/* Copyright 2024 Josh Deprez Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package router import ( "bytes" "context" "log" "net" "time" "gitea.drjosh.dev/josh/jrouter/aurp" "github.com/sfiera/multitalk/pkg/ddp" ) const ( // TODO: check these parameters lastHeardFromTimer = 90 * time.Second tickleRetryLimit = 10 sendRetryTimer = 10 * time.Second sendRetryLimit = 5 reconnectTimer = 10 * time.Minute updateTimer = 10 * time.Second ) type receiverState int const ( rsUnconnected receiverState = iota rsConnected rsWaitForOpenRsp rsWaitForRIRsp rsWaitForTickleAck ) func (rs receiverState) String() string { return map[receiverState]string{ rsUnconnected: "unconnected", rsConnected: "connected", rsWaitForOpenRsp: "waiting for Open-Rsp", rsWaitForRIRsp: "waiting for RI-Rsp", rsWaitForTickleAck: "waiting for Tickle-Ack", }[rs] } type senderState int const ( ssUnconnected senderState = iota ssConnected ssWaitForRIRspAck ssWaitForRIUpdAck ssWaitForRDAck ) func (ss senderState) String() string { return map[senderState]string{ ssUnconnected: "unconnected", ssConnected: "connected", ssWaitForRIRspAck: "waiting for RI-Ack for RI-Rsp", ssWaitForRIUpdAck: "waiting for RI-Ack for RI-Upd", ssWaitForRDAck: "waiting for RI-Ack for RD", }[ss] } type Peer struct { Config *Config Transport *aurp.Transport UDPConn *net.UDPConn RemoteAddr *net.UDPAddr RecieveCh chan aurp.Packet RoutingTable *RoutingTable ZoneTable *ZoneTable Reconnect bool } // Send encodes and sends pkt to the remote host. func (p *Peer) Send(pkt aurp.Packet) (int, error) { var b bytes.Buffer if _, err := pkt.WriteTo(&b); err != nil { return 0, err } log.Printf("AURP Peer: Sending %T (len %d) to %v", pkt, b.Len(), p.RemoteAddr) return p.UDPConn.WriteToUDP(b.Bytes(), p.RemoteAddr) } func (p *Peer) Handle(ctx context.Context) error { rticker := time.NewTicker(1 * time.Second) defer rticker.Stop() sticker := time.NewTicker(1 * time.Second) defer sticker.Stop() lastReconnect := time.Now() lastHeardFrom := time.Now() lastSend := time.Now() lastUpdate := time.Now() sendRetries := 0 var lastRISent aurp.Packet rstate := rsUnconnected sstate := ssUnconnected // Write an Open-Req packet if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err } rstate = rsWaitForOpenRsp for { select { case <-ctx.Done(): if sstate == ssUnconnected { // Return immediately return ctx.Err() } // Send a best-effort Router Down before returning lastRISent = p.Transport.NewRDPacket(aurp.ErrCodeNormalClose) if _, err := p.Send(lastRISent); err != nil { log.Printf("Couldn't send RD packet: %v", err) } return ctx.Err() case <-rticker.C: switch rstate { case rsWaitForOpenRsp: if time.Since(lastSend) <= sendRetryTimer { break } if sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection") rstate = rsUnconnected break } // Send another Open-Req sendRetries++ lastSend = time.Now() if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err } case rsConnected: // Check LHFT, send tickle? if time.Since(lastHeardFrom) <= lastHeardFromTimer { break } if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { log.Printf("AURP Peer: Couldn't send Tickle: %v", err) return err } rstate = rsWaitForTickleAck sendRetries = 0 lastSend = time.Now() case rsWaitForTickleAck: if time.Since(lastSend) <= sendRetryTimer { break } if sendRetries >= tickleRetryLimit { log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection") rstate = rsUnconnected p.RoutingTable.DeletePeer(p) break } sendRetries++ lastSend = time.Now() if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { log.Printf("AURP Peer: Couldn't send Tickle: %v", err) return err } // still in Wait For Tickle-Ack case rsWaitForRIRsp: if time.Since(lastSend) <= sendRetryTimer { break } if sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection") rstate = rsUnconnected p.RoutingTable.DeletePeer(p) break } // RI-Req is stateless, so we don't need to cache the one we // sent earlier just to send it again sendRetries++ if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil { log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) return err } // still in Wait For RI-Rsp case rsUnconnected: // Data receiver is unconnected. If data sender is connected, // send a null RI-Upd to check if the sender is also unconnected if sstate == ssConnected && time.Since(lastSend) > sendRetryTimer { if sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection") } sendRetries++ lastSend = time.Now() aurp.Inc(&p.Transport.LocalSeq) events := aurp.EventTuples{{ EventCode: aurp.EventCodeNull, }} lastRISent = p.Transport.NewRIUpdPacket(events) if _, err := p.Send(lastRISent); err != nil { log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err) return err } sstate = ssWaitForRIUpdAck } if p.Reconnect { // Periodically try to reconnect, if this peer is in the config file if time.Since(lastReconnect) <= reconnectTimer { break } lastReconnect = time.Now() sendRetries = 0 lastSend = time.Now() if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err } rstate = rsWaitForOpenRsp } } case <-sticker.C: switch sstate { case ssUnconnected: // Do nothing case ssConnected: if time.Since(lastUpdate) <= updateTimer { break } // TODO: is there a routing update to send? case ssWaitForRIRspAck, ssWaitForRIUpdAck: if time.Since(lastSend) <= sendRetryTimer { break } if lastRISent == nil { log.Print("AURP Peer: sender retry: lastRISent = nil?") continue } if sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached, closing connection") sstate = ssUnconnected continue } sendRetries++ lastSend = time.Now() if _, err := p.Send(lastRISent); err != nil { log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err) return err } case ssWaitForRDAck: if time.Since(lastSend) <= sendRetryTimer { break } sstate = ssUnconnected } case pkt := <-p.RecieveCh: lastHeardFrom = time.Now() switch pkt := pkt.(type) { case *aurp.OpenReqPacket: if sstate != ssUnconnected { log.Printf("AURP Peer: Open-Req received but sender state is not unconnected (was %v)", sstate) } // The peer tells us their connection ID in Open-Req. p.Transport.RemoteConnID = pkt.ConnectionID // Formulate a response. var orsp *aurp.OpenRspPacket switch { case pkt.Version != 1: // Respond with Open-Rsp with unknown version error. orsp = p.Transport.NewOpenRspPacket(0, int16(aurp.ErrCodeInvalidVersion), nil) case len(pkt.Options) > 0: // Options? OPTIONS? We don't accept no stinkin' _options_ orsp = p.Transport.NewOpenRspPacket(0, int16(aurp.ErrCodeOptionNegotiation), nil) default: // Accept it I guess. orsp = p.Transport.NewOpenRspPacket(0, 1, nil) } if _, err := p.Send(orsp); err != nil { log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err) return err } if orsp.RateOrErrCode >= 0 { sstate = ssConnected } // If receiver is unconnected, commence connecting if rstate == rsUnconnected { lastSend = time.Now() sendRetries = 0 if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err } rstate = rsWaitForOpenRsp } case *aurp.OpenRspPacket: if rstate != rsWaitForOpenRsp { log.Printf("AURP Peer: Received Open-Rsp but was not waiting for one (receiver state was %v)", rstate) } if pkt.RateOrErrCode < 0 { // It's an error code. log.Printf("AURP Peer: Open-Rsp error code from peer %v: %d", p.RemoteAddr.IP, pkt.RateOrErrCode) rstate = rsUnconnected break } //log.Printf("AURP Peer: Data receiver is connected!") rstate = rsConnected // Send an RI-Req sendRetries = 0 if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil { log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) return err } rstate = rsWaitForRIRsp case *aurp.RIReqPacket: if sstate != ssConnected { log.Printf("AURP Peer: Received RI-Req but was not expecting one (sender state was %v)", sstate) } nets := aurp.NetworkTuples{ { Extended: true, RangeStart: uint16(p.Config.EtherTalk.NetStart), RangeEnd: uint16(p.Config.EtherTalk.NetEnd), Distance: 0, }, } p.Transport.LocalSeq = 1 lastRISent = p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets) if _, err := p.Send(lastRISent); err != nil { log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err) return err } sstate = ssWaitForRIRspAck case *aurp.RIRspPacket: if rstate != rsWaitForRIRsp { log.Printf("Received RI-Rsp but was not waiting for one (receiver state was %v)", rstate) } log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks) for _, nt := range pkt.Networks { p.RoutingTable.UpsertRoute( nt.Extended, ddp.Network(nt.RangeStart), ddp.Network(nt.RangeEnd), p, nt.Distance, ) } // TODO: track which networks we don't have zone info for, and // only set SZI for those ? if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil { log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err) return err } if pkt.Flags&aurp.RoutingFlagLast != 0 { // No longer waiting for an RI-Rsp rstate = rsConnected } case *aurp.RIAckPacket: switch sstate { case ssWaitForRIRspAck: // We sent an RI-Rsp, this is the RI-Ack we expected. case ssWaitForRIUpdAck: // We sent an RI-Upd, this is the RI-Ack we expected. case ssWaitForRDAck: // We sent an RD... Why are we here? continue default: log.Printf("AURP Peer: Received RI-Ack but was not waiting for one (sender state was %v)", sstate) } sstate = ssConnected sendRetries = 0 // If SZI flag is set, send ZI-Rsp (transaction) // TODO: split ZI-Rsp packets similarly to ZIP Replies if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 { zones := map[ddp.Network][]string{ p.Config.EtherTalk.NetStart: {p.Config.EtherTalk.ZoneName}, } if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil { log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err) } } // TODO: Continue sending next RI-Rsp (streamed)? case *aurp.RIUpdPacket: // TODO: Integrate info into route table for _, et := range pkt.Events { log.Printf("AURP Peer: RI-Upd event %v", et) } if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil { log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err) return err } case *aurp.RDPacket: if rstate == rsUnconnected || rstate == rsWaitForOpenRsp { log.Printf("AURP Peer: Received RD but was not expecting one (receiver state was %v)", rstate) } log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode) p.RoutingTable.DeletePeer(p) // Respond with RI-Ack if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil { log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err) return err } // Connections closed rstate = rsUnconnected sstate = ssUnconnected case *aurp.ZIReqPacket: // TODO: split ZI-Rsp packets similarly to ZIP Replies zones := p.ZoneTable.Query(pkt.Networks) if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil { log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err) return err } case *aurp.ZIRspPacket: log.Printf("AURP Peer: Learned about these zones: %v", pkt.Zones) for _, zt := range pkt.Zones { p.ZoneTable.Upsert(ddp.Network(zt.Network), zt.Name, false) } case *aurp.GDZLReqPacket: if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil { log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err) return err } case *aurp.GDZLRspPacket: log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird") case *aurp.GZNReqPacket: if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil { log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err) return err } case *aurp.GZNRspPacket: log.Printf("AURP Peer: Received a GZN-Rsp, but I wouldn't have sent a GZN-Req - that's weird") case *aurp.TicklePacket: // Immediately respond with Tickle-Ack if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil { log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err) return err } case *aurp.TickleAckPacket: if rstate != rsWaitForTickleAck { log.Printf("AURP Peer: Received Tickle-Ack but was not waiting for one (receiver state was %v)", rstate) } rstate = rsConnected } } } }