From 5a25d6697b091735445723334ae751bd47485cbe Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Fri, 19 Apr 2024 16:25:39 +1000 Subject: [PATCH] Progress --- router/peer.go | 65 +++++++++++++++++++++++++------------------------ router/route.go | 11 +++++++++ router/rtmp.go | 8 ++++-- 3 files changed, 50 insertions(+), 34 deletions(-) diff --git a/router/peer.go b/router/peer.go index 320e329..8ce75a1 100644 --- a/router/peer.go +++ b/router/peer.go @@ -91,7 +91,7 @@ func (p *Peer) Send(pkt aurp.Packet) (int, error) { if _, err := pkt.WriteTo(&b); err != nil { return 0, err } - log.Printf("Sending %T (len %d) to %v", pkt, b.Len(), p.RemoteAddr) + log.Printf("AURP Peer: Sending %T (len %d) to %v", pkt, b.Len(), p.RemoteAddr) return p.UDPConn.WriteToUDP(b.Bytes(), p.RemoteAddr) } @@ -108,7 +108,7 @@ func (p *Peer) Handle(ctx context.Context) error { // Write an Open-Req packet if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { - log.Printf("Couldn't send Open-Req packet: %v", err) + log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err } @@ -134,7 +134,7 @@ func (p *Peer) Handle(ctx context.Context) error { break } if sendRetries >= sendRetryLimit { - log.Printf("Send retry limit reached while waiting for Open-Rsp, closing connection") + log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection") rstate = rsUnconnected break } @@ -143,7 +143,7 @@ func (p *Peer) Handle(ctx context.Context) error { sendRetries++ lastSend = time.Now() if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { - log.Printf("Couldn't send Open-Req packet: %v", err) + log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err } @@ -153,7 +153,7 @@ func (p *Peer) Handle(ctx context.Context) error { break } if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { - log.Printf("Couldn't send Tickle: %v", err) + log.Printf("AURP Peer: Couldn't send Tickle: %v", err) return err } rstate = rsWaitForTickleAck @@ -165,7 +165,7 @@ func (p *Peer) Handle(ctx context.Context) error { break } if sendRetries >= tickleRetryLimit { - log.Printf("Send retry limit reached while waiting for Tickle-Ack, closing connection") + log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection") rstate = rsUnconnected break } @@ -173,7 +173,7 @@ func (p *Peer) Handle(ctx context.Context) error { sendRetries++ lastSend = time.Now() if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { - log.Printf("Couldn't send Tickle: %v", err) + log.Printf("AURP Peer: Couldn't send Tickle: %v", err) return err } @@ -190,7 +190,7 @@ func (p *Peer) Handle(ctx context.Context) error { switch pkt := pkt.(type) { case *aurp.OpenReqPacket: if sstate != ssUnconnected { - log.Printf("Open-Req received but sender state is not unconnected (was %v)", sstate) + log.Printf("AURP Peer: Open-Req received but sender state is not unconnected (was %v)", sstate) } // The peer tells us their connection ID in Open-Req. @@ -213,7 +213,7 @@ func (p *Peer) Handle(ctx context.Context) error { } if _, err := p.Send(orsp); err != nil { - log.Printf("Couldn't send Open-Rsp: %v", err) + log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err) return err } if orsp.RateOrErrCode >= 0 { @@ -225,7 +225,7 @@ func (p *Peer) Handle(ctx context.Context) error { lastSend = time.Now() sendRetries = 0 if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { - log.Printf("Couldn't send Open-Req packet: %v", err) + log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err } rstate = rsWaitForOpenRsp @@ -233,27 +233,27 @@ func (p *Peer) Handle(ctx context.Context) error { case *aurp.OpenRspPacket: if rstate != rsWaitForOpenRsp { - log.Printf("Received Open-Rsp but was not waiting for one (receiver state was %v)", rstate) + log.Printf("AURP Peer: Received Open-Rsp but was not waiting for one (receiver state was %v)", rstate) } if pkt.RateOrErrCode < 0 { // It's an error code. - log.Printf("Open-Rsp error code from peer %v: %d", p.RemoteAddr.IP, pkt.RateOrErrCode) + log.Printf("AURP Peer: Open-Rsp error code from peer %v: %d", p.RemoteAddr.IP, pkt.RateOrErrCode) rstate = rsUnconnected break } - log.Printf("Data receiver is connected!") + //log.Printf("AURP Peer: Data receiver is connected!") rstate = rsConnected // Send an RI-Req if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil { - log.Printf("Couldn't send RI-Req packet: %v", err) + log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) return err } rstate = rsWaitForRIRsp case *aurp.RIReqPacket: if sstate != ssConnected { - log.Printf("Received RI-Req but was not expecting one (sender state was %v)", sstate) + log.Printf("AURP Peer: Received RI-Req but was not expecting one (sender state was %v)", sstate) } nets := aurp.NetworkTuples{ @@ -266,7 +266,7 @@ func (p *Peer) Handle(ctx context.Context) error { } p.Transport.LocalSeq = 1 if _, err := p.Send(p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets)); err != nil { - log.Printf("Couldn't send RI-Rsp packet: %v", err) + log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err) return err } sstate = ssWaitForRIAck1 @@ -276,7 +276,7 @@ func (p *Peer) Handle(ctx context.Context) error { log.Printf("Received RI-Rsp but was not waiting for one (receiver state was %v)", rstate) } - log.Printf("Learned about these networks: %v", pkt.Networks) + log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks) for _, nt := range pkt.Networks { p.RoutingTable.UpsertRoute( @@ -291,7 +291,7 @@ func (p *Peer) Handle(ctx context.Context) error { // TODO: track which networks we don't have zone info for, and // only set SZI for those ? if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil { - log.Printf("Couldn't send RI-Ack packet: %v", err) + log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err) return err } if pkt.Flags&aurp.RoutingFlagLast != 0 { @@ -312,7 +312,7 @@ func (p *Peer) Handle(ctx context.Context) error { continue default: - log.Printf("Received RI-Ack but was not waiting for one (sender state was %v)", sstate) + log.Printf("AURP Peer: Received RI-Ack but was not waiting for one (sender state was %v)", sstate) } sstate = ssConnected @@ -328,7 +328,7 @@ func (p *Peer) Handle(ctx context.Context) error { }, } if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil { - log.Printf("Couldn't send ZI-Rsp packet: %v", err) + log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err) } } @@ -339,14 +339,15 @@ func (p *Peer) Handle(ctx context.Context) error { case *aurp.RDPacket: if rstate == rsUnconnected || rstate == rsWaitForOpenRsp { - log.Printf("Received RD but was not expecting one (receiver state was %v)", rstate) + log.Printf("AURP Peer: Received RD but was not expecting one (receiver state was %v)", rstate) } - // TODO: Remove router from route tables - log.Printf("Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode) + log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode) + p.RoutingTable.DeletePeer(p) + // Respond with RI-Ack if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil { - log.Printf("Couldn't send RI-Ack: %v", err) + log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err) return err } // Connection closed @@ -362,44 +363,44 @@ func (p *Peer) Handle(ctx context.Context) error { }, } if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil { - log.Printf("Couldn't send ZI-Rsp packet: %v", err) + log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err) return err } case *aurp.ZIRspPacket: - log.Printf("Learned about these zones: %v", pkt.Zones) + log.Printf("AURP Peer: Learned about these zones: %v", pkt.Zones) for _, zt := range pkt.Zones { p.ZoneTable.Upsert(ddp.Network(zt.Network), zt.Name, false) } case *aurp.GDZLReqPacket: if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil { - log.Printf("Couldn't send GDZL-Rsp packet: %v", err) + log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err) return err } case *aurp.GDZLRspPacket: - log.Printf("Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird") + log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird") case *aurp.GZNReqPacket: if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil { - log.Printf("Couldn't send GZN-Rsp packet: %v", err) + log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err) return err } case *aurp.GZNRspPacket: - log.Printf("Received a GZN-Rsp, but I wouldn't have sent a GZN-Req - that's weird") + log.Printf("AURP Peer: Received a GZN-Rsp, but I wouldn't have sent a GZN-Req - that's weird") case *aurp.TicklePacket: // Immediately respond with Tickle-Ack if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil { - log.Printf("Couldn't send Tickle-Ack: %v", err) + log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err) return err } case *aurp.TickleAckPacket: if rstate != rsWaitForTickleAck { - log.Printf("Received Tickle-Ack but was not waiting for one (receiver state was %v)", rstate) + log.Printf("AURP Peer: Received Tickle-Ack but was not waiting for one (receiver state was %v)", rstate) } rstate = rsConnected } diff --git a/router/route.go b/router/route.go index e8218d1..7ef9c72 100644 --- a/router/route.go +++ b/router/route.go @@ -72,6 +72,17 @@ func (rt *RoutingTable) LookupRoute(network ddp.Network) *Route { return bestRoute } +func (rt *RoutingTable) DeletePeer(peer *Peer) { + rt.mu.Lock() + defer rt.mu.Unlock() + + for route := range rt.routes { + if route.Peer == peer { + delete(rt.routes, route) + } + } +} + func (rt *RoutingTable) UpsertRoute(extended bool, netStart, netEnd ddp.Network, peer *Peer, metric uint8) error { if netStart > netEnd { return fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd) diff --git a/router/rtmp.go b/router/rtmp.go index b63976f..c9515aa 100644 --- a/router/rtmp.go +++ b/router/rtmp.go @@ -83,7 +83,7 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) } switch req.Function { - case 1: // RTMP Request + case rtmp.FunctionRequest: // Respond with RTMP Response respPkt := &rtmp.ResponsePacket{ SenderAddr: myAddr.Proto, @@ -115,7 +115,7 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) log.Printf("RTMP: Couldn't send Data broadcast: %v", err) } - case 2, 3: + case rtmp.FunctionRDRSplitHorizon, rtmp.FunctionRDRComplete: // Like the Data broadcast, but solicited by a request (RDR). // TODO: handle split-horizon processing for _, dataPkt := range m.dataPackets(myAddr.Proto) { @@ -145,6 +145,10 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) break } } + + case rtmp.FunctionLoopProbe: + log.Printf("RTMP: TODO: handle Loop Probes") + } case ddp.ProtoRTMPResp: