Table unification #2

Merged
josh merged 6 commits from table-unification into main 2024-05-05 18:15:46 +10:00
7 changed files with 112 additions and 179 deletions
Showing only changes of commit 7e9fe4ff98 - Show all commits

37
main.go
View file

@ -50,6 +50,7 @@ const routingTableTemplate = `
<thead><tr> <thead><tr>
<th>Network range</th> <th>Network range</th>
<th>Extended?</th> <th>Extended?</th>
<th>Zone names</th>
<th>Distance</th> <th>Distance</th>
<th>Last seen</th> <th>Last seen</th>
<th>Port</th> <th>Port</th>
@ -58,7 +59,8 @@ const routingTableTemplate = `
{{range $route := . }} {{range $route := . }}
<tr> <tr>
<td>{{$route.NetStart}}{{if not (eq $route.NetStart $route.NetEnd)}} - {{$route.NetEnd}}{{end}}</td> <td>{{$route.NetStart}}{{if not (eq $route.NetStart $route.NetEnd)}} - {{$route.NetEnd}}{{end}}</td>
<td>{{if $route.Extended}}{{else}}{{end}}</td> <td>{{if $route.Extended}}{{else}}-{{end}}</td>
<td>{{range $route.ZoneNames}}{{.}}<br>{{end}}</td>
<td>{{$route.Distance}}</td> <td>{{$route.Distance}}</td>
<td>{{$route.LastSeenAgo}}</td> <td>{{$route.LastSeenAgo}}</td>
<td> <td>
@ -220,14 +222,14 @@ func main() {
return rs, nil return rs, nil
}) })
zones := router.NewZoneTable() // zones := router.NewZoneTable()
status.AddItem(ctx, "Zone table", zoneTableTemplate, func(context.Context) (any, error) { // status.AddItem(ctx, "Zone table", zoneTableTemplate, func(context.Context) (any, error) {
zs := zones.Dump() // zs := zones.Dump()
slices.SortFunc(zs, func(za, zb router.Zone) int { // slices.SortFunc(zs, func(za, zb router.Zone) int {
return cmp.Compare(za.Name, zb.Name) // return cmp.Compare(za.Name, zb.Name)
}) // })
return zs, nil // return zs, nil
}) // })
// -------------------------------- Peers --------------------------------- // -------------------------------- Peers ---------------------------------
var peersMu sync.Mutex var peersMu sync.Mutex
@ -326,8 +328,7 @@ func main() {
ConfiguredAddr: peerStr, ConfiguredAddr: peerStr,
RemoteAddr: raddr, RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024), ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes, RouteTable: routes,
ZoneTable: zones,
} }
aurp.Inc(&nextConnID) aurp.Inc(&nextConnID)
peersMu.Lock() peersMu.Lock()
@ -344,7 +345,7 @@ func main() {
rooter := &router.Router{ rooter := &router.Router{
Config: cfg, Config: cfg,
RouteTable: routes, RouteTable: routes,
ZoneTable: zones, // ZoneTable: zones,
} }
etherTalkPort := &router.EtherTalkPort{ etherTalkPort := &router.EtherTalkPort{
@ -360,9 +361,6 @@ func main() {
} }
rooter.Ports = append(rooter.Ports, etherTalkPort) rooter.Ports = append(rooter.Ports, etherTalkPort)
routes.InsertEtherTalkDirect(etherTalkPort) routes.InsertEtherTalkDirect(etherTalkPort)
for _, az := range etherTalkPort.AvailableZones {
zones.Upsert(etherTalkPort.NetStart, az, etherTalkPort)
}
// --------------------------------- RTMP --------------------------------- // --------------------------------- RTMP ---------------------------------
go etherTalkPort.RunRTMP(ctx) go etherTalkPort.RunRTMP(ctx)
@ -434,11 +432,10 @@ func main() {
RemoteDI: dh.SourceDI, // platinum rule RemoteDI: dh.SourceDI, // platinum rule
LocalConnID: nextConnID, LocalConnID: nextConnID,
}, },
UDPConn: ln, UDPConn: ln,
RemoteAddr: raddr, RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024), ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes, RouteTable: routes,
ZoneTable: zones,
} }
aurp.Inc(&nextConnID) aurp.Inc(&nextConnID)
peers[ra] = pr peers[ra] = pr

View file

@ -74,10 +74,10 @@ func (port *EtherTalkPort) handleNBPBrRq(ctx context.Context, ddpkt *ddp.ExtPack
// tuple.Zone = port.DefaultZoneName // tuple.Zone = port.DefaultZoneName
// } // }
zones := port.Router.ZoneTable.LookupName(tuple.Zone) routes := port.Router.RouteTable.RoutesForZone(tuple.Zone)
for _, z := range zones { for _, route := range routes {
if outPort := z.LocalPort; outPort != nil { if outPort := route.EtherTalkDirect; outPort != nil {
// If it's for a local zone, translate it to a LkUp and broadcast // If it's for a local zone, translate it to a LkUp and broadcast
// out the corresponding EtherTalk port. // out the corresponding EtherTalk port.
// "Note: On an internet, nodes on extended networks performing lookups in // "Note: On an internet, nodes on extended networks performing lookups in
@ -147,7 +147,7 @@ func (port *EtherTalkPort) handleNBPBrRq(ctx context.Context, ddpkt *ddp.ExtPack
SrcNet: ddpkt.SrcNet, SrcNet: ddpkt.SrcNet,
SrcNode: ddpkt.SrcNode, SrcNode: ddpkt.SrcNode,
SrcSocket: ddpkt.SrcSocket, SrcSocket: ddpkt.SrcSocket,
DstNet: z.Network, DstNet: route.NetStart,
DstNode: 0x00, // Any router for the dest network DstNode: 0x00, // Any router for the dest network
DstSocket: 2, DstSocket: 2,
Proto: ddp.ProtoNBP, Proto: ddp.ProtoNBP,

View file

@ -114,11 +114,8 @@ type AURPPeer struct {
// Incoming packet channel. // Incoming packet channel.
ReceiveCh chan aurp.Packet ReceiveCh chan aurp.Packet
// Routing table (the peer will add/remove/update routes) // Route table (the peer will add/remove/update routes and zones)
RoutingTable *RouteTable RouteTable *RouteTable
// Zone table (the peer will add/remove/update zones)
ZoneTable *ZoneTable
mu sync.RWMutex mu sync.RWMutex
rstate ReceiverState rstate ReceiverState
@ -253,7 +250,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if sendRetries >= tickleRetryLimit { if sendRetries >= tickleRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
p.RoutingTable.DeleteAURPPeer(p) p.RouteTable.DeleteAURPPeer(p)
break break
} }
@ -272,7 +269,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
p.RoutingTable.DeleteAURPPeer(p) p.RouteTable.DeleteAURPPeer(p)
break break
} }
@ -467,7 +464,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks) log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks)
for _, nt := range pkt.Networks { for _, nt := range pkt.Networks {
p.RoutingTable.InsertAURPRoute( p.RouteTable.InsertAURPRoute(
p, p,
nt.Extended, nt.Extended,
ddp.Network(nt.RangeStart), ddp.Network(nt.RangeStart),
@ -543,7 +540,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing except respond with RI-Ack // Do nothing except respond with RI-Ack
case aurp.EventCodeNA: case aurp.EventCodeNA:
if err := p.RoutingTable.InsertAURPRoute( if err := p.RouteTable.InsertAURPRoute(
p, p,
et.Extended, et.Extended,
et.RangeStart, et.RangeStart,
@ -555,10 +552,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
ackFlag = aurp.RoutingFlagSendZoneInfo ackFlag = aurp.RoutingFlagSendZoneInfo
case aurp.EventCodeND: case aurp.EventCodeND:
p.RoutingTable.DeleteAURPPeerNetwork(p, et.RangeStart) p.RouteTable.DeleteAURPPeerNetwork(p, et.RangeStart)
case aurp.EventCodeNDC: case aurp.EventCodeNDC:
p.RoutingTable.UpdateAURPRouteDistance(p, et.RangeStart, et.Distance+1) p.RouteTable.UpdateAURPRouteDistance(p, et.RangeStart, et.Distance+1)
case aurp.EventCodeNRC: case aurp.EventCodeNRC:
// "An exterior router sends a Network Route Change // "An exterior router sends a Network Route Change
@ -566,7 +563,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// through its local internet changes to a path through // through its local internet changes to a path through
// a tunneling port, causing split-horizoned processing // a tunneling port, causing split-horizoned processing
// to eliminate that networks routing information." // to eliminate that networks routing information."
p.RoutingTable.DeleteAURPPeerNetwork(p, et.RangeStart) p.RouteTable.DeleteAURPPeerNetwork(p, et.RangeStart)
case aurp.EventCodeZC: case aurp.EventCodeZC:
// "This event is reserved for future use." // "This event is reserved for future use."
@ -584,7 +581,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode) log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode)
p.RoutingTable.DeleteAURPPeer(p) p.RouteTable.DeleteAURPPeer(p)
// Respond with RI-Ack // Respond with RI-Ack
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
@ -596,7 +593,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIReqPacket: case *aurp.ZIReqPacket:
// TODO: split ZI-Rsp packets similarly to ZIP Replies // TODO: split ZI-Rsp packets similarly to ZIP Replies
zones := p.ZoneTable.Query(pkt.Networks) zones := p.RouteTable.ZonesForNetworks(pkt.Networks)
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil { if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
return err return err
@ -605,7 +602,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIRspPacket: case *aurp.ZIRspPacket:
log.Printf("AURP Peer: Learned about these zones: %v", pkt.Zones) log.Printf("AURP Peer: Learned about these zones: %v", pkt.Zones)
for _, zt := range pkt.Zones { for _, zt := range pkt.Zones {
p.ZoneTable.Upsert(ddp.Network(zt.Network), zt.Name, nil) p.RouteTable.AddZoneToNetwork(zt.Network, zt.Name)
} }
case *aurp.GDZLReqPacket: case *aurp.GDZLReqPacket:

View file

@ -34,6 +34,10 @@ type Route struct {
LastSeen time.Time LastSeen time.Time
// ZoneNames may be empty between learning the existence of a route and
// receiving zone information.
ZoneNames []string
// Exactly one of the following should be set // Exactly one of the following should be set
AURPPeer *AURPPeer // Next hop is this peer router (over AURP) AURPPeer *AURPPeer // Next hop is this peer router (over AURP)
EtherTalkPeer *EtherTalkPeer // Next hop is this peer router (over EtherTalk) EtherTalkPeer *EtherTalkPeer // Next hop is this peer router (over EtherTalk)
@ -47,6 +51,10 @@ func (r Route) LastSeenAgo() string {
return fmt.Sprintf("%v ago", time.Since(r.LastSeen).Truncate(time.Millisecond)) return fmt.Sprintf("%v ago", time.Since(r.LastSeen).Truncate(time.Millisecond))
} }
func (r *Route) Valid() bool {
return r.EtherTalkPeer == nil || time.Since(r.LastSeen) <= maxRouteAge
}
type RouteTable struct { type RouteTable struct {
mu sync.Mutex mu sync.Mutex
routes map[*Route]struct{} routes map[*Route]struct{}
@ -65,6 +73,7 @@ func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
NetEnd: port.NetEnd, NetEnd: port.NetEnd,
Distance: 0, // we're connected directly Distance: 0, // we're connected directly
LastSeen: time.Now(), LastSeen: time.Now(),
ZoneNames: port.AvailableZones,
EtherTalkDirect: port, EtherTalkDirect: port,
} }
@ -93,8 +102,7 @@ func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
if network < r.NetStart || network > r.NetEnd { if network < r.NetStart || network > r.NetEnd {
continue continue
} }
// Exclude EtherTalk routes that are too old if !r.Valid() {
if r.EtherTalkPeer != nil && time.Since(r.LastSeen) > maxRouteAge {
continue continue
} }
if bestRoute == nil { if bestRoute == nil {
@ -213,11 +221,9 @@ func (rt *RouteTable) ValidRoutes() []*Route {
defer rt.mu.Unlock() defer rt.mu.Unlock()
valid := make([]*Route, 0, len(rt.routes)) valid := make([]*Route, 0, len(rt.routes))
for r := range rt.routes { for r := range rt.routes {
// Exclude EtherTalk routes that are too old if r.Valid() {
if r.EtherTalkPeer != nil && time.Since(r.LastSeen) > maxRouteAge { valid = append(valid, r)
continue
} }
valid = append(valid, r)
} }
return valid return valid
} }

View file

@ -26,7 +26,6 @@ import (
type Router struct { type Router struct {
Config *Config Config *Config
RouteTable *RouteTable RouteTable *RouteTable
ZoneTable *ZoneTable
Ports []*EtherTalkPort Ports []*EtherTalkPort
} }

View file

@ -62,7 +62,7 @@ func (port *EtherTalkPort) handleZIPZIP(ctx context.Context, ddpkt *ddp.ExtPacke
func (port *EtherTalkPort) handleZIPQuery(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.QueryPacket) error { func (port *EtherTalkPort) handleZIPQuery(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.QueryPacket) error {
log.Printf("ZIP: Got Query for networks %v", zipkt.Networks) log.Printf("ZIP: Got Query for networks %v", zipkt.Networks)
networks := port.Router.ZoneTable.Query(zipkt.Networks) networks := port.Router.RouteTable.ZonesForNetworks(zipkt.Networks)
sendReply := func(resp *zip.ReplyPacket) error { sendReply := func(resp *zip.ReplyPacket) error {
respRaw, err := resp.Marshal() respRaw, err := resp.Marshal()
@ -258,7 +258,7 @@ func (port *EtherTalkPort) handleZIPTReq(ctx context.Context, ddpkt *ddp.ExtPack
switch gzl.Function { switch gzl.Function {
case zip.FunctionGetZoneList: case zip.FunctionGetZoneList:
resp.Zones = port.Router.ZoneTable.AllNames() resp.Zones = port.Router.RouteTable.AllZoneNames()
case zip.FunctionGetLocalZones: case zip.FunctionGetLocalZones:
resp.Zones = port.AvailableZones resp.Zones = port.AvailableZones

View file

@ -17,145 +17,79 @@
package router package router
import ( import (
"fmt"
"slices" "slices"
"sort"
"sync"
"time"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )
//const maxZoneAge = 10 * time.Minute // TODO: confirm func (rt *RouteTable) AddZoneToNetwork(n ddp.Network, z string) {
rt.mu.Lock()
type Zone struct { defer rt.mu.Unlock()
Network ddp.Network for r := range rt.routes {
Name string if n < r.NetStart || n > r.NetEnd {
LocalPort *EtherTalkPort // nil if remote (local to another router)
LastSeen time.Time
}
func (z Zone) LastSeenAgo() string {
if z.LastSeen.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(z.LastSeen).Truncate(time.Millisecond))
}
type zoneKey struct {
network ddp.Network
name string
}
type ZoneTable struct {
mu sync.Mutex
zones map[zoneKey]*Zone
}
func NewZoneTable() *ZoneTable {
return &ZoneTable{
zones: make(map[zoneKey]*Zone),
}
}
func (zt *ZoneTable) Dump() []Zone {
zt.mu.Lock()
defer zt.mu.Unlock()
zs := make([]Zone, 0, len(zt.zones))
for _, z := range zt.zones {
zs = append(zs, *z)
}
return zs
}
func (zt *ZoneTable) Upsert(network ddp.Network, name string, localPort *EtherTalkPort) {
zt.mu.Lock()
defer zt.mu.Unlock()
key := zoneKey{network, name}
z := zt.zones[key]
if z != nil {
z.LocalPort = localPort
z.LastSeen = time.Now()
return
}
zt.zones[key] = &Zone{
Network: network,
Name: name,
LocalPort: localPort,
LastSeen: time.Now(),
}
}
func (zt *ZoneTable) Query(ns []ddp.Network) map[ddp.Network][]string {
slices.Sort(ns)
zs := make(map[ddp.Network][]string)
zt.mu.Lock()
defer zt.mu.Unlock()
for _, z := range zt.zones {
// if time.Since(z.LastSeen) > maxZoneAge {
// continue
// }
if _, ok := slices.BinarySearch(ns, z.Network); ok {
zs[z.Network] = append(zs[z.Network], z.Name)
}
}
return zs
}
func (zt *ZoneTable) LookupName(name string) []*Zone {
zt.mu.Lock()
defer zt.mu.Unlock()
var zs []*Zone
for _, z := range zt.zones {
if z.Name == name {
zs = append(zs, z)
}
}
return zs
}
// func (zt *ZoneTable) LocalNames() []string {
// zt.mu.Lock()
// seen := make(map[string]struct{})
// zs := make([]string, 0, len(zt.zones))
// for _, z := range zt.zones {
// // if time.Since(z.LastSeen) > maxZoneAge {
// // continue
// // }
// if z.Local != nil {
// continue
// }
// if _, s := seen[z.Name]; s {
// continue
// }
// seen[z.Name] = struct{}{}
// zs = append(zs, z.Name)
// }
// zt.mu.Unlock()
// sort.Strings(zs)
// return zs
// }
func (zt *ZoneTable) AllNames() []string {
zt.mu.Lock()
seen := make(map[string]struct{})
zs := make([]string, 0, len(zt.zones))
for _, z := range zt.zones {
// if time.Since(z.LastSeen) > maxZoneAge {
// continue
// }
if _, s := seen[z.Name]; s {
continue continue
} }
seen[z.Name] = struct{}{} if !r.Valid() {
zs = append(zs, z.Name) continue
}
if slices.Contains(r.ZoneNames, z) {
continue
}
r.ZoneNames = append(r.ZoneNames, z)
} }
zt.mu.Unlock() }
sort.Strings(zs) func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]string {
zs := make(map[ddp.Network][]string)
rt.mu.Lock()
defer rt.mu.Unlock()
for r := range rt.routes {
if !r.Valid() {
continue
}
if _, ok := slices.BinarySearch(ns, r.NetStart); ok {
zs[r.NetStart] = append(zs[r.NetStart], r.ZoneNames...)
}
}
return zs return zs
} }
func (rt *RouteTable) RoutesForZone(zone string) []*Route {
rt.mu.Lock()
defer rt.mu.Unlock()
var routes []*Route
for r := range rt.routes {
if !r.Valid() {
continue
}
if slices.Contains(r.ZoneNames, zone) {
routes = append(routes, r)
}
}
return routes
}
func (rt *RouteTable) AllZoneNames() (zones []string) {
defer slices.Sort(zones)
rt.mu.Lock()
defer rt.mu.Unlock()
seen := make(map[string]struct{})
for r := range rt.routes {
if !r.Valid() {
continue
}
for _, z := range r.ZoneNames {
if _, s := seen[z]; s {
continue
}
seen[z] = struct{}{}
zones = append(zones, z)
}
}
return zones
}