diff --git a/router/peer_aurp.go b/router/peer_aurp.go index ce389ca..ac4a914 100644 --- a/router/peer_aurp.go +++ b/router/peer_aurp.go @@ -115,7 +115,8 @@ type AURPPeer struct { RouteTable *RouteTable // Event tuples yet to be sent to this peer in an RI-Upd. - pendingEvents chan aurp.EventTuple + pendingEventsMu sync.Mutex + pendingEvents aurp.EventTuples // The internal states below are only set within the Handle loop, but can // be read concurrently from outside. @@ -144,7 +145,6 @@ func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, radd RemoteAddr: raddr, ReceiveCh: make(chan aurp.Packet, 1024), RouteTable: routes, - pendingEvents: make(chan aurp.EventTuple, 1024), } } @@ -164,7 +164,9 @@ func (p *AURPPeer) addPendingEvent(ec aurp.EventCode, route *Route) { case aurp.EventCodeND, aurp.EventCodeNRC: et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0." } - p.pendingEvents <- et + p.pendingEventsMu.Lock() + defer p.pendingEventsMu.Unlock() + p.pendingEvents = append(p.pendingEvents, et) } func (p *AURPPeer) RouteAdded(route *Route) { @@ -300,6 +302,9 @@ func (p *AURPPeer) send(pkt aurp.Packet) (int, error) { } func (p *AURPPeer) Handle(ctx context.Context) error { + // Stop listening to events if the goroutine exits + defer p.RouteTable.RemoveObserver(p) + rticker := time.NewTicker(1 * time.Second) defer rticker.Stop() sticker := time.NewTicker(1 * time.Second) @@ -468,8 +473,29 @@ func (p *AURPPeer) Handle(ctx context.Context) error { if time.Since(p.lastUpdate) <= updateTimer { break } - // TODO: is there a routing update to send? + + // Are there routing updates to send? + p.pendingEventsMu.Lock() + if len(p.pendingEvents) == 0 { + p.pendingEventsMu.Unlock() + break + } + // Yes - swap the slices, release the mutex, then send them + pending := p.pendingEvents + p.pendingEvents = make(aurp.EventTuples, 0, cap(pending)) + p.pendingEventsMu.Unlock() + + // TODO: eliminate events that cancel out (e.g. NA then ND) + // TODO: split pending events to fit within a packet + p.bumpLastUpdate() + aurp.Inc(&p.Transport.LocalSeq) + lastRISent = p.Transport.NewRIUpdPacket(pending) + if _, err := p.send(lastRISent); err != nil { + log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err) + return err + } + p.setSState(SenderWaitForRIUpdAck) case SenderWaitForRIRspAck, SenderWaitForRIUpdAck: if time.Since(p.lastSend) <= sendRetryTimer { @@ -482,6 +508,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { if p.sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached, closing connection") p.setSState(SenderUnconnected) + p.RouteTable.RemoveObserver(p) continue } p.incSendRetries() @@ -496,6 +523,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { break } p.setSState(SenderUnconnected) + p.RouteTable.RemoveObserver(p) } case pkt := <-p.ReceiveCh: @@ -531,7 +559,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error { return err } if orsp.RateOrErrCode >= 0 { + // Data sender is successfully in connected state p.setSState(SenderConnected) + p.RouteTable.AddObserver(p) } // If receiver is unconnected, commence connecting @@ -635,6 +665,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { p.setSState(SenderConnected) p.resetSendRetries() + p.RouteTable.AddObserver(p) // If SZI flag is set, send ZI-Rsp (transaction) if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {