diff --git a/main.go b/main.go index 699c5b6..4a8d31a 100644 --- a/main.go +++ b/main.go @@ -297,18 +297,7 @@ func main() { continue } - peer := &router.AURPPeer{ - Transport: &aurp.Transport{ - LocalDI: localDI, - RemoteDI: aurp.IPDomainIdentifier(raddr.IP), - LocalConnID: nextConnID, - }, - UDPConn: ln, - ConfiguredAddr: peerStr, - RemoteAddr: raddr, - ReceiveCh: make(chan aurp.Packet, 1024), - RouteTable: routes, - } + peer := router.NewAURPPeer(routes, ln, peerStr, raddr, localDI, nil, nextConnID) aurp.Inc(&nextConnID) peersMu.Lock() peers[udpAddrFromNet(raddr)] = peer @@ -404,17 +393,7 @@ func main() { continue } // New peer! - pr = &router.AURPPeer{ - Transport: &aurp.Transport{ - LocalDI: localDI, - RemoteDI: dh.SourceDI, // platinum rule - LocalConnID: nextConnID, - }, - UDPConn: ln, - RemoteAddr: raddr, - ReceiveCh: make(chan aurp.Packet, 1024), - RouteTable: routes, - } + pr = router.NewAURPPeer(routes, ln, "", raddr, localDI, dh.SourceDI, nextConnID) aurp.Inc(&nextConnID) peers[ra] = pr goPeerHandler(pr) diff --git a/router/peer_aurp.go b/router/peer_aurp.go index 395efb0..ac4a914 100644 --- a/router/peer_aurp.go +++ b/router/peer_aurp.go @@ -114,6 +114,10 @@ type AURPPeer struct { // Route table (the peer will add/remove/update routes and zones) RouteTable *RouteTable + // Event tuples yet to be sent to this peer in an RI-Upd. + pendingEventsMu sync.Mutex + pendingEvents aurp.EventTuples + // The internal states below are only set within the Handle loop, but can // be read concurrently from outside. mu sync.RWMutex @@ -126,6 +130,61 @@ type AURPPeer struct { sendRetries int } +func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, raddr *net.UDPAddr, localDI, remoteDI aurp.DomainIdentifier, connID uint16) *AURPPeer { + if remoteDI == nil { + remoteDI = aurp.IPDomainIdentifier(raddr.IP) + } + return &AURPPeer{ + Transport: &aurp.Transport{ + LocalDI: localDI, + RemoteDI: remoteDI, + LocalConnID: connID, + }, + UDPConn: udpConn, + ConfiguredAddr: peerAddr, + RemoteAddr: raddr, + ReceiveCh: make(chan aurp.Packet, 1024), + RouteTable: routes, + } +} + +func (p *AURPPeer) addPendingEvent(ec aurp.EventCode, route *Route) { + // Don't advertise routes to AURP peers to other AURP peers + if route.AURPPeer != nil { + return + } + et := aurp.EventTuple{ + EventCode: ec, + Extended: route.Extended, + RangeStart: route.NetStart, + Distance: route.Distance, + RangeEnd: route.NetEnd, + } + switch ec { + case aurp.EventCodeND, aurp.EventCodeNRC: + et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0." + } + p.pendingEventsMu.Lock() + defer p.pendingEventsMu.Unlock() + p.pendingEvents = append(p.pendingEvents, et) +} + +func (p *AURPPeer) RouteAdded(route *Route) { + p.addPendingEvent(aurp.EventCodeNA, route) +} + +func (p *AURPPeer) RouteDeleted(route *Route) { + p.addPendingEvent(aurp.EventCodeND, route) +} + +func (p *AURPPeer) RouteDistanceChanged(route *Route) { + p.addPendingEvent(aurp.EventCodeNDC, route) +} + +func (p *AURPPeer) RouteForwarderChanged(route *Route) { + p.addPendingEvent(aurp.EventCodeNRC, route) +} + func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error { outPkt, err := ddp.ExtMarshal(*ddpkt) if err != nil { @@ -243,6 +302,9 @@ func (p *AURPPeer) send(pkt aurp.Packet) (int, error) { } func (p *AURPPeer) Handle(ctx context.Context) error { + // Stop listening to events if the goroutine exits + defer p.RouteTable.RemoveObserver(p) + rticker := time.NewTicker(1 * time.Second) defer rticker.Stop() sticker := time.NewTicker(1 * time.Second) @@ -411,8 +473,29 @@ func (p *AURPPeer) Handle(ctx context.Context) error { if time.Since(p.lastUpdate) <= updateTimer { break } - // TODO: is there a routing update to send? + + // Are there routing updates to send? + p.pendingEventsMu.Lock() + if len(p.pendingEvents) == 0 { + p.pendingEventsMu.Unlock() + break + } + // Yes - swap the slices, release the mutex, then send them + pending := p.pendingEvents + p.pendingEvents = make(aurp.EventTuples, 0, cap(pending)) + p.pendingEventsMu.Unlock() + + // TODO: eliminate events that cancel out (e.g. NA then ND) + // TODO: split pending events to fit within a packet + p.bumpLastUpdate() + aurp.Inc(&p.Transport.LocalSeq) + lastRISent = p.Transport.NewRIUpdPacket(pending) + if _, err := p.send(lastRISent); err != nil { + log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err) + return err + } + p.setSState(SenderWaitForRIUpdAck) case SenderWaitForRIRspAck, SenderWaitForRIUpdAck: if time.Since(p.lastSend) <= sendRetryTimer { @@ -425,6 +508,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { if p.sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached, closing connection") p.setSState(SenderUnconnected) + p.RouteTable.RemoveObserver(p) continue } p.incSendRetries() @@ -439,6 +523,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { break } p.setSState(SenderUnconnected) + p.RouteTable.RemoveObserver(p) } case pkt := <-p.ReceiveCh: @@ -474,7 +559,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error { return err } if orsp.RateOrErrCode >= 0 { + // Data sender is successfully in connected state p.setSState(SenderConnected) + p.RouteTable.AddObserver(p) } // If receiver is unconnected, commence connecting @@ -578,6 +665,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { p.setSState(SenderConnected) p.resetSendRetries() + p.RouteTable.AddObserver(p) // If SZI flag is set, send ZI-Rsp (transaction) if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 { diff --git a/router/route.go b/router/route.go index ae5de0c..23cb0ef 100644 --- a/router/route.go +++ b/router/route.go @@ -55,17 +55,40 @@ func (r *Route) Valid() bool { return len(r.ZoneNames) > 0 && (r.EtherTalkPeer == nil || time.Since(r.LastSeen) <= maxRouteAge) } +type RouteTableObserver interface { + RouteAdded(*Route) + RouteDeleted(*Route) + RouteDistanceChanged(*Route) + RouteForwarderChanged(*Route) +} + type RouteTable struct { - mu sync.Mutex - routes map[*Route]struct{} + routesMu sync.RWMutex + routes map[*Route]struct{} + + observersMu sync.RWMutex + observers map[RouteTableObserver]struct{} } func NewRouteTable() *RouteTable { return &RouteTable{ - routes: make(map[*Route]struct{}), + routes: make(map[*Route]struct{}), + observers: make(map[RouteTableObserver]struct{}), } } +func (rt *RouteTable) AddObserver(obs RouteTableObserver) { + rt.observersMu.Lock() + defer rt.observersMu.Unlock() + rt.observers[obs] = struct{}{} +} + +func (rt *RouteTable) RemoveObserver(obs RouteTableObserver) { + rt.observersMu.Lock() + defer rt.observersMu.Unlock() + delete(rt.observers, obs) +} + func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) { r := &Route{ Extended: true, @@ -77,14 +100,14 @@ func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) { EtherTalkDirect: port, } - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() rt.routes[r] = struct{}{} } func (rt *RouteTable) Dump() []Route { - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() table := make([]Route, 0, len(rt.routes)) for r := range rt.routes { @@ -94,8 +117,8 @@ func (rt *RouteTable) Dump() []Route { } func (rt *RouteTable) LookupRoute(network ddp.Network) *Route { - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() var bestRoute *Route for r := range rt.routes { @@ -117,8 +140,8 @@ func (rt *RouteTable) LookupRoute(network ddp.Network) *Route { } func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) { - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() for route := range rt.routes { if route.AURPPeer == peer { @@ -128,8 +151,8 @@ func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) { } func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) { - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() for route := range rt.routes { if route.AURPPeer == peer && route.NetStart == network { @@ -139,8 +162,8 @@ func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) } func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Network, distance uint8) { - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() for route := range rt.routes { if route.AURPPeer == peer && route.NetStart == network { @@ -158,8 +181,8 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n return nil, fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd) } - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() // Update? for r := range rt.routes { @@ -210,16 +233,16 @@ func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, n AURPPeer: peer, } - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() rt.routes[r] = struct{}{} return nil } // ValidRoutes returns all valid routes. func (rt *RouteTable) ValidRoutes() []*Route { - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.RLock() + defer rt.routesMu.RUnlock() valid := make([]*Route, 0, len(rt.routes)) for r := range rt.routes { if r.Valid() { @@ -231,8 +254,8 @@ func (rt *RouteTable) ValidRoutes() []*Route { // ValidNonAURPRoutes returns all valid routes that were not learned via AURP. func (rt *RouteTable) ValidNonAURPRoutes() []*Route { - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.RLock() + defer rt.routesMu.RUnlock() valid := make([]*Route, 0, len(rt.routes)) for r := range rt.routes { if r.AURPPeer != nil { diff --git a/router/zones.go b/router/zones.go index 42bf81d..2043c03 100644 --- a/router/zones.go +++ b/router/zones.go @@ -23,8 +23,8 @@ import ( ) func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) { - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() for r := range rt.routes { if n < r.NetStart || n > r.NetEnd { continue @@ -39,8 +39,8 @@ func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) { func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]string { zs := make(map[ddp.Network][]string) - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() for r := range rt.routes { if !r.Valid() { continue @@ -55,8 +55,8 @@ func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]strin } func (rt *RouteTable) RoutesForZone(zone string) []*Route { - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() var routes []*Route for r := range rt.routes { @@ -73,8 +73,8 @@ func (rt *RouteTable) RoutesForZone(zone string) []*Route { func (rt *RouteTable) AllZoneNames() (zones []string) { defer slices.Sort(zones) - rt.mu.Lock() - defer rt.mu.Unlock() + rt.routesMu.Lock() + defer rt.routesMu.Unlock() zs := make(StringSet) for r := range rt.routes {