Send pending routing events

This commit is contained in:
Josh Deprez 2024-06-07 16:00:43 +10:00
parent 9c808cbf63
commit a14ae919ff
No known key found for this signature in database

View file

@ -115,7 +115,8 @@ type AURPPeer struct {
RouteTable *RouteTable RouteTable *RouteTable
// Event tuples yet to be sent to this peer in an RI-Upd. // Event tuples yet to be sent to this peer in an RI-Upd.
pendingEvents chan aurp.EventTuple pendingEventsMu sync.Mutex
pendingEvents aurp.EventTuples
// The internal states below are only set within the Handle loop, but can // The internal states below are only set within the Handle loop, but can
// be read concurrently from outside. // be read concurrently from outside.
@ -144,7 +145,6 @@ func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, radd
RemoteAddr: raddr, RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024), ReceiveCh: make(chan aurp.Packet, 1024),
RouteTable: routes, RouteTable: routes,
pendingEvents: make(chan aurp.EventTuple, 1024),
} }
} }
@ -164,7 +164,9 @@ func (p *AURPPeer) addPendingEvent(ec aurp.EventCode, route *Route) {
case aurp.EventCodeND, aurp.EventCodeNRC: case aurp.EventCodeND, aurp.EventCodeNRC:
et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0." et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0."
} }
p.pendingEvents <- et p.pendingEventsMu.Lock()
defer p.pendingEventsMu.Unlock()
p.pendingEvents = append(p.pendingEvents, et)
} }
func (p *AURPPeer) RouteAdded(route *Route) { func (p *AURPPeer) RouteAdded(route *Route) {
@ -300,6 +302,9 @@ func (p *AURPPeer) send(pkt aurp.Packet) (int, error) {
} }
func (p *AURPPeer) Handle(ctx context.Context) error { func (p *AURPPeer) Handle(ctx context.Context) error {
// Stop listening to events if the goroutine exits
defer p.RouteTable.RemoveObserver(p)
rticker := time.NewTicker(1 * time.Second) rticker := time.NewTicker(1 * time.Second)
defer rticker.Stop() defer rticker.Stop()
sticker := time.NewTicker(1 * time.Second) sticker := time.NewTicker(1 * time.Second)
@ -468,8 +473,29 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if time.Since(p.lastUpdate) <= updateTimer { if time.Since(p.lastUpdate) <= updateTimer {
break break
} }
// TODO: is there a routing update to send?
// Are there routing updates to send?
p.pendingEventsMu.Lock()
if len(p.pendingEvents) == 0 {
p.pendingEventsMu.Unlock()
break
}
// Yes - swap the slices, release the mutex, then send them
pending := p.pendingEvents
p.pendingEvents = make(aurp.EventTuples, 0, cap(pending))
p.pendingEventsMu.Unlock()
// TODO: eliminate events that cancel out (e.g. NA then ND)
// TODO: split pending events to fit within a packet
p.bumpLastUpdate() p.bumpLastUpdate()
aurp.Inc(&p.Transport.LocalSeq)
lastRISent = p.Transport.NewRIUpdPacket(pending)
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err
}
p.setSState(SenderWaitForRIUpdAck)
case SenderWaitForRIRspAck, SenderWaitForRIUpdAck: case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(p.lastSend) <= sendRetryTimer {
@ -482,6 +508,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if p.sendRetries >= sendRetryLimit { if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached, closing connection") log.Printf("AURP Peer: Send retry limit reached, closing connection")
p.setSState(SenderUnconnected) p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
continue continue
} }
p.incSendRetries() p.incSendRetries()
@ -496,6 +523,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
break break
} }
p.setSState(SenderUnconnected) p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
} }
case pkt := <-p.ReceiveCh: case pkt := <-p.ReceiveCh:
@ -531,7 +559,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
return err return err
} }
if orsp.RateOrErrCode >= 0 { if orsp.RateOrErrCode >= 0 {
// Data sender is successfully in connected state
p.setSState(SenderConnected) p.setSState(SenderConnected)
p.RouteTable.AddObserver(p)
} }
// If receiver is unconnected, commence connecting // If receiver is unconnected, commence connecting
@ -635,6 +665,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.setSState(SenderConnected) p.setSState(SenderConnected)
p.resetSendRetries() p.resetSendRetries()
p.RouteTable.AddObserver(p)
// If SZI flag is set, send ZI-Rsp (transaction) // If SZI flag is set, send ZI-Rsp (transaction)
if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 { if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {