Compare commits

..

No commits in common. "main" and "multi-port-refactor" have entirely different histories.

13 changed files with 351 additions and 600 deletions

View file

@ -11,25 +11,21 @@ Home-grown alternative implementation of Apple Internet Router 3.0
TashTalk could be a stretch goal, if I can acquire one!
## Things that used to be caveats
* Previously it would listen for all EtherTalk traffic, regardless of destination.
Now it doesn't do that, which should help it co-exist with other routers on
the same host.
* You can configure an alternate Ethernet address if you are reusing the same
network interface for multiple different EtherTalk software.
* In addition to the configured EtherTalk network and zone, it now learns routes
and zones from other EtherTalk routers, and should share them across AURP.
* There's a status server. Browse to http://\[your router\]:9459/status to see
information about the state of jrouter.
## Caveats
Things I plan to fix Real Soon Now:
* ✅ Fixed ~~It currently listens to all AppleTalk and AARP traffic on the EtherTalk port.
This might not play well with other AppleTalk software, e.g. netatalk.~~
* ✅ Fixed ~~Also it currently uses the default Ethernet address for the interface for
sending packets. I plan to add the ability to configure a different address.~~
You can now configure a different Ethernet address for the EtherTalk
interface. I haven't tested it with netatalk or tashrouter on the same
host, but I think using a distinct Ethernet address would help them coexist.
* It currently ignores other AppleTalk routers on the EtherTalk side. This is
the next main thing to implement to make it work with e.g. netatalk.
* Some packet types need splitting to fit within limits. Some of these aren't
implemented yet (mainly encapsulated). The unimplemented ones seem unlikely to
hit those limits unless you are running a lot of routers or zones locally.
implemented yet (mainly encapsulated).
* I plan to add a Prometheus metrics endpoint and at least add log levels /
verbosity config.
* The AURP implementation is mostly there, but not fully complete. The main
@ -62,7 +58,3 @@ sudo setcap 'CAP_NET_BIND_SERVICE=ep CAP_NET_RAW=ep' ~/go/bin/jrouter
* `NET_RAW` is needed for EtherTalk
TODO: instructions for non-Linux machines
## Bug reports? Feature requests? Complaints? Praise?
You can contact me on the Fediverse at @DrJosh9000@cloudisland.nz, or email me at josh.deprez@gmail.com.

View file

@ -173,12 +173,9 @@ func (e EventTuples) WriteTo(w io.Writer) (int64, error) {
}
func parseEventTuples(p []byte) (EventTuples, error) {
// Event tuples can be 1, 4, or 6 bytes long. But the only type of length 1
// is the Null event type sent to probe whether or not the data receiver is
// still listening. If that's present there probably aren't any other
// tuples. Hence len(p)/4 (rounded up) is a reasonable estimate of max tuple
// count.
e := make(EventTuples, 0, (len(p)+3)/4)
// Each event tuple is at least 4 bytes, so we need to store at most
// len(p)/4 of them.
e := make(EventTuples, 0, len(p)/4)
for len(p) > 0 {
et, nextp, err := parseEventTuple(p)
if err != nil {
@ -201,10 +198,6 @@ type EventTuple struct {
func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
a := acc(w)
a.write8(uint8(et.EventCode))
if et.EventCode == EventCodeNull {
// null tuple
return a.ret()
}
a.write16(uint16(et.RangeStart))
if !et.Extended {
// non-extended tuple
@ -218,18 +211,12 @@ func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
}
func parseEventTuple(p []byte) (EventTuple, []byte, error) {
if len(p) < 1 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for any network event tuple", len(p))
if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for network event tuple", len(p))
}
var et EventTuple
et.EventCode = EventCode(p[0])
if et.EventCode == EventCodeNull {
return et, p[1:], nil
}
if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for non-Null network event tuple", len(p))
}
et.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[1:3]))
et.RangeEnd = et.RangeStart
et.Distance = p[3]

80
main.go
View file

@ -50,7 +50,6 @@ const routingTableTemplate = `
<thead><tr>
<th>Network range</th>
<th>Extended?</th>
<th>Zone names</th>
<th>Distance</th>
<th>Last seen</th>
<th>Port</th>
@ -59,8 +58,7 @@ const routingTableTemplate = `
{{range $route := . }}
<tr>
<td>{{$route.NetStart}}{{if not (eq $route.NetStart $route.NetEnd)}} - {{$route.NetEnd}}{{end}}</td>
<td>{{if $route.Extended}}{{else}}-{{end}}</td>
<td>{{range $route.ZoneNames.ToSlice}}{{.}}<br>{{end}}</td>
<td>{{if $route.Extended}}{{else}}{{end}}</td>
<td>{{$route.Distance}}</td>
<td>{{$route.LastSeenAgo}}</td>
<td>
@ -80,6 +78,27 @@ const routingTableTemplate = `
</table>
`
const zoneTableTemplate = `
<table>
<thead><tr>
<th>Network</th>
<th>Name</th>
<th>Local Port</th>
<th>Last seen</th>
</tr></thead>
<tbody>
{{range $zone := . }}
<tr>
<td>{{$zone.Network}}</td>
<td>{{$zone.Name}}</td>
<td>{{with $zone.LocalPort}}{{.Device}}{{else}}-{{end}}</td>
<td>{{$zone.LastSeenAgo}}</td>
</tr>
{{end}}
</tbody>
</table>
`
const peerTableTemplate = `
<table>
<thead><tr>
@ -87,11 +106,6 @@ const peerTableTemplate = `
<th>Remote addr</th>
<th>Receiver state</th>
<th>Sender state</th>
<th>Last heard from</th>
<th>Last reconnect</th>
<th>Last update</th>
<th>Last send</th>
<th>Send retries</th>
</tr></thead>
<tbody>
{{range $peer := . }}
@ -100,11 +114,6 @@ const peerTableTemplate = `
<td>{{$peer.RemoteAddr}}</td>
<td>{{$peer.ReceiverState}}</td>
<td>{{$peer.SenderState}}</td>
<td>{{$peer.LastHeardFromAgo}}</td>
<td>{{$peer.LastReconnectAgo}}</td>
<td>{{$peer.LastUpdateAgo}}</td>
<td>{{$peer.LastSendAgo}}</td>
<td>{{$peer.SendRetries}}</td>
</tr>
{{end}}
</tbody>
@ -211,6 +220,15 @@ func main() {
return rs, nil
})
zones := router.NewZoneTable()
status.AddItem(ctx, "Zone table", zoneTableTemplate, func(context.Context) (any, error) {
zs := zones.Dump()
slices.SortFunc(zs, func(za, zb router.Zone) int {
return cmp.Compare(za.Name, zb.Name)
})
return zs, nil
})
// -------------------------------- Peers ---------------------------------
var peersMu sync.Mutex
peers := make(map[udpAddr]*router.AURPPeer)
@ -297,7 +315,20 @@ func main() {
continue
}
peer := router.NewAURPPeer(routes, ln, peerStr, raddr, localDI, nil, nextConnID)
peer := &router.AURPPeer{
Config: cfg,
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: aurp.IPDomainIdentifier(raddr.IP),
LocalConnID: nextConnID,
},
UDPConn: ln,
ConfiguredAddr: peerStr,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes,
ZoneTable: zones,
}
aurp.Inc(&nextConnID)
peersMu.Lock()
peers[udpAddrFromNet(raddr)] = peer
@ -313,7 +344,7 @@ func main() {
rooter := &router.Router{
Config: cfg,
RouteTable: routes,
// ZoneTable: zones,
ZoneTable: zones,
}
etherTalkPort := &router.EtherTalkPort{
@ -322,13 +353,16 @@ func main() {
NetStart: cfg.EtherTalk.NetStart,
NetEnd: cfg.EtherTalk.NetEnd,
DefaultZoneName: cfg.EtherTalk.ZoneName,
AvailableZones: router.SetFromSlice([]string{cfg.EtherTalk.ZoneName}),
AvailableZones: []string{cfg.EtherTalk.ZoneName},
PcapHandle: pcapHandle,
AARPMachine: aarpMachine,
Router: rooter,
}
rooter.Ports = append(rooter.Ports, etherTalkPort)
routes.InsertEtherTalkDirect(etherTalkPort)
for _, az := range etherTalkPort.AvailableZones {
zones.Upsert(etherTalkPort.NetStart, az, etherTalkPort)
}
// --------------------------------- RTMP ---------------------------------
go etherTalkPort.RunRTMP(ctx)
@ -393,7 +427,19 @@ func main() {
continue
}
// New peer!
pr = router.NewAURPPeer(routes, ln, "", raddr, localDI, dh.SourceDI, nextConnID)
pr = &router.AURPPeer{
Config: cfg,
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: dh.SourceDI, // platinum rule
LocalConnID: nextConnID,
},
UDPConn: ln,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes,
ZoneTable: zones,
}
aurp.Inc(&nextConnID)
peers[ra] = pr
goPeerHandler(pr)

View file

@ -37,8 +37,6 @@ const (
maxAMTEntryAge = 30 * time.Second
aarpRequestRetransmit = 1 * time.Second
aarpRequestTimeout = 10 * time.Second
aarpBodyLength = 28 // bytes
)
const aarpStatusTemplate = `
@ -183,17 +181,8 @@ func (a *AARPMachine) Run(ctx context.Context) error {
a.incomingCh = nil
}
// sfiera/multitalk will return an "excess data" error if the
// payload is too big. Most traffic I've seen locally does not have
// this problem, but I've seen one report with some junk trailing
// data on AARP packets.
payload := ethFrame.Payload
if len(payload) > aarpBodyLength {
payload = payload[:aarpBodyLength]
}
var aapkt aarp.Packet
if err := aarp.Unmarshal(payload, &aapkt); err != nil {
if err := aarp.Unmarshal(ethFrame.Payload, &aapkt); err != nil {
log.Printf("Couldn't unmarshal AARP packet: %v", err)
continue
}
@ -389,7 +378,10 @@ func (e AMTEntry) Valid() bool {
// LastUpdatedAgo is a friendly string reporting how long ago the entry was
// updated/resolved.
func (e AMTEntry) LastUpdatedAgo() string {
return ago(e.LastUpdated)
if e.LastUpdated.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(e.LastUpdated).Truncate(time.Millisecond))
}
// addressMappingTable implements a concurrent-safe Address Mapping Table for

View file

@ -1,65 +0,0 @@
/*
Copyright 2024 Josh Deprez
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package router
import (
"fmt"
"time"
)
// StringSet is a set of strings.
// Yep, yet another string set implementation. Took me 2 minutes to write *shrug*
type StringSet map[string]struct{}
func (set StringSet) ToSlice() []string {
ss := make([]string, 0, len(set))
for s := range set {
ss = append(ss, s)
}
return ss
}
func (set StringSet) Contains(s string) bool {
_, c := set[s]
return c
}
func (set StringSet) Insert(ss ...string) {
for _, s := range ss {
set[s] = struct{}{}
}
}
func (set StringSet) Add(t StringSet) {
for s := range t {
set[s] = struct{}{}
}
}
func SetFromSlice(ss []string) StringSet {
set := make(StringSet, len(ss))
set.Insert(ss...)
return set
}
// ago is a helper for formatting times.
func ago(t time.Time) string {
if t.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(t).Truncate(time.Millisecond))
}

View file

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"log"
"slices"
"gitea.drjosh.dev/josh/jrouter/atalk"
"gitea.drjosh.dev/josh/jrouter/atalk/nbp"
@ -73,10 +74,10 @@ func (port *EtherTalkPort) handleNBPBrRq(ctx context.Context, ddpkt *ddp.ExtPack
// tuple.Zone = port.DefaultZoneName
// }
routes := port.Router.RouteTable.RoutesForZone(tuple.Zone)
zones := port.Router.ZoneTable.LookupName(tuple.Zone)
for _, route := range routes {
if outPort := route.EtherTalkDirect; outPort != nil {
for _, z := range zones {
if outPort := z.LocalPort; outPort != nil {
// If it's for a local zone, translate it to a LkUp and broadcast
// out the corresponding EtherTalk port.
// "Note: On an internet, nodes on extended networks performing lookups in
@ -146,7 +147,7 @@ func (port *EtherTalkPort) handleNBPBrRq(ctx context.Context, ddpkt *ddp.ExtPack
SrcNet: ddpkt.SrcNet,
SrcNode: ddpkt.SrcNode,
SrcSocket: ddpkt.SrcSocket,
DstNet: route.NetStart,
DstNet: z.Network,
DstNode: 0x00, // Any router for the dest network
DstSocket: 2,
Proto: ddp.ProtoNBP,
@ -169,7 +170,7 @@ func (rtr *Router) handleNBPFwdReq(ctx context.Context, ddpkt *ddp.ExtPacket, nb
tuple := &nbpkt.Tuples[0]
for _, outPort := range rtr.Ports {
if !outPort.AvailableZones.Contains(tuple.Zone) {
if !slices.Contains(outPort.AvailableZones, tuple.Zone) {
continue
}
log.Printf("NBP: Converting FwdReq to LkUp (%v)", tuple)

View file

@ -94,6 +94,9 @@ func (ss SenderState) String() string {
// AURPPeer handles the peering with a peer AURP router.
type AURPPeer struct {
// Whole router config.
Config *Config
// AURP-Tr state for producing packets.
Transport *aurp.Transport
@ -111,78 +114,15 @@ type AURPPeer struct {
// Incoming packet channel.
ReceiveCh chan aurp.Packet
// Route table (the peer will add/remove/update routes and zones)
RouteTable *RouteTable
// Routing table (the peer will add/remove/update routes)
RoutingTable *RouteTable
// Event tuples yet to be sent to this peer in an RI-Upd.
pendingEventsMu sync.Mutex
pendingEvents aurp.EventTuples
// Zone table (the peer will add/remove/update zones)
ZoneTable *ZoneTable
// The internal states below are only set within the Handle loop, but can
// be read concurrently from outside.
mu sync.RWMutex
rstate ReceiverState
sstate SenderState
lastReconnect time.Time
lastHeardFrom time.Time
lastSend time.Time // TODO: clarify use of lastSend / sendRetries
lastUpdate time.Time
sendRetries int
}
func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, raddr *net.UDPAddr, localDI, remoteDI aurp.DomainIdentifier, connID uint16) *AURPPeer {
if remoteDI == nil {
remoteDI = aurp.IPDomainIdentifier(raddr.IP)
}
return &AURPPeer{
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: remoteDI,
LocalConnID: connID,
},
UDPConn: udpConn,
ConfiguredAddr: peerAddr,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RouteTable: routes,
}
}
func (p *AURPPeer) addPendingEvent(ec aurp.EventCode, route *Route) {
// Don't advertise routes to AURP peers to other AURP peers
if route.AURPPeer != nil {
return
}
et := aurp.EventTuple{
EventCode: ec,
Extended: route.Extended,
RangeStart: route.NetStart,
Distance: route.Distance,
RangeEnd: route.NetEnd,
}
switch ec {
case aurp.EventCodeND, aurp.EventCodeNRC:
et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0."
}
p.pendingEventsMu.Lock()
defer p.pendingEventsMu.Unlock()
p.pendingEvents = append(p.pendingEvents, et)
}
func (p *AURPPeer) RouteAdded(route *Route) {
p.addPendingEvent(aurp.EventCodeNA, route)
}
func (p *AURPPeer) RouteDeleted(route *Route) {
p.addPendingEvent(aurp.EventCodeND, route)
}
func (p *AURPPeer) RouteDistanceChanged(route *Route) {
p.addPendingEvent(aurp.EventCodeNDC, route)
}
func (p *AURPPeer) RouteForwarderChanged(route *Route) {
p.addPendingEvent(aurp.EventCodeNRC, route)
mu sync.RWMutex
rstate ReceiverState
sstate SenderState
}
func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
@ -190,7 +130,7 @@ func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
if err != nil {
return err
}
_, err = p.send(p.Transport.NewAppleTalkPacket(outPkt))
_, err = p.Send(p.Transport.NewAppleTalkPacket(outPkt))
return err
}
@ -206,36 +146,6 @@ func (p *AURPPeer) SenderState() SenderState {
return p.sstate
}
func (p *AURPPeer) LastReconnectAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastReconnect)
}
func (p *AURPPeer) LastHeardFromAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastHeardFrom)
}
func (p *AURPPeer) LastSendAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastSend)
}
func (p *AURPPeer) LastUpdateAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastUpdate)
}
func (p *AURPPeer) SendRetries() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.sendRetries
}
func (p *AURPPeer) setRState(rstate ReceiverState) {
p.mu.Lock()
defer p.mu.Unlock()
@ -248,42 +158,6 @@ func (p *AURPPeer) setSState(sstate SenderState) {
p.sstate = sstate
}
func (p *AURPPeer) incSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries++
}
func (p *AURPPeer) resetSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries = 0
}
func (p *AURPPeer) bumpLastHeardFrom() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastHeardFrom = time.Now()
}
func (p *AURPPeer) bumpLastReconnect() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastReconnect = time.Now()
}
func (p *AURPPeer) bumpLastSend() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSend = time.Now()
}
func (p *AURPPeer) bumpLastUpdate() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastUpdate = time.Now()
}
func (p *AURPPeer) disconnect() {
p.mu.Lock()
defer p.mu.Unlock()
@ -291,8 +165,8 @@ func (p *AURPPeer) disconnect() {
p.sstate = SenderUnconnected
}
// send encodes and sends pkt to the remote host.
func (p *AURPPeer) send(pkt aurp.Packet) (int, error) {
// Send encodes and sends pkt to the remote host.
func (p *AURPPeer) Send(pkt aurp.Packet) (int, error) {
var b bytes.Buffer
if _, err := pkt.WriteTo(&b); err != nil {
return 0, err
@ -302,28 +176,23 @@ func (p *AURPPeer) send(pkt aurp.Packet) (int, error) {
}
func (p *AURPPeer) Handle(ctx context.Context) error {
// Stop listening to events if the goroutine exits
defer p.RouteTable.RemoveObserver(p)
rticker := time.NewTicker(1 * time.Second)
defer rticker.Stop()
sticker := time.NewTicker(1 * time.Second)
defer sticker.Stop()
p.mu.Lock()
p.lastReconnect = time.Now()
p.lastHeardFrom = time.Now()
p.lastSend = time.Now() // TODO: clarify use of lastSend / sendRetries
p.lastUpdate = time.Now()
p.sendRetries = 0
p.mu.Unlock()
lastReconnect := time.Now()
lastHeardFrom := time.Now()
lastSend := time.Now() // TODO: clarify use of lastSend / sendRetries
lastUpdate := time.Now()
sendRetries := 0
var lastRISent aurp.Packet
p.disconnect()
// Write an Open-Req packet
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -339,7 +208,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
// Send a best-effort Router Down before returning
lastRISent = p.Transport.NewRDPacket(aurp.ErrCodeNormalClose)
if _, err := p.send(lastRISent); err != nil {
if _, err := p.Send(lastRISent); err != nil {
log.Printf("Couldn't send RD packet: %v", err)
}
return ctx.Err()
@ -347,71 +216,70 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case <-rticker.C:
switch p.rstate {
case ReceiverWaitForOpenRsp:
if time.Since(p.lastSend) <= sendRetryTimer {
if time.Since(lastSend) <= sendRetryTimer {
break
}
if p.sendRetries >= sendRetryLimit {
if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection")
p.setRState(ReceiverUnconnected)
break
}
// Send another Open-Req
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
sendRetries++
lastSend = time.Now()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
case ReceiverConnected:
// Check LHFT, send tickle?
if time.Since(p.lastHeardFrom) <= lastHeardFromTimer {
if time.Since(lastHeardFrom) <= lastHeardFromTimer {
break
}
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil {
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err
}
p.setRState(ReceiverWaitForTickleAck)
p.resetSendRetries()
p.bumpLastSend()
sendRetries = 0
lastSend = time.Now()
case ReceiverWaitForTickleAck:
if time.Since(p.lastSend) <= sendRetryTimer {
if time.Since(lastSend) <= sendRetryTimer {
break
}
if p.sendRetries >= tickleRetryLimit {
if sendRetries >= tickleRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p)
p.RoutingTable.DeleteAURPPeer(p)
break
}
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil {
sendRetries++
lastSend = time.Now()
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err
}
// still in Wait For Tickle-Ack
case ReceiverWaitForRIRsp:
if time.Since(p.lastSend) <= sendRetryTimer {
if time.Since(lastSend) <= sendRetryTimer {
break
}
if p.sendRetries >= sendRetryLimit {
if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection")
p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p)
p.RoutingTable.DeleteAURPPeer(p)
break
}
// RI-Req is stateless, so we don't need to cache the one we
// sent earlier just to send it again
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
sendRetries++
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err
}
@ -420,18 +288,18 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case ReceiverUnconnected:
// Data receiver is unconnected. If data sender is connected,
// send a null RI-Upd to check if the sender is also unconnected
if p.sstate == SenderConnected && time.Since(p.lastSend) > sendRetryTimer {
if p.sendRetries >= sendRetryLimit {
if p.sstate == SenderConnected && time.Since(lastSend) > sendRetryTimer {
if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection")
}
p.incSendRetries()
p.bumpLastSend()
sendRetries++
lastSend = time.Now()
aurp.Inc(&p.Transport.LocalSeq)
events := aurp.EventTuples{{
EventCode: aurp.EventCodeNull,
}}
lastRISent = p.Transport.NewRIUpdPacket(events)
if _, err := p.send(lastRISent); err != nil {
if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err
}
@ -440,7 +308,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if p.ConfiguredAddr != "" {
// Periodically try to reconnect, if this peer is in the config file
if time.Since(p.lastReconnect) <= reconnectTimer {
if time.Since(lastReconnect) <= reconnectTimer {
break
}
@ -453,10 +321,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr)
p.RemoteAddr = raddr
p.bumpLastReconnect()
p.resetSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
lastReconnect = time.Now()
sendRetries = 0
lastSend = time.Now()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -470,64 +338,40 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing
case SenderConnected:
if time.Since(p.lastUpdate) <= updateTimer {
if time.Since(lastUpdate) <= updateTimer {
break
}
// Are there routing updates to send?
p.pendingEventsMu.Lock()
if len(p.pendingEvents) == 0 {
p.pendingEventsMu.Unlock()
break
}
// Yes - swap the slices, release the mutex, then send them
pending := p.pendingEvents
p.pendingEvents = make(aurp.EventTuples, 0, cap(pending))
p.pendingEventsMu.Unlock()
// TODO: eliminate events that cancel out (e.g. NA then ND)
// TODO: split pending events to fit within a packet
p.bumpLastUpdate()
aurp.Inc(&p.Transport.LocalSeq)
lastRISent = p.Transport.NewRIUpdPacket(pending)
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err
}
p.setSState(SenderWaitForRIUpdAck)
// TODO: is there a routing update to send?
case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
if time.Since(p.lastSend) <= sendRetryTimer {
if time.Since(lastSend) <= sendRetryTimer {
break
}
if lastRISent == nil {
log.Print("AURP Peer: sender retry: lastRISent = nil?")
continue
}
if p.sendRetries >= sendRetryLimit {
if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached, closing connection")
p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
continue
}
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(lastRISent); err != nil {
sendRetries++
lastSend = time.Now()
if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err)
return err
}
case SenderWaitForRDAck:
if time.Since(p.lastSend) <= sendRetryTimer {
if time.Since(lastSend) <= sendRetryTimer {
break
}
p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
}
case pkt := <-p.ReceiveCh:
p.bumpLastHeardFrom()
lastHeardFrom = time.Now()
switch pkt := pkt.(type) {
case *aurp.OpenReqPacket:
@ -554,21 +398,19 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
orsp = p.Transport.NewOpenRspPacket(0, 1, nil)
}
if _, err := p.send(orsp); err != nil {
if _, err := p.Send(orsp); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err)
return err
}
if orsp.RateOrErrCode >= 0 {
// Data sender is successfully in connected state
p.setSState(SenderConnected)
p.RouteTable.AddObserver(p)
}
// If receiver is unconnected, commence connecting
if p.rstate == ReceiverUnconnected {
p.resetSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
lastSend = time.Now()
sendRetries = 0
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -589,8 +431,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.setRState(ReceiverConnected)
// Send an RI-Req
p.resetSendRetries()
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
sendRetries = 0
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err
}
@ -601,19 +443,17 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Received RI-Req but was not expecting one (sender state was %v)", p.sstate)
}
var nets aurp.NetworkTuples
for _, r := range p.RouteTable.ValidNonAURPRoutes() {
nets = append(nets, aurp.NetworkTuple{
Extended: r.Extended,
RangeStart: r.NetStart,
RangeEnd: r.NetEnd,
Distance: r.Distance,
})
nets := aurp.NetworkTuples{
{
Extended: true,
RangeStart: p.Config.EtherTalk.NetStart,
RangeEnd: p.Config.EtherTalk.NetEnd,
Distance: 0,
},
}
p.Transport.LocalSeq = 1
// TODO: Split tuples across multiple packets as required
lastRISent = p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets)
if _, err := p.send(lastRISent); err != nil {
if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err)
return err
}
@ -627,7 +467,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks)
for _, nt := range pkt.Networks {
p.RouteTable.InsertAURPRoute(
p.RoutingTable.InsertAURPRoute(
p,
nt.Extended,
ddp.Network(nt.RangeStart),
@ -638,7 +478,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// TODO: track which networks we don't have zone info for, and
// only set SZI for those ?
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err)
return err
}
@ -664,33 +504,15 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
p.setSState(SenderConnected)
p.resetSendRetries()
p.RouteTable.AddObserver(p)
sendRetries = 0
// If SZI flag is set, send ZI-Rsp (transaction)
// TODO: split ZI-Rsp packets similarly to ZIP Replies
if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {
// Inspect last routing info packet sent to determine
// networks to gather names for
var nets []ddp.Network
switch last := lastRISent.(type) {
case *aurp.RIRspPacket:
for _, nt := range last.Networks {
nets = append(nets, nt.RangeStart)
}
case *aurp.RIUpdPacket:
for _, et := range last.Events {
// Only networks that were added
if et.EventCode != aurp.EventCodeNA {
continue
}
nets = append(nets, et.RangeStart)
}
zones := map[ddp.Network][]string{
p.Config.EtherTalk.NetStart: {p.Config.EtherTalk.ZoneName},
}
zones := p.RouteTable.ZonesForNetworks(nets)
// TODO: split ZI-Rsp packets similarly to ZIP Replies
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil {
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
}
}
@ -701,9 +523,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Receiver is unconnected, but their receiver sent us an
// RI-Ack for something
// Try to reconnect?
p.resetSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
lastSend = time.Now()
sendRetries = 0
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -711,6 +533,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
case *aurp.RIUpdPacket:
var ackFlag aurp.RoutingFlag
for _, et := range pkt.Events {
@ -720,7 +543,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing except respond with RI-Ack
case aurp.EventCodeNA:
if err := p.RouteTable.InsertAURPRoute(
if err := p.RoutingTable.InsertAURPRoute(
p,
et.Extended,
et.RangeStart,
@ -732,10 +555,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
ackFlag = aurp.RoutingFlagSendZoneInfo
case aurp.EventCodeND:
p.RouteTable.DeleteAURPPeerNetwork(p, et.RangeStart)
p.RoutingTable.DeleteAURPPeerNetwork(p, et.RangeStart)
case aurp.EventCodeNDC:
p.RouteTable.UpdateAURPRouteDistance(p, et.RangeStart, et.Distance+1)
p.RoutingTable.UpdateAURPRouteDistance(p, et.RangeStart, et.Distance+1)
case aurp.EventCodeNRC:
// "An exterior router sends a Network Route Change
@ -743,14 +566,14 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// through its local internet changes to a path through
// a tunneling port, causing split-horizoned processing
// to eliminate that networks routing information."
p.RouteTable.DeleteAURPPeerNetwork(p, et.RangeStart)
p.RoutingTable.DeleteAURPPeerNetwork(p, et.RangeStart)
case aurp.EventCodeZC:
// "This event is reserved for future use."
}
}
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err
}
@ -761,10 +584,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode)
p.RouteTable.DeleteAURPPeer(p)
p.RoutingTable.DeleteAURPPeer(p)
// Respond with RI-Ack
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err
}
@ -773,8 +596,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIReqPacket:
// TODO: split ZI-Rsp packets similarly to ZIP Replies
zones := p.RouteTable.ZonesForNetworks(pkt.Networks)
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil {
zones := p.ZoneTable.Query(pkt.Networks)
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
return err
}
@ -782,11 +605,11 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIRspPacket:
log.Printf("AURP Peer: Learned about these zones: %v", pkt.Zones)
for _, zt := range pkt.Zones {
p.RouteTable.AddZonesToNetwork(zt.Network, zt.Name)
p.ZoneTable.Upsert(ddp.Network(zt.Network), zt.Name, nil)
}
case *aurp.GDZLReqPacket:
if _, err := p.send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err)
return err
}
@ -795,7 +618,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird")
case *aurp.GZNReqPacket:
if _, err := p.send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err)
return err
}
@ -805,7 +628,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.TicklePacket:
// Immediately respond with Tickle-Ack
if _, err := p.send(p.Transport.NewTickleAckPacket()); err != nil {
if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err)
return err
}

View file

@ -18,7 +18,6 @@ package router
import (
"context"
"encoding/binary"
"errors"
"io"
"log"
@ -38,7 +37,7 @@ type EtherTalkPort struct {
NetEnd ddp.Network
MyAddr ddp.Addr
DefaultZoneName string
AvailableZones StringSet
AvailableZones []string
PcapHandle *pcap.Handle
AARPMachine *AARPMachine
Router *Router
@ -80,18 +79,8 @@ func (port *EtherTalkPort) Serve(ctx context.Context) {
case ethertalk.AppleTalkProto:
// log.Print("Got an AppleTalk frame")
// Workaround for strict length checking in sfiera/multitalk
payload := ethFrame.Payload
if len(payload) < 2 {
log.Printf("Couldn't unmarshal DDP packet: too small (length = %d)", len(payload))
}
if size := binary.BigEndian.Uint16(payload[:2]) & 0x3ff; len(payload) > int(size) {
payload = payload[:size]
}
ddpkt := new(ddp.ExtPacket)
if err := ddp.ExtUnmarshal(payload, ddpkt); err != nil {
if err := ddp.ExtUnmarshal(ethFrame.Payload, ddpkt); err != nil {
log.Printf("Couldn't unmarshal DDP packet: %v", err)
continue
}

View file

@ -34,10 +34,6 @@ type Route struct {
LastSeen time.Time
// ZoneNames may be empty between learning the existence of a route and
// receiving zone information.
ZoneNames StringSet
// Exactly one of the following should be set
AURPPeer *AURPPeer // Next hop is this peer router (over AURP)
EtherTalkPeer *EtherTalkPeer // Next hop is this peer router (over EtherTalk)
@ -45,50 +41,23 @@ type Route struct {
}
func (r Route) LastSeenAgo() string {
return ago(r.LastSeen)
}
// Valid reports whether the route is valid.
// A valid route has one or more zone names, and if it is learned from a peer
// router over EtherTalk is not too old.
func (r *Route) Valid() bool {
return len(r.ZoneNames) > 0 && (r.EtherTalkPeer == nil || time.Since(r.LastSeen) <= maxRouteAge)
}
type RouteTableObserver interface {
RouteAdded(*Route)
RouteDeleted(*Route)
RouteDistanceChanged(*Route)
RouteForwarderChanged(*Route)
if r.LastSeen.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(r.LastSeen).Truncate(time.Millisecond))
}
type RouteTable struct {
routesMu sync.RWMutex
routes map[*Route]struct{}
observersMu sync.RWMutex
observers map[RouteTableObserver]struct{}
mu sync.Mutex
routes map[*Route]struct{}
}
func NewRouteTable() *RouteTable {
return &RouteTable{
routes: make(map[*Route]struct{}),
observers: make(map[RouteTableObserver]struct{}),
routes: make(map[*Route]struct{}),
}
}
func (rt *RouteTable) AddObserver(obs RouteTableObserver) {
rt.observersMu.Lock()
defer rt.observersMu.Unlock()
rt.observers[obs] = struct{}{}
}
func (rt *RouteTable) RemoveObserver(obs RouteTableObserver) {
rt.observersMu.Lock()
defer rt.observersMu.Unlock()
delete(rt.observers, obs)
}
func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
r := &Route{
Extended: true,
@ -96,18 +65,17 @@ func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
NetEnd: port.NetEnd,
Distance: 0, // we're connected directly
LastSeen: time.Now(),
ZoneNames: port.AvailableZones,
EtherTalkDirect: port,
}
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routes[r] = struct{}{}
}
func (rt *RouteTable) Dump() []Route {
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.mu.Lock()
defer rt.mu.Unlock()
table := make([]Route, 0, len(rt.routes))
for r := range rt.routes {
@ -117,15 +85,16 @@ func (rt *RouteTable) Dump() []Route {
}
func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.mu.Lock()
defer rt.mu.Unlock()
var bestRoute *Route
for r := range rt.routes {
if network < r.NetStart || network > r.NetEnd {
continue
}
if !r.Valid() {
// Exclude EtherTalk routes that are too old
if r.EtherTalkPeer != nil && time.Since(r.LastSeen) > maxRouteAge {
continue
}
if bestRoute == nil {
@ -140,8 +109,8 @@ func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
}
func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.mu.Lock()
defer rt.mu.Unlock()
for route := range rt.routes {
if route.AURPPeer == peer {
@ -151,8 +120,8 @@ func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
}
func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) {
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.mu.Lock()
defer rt.mu.Unlock()
for route := range rt.routes {
if route.AURPPeer == peer && route.NetStart == network {
@ -162,8 +131,8 @@ func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network)
}
func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Network, distance uint8) {
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.mu.Lock()
defer rt.mu.Unlock()
for route := range rt.routes {
if route.AURPPeer == peer && route.NetStart == network {
@ -173,16 +142,16 @@ func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Networ
}
}
func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) (*Route, error) {
func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) error {
if netStart > netEnd {
return nil, fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd)
return fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd)
}
if netStart != netEnd && !extended {
return nil, fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd)
return fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd)
}
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.mu.Lock()
defer rt.mu.Unlock()
// Update?
for r := range rt.routes {
@ -200,7 +169,7 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n
}
r.Distance = metric
r.LastSeen = time.Now()
return r, nil
return nil
}
// Insert.
@ -213,7 +182,7 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n
EtherTalkPeer: peer,
}
rt.routes[r] = struct{}{}
return r, nil
return nil
}
func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) error {
@ -233,35 +202,19 @@ func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, n
AURPPeer: peer,
}
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routes[r] = struct{}{}
return nil
}
// ValidRoutes returns all valid routes.
func (rt *RouteTable) ValidRoutes() []*Route {
rt.routesMu.RLock()
defer rt.routesMu.RUnlock()
rt.mu.Lock()
defer rt.mu.Unlock()
valid := make([]*Route, 0, len(rt.routes))
for r := range rt.routes {
if r.Valid() {
valid = append(valid, r)
}
}
return valid
}
// ValidNonAURPRoutes returns all valid routes that were not learned via AURP.
func (rt *RouteTable) ValidNonAURPRoutes() []*Route {
rt.routesMu.RLock()
defer rt.routesMu.RUnlock()
valid := make([]*Route, 0, len(rt.routes))
for r := range rt.routes {
if r.AURPPeer != nil {
continue
}
if !r.Valid() {
// Exclude EtherTalk routes that are too old
if r.EtherTalkPeer != nil && time.Since(r.LastSeen) > maxRouteAge {
continue
}
valid = append(valid, r)

View file

@ -26,6 +26,7 @@ import (
type Router struct {
Config *Config
RouteTable *RouteTable
ZoneTable *ZoneTable
Ports []*EtherTalkPort
}

View file

@ -24,7 +24,6 @@ import (
"gitea.drjosh.dev/josh/jrouter/atalk"
"gitea.drjosh.dev/josh/jrouter/atalk/rtmp"
"gitea.drjosh.dev/josh/jrouter/atalk/zip"
"gitea.drjosh.dev/josh/jrouter/status"
"github.com/sfiera/multitalk/pkg/ddp"
@ -102,7 +101,8 @@ func (port *EtherTalkPort) HandleRTMP(ctx context.Context, pkt *ddp.ExtPacket) e
}
case rtmp.FunctionLoopProbe:
return fmt.Errorf("TODO: handle Loop Probes")
log.Print("RTMP: TODO: handle Loop Probes")
return nil
}
case ddp.ProtoRTMPResp:
@ -110,51 +110,22 @@ func (port *EtherTalkPort) HandleRTMP(ctx context.Context, pkt *ddp.ExtPacket) e
log.Print("RTMP: Got Response or Data")
dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data)
if err != nil {
return fmt.Errorf("unmarshal RTMP Data packet: %w", err)
log.Printf("RTMP: Couldn't unmarshal RTMP Data packet: %v", err)
break
}
peer := &EtherTalkPeer{
Port: port,
PeerAddr: dataPkt.RouterAddr,
}
var noZones []ddp.Network
for _, nt := range dataPkt.NetworkTuples {
route, err := port.Router.RouteTable.UpsertEtherTalkRoute(peer, nt.Extended, nt.RangeStart, nt.RangeEnd, nt.Distance+1)
if err != nil {
return fmt.Errorf("upsert EtherTalk route: %v", err)
}
if len(route.ZoneNames) == 0 {
noZones = append(noZones, route.NetStart)
}
}
if len(noZones) > 0 {
// Send a ZIP Query for all networks we don't have zone names for.
// TODO: split networks to fit in multiple packets as needed
qryPkt, err := (&zip.QueryPacket{Networks: noZones}).Marshal()
if err != nil {
return fmt.Errorf("marshal ZIP Query packet: %w", err)
}
outDDP := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(qryPkt)) + atalk.DDPExtHeaderSize,
Cksum: 0,
SrcNet: port.MyAddr.Network,
SrcNode: port.MyAddr.Node,
SrcSocket: 6,
DstNet: pkt.SrcNet,
DstNode: pkt.SrcNode,
DstSocket: 6, // ZIP socket
Proto: ddp.ProtoZIP,
},
Data: qryPkt,
}
if err := port.Send(ctx, outDDP); err != nil {
return fmt.Errorf("sending ZIP Query: %w", err)
for _, rt := range dataPkt.NetworkTuples {
if err := port.Router.RouteTable.UpsertEtherTalkRoute(peer, rt.Extended, rt.RangeStart, rt.RangeEnd, rt.Distance+1); err != nil {
log.Printf("RTMP: Couldn't upsert EtherTalk route: %v", err)
}
}
default:
return fmt.Errorf("invalid DDP type %d on socket 1", pkt.Proto)
log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto)
}
return nil

View file

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"log"
"slices"
"gitea.drjosh.dev/josh/jrouter/atalk"
"gitea.drjosh.dev/josh/jrouter/atalk/atp"
@ -51,9 +52,6 @@ func (port *EtherTalkPort) handleZIPZIP(ctx context.Context, ddpkt *ddp.ExtPacke
case *zip.QueryPacket:
return port.handleZIPQuery(ctx, ddpkt, zipkt)
case *zip.ReplyPacket:
return port.handleZIPReply(zipkt)
case *zip.GetNetInfoPacket:
return port.handleZIPGetNetInfo(ctx, ddpkt, zipkt)
@ -64,7 +62,7 @@ func (port *EtherTalkPort) handleZIPZIP(ctx context.Context, ddpkt *ddp.ExtPacke
func (port *EtherTalkPort) handleZIPQuery(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.QueryPacket) error {
log.Printf("ZIP: Got Query for networks %v", zipkt.Networks)
networks := port.Router.RouteTable.ZonesForNetworks(zipkt.Networks)
networks := port.Router.ZoneTable.Query(zipkt.Networks)
sendReply := func(resp *zip.ReplyPacket) error {
respRaw, err := resp.Marshal()
@ -158,21 +156,11 @@ func (port *EtherTalkPort) handleZIPQuery(ctx context.Context, ddpkt *ddp.ExtPac
return nil
}
func (port *EtherTalkPort) handleZIPReply(zipkt *zip.ReplyPacket) error {
log.Printf("ZIP: Got Reply containing %v", zipkt.Networks)
// Integrate new zone information into route table.
for n, zs := range zipkt.Networks {
port.Router.RouteTable.AddZonesToNetwork(n, zs...)
}
return nil
}
func (port *EtherTalkPort) handleZIPGetNetInfo(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.GetNetInfoPacket) error {
log.Printf("ZIP: Got GetNetInfo for zone %q", zipkt.ZoneName)
// The request is zoneValid if the zone name is available on this network.
zoneValid := port.AvailableZones.Contains(zipkt.ZoneName)
zoneValid := slices.Contains(port.AvailableZones, zipkt.ZoneName)
// The multicast address we return depends on the validity of the zone
// name.
@ -270,10 +258,10 @@ func (port *EtherTalkPort) handleZIPTReq(ctx context.Context, ddpkt *ddp.ExtPack
switch gzl.Function {
case zip.FunctionGetZoneList:
resp.Zones = port.Router.RouteTable.AllZoneNames()
resp.Zones = port.Router.ZoneTable.AllNames()
case zip.FunctionGetLocalZones:
resp.Zones = port.AvailableZones.ToSlice()
resp.Zones = port.AvailableZones
case zip.FunctionGetMyZone:
// Note: This shouldn't happen on extended networks (e.g. EtherTalk)

View file

@ -17,72 +17,145 @@
package router
import (
"fmt"
"slices"
"sort"
"sync"
"time"
"github.com/sfiera/multitalk/pkg/ddp"
)
func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) {
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
for r := range rt.routes {
if n < r.NetStart || n > r.NetEnd {
continue
}
if r.ZoneNames == nil {
r.ZoneNames = make(StringSet)
}
r.ZoneNames.Insert(zs...)
//const maxZoneAge = 10 * time.Minute // TODO: confirm
type Zone struct {
Network ddp.Network
Name string
LocalPort *EtherTalkPort // nil if remote (local to another router)
LastSeen time.Time
}
func (z Zone) LastSeenAgo() string {
if z.LastSeen.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(z.LastSeen).Truncate(time.Millisecond))
}
type zoneKey struct {
network ddp.Network
name string
}
type ZoneTable struct {
mu sync.Mutex
zones map[zoneKey]*Zone
}
func NewZoneTable() *ZoneTable {
return &ZoneTable{
zones: make(map[zoneKey]*Zone),
}
}
func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]string {
func (zt *ZoneTable) Dump() []Zone {
zt.mu.Lock()
defer zt.mu.Unlock()
zs := make([]Zone, 0, len(zt.zones))
for _, z := range zt.zones {
zs = append(zs, *z)
}
return zs
}
func (zt *ZoneTable) Upsert(network ddp.Network, name string, localPort *EtherTalkPort) {
zt.mu.Lock()
defer zt.mu.Unlock()
key := zoneKey{network, name}
z := zt.zones[key]
if z != nil {
z.LocalPort = localPort
z.LastSeen = time.Now()
return
}
zt.zones[key] = &Zone{
Network: network,
Name: name,
LocalPort: localPort,
LastSeen: time.Now(),
}
}
func (zt *ZoneTable) Query(ns []ddp.Network) map[ddp.Network][]string {
slices.Sort(ns)
zs := make(map[ddp.Network][]string)
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
for r := range rt.routes {
if !r.Valid() {
continue
}
if _, ok := slices.BinarySearch(ns, r.NetStart); ok {
for z := range r.ZoneNames {
zs[r.NetStart] = append(zs[r.NetStart], z)
}
zt.mu.Lock()
defer zt.mu.Unlock()
for _, z := range zt.zones {
// if time.Since(z.LastSeen) > maxZoneAge {
// continue
// }
if _, ok := slices.BinarySearch(ns, z.Network); ok {
zs[z.Network] = append(zs[z.Network], z.Name)
}
}
return zs
}
func (rt *RouteTable) RoutesForZone(zone string) []*Route {
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
func (zt *ZoneTable) LookupName(name string) []*Zone {
zt.mu.Lock()
defer zt.mu.Unlock()
var routes []*Route
for r := range rt.routes {
if !r.Valid() {
continue
}
if r.ZoneNames.Contains(zone) {
routes = append(routes, r)
var zs []*Zone
for _, z := range zt.zones {
if z.Name == name {
zs = append(zs, z)
}
}
return routes
return zs
}
func (rt *RouteTable) AllZoneNames() (zones []string) {
defer slices.Sort(zones)
// func (zt *ZoneTable) LocalNames() []string {
// zt.mu.Lock()
// seen := make(map[string]struct{})
// zs := make([]string, 0, len(zt.zones))
// for _, z := range zt.zones {
// // if time.Since(z.LastSeen) > maxZoneAge {
// // continue
// // }
// if z.Local != nil {
// continue
// }
// if _, s := seen[z.Name]; s {
// continue
// }
// seen[z.Name] = struct{}{}
// zs = append(zs, z.Name)
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
// }
// zt.mu.Unlock()
zs := make(StringSet)
for r := range rt.routes {
if !r.Valid() {
// sort.Strings(zs)
// return zs
// }
func (zt *ZoneTable) AllNames() []string {
zt.mu.Lock()
seen := make(map[string]struct{})
zs := make([]string, 0, len(zt.zones))
for _, z := range zt.zones {
// if time.Since(z.LastSeen) > maxZoneAge {
// continue
// }
if _, s := seen[z.Name]; s {
continue
}
zs.Add(r.ZoneNames)
seen[z.Name] = struct{}{}
zs = append(zs, z.Name)
}
zt.mu.Unlock()
return zs.ToSlice()
sort.Strings(zs)
return zs
}