Send RI-Upds to peers #14
4 changed files with 145 additions and 55 deletions
25
main.go
25
main.go
|
@ -297,18 +297,7 @@ func main() {
|
|||
continue
|
||||
}
|
||||
|
||||
peer := &router.AURPPeer{
|
||||
Transport: &aurp.Transport{
|
||||
LocalDI: localDI,
|
||||
RemoteDI: aurp.IPDomainIdentifier(raddr.IP),
|
||||
LocalConnID: nextConnID,
|
||||
},
|
||||
UDPConn: ln,
|
||||
ConfiguredAddr: peerStr,
|
||||
RemoteAddr: raddr,
|
||||
ReceiveCh: make(chan aurp.Packet, 1024),
|
||||
RouteTable: routes,
|
||||
}
|
||||
peer := router.NewAURPPeer(routes, ln, peerStr, raddr, localDI, nil, nextConnID)
|
||||
aurp.Inc(&nextConnID)
|
||||
peersMu.Lock()
|
||||
peers[udpAddrFromNet(raddr)] = peer
|
||||
|
@ -404,17 +393,7 @@ func main() {
|
|||
continue
|
||||
}
|
||||
// New peer!
|
||||
pr = &router.AURPPeer{
|
||||
Transport: &aurp.Transport{
|
||||
LocalDI: localDI,
|
||||
RemoteDI: dh.SourceDI, // platinum rule
|
||||
LocalConnID: nextConnID,
|
||||
},
|
||||
UDPConn: ln,
|
||||
RemoteAddr: raddr,
|
||||
ReceiveCh: make(chan aurp.Packet, 1024),
|
||||
RouteTable: routes,
|
||||
}
|
||||
pr = router.NewAURPPeer(routes, ln, "", raddr, localDI, dh.SourceDI, nextConnID)
|
||||
aurp.Inc(&nextConnID)
|
||||
peers[ra] = pr
|
||||
goPeerHandler(pr)
|
||||
|
|
|
@ -114,6 +114,10 @@ type AURPPeer struct {
|
|||
// Route table (the peer will add/remove/update routes and zones)
|
||||
RouteTable *RouteTable
|
||||
|
||||
// Event tuples yet to be sent to this peer in an RI-Upd.
|
||||
pendingEventsMu sync.Mutex
|
||||
pendingEvents aurp.EventTuples
|
||||
|
||||
// The internal states below are only set within the Handle loop, but can
|
||||
// be read concurrently from outside.
|
||||
mu sync.RWMutex
|
||||
|
@ -126,6 +130,61 @@ type AURPPeer struct {
|
|||
sendRetries int
|
||||
}
|
||||
|
||||
func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, raddr *net.UDPAddr, localDI, remoteDI aurp.DomainIdentifier, connID uint16) *AURPPeer {
|
||||
if remoteDI == nil {
|
||||
remoteDI = aurp.IPDomainIdentifier(raddr.IP)
|
||||
}
|
||||
return &AURPPeer{
|
||||
Transport: &aurp.Transport{
|
||||
LocalDI: localDI,
|
||||
RemoteDI: remoteDI,
|
||||
LocalConnID: connID,
|
||||
},
|
||||
UDPConn: udpConn,
|
||||
ConfiguredAddr: peerAddr,
|
||||
RemoteAddr: raddr,
|
||||
ReceiveCh: make(chan aurp.Packet, 1024),
|
||||
RouteTable: routes,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *AURPPeer) addPendingEvent(ec aurp.EventCode, route *Route) {
|
||||
// Don't advertise routes to AURP peers to other AURP peers
|
||||
if route.AURPPeer != nil {
|
||||
return
|
||||
}
|
||||
et := aurp.EventTuple{
|
||||
EventCode: ec,
|
||||
Extended: route.Extended,
|
||||
RangeStart: route.NetStart,
|
||||
Distance: route.Distance,
|
||||
RangeEnd: route.NetEnd,
|
||||
}
|
||||
switch ec {
|
||||
case aurp.EventCodeND, aurp.EventCodeNRC:
|
||||
et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0."
|
||||
}
|
||||
p.pendingEventsMu.Lock()
|
||||
defer p.pendingEventsMu.Unlock()
|
||||
p.pendingEvents = append(p.pendingEvents, et)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) RouteAdded(route *Route) {
|
||||
p.addPendingEvent(aurp.EventCodeNA, route)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) RouteDeleted(route *Route) {
|
||||
p.addPendingEvent(aurp.EventCodeND, route)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) RouteDistanceChanged(route *Route) {
|
||||
p.addPendingEvent(aurp.EventCodeNDC, route)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) RouteForwarderChanged(route *Route) {
|
||||
p.addPendingEvent(aurp.EventCodeNRC, route)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
|
||||
outPkt, err := ddp.ExtMarshal(*ddpkt)
|
||||
if err != nil {
|
||||
|
@ -243,6 +302,9 @@ func (p *AURPPeer) send(pkt aurp.Packet) (int, error) {
|
|||
}
|
||||
|
||||
func (p *AURPPeer) Handle(ctx context.Context) error {
|
||||
// Stop listening to events if the goroutine exits
|
||||
defer p.RouteTable.RemoveObserver(p)
|
||||
|
||||
rticker := time.NewTicker(1 * time.Second)
|
||||
defer rticker.Stop()
|
||||
sticker := time.NewTicker(1 * time.Second)
|
||||
|
@ -411,8 +473,29 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
if time.Since(p.lastUpdate) <= updateTimer {
|
||||
break
|
||||
}
|
||||
// TODO: is there a routing update to send?
|
||||
|
||||
// Are there routing updates to send?
|
||||
p.pendingEventsMu.Lock()
|
||||
if len(p.pendingEvents) == 0 {
|
||||
p.pendingEventsMu.Unlock()
|
||||
break
|
||||
}
|
||||
// Yes - swap the slices, release the mutex, then send them
|
||||
pending := p.pendingEvents
|
||||
p.pendingEvents = make(aurp.EventTuples, 0, cap(pending))
|
||||
p.pendingEventsMu.Unlock()
|
||||
|
||||
// TODO: eliminate events that cancel out (e.g. NA then ND)
|
||||
// TODO: split pending events to fit within a packet
|
||||
|
||||
p.bumpLastUpdate()
|
||||
aurp.Inc(&p.Transport.LocalSeq)
|
||||
lastRISent = p.Transport.NewRIUpdPacket(pending)
|
||||
if _, err := p.send(lastRISent); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
|
||||
return err
|
||||
}
|
||||
p.setSState(SenderWaitForRIUpdAck)
|
||||
|
||||
case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
|
||||
if time.Since(p.lastSend) <= sendRetryTimer {
|
||||
|
@ -425,6 +508,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
if p.sendRetries >= sendRetryLimit {
|
||||
log.Printf("AURP Peer: Send retry limit reached, closing connection")
|
||||
p.setSState(SenderUnconnected)
|
||||
p.RouteTable.RemoveObserver(p)
|
||||
continue
|
||||
}
|
||||
p.incSendRetries()
|
||||
|
@ -439,6 +523,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
break
|
||||
}
|
||||
p.setSState(SenderUnconnected)
|
||||
p.RouteTable.RemoveObserver(p)
|
||||
}
|
||||
|
||||
case pkt := <-p.ReceiveCh:
|
||||
|
@ -474,7 +559,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
if orsp.RateOrErrCode >= 0 {
|
||||
// Data sender is successfully in connected state
|
||||
p.setSState(SenderConnected)
|
||||
p.RouteTable.AddObserver(p)
|
||||
}
|
||||
|
||||
// If receiver is unconnected, commence connecting
|
||||
|
@ -578,6 +665,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
|
||||
p.setSState(SenderConnected)
|
||||
p.resetSendRetries()
|
||||
p.RouteTable.AddObserver(p)
|
||||
|
||||
// If SZI flag is set, send ZI-Rsp (transaction)
|
||||
if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {
|
||||
|
|
|
@ -55,17 +55,40 @@ func (r *Route) Valid() bool {
|
|||
return len(r.ZoneNames) > 0 && (r.EtherTalkPeer == nil || time.Since(r.LastSeen) <= maxRouteAge)
|
||||
}
|
||||
|
||||
type RouteTableObserver interface {
|
||||
RouteAdded(*Route)
|
||||
RouteDeleted(*Route)
|
||||
RouteDistanceChanged(*Route)
|
||||
RouteForwarderChanged(*Route)
|
||||
}
|
||||
|
||||
type RouteTable struct {
|
||||
mu sync.Mutex
|
||||
routesMu sync.RWMutex
|
||||
routes map[*Route]struct{}
|
||||
|
||||
observersMu sync.RWMutex
|
||||
observers map[RouteTableObserver]struct{}
|
||||
}
|
||||
|
||||
func NewRouteTable() *RouteTable {
|
||||
return &RouteTable{
|
||||
routes: make(map[*Route]struct{}),
|
||||
observers: make(map[RouteTableObserver]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *RouteTable) AddObserver(obs RouteTableObserver) {
|
||||
rt.observersMu.Lock()
|
||||
defer rt.observersMu.Unlock()
|
||||
rt.observers[obs] = struct{}{}
|
||||
}
|
||||
|
||||
func (rt *RouteTable) RemoveObserver(obs RouteTableObserver) {
|
||||
rt.observersMu.Lock()
|
||||
defer rt.observersMu.Unlock()
|
||||
delete(rt.observers, obs)
|
||||
}
|
||||
|
||||
func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
|
||||
r := &Route{
|
||||
Extended: true,
|
||||
|
@ -77,14 +100,14 @@ func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
|
|||
EtherTalkDirect: port,
|
||||
}
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
rt.routes[r] = struct{}{}
|
||||
}
|
||||
|
||||
func (rt *RouteTable) Dump() []Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
table := make([]Route, 0, len(rt.routes))
|
||||
for r := range rt.routes {
|
||||
|
@ -94,8 +117,8 @@ func (rt *RouteTable) Dump() []Route {
|
|||
}
|
||||
|
||||
func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
var bestRoute *Route
|
||||
for r := range rt.routes {
|
||||
|
@ -117,8 +140,8 @@ func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
|
|||
}
|
||||
|
||||
func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
for route := range rt.routes {
|
||||
if route.AURPPeer == peer {
|
||||
|
@ -128,8 +151,8 @@ func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
|
|||
}
|
||||
|
||||
func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
for route := range rt.routes {
|
||||
if route.AURPPeer == peer && route.NetStart == network {
|
||||
|
@ -139,8 +162,8 @@ func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network)
|
|||
}
|
||||
|
||||
func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Network, distance uint8) {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
for route := range rt.routes {
|
||||
if route.AURPPeer == peer && route.NetStart == network {
|
||||
|
@ -158,8 +181,8 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n
|
|||
return nil, fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd)
|
||||
}
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
// Update?
|
||||
for r := range rt.routes {
|
||||
|
@ -210,16 +233,16 @@ func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, n
|
|||
AURPPeer: peer,
|
||||
}
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
rt.routes[r] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidRoutes returns all valid routes.
|
||||
func (rt *RouteTable) ValidRoutes() []*Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.RLock()
|
||||
defer rt.routesMu.RUnlock()
|
||||
valid := make([]*Route, 0, len(rt.routes))
|
||||
for r := range rt.routes {
|
||||
if r.Valid() {
|
||||
|
@ -231,8 +254,8 @@ func (rt *RouteTable) ValidRoutes() []*Route {
|
|||
|
||||
// ValidNonAURPRoutes returns all valid routes that were not learned via AURP.
|
||||
func (rt *RouteTable) ValidNonAURPRoutes() []*Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.RLock()
|
||||
defer rt.routesMu.RUnlock()
|
||||
valid := make([]*Route, 0, len(rt.routes))
|
||||
for r := range rt.routes {
|
||||
if r.AURPPeer != nil {
|
||||
|
|
|
@ -23,8 +23,8 @@ import (
|
|||
)
|
||||
|
||||
func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
for r := range rt.routes {
|
||||
if n < r.NetStart || n > r.NetEnd {
|
||||
continue
|
||||
|
@ -39,8 +39,8 @@ func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) {
|
|||
func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]string {
|
||||
zs := make(map[ddp.Network][]string)
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
for r := range rt.routes {
|
||||
if !r.Valid() {
|
||||
continue
|
||||
|
@ -55,8 +55,8 @@ func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]strin
|
|||
}
|
||||
|
||||
func (rt *RouteTable) RoutesForZone(zone string) []*Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
var routes []*Route
|
||||
for r := range rt.routes {
|
||||
|
@ -73,8 +73,8 @@ func (rt *RouteTable) RoutesForZone(zone string) []*Route {
|
|||
func (rt *RouteTable) AllZoneNames() (zones []string) {
|
||||
defer slices.Sort(zones)
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
zs := make(StringSet)
|
||||
for r := range rt.routes {
|
||||
|
|
Loading…
Reference in a new issue