This commit is contained in:
Josh Deprez 2024-04-19 16:25:39 +10:00
parent 183e2d079b
commit 5a25d6697b
No known key found for this signature in database
3 changed files with 50 additions and 34 deletions

View file

@ -91,7 +91,7 @@ func (p *Peer) Send(pkt aurp.Packet) (int, error) {
if _, err := pkt.WriteTo(&b); err != nil { if _, err := pkt.WriteTo(&b); err != nil {
return 0, err return 0, err
} }
log.Printf("Sending %T (len %d) to %v", pkt, b.Len(), p.RemoteAddr) log.Printf("AURP Peer: Sending %T (len %d) to %v", pkt, b.Len(), p.RemoteAddr)
return p.UDPConn.WriteToUDP(b.Bytes(), p.RemoteAddr) return p.UDPConn.WriteToUDP(b.Bytes(), p.RemoteAddr)
} }
@ -108,7 +108,7 @@ func (p *Peer) Handle(ctx context.Context) error {
// Write an Open-Req packet // Write an Open-Req packet
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -134,7 +134,7 @@ func (p *Peer) Handle(ctx context.Context) error {
break break
} }
if sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("Send retry limit reached while waiting for Open-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection")
rstate = rsUnconnected rstate = rsUnconnected
break break
} }
@ -143,7 +143,7 @@ func (p *Peer) Handle(ctx context.Context) error {
sendRetries++ sendRetries++
lastSend = time.Now() lastSend = time.Now()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -153,7 +153,7 @@ func (p *Peer) Handle(ctx context.Context) error {
break break
} }
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("Couldn't send Tickle: %v", err) log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err return err
} }
rstate = rsWaitForTickleAck rstate = rsWaitForTickleAck
@ -165,7 +165,7 @@ func (p *Peer) Handle(ctx context.Context) error {
break break
} }
if sendRetries >= tickleRetryLimit { if sendRetries >= tickleRetryLimit {
log.Printf("Send retry limit reached while waiting for Tickle-Ack, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
rstate = rsUnconnected rstate = rsUnconnected
break break
} }
@ -173,7 +173,7 @@ func (p *Peer) Handle(ctx context.Context) error {
sendRetries++ sendRetries++
lastSend = time.Now() lastSend = time.Now()
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("Couldn't send Tickle: %v", err) log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err return err
} }
@ -190,7 +190,7 @@ func (p *Peer) Handle(ctx context.Context) error {
switch pkt := pkt.(type) { switch pkt := pkt.(type) {
case *aurp.OpenReqPacket: case *aurp.OpenReqPacket:
if sstate != ssUnconnected { if sstate != ssUnconnected {
log.Printf("Open-Req received but sender state is not unconnected (was %v)", sstate) log.Printf("AURP Peer: Open-Req received but sender state is not unconnected (was %v)", sstate)
} }
// The peer tells us their connection ID in Open-Req. // The peer tells us their connection ID in Open-Req.
@ -213,7 +213,7 @@ func (p *Peer) Handle(ctx context.Context) error {
} }
if _, err := p.Send(orsp); err != nil { if _, err := p.Send(orsp); err != nil {
log.Printf("Couldn't send Open-Rsp: %v", err) log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err)
return err return err
} }
if orsp.RateOrErrCode >= 0 { if orsp.RateOrErrCode >= 0 {
@ -225,7 +225,7 @@ func (p *Peer) Handle(ctx context.Context) error {
lastSend = time.Now() lastSend = time.Now()
sendRetries = 0 sendRetries = 0
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
rstate = rsWaitForOpenRsp rstate = rsWaitForOpenRsp
@ -233,27 +233,27 @@ func (p *Peer) Handle(ctx context.Context) error {
case *aurp.OpenRspPacket: case *aurp.OpenRspPacket:
if rstate != rsWaitForOpenRsp { if rstate != rsWaitForOpenRsp {
log.Printf("Received Open-Rsp but was not waiting for one (receiver state was %v)", rstate) log.Printf("AURP Peer: Received Open-Rsp but was not waiting for one (receiver state was %v)", rstate)
} }
if pkt.RateOrErrCode < 0 { if pkt.RateOrErrCode < 0 {
// It's an error code. // It's an error code.
log.Printf("Open-Rsp error code from peer %v: %d", p.RemoteAddr.IP, pkt.RateOrErrCode) log.Printf("AURP Peer: Open-Rsp error code from peer %v: %d", p.RemoteAddr.IP, pkt.RateOrErrCode)
rstate = rsUnconnected rstate = rsUnconnected
break break
} }
log.Printf("Data receiver is connected!") //log.Printf("AURP Peer: Data receiver is connected!")
rstate = rsConnected rstate = rsConnected
// Send an RI-Req // Send an RI-Req
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil { if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("Couldn't send RI-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err return err
} }
rstate = rsWaitForRIRsp rstate = rsWaitForRIRsp
case *aurp.RIReqPacket: case *aurp.RIReqPacket:
if sstate != ssConnected { if sstate != ssConnected {
log.Printf("Received RI-Req but was not expecting one (sender state was %v)", sstate) log.Printf("AURP Peer: Received RI-Req but was not expecting one (sender state was %v)", sstate)
} }
nets := aurp.NetworkTuples{ nets := aurp.NetworkTuples{
@ -266,7 +266,7 @@ func (p *Peer) Handle(ctx context.Context) error {
} }
p.Transport.LocalSeq = 1 p.Transport.LocalSeq = 1
if _, err := p.Send(p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets)); err != nil { if _, err := p.Send(p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets)); err != nil {
log.Printf("Couldn't send RI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err)
return err return err
} }
sstate = ssWaitForRIAck1 sstate = ssWaitForRIAck1
@ -276,7 +276,7 @@ func (p *Peer) Handle(ctx context.Context) error {
log.Printf("Received RI-Rsp but was not waiting for one (receiver state was %v)", rstate) log.Printf("Received RI-Rsp but was not waiting for one (receiver state was %v)", rstate)
} }
log.Printf("Learned about these networks: %v", pkt.Networks) log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks)
for _, nt := range pkt.Networks { for _, nt := range pkt.Networks {
p.RoutingTable.UpsertRoute( p.RoutingTable.UpsertRoute(
@ -291,7 +291,7 @@ func (p *Peer) Handle(ctx context.Context) error {
// TODO: track which networks we don't have zone info for, and // TODO: track which networks we don't have zone info for, and
// only set SZI for those ? // only set SZI for those ?
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
log.Printf("Couldn't send RI-Ack packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err)
return err return err
} }
if pkt.Flags&aurp.RoutingFlagLast != 0 { if pkt.Flags&aurp.RoutingFlagLast != 0 {
@ -312,7 +312,7 @@ func (p *Peer) Handle(ctx context.Context) error {
continue continue
default: default:
log.Printf("Received RI-Ack but was not waiting for one (sender state was %v)", sstate) log.Printf("AURP Peer: Received RI-Ack but was not waiting for one (sender state was %v)", sstate)
} }
sstate = ssConnected sstate = ssConnected
@ -328,7 +328,7 @@ func (p *Peer) Handle(ctx context.Context) error {
}, },
} }
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil { if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("Couldn't send ZI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
} }
} }
@ -339,14 +339,15 @@ func (p *Peer) Handle(ctx context.Context) error {
case *aurp.RDPacket: case *aurp.RDPacket:
if rstate == rsUnconnected || rstate == rsWaitForOpenRsp { if rstate == rsUnconnected || rstate == rsWaitForOpenRsp {
log.Printf("Received RD but was not expecting one (receiver state was %v)", rstate) log.Printf("AURP Peer: Received RD but was not expecting one (receiver state was %v)", rstate)
} }
// TODO: Remove router from route tables
log.Printf("Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode) log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode)
p.RoutingTable.DeletePeer(p)
// Respond with RI-Ack // Respond with RI-Ack
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
log.Printf("Couldn't send RI-Ack: %v", err) log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err return err
} }
// Connection closed // Connection closed
@ -362,44 +363,44 @@ func (p *Peer) Handle(ctx context.Context) error {
}, },
} }
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil { if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("Couldn't send ZI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
return err return err
} }
case *aurp.ZIRspPacket: case *aurp.ZIRspPacket:
log.Printf("Learned about these zones: %v", pkt.Zones) log.Printf("AURP Peer: Learned about these zones: %v", pkt.Zones)
for _, zt := range pkt.Zones { for _, zt := range pkt.Zones {
p.ZoneTable.Upsert(ddp.Network(zt.Network), zt.Name, false) p.ZoneTable.Upsert(ddp.Network(zt.Network), zt.Name, false)
} }
case *aurp.GDZLReqPacket: case *aurp.GDZLReqPacket:
if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil { if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
log.Printf("Couldn't send GDZL-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err)
return err return err
} }
case *aurp.GDZLRspPacket: case *aurp.GDZLRspPacket:
log.Printf("Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird") log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird")
case *aurp.GZNReqPacket: case *aurp.GZNReqPacket:
if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil { if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
log.Printf("Couldn't send GZN-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err)
return err return err
} }
case *aurp.GZNRspPacket: case *aurp.GZNRspPacket:
log.Printf("Received a GZN-Rsp, but I wouldn't have sent a GZN-Req - that's weird") log.Printf("AURP Peer: Received a GZN-Rsp, but I wouldn't have sent a GZN-Req - that's weird")
case *aurp.TicklePacket: case *aurp.TicklePacket:
// Immediately respond with Tickle-Ack // Immediately respond with Tickle-Ack
if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil { if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil {
log.Printf("Couldn't send Tickle-Ack: %v", err) log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err)
return err return err
} }
case *aurp.TickleAckPacket: case *aurp.TickleAckPacket:
if rstate != rsWaitForTickleAck { if rstate != rsWaitForTickleAck {
log.Printf("Received Tickle-Ack but was not waiting for one (receiver state was %v)", rstate) log.Printf("AURP Peer: Received Tickle-Ack but was not waiting for one (receiver state was %v)", rstate)
} }
rstate = rsConnected rstate = rsConnected
} }

View file

@ -72,6 +72,17 @@ func (rt *RoutingTable) LookupRoute(network ddp.Network) *Route {
return bestRoute return bestRoute
} }
func (rt *RoutingTable) DeletePeer(peer *Peer) {
rt.mu.Lock()
defer rt.mu.Unlock()
for route := range rt.routes {
if route.Peer == peer {
delete(rt.routes, route)
}
}
}
func (rt *RoutingTable) UpsertRoute(extended bool, netStart, netEnd ddp.Network, peer *Peer, metric uint8) error { func (rt *RoutingTable) UpsertRoute(extended bool, netStart, netEnd ddp.Network, peer *Peer, metric uint8) error {
if netStart > netEnd { if netStart > netEnd {
return fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd) return fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd)

View file

@ -83,7 +83,7 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket)
} }
switch req.Function { switch req.Function {
case 1: // RTMP Request case rtmp.FunctionRequest:
// Respond with RTMP Response // Respond with RTMP Response
respPkt := &rtmp.ResponsePacket{ respPkt := &rtmp.ResponsePacket{
SenderAddr: myAddr.Proto, SenderAddr: myAddr.Proto,
@ -115,7 +115,7 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket)
log.Printf("RTMP: Couldn't send Data broadcast: %v", err) log.Printf("RTMP: Couldn't send Data broadcast: %v", err)
} }
case 2, 3: case rtmp.FunctionRDRSplitHorizon, rtmp.FunctionRDRComplete:
// Like the Data broadcast, but solicited by a request (RDR). // Like the Data broadcast, but solicited by a request (RDR).
// TODO: handle split-horizon processing // TODO: handle split-horizon processing
for _, dataPkt := range m.dataPackets(myAddr.Proto) { for _, dataPkt := range m.dataPackets(myAddr.Proto) {
@ -145,6 +145,10 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket)
break break
} }
} }
case rtmp.FunctionLoopProbe:
log.Printf("RTMP: TODO: handle Loop Probes")
} }
case ddp.ProtoRTMPResp: case ddp.ProtoRTMPResp: