Compare commits

..

26 commits

Author SHA1 Message Date
ca0e81eaa1 Add Dockerfile 2024-08-11 18:50:26 +10:00
100d70dccf
Ensure frame padding 2024-08-10 16:15:11 +10:00
0f8297e670 Update README.md 2024-08-10 15:51:50 +10:00
8d04a405f5 More build/install notes 2024-08-10 15:47:37 +10:00
4dbf1b6f64
Another strict size workaround 2024-06-16 13:08:53 +10:00
668fe5e722
Merge branch 'main' of gitea:josh/jrouter 2024-06-16 10:21:58 +10:00
2f1b4c1ce1
Workaround for trailing data on AARP packets 2024-06-16 10:21:56 +10:00
d063075907 Add contact info 2024-06-16 09:36:30 +10:00
d498f5d6cc Merge pull request 'Send RI-Upds to peers' (#14) from route-updates into main
Reviewed-on: #14
2024-06-07 16:02:23 +10:00
a14ae919ff
Send pending routing events 2024-06-07 16:00:43 +10:00
9c808cbf63
Dry up creating new AURP peers 2024-05-24 16:57:54 +10:00
9e3b311b56
Add route table observers 2024-05-24 16:57:54 +10:00
d25c2eb362
Send -> send 2024-05-12 18:31:01 +10:00
9fe09c0a9b
refactor ago 2024-05-12 18:12:27 +10:00
331da095c3
AURP peer timers in status 2024-05-12 18:07:27 +10:00
3067daa264
Fix Null event tuple coding 2024-05-12 12:08:33 +10:00
3a2cbdcde8
Merge branch 'main' of gitea:josh/jrouter 2024-05-05 18:21:32 +10:00
19af36081c
Update README 2024-05-05 18:21:31 +10:00
f9d63e8358 Merge pull request 'Table unification' (#2) from table-unification into main
Reviewed-on: #2
2024-05-05 18:15:46 +10:00
376c09d189
More cleanups 2024-05-05 18:13:40 +10:00
c1f84c3f29
Cleanup 2024-05-05 18:09:38 +10:00
95eec9564f
Fix obvious bugs 2024-05-05 18:04:54 +10:00
10d4610e0d
Zone name slices -> sets 2024-05-05 17:59:49 +10:00
5f3bfe2f76
AURP: send more routes / zones 2024-05-05 17:25:11 +10:00
7e9fe4ff98
Unify route and zone tables 2024-05-05 17:01:23 +10:00
8ce8f52776 Merge pull request 'Multi-port refactor' (#1) from multi-port-refactor into main
Reviewed-on: #1
2024-05-04 17:06:20 +10:00
15 changed files with 687 additions and 380 deletions

12
Dockerfile Normal file
View file

@ -0,0 +1,12 @@
FROM golang:alpine AS builder
WORKDIR /go/src/jrouter
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg/mod \
apk add build-base libpcap-dev && \
CGO_ENABLED=1 CGO_FLAGS='-static' go build -v -o jrouter .
FROM alpine:latest
COPY --from=builder /go/src/jrouter/jrouter /usr/bin/
RUN apk add libpcap
ENTRYPOINT ["/usr/bin/jrouter", "-config", "/etc/jrouter/jrouter.yaml"]

View file

@ -11,21 +11,25 @@ Home-grown alternative implementation of Apple Internet Router 3.0
TashTalk could be a stretch goal, if I can acquire one!
## Things that used to be caveats
* Previously it would listen for all EtherTalk traffic, regardless of destination.
Now it doesn't do that, which should help it co-exist with other routers on
the same host.
* You can configure an alternate Ethernet address if you are reusing the same
network interface for multiple different EtherTalk software.
* In addition to the configured EtherTalk network and zone, it now learns routes
and zones from other EtherTalk routers, and should share them across AURP.
* There's a status server. Browse to http://\[your router\]:9459/status to see
information about the state of jrouter.
## Caveats
Things I plan to fix Real Soon Now:
* ✅ Fixed ~~It currently listens to all AppleTalk and AARP traffic on the EtherTalk port.
This might not play well with other AppleTalk software, e.g. netatalk.~~
* ✅ Fixed ~~Also it currently uses the default Ethernet address for the interface for
sending packets. I plan to add the ability to configure a different address.~~
You can now configure a different Ethernet address for the EtherTalk
interface. I haven't tested it with netatalk or tashrouter on the same
host, but I think using a distinct Ethernet address would help them coexist.
* It currently ignores other AppleTalk routers on the EtherTalk side. This is
the next main thing to implement to make it work with e.g. netatalk.
* Some packet types need splitting to fit within limits. Some of these aren't
implemented yet (mainly encapsulated).
implemented yet (mainly encapsulated). The unimplemented ones seem unlikely to
hit those limits unless you are running a lot of routers or zones locally.
* I plan to add a Prometheus metrics endpoint and at least add log levels /
verbosity config.
* The AURP implementation is mostly there, but not fully complete. The main
@ -33,7 +37,7 @@ Things I plan to fix Real Soon Now:
Things I plan to fix At Some Point:
* For expediency I made it act as a _seed router_. At some point I might add
* For expediency I made it act as a _seed router_. At some point I might add
"soft seed" functionality.
@ -45,16 +49,50 @@ First, set up a `jrouter.yaml` (use the one in this repo as an example).
TODO: explain the configuration file
Building and running:
### Building and running directly
```shell
sudo apt install libpcap-dev
go install gitea.drjosh.dev/josh/jrouter@latest
sudo setcap 'CAP_NET_BIND_SERVICE=ep CAP_NET_RAW=ep' ~/go/bin/jrouter
~/go/bin/jrouter
```
1. Install [Go](https://go.dev/dl).
2. Run these commands (for Debian-variety Linuxen, e.g. Ubuntu, Raspbian, Mint...):
```shell
sudo apt install git build-essential libpcap-dev
go install gitea.drjosh.dev/josh/jrouter@latest
sudo setcap 'CAP_NET_BIND_SERVICE=ep CAP_NET_RAW=ep' ~/go/bin/jrouter
```
3. Configure `jrouter.yaml`
4. To run:
```shell
~/go/bin/jrouter
```
* `NET_BIND_SERVICE` is needed to bind UDP port 387 (for talking between AIRs)
* `NET_RAW` is needed for EtherTalk
Notes:
TODO: instructions for non-Linux machines
* `git` is needed for `go install` to fetch the module
* `build-essential` and`libpcap-dev` are needed for [gopacket](https://github.com/google/gopacket), which uses [CGo](https://pkg.go.dev/cmd/cgo)
* `NET_BIND_SERVICE` is needed for `jrouter` to bind UDP port 387 (for talking between AIRs)
* `NET_RAW` is needed for `jrouter` to listen for and send EtherTalk packets
TODO: instructions for non-Linux / non-Debian-like machines
### With Docker
1. Clone the repo and `cd` into it.
2. `docker build -t jrouter .`
3. Example `docker run` command:
```shell
docker run \
-v ./cfg:/etc/jrouter \
--cap-add NET_RAW \
--net host \
--name jrouter \
jrouter
```
Notes:
* Put `jrouter.yaml` inside a `cfg` directory (or some path of your choice and bind-mount it at `/etc/jrouter`) for it to find the config file.
* `--cap-add NET_RAW` and `--net host` is needed for EtherTalk access to the network interface.
* By using `--net host`, the default AURP port (387) will be bound without `-p`.
## Bug reports? Feature requests? Complaints? Praise?
You can contact me on the Fediverse at @DrJosh9000@cloudisland.nz, or email me at josh.deprez@gmail.com.

View file

@ -173,9 +173,12 @@ func (e EventTuples) WriteTo(w io.Writer) (int64, error) {
}
func parseEventTuples(p []byte) (EventTuples, error) {
// Each event tuple is at least 4 bytes, so we need to store at most
// len(p)/4 of them.
e := make(EventTuples, 0, len(p)/4)
// Event tuples can be 1, 4, or 6 bytes long. But the only type of length 1
// is the Null event type sent to probe whether or not the data receiver is
// still listening. If that's present there probably aren't any other
// tuples. Hence len(p)/4 (rounded up) is a reasonable estimate of max tuple
// count.
e := make(EventTuples, 0, (len(p)+3)/4)
for len(p) > 0 {
et, nextp, err := parseEventTuple(p)
if err != nil {
@ -198,6 +201,10 @@ type EventTuple struct {
func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
a := acc(w)
a.write8(uint8(et.EventCode))
if et.EventCode == EventCodeNull {
// null tuple
return a.ret()
}
a.write16(uint16(et.RangeStart))
if !et.Extended {
// non-extended tuple
@ -211,12 +218,18 @@ func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
}
func parseEventTuple(p []byte) (EventTuple, []byte, error) {
if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for network event tuple", len(p))
if len(p) < 1 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for any network event tuple", len(p))
}
var et EventTuple
et.EventCode = EventCode(p[0])
if et.EventCode == EventCodeNull {
return et, p[1:], nil
}
if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for non-Null network event tuple", len(p))
}
et.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[1:3]))
et.RangeEnd = et.RangeStart
et.Distance = p[3]

85
main.go
View file

@ -30,7 +30,6 @@ import (
"os"
"os/signal"
"regexp"
"runtime/debug"
"slices"
"strings"
"sync"
@ -50,6 +49,7 @@ const routingTableTemplate = `
<thead><tr>
<th>Network range</th>
<th>Extended?</th>
<th>Zone names</th>
<th>Distance</th>
<th>Last seen</th>
<th>Port</th>
@ -58,7 +58,8 @@ const routingTableTemplate = `
{{range $route := . }}
<tr>
<td>{{$route.NetStart}}{{if not (eq $route.NetStart $route.NetEnd)}} - {{$route.NetEnd}}{{end}}</td>
<td>{{if $route.Extended}}{{else}}{{end}}</td>
<td>{{if $route.Extended}}{{else}}-{{end}}</td>
<td>{{range $route.ZoneNames.ToSlice}}{{.}}<br>{{end}}</td>
<td>{{$route.Distance}}</td>
<td>{{$route.LastSeenAgo}}</td>
<td>
@ -78,27 +79,6 @@ const routingTableTemplate = `
</table>
`
const zoneTableTemplate = `
<table>
<thead><tr>
<th>Network</th>
<th>Name</th>
<th>Local Port</th>
<th>Last seen</th>
</tr></thead>
<tbody>
{{range $zone := . }}
<tr>
<td>{{$zone.Network}}</td>
<td>{{$zone.Name}}</td>
<td>{{with $zone.LocalPort}}{{.Device}}{{else}}-{{end}}</td>
<td>{{$zone.LastSeenAgo}}</td>
</tr>
{{end}}
</tbody>
</table>
`
const peerTableTemplate = `
<table>
<thead><tr>
@ -106,6 +86,11 @@ const peerTableTemplate = `
<th>Remote addr</th>
<th>Receiver state</th>
<th>Sender state</th>
<th>Last heard from</th>
<th>Last reconnect</th>
<th>Last update</th>
<th>Last send</th>
<th>Send retries</th>
</tr></thead>
<tbody>
{{range $peer := . }}
@ -114,6 +99,11 @@ const peerTableTemplate = `
<td>{{$peer.RemoteAddr}}</td>
<td>{{$peer.ReceiverState}}</td>
<td>{{$peer.SenderState}}</td>
<td>{{$peer.LastHeardFromAgo}}</td>
<td>{{$peer.LastReconnectAgo}}</td>
<td>{{$peer.LastUpdateAgo}}</td>
<td>{{$peer.LastSendAgo}}</td>
<td>{{$peer.SendRetries}}</td>
</tr>
{{end}}
</tbody>
@ -126,7 +116,9 @@ var configFilePath = flag.String("config", "jrouter.yaml", "Path to configuratio
func main() {
// For some reason it occasionally panics and the panics have no traceback?
debug.SetTraceback("all")
// This didn't help:
// debug.SetTraceback("all")
// I think it's calling recover in a defer too broadly.
flag.Parse()
log.Println("jrouter")
@ -220,15 +212,6 @@ func main() {
return rs, nil
})
zones := router.NewZoneTable()
status.AddItem(ctx, "Zone table", zoneTableTemplate, func(context.Context) (any, error) {
zs := zones.Dump()
slices.SortFunc(zs, func(za, zb router.Zone) int {
return cmp.Compare(za.Name, zb.Name)
})
return zs, nil
})
// -------------------------------- Peers ---------------------------------
var peersMu sync.Mutex
peers := make(map[udpAddr]*router.AURPPeer)
@ -315,20 +298,7 @@ func main() {
continue
}
peer := &router.AURPPeer{
Config: cfg,
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: aurp.IPDomainIdentifier(raddr.IP),
LocalConnID: nextConnID,
},
UDPConn: ln,
ConfiguredAddr: peerStr,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes,
ZoneTable: zones,
}
peer := router.NewAURPPeer(routes, ln, peerStr, raddr, localDI, nil, nextConnID)
aurp.Inc(&nextConnID)
peersMu.Lock()
peers[udpAddrFromNet(raddr)] = peer
@ -344,7 +314,7 @@ func main() {
rooter := &router.Router{
Config: cfg,
RouteTable: routes,
ZoneTable: zones,
// ZoneTable: zones,
}
etherTalkPort := &router.EtherTalkPort{
@ -353,16 +323,13 @@ func main() {
NetStart: cfg.EtherTalk.NetStart,
NetEnd: cfg.EtherTalk.NetEnd,
DefaultZoneName: cfg.EtherTalk.ZoneName,
AvailableZones: []string{cfg.EtherTalk.ZoneName},
AvailableZones: router.SetFromSlice([]string{cfg.EtherTalk.ZoneName}),
PcapHandle: pcapHandle,
AARPMachine: aarpMachine,
Router: rooter,
}
rooter.Ports = append(rooter.Ports, etherTalkPort)
routes.InsertEtherTalkDirect(etherTalkPort)
for _, az := range etherTalkPort.AvailableZones {
zones.Upsert(etherTalkPort.NetStart, az, etherTalkPort)
}
// --------------------------------- RTMP ---------------------------------
go etherTalkPort.RunRTMP(ctx)
@ -427,19 +394,7 @@ func main() {
continue
}
// New peer!
pr = &router.AURPPeer{
Config: cfg,
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: dh.SourceDI, // platinum rule
LocalConnID: nextConnID,
},
UDPConn: ln,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes,
ZoneTable: zones,
}
pr = router.NewAURPPeer(routes, ln, "", raddr, localDI, dh.SourceDI, nextConnID)
aurp.Inc(&nextConnID)
peers[ra] = pr
goPeerHandler(pr)

View file

@ -37,6 +37,8 @@ const (
maxAMTEntryAge = 30 * time.Second
aarpRequestRetransmit = 1 * time.Second
aarpRequestTimeout = 10 * time.Second
aarpBodyLength = 28 // bytes
)
const aarpStatusTemplate = `
@ -181,8 +183,17 @@ func (a *AARPMachine) Run(ctx context.Context) error {
a.incomingCh = nil
}
// sfiera/multitalk will return an "excess data" error if the
// payload is too big. Most traffic I've seen locally does not have
// this problem, but I've seen one report with some junk trailing
// data on AARP packets.
payload := ethFrame.Payload
if len(payload) > aarpBodyLength {
payload = payload[:aarpBodyLength]
}
var aapkt aarp.Packet
if err := aarp.Unmarshal(ethFrame.Payload, &aapkt); err != nil {
if err := aarp.Unmarshal(payload, &aapkt); err != nil {
log.Printf("Couldn't unmarshal AARP packet: %v", err)
continue
}
@ -326,6 +337,9 @@ func (a *AARPMachine) heyThatsMe(targ aarp.AddrPair) error {
if err != nil {
return err
}
if len(respFrameRaw) < 64 {
respFrameRaw = append(respFrameRaw, make([]byte, 64-len(respFrameRaw))...)
}
return a.pcapHandle.WritePacketData(respFrameRaw)
}
@ -339,6 +353,9 @@ func (a *AARPMachine) probe() error {
if err != nil {
return err
}
if len(probeFrameRaw) < 64 {
probeFrameRaw = append(probeFrameRaw, make([]byte, 64-len(probeFrameRaw))...)
}
return a.pcapHandle.WritePacketData(probeFrameRaw)
}
@ -352,6 +369,9 @@ func (a *AARPMachine) request(ddpAddr ddp.Addr) error {
if err != nil {
return err
}
if len(reqFrameRaw) < 64 {
reqFrameRaw = append(reqFrameRaw, make([]byte, 64-len(reqFrameRaw))...)
}
return a.pcapHandle.WritePacketData(reqFrameRaw)
}
@ -378,10 +398,7 @@ func (e AMTEntry) Valid() bool {
// LastUpdatedAgo is a friendly string reporting how long ago the entry was
// updated/resolved.
func (e AMTEntry) LastUpdatedAgo() string {
if e.LastUpdated.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(e.LastUpdated).Truncate(time.Millisecond))
return ago(e.LastUpdated)
}
// addressMappingTable implements a concurrent-safe Address Mapping Table for

65
router/misc.go Normal file
View file

@ -0,0 +1,65 @@
/*
Copyright 2024 Josh Deprez
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package router
import (
"fmt"
"time"
)
// StringSet is a set of strings.
// Yep, yet another string set implementation. Took me 2 minutes to write *shrug*
type StringSet map[string]struct{}
func (set StringSet) ToSlice() []string {
ss := make([]string, 0, len(set))
for s := range set {
ss = append(ss, s)
}
return ss
}
func (set StringSet) Contains(s string) bool {
_, c := set[s]
return c
}
func (set StringSet) Insert(ss ...string) {
for _, s := range ss {
set[s] = struct{}{}
}
}
func (set StringSet) Add(t StringSet) {
for s := range t {
set[s] = struct{}{}
}
}
func SetFromSlice(ss []string) StringSet {
set := make(StringSet, len(ss))
set.Insert(ss...)
return set
}
// ago is a helper for formatting times.
func ago(t time.Time) string {
if t.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(t).Truncate(time.Millisecond))
}

View file

@ -20,7 +20,6 @@ import (
"context"
"fmt"
"log"
"slices"
"gitea.drjosh.dev/josh/jrouter/atalk"
"gitea.drjosh.dev/josh/jrouter/atalk/nbp"
@ -74,10 +73,10 @@ func (port *EtherTalkPort) handleNBPBrRq(ctx context.Context, ddpkt *ddp.ExtPack
// tuple.Zone = port.DefaultZoneName
// }
zones := port.Router.ZoneTable.LookupName(tuple.Zone)
routes := port.Router.RouteTable.RoutesForZone(tuple.Zone)
for _, z := range zones {
if outPort := z.LocalPort; outPort != nil {
for _, route := range routes {
if outPort := route.EtherTalkDirect; outPort != nil {
// If it's for a local zone, translate it to a LkUp and broadcast
// out the corresponding EtherTalk port.
// "Note: On an internet, nodes on extended networks performing lookups in
@ -147,7 +146,7 @@ func (port *EtherTalkPort) handleNBPBrRq(ctx context.Context, ddpkt *ddp.ExtPack
SrcNet: ddpkt.SrcNet,
SrcNode: ddpkt.SrcNode,
SrcSocket: ddpkt.SrcSocket,
DstNet: z.Network,
DstNet: route.NetStart,
DstNode: 0x00, // Any router for the dest network
DstSocket: 2,
Proto: ddp.ProtoNBP,
@ -170,7 +169,7 @@ func (rtr *Router) handleNBPFwdReq(ctx context.Context, ddpkt *ddp.ExtPacket, nb
tuple := &nbpkt.Tuples[0]
for _, outPort := range rtr.Ports {
if !slices.Contains(outPort.AvailableZones, tuple.Zone) {
if !outPort.AvailableZones.Contains(tuple.Zone) {
continue
}
log.Printf("NBP: Converting FwdReq to LkUp (%v)", tuple)

View file

@ -94,9 +94,6 @@ func (ss SenderState) String() string {
// AURPPeer handles the peering with a peer AURP router.
type AURPPeer struct {
// Whole router config.
Config *Config
// AURP-Tr state for producing packets.
Transport *aurp.Transport
@ -114,15 +111,78 @@ type AURPPeer struct {
// Incoming packet channel.
ReceiveCh chan aurp.Packet
// Routing table (the peer will add/remove/update routes)
RoutingTable *RouteTable
// Route table (the peer will add/remove/update routes and zones)
RouteTable *RouteTable
// Zone table (the peer will add/remove/update zones)
ZoneTable *ZoneTable
// Event tuples yet to be sent to this peer in an RI-Upd.
pendingEventsMu sync.Mutex
pendingEvents aurp.EventTuples
mu sync.RWMutex
rstate ReceiverState
sstate SenderState
// The internal states below are only set within the Handle loop, but can
// be read concurrently from outside.
mu sync.RWMutex
rstate ReceiverState
sstate SenderState
lastReconnect time.Time
lastHeardFrom time.Time
lastSend time.Time // TODO: clarify use of lastSend / sendRetries
lastUpdate time.Time
sendRetries int
}
func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, raddr *net.UDPAddr, localDI, remoteDI aurp.DomainIdentifier, connID uint16) *AURPPeer {
if remoteDI == nil {
remoteDI = aurp.IPDomainIdentifier(raddr.IP)
}
return &AURPPeer{
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: remoteDI,
LocalConnID: connID,
},
UDPConn: udpConn,
ConfiguredAddr: peerAddr,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RouteTable: routes,
}
}
func (p *AURPPeer) addPendingEvent(ec aurp.EventCode, route *Route) {
// Don't advertise routes to AURP peers to other AURP peers
if route.AURPPeer != nil {
return
}
et := aurp.EventTuple{
EventCode: ec,
Extended: route.Extended,
RangeStart: route.NetStart,
Distance: route.Distance,
RangeEnd: route.NetEnd,
}
switch ec {
case aurp.EventCodeND, aurp.EventCodeNRC:
et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0."
}
p.pendingEventsMu.Lock()
defer p.pendingEventsMu.Unlock()
p.pendingEvents = append(p.pendingEvents, et)
}
func (p *AURPPeer) RouteAdded(route *Route) {
p.addPendingEvent(aurp.EventCodeNA, route)
}
func (p *AURPPeer) RouteDeleted(route *Route) {
p.addPendingEvent(aurp.EventCodeND, route)
}
func (p *AURPPeer) RouteDistanceChanged(route *Route) {
p.addPendingEvent(aurp.EventCodeNDC, route)
}
func (p *AURPPeer) RouteForwarderChanged(route *Route) {
p.addPendingEvent(aurp.EventCodeNRC, route)
}
func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
@ -130,7 +190,7 @@ func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
if err != nil {
return err
}
_, err = p.Send(p.Transport.NewAppleTalkPacket(outPkt))
_, err = p.send(p.Transport.NewAppleTalkPacket(outPkt))
return err
}
@ -146,6 +206,36 @@ func (p *AURPPeer) SenderState() SenderState {
return p.sstate
}
func (p *AURPPeer) LastReconnectAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastReconnect)
}
func (p *AURPPeer) LastHeardFromAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastHeardFrom)
}
func (p *AURPPeer) LastSendAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastSend)
}
func (p *AURPPeer) LastUpdateAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastUpdate)
}
func (p *AURPPeer) SendRetries() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.sendRetries
}
func (p *AURPPeer) setRState(rstate ReceiverState) {
p.mu.Lock()
defer p.mu.Unlock()
@ -158,6 +248,42 @@ func (p *AURPPeer) setSState(sstate SenderState) {
p.sstate = sstate
}
func (p *AURPPeer) incSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries++
}
func (p *AURPPeer) resetSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries = 0
}
func (p *AURPPeer) bumpLastHeardFrom() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastHeardFrom = time.Now()
}
func (p *AURPPeer) bumpLastReconnect() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastReconnect = time.Now()
}
func (p *AURPPeer) bumpLastSend() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSend = time.Now()
}
func (p *AURPPeer) bumpLastUpdate() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastUpdate = time.Now()
}
func (p *AURPPeer) disconnect() {
p.mu.Lock()
defer p.mu.Unlock()
@ -165,8 +291,8 @@ func (p *AURPPeer) disconnect() {
p.sstate = SenderUnconnected
}
// Send encodes and sends pkt to the remote host.
func (p *AURPPeer) Send(pkt aurp.Packet) (int, error) {
// send encodes and sends pkt to the remote host.
func (p *AURPPeer) send(pkt aurp.Packet) (int, error) {
var b bytes.Buffer
if _, err := pkt.WriteTo(&b); err != nil {
return 0, err
@ -176,23 +302,28 @@ func (p *AURPPeer) Send(pkt aurp.Packet) (int, error) {
}
func (p *AURPPeer) Handle(ctx context.Context) error {
// Stop listening to events if the goroutine exits
defer p.RouteTable.RemoveObserver(p)
rticker := time.NewTicker(1 * time.Second)
defer rticker.Stop()
sticker := time.NewTicker(1 * time.Second)
defer sticker.Stop()
lastReconnect := time.Now()
lastHeardFrom := time.Now()
lastSend := time.Now() // TODO: clarify use of lastSend / sendRetries
lastUpdate := time.Now()
sendRetries := 0
p.mu.Lock()
p.lastReconnect = time.Now()
p.lastHeardFrom = time.Now()
p.lastSend = time.Now() // TODO: clarify use of lastSend / sendRetries
p.lastUpdate = time.Now()
p.sendRetries = 0
p.mu.Unlock()
var lastRISent aurp.Packet
p.disconnect()
// Write an Open-Req packet
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -208,7 +339,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
// Send a best-effort Router Down before returning
lastRISent = p.Transport.NewRDPacket(aurp.ErrCodeNormalClose)
if _, err := p.Send(lastRISent); err != nil {
if _, err := p.send(lastRISent); err != nil {
log.Printf("Couldn't send RD packet: %v", err)
}
return ctx.Err()
@ -216,70 +347,71 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case <-rticker.C:
switch p.rstate {
case ReceiverWaitForOpenRsp:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
if sendRetries >= sendRetryLimit {
if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection")
p.setRState(ReceiverUnconnected)
break
}
// Send another Open-Req
sendRetries++
lastSend = time.Now()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
case ReceiverConnected:
// Check LHFT, send tickle?
if time.Since(lastHeardFrom) <= lastHeardFromTimer {
if time.Since(p.lastHeardFrom) <= lastHeardFromTimer {
break
}
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err
}
p.setRState(ReceiverWaitForTickleAck)
sendRetries = 0
lastSend = time.Now()
p.resetSendRetries()
p.bumpLastSend()
case ReceiverWaitForTickleAck:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
if sendRetries >= tickleRetryLimit {
if p.sendRetries >= tickleRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
p.setRState(ReceiverUnconnected)
p.RoutingTable.DeleteAURPPeer(p)
p.RouteTable.DeleteAURPPeer(p)
break
}
sendRetries++
lastSend = time.Now()
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err
}
// still in Wait For Tickle-Ack
case ReceiverWaitForRIRsp:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
if sendRetries >= sendRetryLimit {
if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection")
p.setRState(ReceiverUnconnected)
p.RoutingTable.DeleteAURPPeer(p)
p.RouteTable.DeleteAURPPeer(p)
break
}
// RI-Req is stateless, so we don't need to cache the one we
// sent earlier just to send it again
sendRetries++
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err
}
@ -288,18 +420,18 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case ReceiverUnconnected:
// Data receiver is unconnected. If data sender is connected,
// send a null RI-Upd to check if the sender is also unconnected
if p.sstate == SenderConnected && time.Since(lastSend) > sendRetryTimer {
if sendRetries >= sendRetryLimit {
if p.sstate == SenderConnected && time.Since(p.lastSend) > sendRetryTimer {
if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection")
}
sendRetries++
lastSend = time.Now()
p.incSendRetries()
p.bumpLastSend()
aurp.Inc(&p.Transport.LocalSeq)
events := aurp.EventTuples{{
EventCode: aurp.EventCodeNull,
}}
lastRISent = p.Transport.NewRIUpdPacket(events)
if _, err := p.Send(lastRISent); err != nil {
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err
}
@ -308,7 +440,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if p.ConfiguredAddr != "" {
// Periodically try to reconnect, if this peer is in the config file
if time.Since(lastReconnect) <= reconnectTimer {
if time.Since(p.lastReconnect) <= reconnectTimer {
break
}
@ -321,10 +453,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr)
p.RemoteAddr = raddr
lastReconnect = time.Now()
sendRetries = 0
lastSend = time.Now()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
p.bumpLastReconnect()
p.resetSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -338,40 +470,64 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing
case SenderConnected:
if time.Since(lastUpdate) <= updateTimer {
if time.Since(p.lastUpdate) <= updateTimer {
break
}
// TODO: is there a routing update to send?
// Are there routing updates to send?
p.pendingEventsMu.Lock()
if len(p.pendingEvents) == 0 {
p.pendingEventsMu.Unlock()
break
}
// Yes - swap the slices, release the mutex, then send them
pending := p.pendingEvents
p.pendingEvents = make(aurp.EventTuples, 0, cap(pending))
p.pendingEventsMu.Unlock()
// TODO: eliminate events that cancel out (e.g. NA then ND)
// TODO: split pending events to fit within a packet
p.bumpLastUpdate()
aurp.Inc(&p.Transport.LocalSeq)
lastRISent = p.Transport.NewRIUpdPacket(pending)
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err
}
p.setSState(SenderWaitForRIUpdAck)
case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
if lastRISent == nil {
log.Print("AURP Peer: sender retry: lastRISent = nil?")
continue
}
if sendRetries >= sendRetryLimit {
if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached, closing connection")
p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
continue
}
sendRetries++
lastSend = time.Now()
if _, err := p.Send(lastRISent); err != nil {
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err)
return err
}
case SenderWaitForRDAck:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
}
case pkt := <-p.ReceiveCh:
lastHeardFrom = time.Now()
p.bumpLastHeardFrom()
switch pkt := pkt.(type) {
case *aurp.OpenReqPacket:
@ -398,19 +554,21 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
orsp = p.Transport.NewOpenRspPacket(0, 1, nil)
}
if _, err := p.Send(orsp); err != nil {
if _, err := p.send(orsp); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err)
return err
}
if orsp.RateOrErrCode >= 0 {
// Data sender is successfully in connected state
p.setSState(SenderConnected)
p.RouteTable.AddObserver(p)
}
// If receiver is unconnected, commence connecting
if p.rstate == ReceiverUnconnected {
lastSend = time.Now()
sendRetries = 0
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
p.resetSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -431,8 +589,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.setRState(ReceiverConnected)
// Send an RI-Req
sendRetries = 0
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
p.resetSendRetries()
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err
}
@ -443,17 +601,19 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Received RI-Req but was not expecting one (sender state was %v)", p.sstate)
}
nets := aurp.NetworkTuples{
{
Extended: true,
RangeStart: p.Config.EtherTalk.NetStart,
RangeEnd: p.Config.EtherTalk.NetEnd,
Distance: 0,
},
var nets aurp.NetworkTuples
for _, r := range p.RouteTable.ValidNonAURPRoutes() {
nets = append(nets, aurp.NetworkTuple{
Extended: r.Extended,
RangeStart: r.NetStart,
RangeEnd: r.NetEnd,
Distance: r.Distance,
})
}
p.Transport.LocalSeq = 1
// TODO: Split tuples across multiple packets as required
lastRISent = p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets)
if _, err := p.Send(lastRISent); err != nil {
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err)
return err
}
@ -467,7 +627,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks)
for _, nt := range pkt.Networks {
p.RoutingTable.InsertAURPRoute(
p.RouteTable.InsertAURPRoute(
p,
nt.Extended,
ddp.Network(nt.RangeStart),
@ -478,7 +638,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// TODO: track which networks we don't have zone info for, and
// only set SZI for those ?
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err)
return err
}
@ -504,15 +664,33 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
p.setSState(SenderConnected)
sendRetries = 0
p.resetSendRetries()
p.RouteTable.AddObserver(p)
// If SZI flag is set, send ZI-Rsp (transaction)
// TODO: split ZI-Rsp packets similarly to ZIP Replies
if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {
zones := map[ddp.Network][]string{
p.Config.EtherTalk.NetStart: {p.Config.EtherTalk.ZoneName},
// Inspect last routing info packet sent to determine
// networks to gather names for
var nets []ddp.Network
switch last := lastRISent.(type) {
case *aurp.RIRspPacket:
for _, nt := range last.Networks {
nets = append(nets, nt.RangeStart)
}
case *aurp.RIUpdPacket:
for _, et := range last.Events {
// Only networks that were added
if et.EventCode != aurp.EventCodeNA {
continue
}
nets = append(nets, et.RangeStart)
}
}
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
zones := p.RouteTable.ZonesForNetworks(nets)
// TODO: split ZI-Rsp packets similarly to ZIP Replies
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
}
}
@ -523,9 +701,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Receiver is unconnected, but their receiver sent us an
// RI-Ack for something
// Try to reconnect?
lastSend = time.Now()
sendRetries = 0
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
p.resetSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -533,7 +711,6 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
case *aurp.RIUpdPacket:
var ackFlag aurp.RoutingFlag
for _, et := range pkt.Events {
@ -543,7 +720,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing except respond with RI-Ack
case aurp.EventCodeNA:
if err := p.RoutingTable.InsertAURPRoute(
if err := p.RouteTable.InsertAURPRoute(
p,
et.Extended,
et.RangeStart,
@ -555,10 +732,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
ackFlag = aurp.RoutingFlagSendZoneInfo
case aurp.EventCodeND:
p.RoutingTable.DeleteAURPPeerNetwork(p, et.RangeStart)
p.RouteTable.DeleteAURPPeerNetwork(p, et.RangeStart)
case aurp.EventCodeNDC:
p.RoutingTable.UpdateAURPRouteDistance(p, et.RangeStart, et.Distance+1)
p.RouteTable.UpdateAURPRouteDistance(p, et.RangeStart, et.Distance+1)
case aurp.EventCodeNRC:
// "An exterior router sends a Network Route Change
@ -566,14 +743,14 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// through its local internet changes to a path through
// a tunneling port, causing split-horizoned processing
// to eliminate that networks routing information."
p.RoutingTable.DeleteAURPPeerNetwork(p, et.RangeStart)
p.RouteTable.DeleteAURPPeerNetwork(p, et.RangeStart)
case aurp.EventCodeZC:
// "This event is reserved for future use."
}
}
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err
}
@ -584,10 +761,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode)
p.RoutingTable.DeleteAURPPeer(p)
p.RouteTable.DeleteAURPPeer(p)
// Respond with RI-Ack
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err
}
@ -596,8 +773,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIReqPacket:
// TODO: split ZI-Rsp packets similarly to ZIP Replies
zones := p.ZoneTable.Query(pkt.Networks)
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
zones := p.RouteTable.ZonesForNetworks(pkt.Networks)
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
return err
}
@ -605,11 +782,11 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIRspPacket:
log.Printf("AURP Peer: Learned about these zones: %v", pkt.Zones)
for _, zt := range pkt.Zones {
p.ZoneTable.Upsert(ddp.Network(zt.Network), zt.Name, nil)
p.RouteTable.AddZonesToNetwork(zt.Network, zt.Name)
}
case *aurp.GDZLReqPacket:
if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
if _, err := p.send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err)
return err
}
@ -618,7 +795,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird")
case *aurp.GZNReqPacket:
if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
if _, err := p.send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err)
return err
}
@ -628,7 +805,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.TicklePacket:
// Immediately respond with Tickle-Ack
if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil {
if _, err := p.send(p.Transport.NewTickleAckPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err)
return err
}

View file

@ -46,5 +46,8 @@ func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error {
if err != nil {
return err
}
if len(outFrameRaw) < 64 {
outFrameRaw = append(outFrameRaw, make([]byte, 64-len(outFrameRaw))...)
}
return p.Port.PcapHandle.WritePacketData(outFrameRaw)
}

View file

@ -18,6 +18,7 @@ package router
import (
"context"
"encoding/binary"
"errors"
"io"
"log"
@ -37,7 +38,7 @@ type EtherTalkPort struct {
NetEnd ddp.Network
MyAddr ddp.Addr
DefaultZoneName string
AvailableZones []string
AvailableZones StringSet
PcapHandle *pcap.Handle
AARPMachine *AARPMachine
Router *Router
@ -79,8 +80,18 @@ func (port *EtherTalkPort) Serve(ctx context.Context) {
case ethertalk.AppleTalkProto:
// log.Print("Got an AppleTalk frame")
// Workaround for strict length checking in sfiera/multitalk
payload := ethFrame.Payload
if len(payload) < 2 {
log.Printf("Couldn't unmarshal DDP packet: too small (length = %d)", len(payload))
}
if size := binary.BigEndian.Uint16(payload[:2]) & 0x3ff; len(payload) > int(size) {
payload = payload[:size]
}
ddpkt := new(ddp.ExtPacket)
if err := ddp.ExtUnmarshal(ethFrame.Payload, ddpkt); err != nil {
if err := ddp.ExtUnmarshal(payload, ddpkt); err != nil {
log.Printf("Couldn't unmarshal DDP packet: %v", err)
continue
}
@ -189,5 +200,8 @@ func (port *EtherTalkPort) send(dstEth ethernet.Addr, pkt *ddp.ExtPacket) error
if err != nil {
return err
}
if len(outFrameRaw) < 64 {
outFrameRaw = append(outFrameRaw, make([]byte, 64-len(outFrameRaw))...)
}
return port.PcapHandle.WritePacketData(outFrameRaw)
}

View file

@ -34,6 +34,10 @@ type Route struct {
LastSeen time.Time
// ZoneNames may be empty between learning the existence of a route and
// receiving zone information.
ZoneNames StringSet
// Exactly one of the following should be set
AURPPeer *AURPPeer // Next hop is this peer router (over AURP)
EtherTalkPeer *EtherTalkPeer // Next hop is this peer router (over EtherTalk)
@ -41,23 +45,50 @@ type Route struct {
}
func (r Route) LastSeenAgo() string {
if r.LastSeen.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(r.LastSeen).Truncate(time.Millisecond))
return ago(r.LastSeen)
}
// Valid reports whether the route is valid.
// A valid route has one or more zone names, and if it is learned from a peer
// router over EtherTalk is not too old.
func (r *Route) Valid() bool {
return len(r.ZoneNames) > 0 && (r.EtherTalkPeer == nil || time.Since(r.LastSeen) <= maxRouteAge)
}
type RouteTableObserver interface {
RouteAdded(*Route)
RouteDeleted(*Route)
RouteDistanceChanged(*Route)
RouteForwarderChanged(*Route)
}
type RouteTable struct {
mu sync.Mutex
routes map[*Route]struct{}
routesMu sync.RWMutex
routes map[*Route]struct{}
observersMu sync.RWMutex
observers map[RouteTableObserver]struct{}
}
func NewRouteTable() *RouteTable {
return &RouteTable{
routes: make(map[*Route]struct{}),
routes: make(map[*Route]struct{}),
observers: make(map[RouteTableObserver]struct{}),
}
}
func (rt *RouteTable) AddObserver(obs RouteTableObserver) {
rt.observersMu.Lock()
defer rt.observersMu.Unlock()
rt.observers[obs] = struct{}{}
}
func (rt *RouteTable) RemoveObserver(obs RouteTableObserver) {
rt.observersMu.Lock()
defer rt.observersMu.Unlock()
delete(rt.observers, obs)
}
func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
r := &Route{
Extended: true,
@ -65,17 +96,18 @@ func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
NetEnd: port.NetEnd,
Distance: 0, // we're connected directly
LastSeen: time.Now(),
ZoneNames: port.AvailableZones,
EtherTalkDirect: port,
}
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.routes[r] = struct{}{}
}
func (rt *RouteTable) Dump() []Route {
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
table := make([]Route, 0, len(rt.routes))
for r := range rt.routes {
@ -85,16 +117,15 @@ func (rt *RouteTable) Dump() []Route {
}
func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
var bestRoute *Route
for r := range rt.routes {
if network < r.NetStart || network > r.NetEnd {
continue
}
// Exclude EtherTalk routes that are too old
if r.EtherTalkPeer != nil && time.Since(r.LastSeen) > maxRouteAge {
if !r.Valid() {
continue
}
if bestRoute == nil {
@ -109,8 +140,8 @@ func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
}
func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
for route := range rt.routes {
if route.AURPPeer == peer {
@ -120,8 +151,8 @@ func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
}
func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) {
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
for route := range rt.routes {
if route.AURPPeer == peer && route.NetStart == network {
@ -131,8 +162,8 @@ func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network)
}
func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Network, distance uint8) {
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
for route := range rt.routes {
if route.AURPPeer == peer && route.NetStart == network {
@ -142,16 +173,16 @@ func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Networ
}
}
func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) error {
func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) (*Route, error) {
if netStart > netEnd {
return fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd)
return nil, fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd)
}
if netStart != netEnd && !extended {
return fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd)
return nil, fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd)
}
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
// Update?
for r := range rt.routes {
@ -169,7 +200,7 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n
}
r.Distance = metric
r.LastSeen = time.Now()
return nil
return r, nil
}
// Insert.
@ -182,7 +213,7 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n
EtherTalkPeer: peer,
}
rt.routes[r] = struct{}{}
return nil
return r, nil
}
func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) error {
@ -202,19 +233,35 @@ func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, n
AURPPeer: peer,
}
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
rt.routes[r] = struct{}{}
return nil
}
// ValidRoutes returns all valid routes.
func (rt *RouteTable) ValidRoutes() []*Route {
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routesMu.RLock()
defer rt.routesMu.RUnlock()
valid := make([]*Route, 0, len(rt.routes))
for r := range rt.routes {
// Exclude EtherTalk routes that are too old
if r.EtherTalkPeer != nil && time.Since(r.LastSeen) > maxRouteAge {
if r.Valid() {
valid = append(valid, r)
}
}
return valid
}
// ValidNonAURPRoutes returns all valid routes that were not learned via AURP.
func (rt *RouteTable) ValidNonAURPRoutes() []*Route {
rt.routesMu.RLock()
defer rt.routesMu.RUnlock()
valid := make([]*Route, 0, len(rt.routes))
for r := range rt.routes {
if r.AURPPeer != nil {
continue
}
if !r.Valid() {
continue
}
valid = append(valid, r)

View file

@ -26,7 +26,6 @@ import (
type Router struct {
Config *Config
RouteTable *RouteTable
ZoneTable *ZoneTable
Ports []*EtherTalkPort
}

View file

@ -24,6 +24,7 @@ import (
"gitea.drjosh.dev/josh/jrouter/atalk"
"gitea.drjosh.dev/josh/jrouter/atalk/rtmp"
"gitea.drjosh.dev/josh/jrouter/atalk/zip"
"gitea.drjosh.dev/josh/jrouter/status"
"github.com/sfiera/multitalk/pkg/ddp"
@ -101,8 +102,7 @@ func (port *EtherTalkPort) HandleRTMP(ctx context.Context, pkt *ddp.ExtPacket) e
}
case rtmp.FunctionLoopProbe:
log.Print("RTMP: TODO: handle Loop Probes")
return nil
return fmt.Errorf("TODO: handle Loop Probes")
}
case ddp.ProtoRTMPResp:
@ -110,22 +110,51 @@ func (port *EtherTalkPort) HandleRTMP(ctx context.Context, pkt *ddp.ExtPacket) e
log.Print("RTMP: Got Response or Data")
dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data)
if err != nil {
log.Printf("RTMP: Couldn't unmarshal RTMP Data packet: %v", err)
break
return fmt.Errorf("unmarshal RTMP Data packet: %w", err)
}
peer := &EtherTalkPeer{
Port: port,
PeerAddr: dataPkt.RouterAddr,
}
for _, rt := range dataPkt.NetworkTuples {
if err := port.Router.RouteTable.UpsertEtherTalkRoute(peer, rt.Extended, rt.RangeStart, rt.RangeEnd, rt.Distance+1); err != nil {
log.Printf("RTMP: Couldn't upsert EtherTalk route: %v", err)
var noZones []ddp.Network
for _, nt := range dataPkt.NetworkTuples {
route, err := port.Router.RouteTable.UpsertEtherTalkRoute(peer, nt.Extended, nt.RangeStart, nt.RangeEnd, nt.Distance+1)
if err != nil {
return fmt.Errorf("upsert EtherTalk route: %v", err)
}
if len(route.ZoneNames) == 0 {
noZones = append(noZones, route.NetStart)
}
}
if len(noZones) > 0 {
// Send a ZIP Query for all networks we don't have zone names for.
// TODO: split networks to fit in multiple packets as needed
qryPkt, err := (&zip.QueryPacket{Networks: noZones}).Marshal()
if err != nil {
return fmt.Errorf("marshal ZIP Query packet: %w", err)
}
outDDP := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(qryPkt)) + atalk.DDPExtHeaderSize,
Cksum: 0,
SrcNet: port.MyAddr.Network,
SrcNode: port.MyAddr.Node,
SrcSocket: 6,
DstNet: pkt.SrcNet,
DstNode: pkt.SrcNode,
DstSocket: 6, // ZIP socket
Proto: ddp.ProtoZIP,
},
Data: qryPkt,
}
if err := port.Send(ctx, outDDP); err != nil {
return fmt.Errorf("sending ZIP Query: %w", err)
}
}
default:
log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto)
return fmt.Errorf("invalid DDP type %d on socket 1", pkt.Proto)
}
return nil

View file

@ -20,7 +20,6 @@ import (
"context"
"fmt"
"log"
"slices"
"gitea.drjosh.dev/josh/jrouter/atalk"
"gitea.drjosh.dev/josh/jrouter/atalk/atp"
@ -52,6 +51,9 @@ func (port *EtherTalkPort) handleZIPZIP(ctx context.Context, ddpkt *ddp.ExtPacke
case *zip.QueryPacket:
return port.handleZIPQuery(ctx, ddpkt, zipkt)
case *zip.ReplyPacket:
return port.handleZIPReply(zipkt)
case *zip.GetNetInfoPacket:
return port.handleZIPGetNetInfo(ctx, ddpkt, zipkt)
@ -62,7 +64,7 @@ func (port *EtherTalkPort) handleZIPZIP(ctx context.Context, ddpkt *ddp.ExtPacke
func (port *EtherTalkPort) handleZIPQuery(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.QueryPacket) error {
log.Printf("ZIP: Got Query for networks %v", zipkt.Networks)
networks := port.Router.ZoneTable.Query(zipkt.Networks)
networks := port.Router.RouteTable.ZonesForNetworks(zipkt.Networks)
sendReply := func(resp *zip.ReplyPacket) error {
respRaw, err := resp.Marshal()
@ -156,11 +158,21 @@ func (port *EtherTalkPort) handleZIPQuery(ctx context.Context, ddpkt *ddp.ExtPac
return nil
}
func (port *EtherTalkPort) handleZIPReply(zipkt *zip.ReplyPacket) error {
log.Printf("ZIP: Got Reply containing %v", zipkt.Networks)
// Integrate new zone information into route table.
for n, zs := range zipkt.Networks {
port.Router.RouteTable.AddZonesToNetwork(n, zs...)
}
return nil
}
func (port *EtherTalkPort) handleZIPGetNetInfo(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.GetNetInfoPacket) error {
log.Printf("ZIP: Got GetNetInfo for zone %q", zipkt.ZoneName)
// The request is zoneValid if the zone name is available on this network.
zoneValid := slices.Contains(port.AvailableZones, zipkt.ZoneName)
zoneValid := port.AvailableZones.Contains(zipkt.ZoneName)
// The multicast address we return depends on the validity of the zone
// name.
@ -258,10 +270,10 @@ func (port *EtherTalkPort) handleZIPTReq(ctx context.Context, ddpkt *ddp.ExtPack
switch gzl.Function {
case zip.FunctionGetZoneList:
resp.Zones = port.Router.ZoneTable.AllNames()
resp.Zones = port.Router.RouteTable.AllZoneNames()
case zip.FunctionGetLocalZones:
resp.Zones = port.AvailableZones
resp.Zones = port.AvailableZones.ToSlice()
case zip.FunctionGetMyZone:
// Note: This shouldn't happen on extended networks (e.g. EtherTalk)

View file

@ -17,145 +17,72 @@
package router
import (
"fmt"
"slices"
"sort"
"sync"
"time"
"github.com/sfiera/multitalk/pkg/ddp"
)
//const maxZoneAge = 10 * time.Minute // TODO: confirm
type Zone struct {
Network ddp.Network
Name string
LocalPort *EtherTalkPort // nil if remote (local to another router)
LastSeen time.Time
}
func (z Zone) LastSeenAgo() string {
if z.LastSeen.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(z.LastSeen).Truncate(time.Millisecond))
}
type zoneKey struct {
network ddp.Network
name string
}
type ZoneTable struct {
mu sync.Mutex
zones map[zoneKey]*Zone
}
func NewZoneTable() *ZoneTable {
return &ZoneTable{
zones: make(map[zoneKey]*Zone),
}
}
func (zt *ZoneTable) Dump() []Zone {
zt.mu.Lock()
defer zt.mu.Unlock()
zs := make([]Zone, 0, len(zt.zones))
for _, z := range zt.zones {
zs = append(zs, *z)
}
return zs
}
func (zt *ZoneTable) Upsert(network ddp.Network, name string, localPort *EtherTalkPort) {
zt.mu.Lock()
defer zt.mu.Unlock()
key := zoneKey{network, name}
z := zt.zones[key]
if z != nil {
z.LocalPort = localPort
z.LastSeen = time.Now()
return
}
zt.zones[key] = &Zone{
Network: network,
Name: name,
LocalPort: localPort,
LastSeen: time.Now(),
}
}
func (zt *ZoneTable) Query(ns []ddp.Network) map[ddp.Network][]string {
slices.Sort(ns)
zs := make(map[ddp.Network][]string)
zt.mu.Lock()
defer zt.mu.Unlock()
for _, z := range zt.zones {
// if time.Since(z.LastSeen) > maxZoneAge {
// continue
// }
if _, ok := slices.BinarySearch(ns, z.Network); ok {
zs[z.Network] = append(zs[z.Network], z.Name)
}
}
return zs
}
func (zt *ZoneTable) LookupName(name string) []*Zone {
zt.mu.Lock()
defer zt.mu.Unlock()
var zs []*Zone
for _, z := range zt.zones {
if z.Name == name {
zs = append(zs, z)
}
}
return zs
}
// func (zt *ZoneTable) LocalNames() []string {
// zt.mu.Lock()
// seen := make(map[string]struct{})
// zs := make([]string, 0, len(zt.zones))
// for _, z := range zt.zones {
// // if time.Since(z.LastSeen) > maxZoneAge {
// // continue
// // }
// if z.Local != nil {
// continue
// }
// if _, s := seen[z.Name]; s {
// continue
// }
// seen[z.Name] = struct{}{}
// zs = append(zs, z.Name)
// }
// zt.mu.Unlock()
// sort.Strings(zs)
// return zs
// }
func (zt *ZoneTable) AllNames() []string {
zt.mu.Lock()
seen := make(map[string]struct{})
zs := make([]string, 0, len(zt.zones))
for _, z := range zt.zones {
// if time.Since(z.LastSeen) > maxZoneAge {
// continue
// }
if _, s := seen[z.Name]; s {
func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) {
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
for r := range rt.routes {
if n < r.NetStart || n > r.NetEnd {
continue
}
seen[z.Name] = struct{}{}
zs = append(zs, z.Name)
if r.ZoneNames == nil {
r.ZoneNames = make(StringSet)
}
r.ZoneNames.Insert(zs...)
}
zt.mu.Unlock()
}
sort.Strings(zs)
func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]string {
zs := make(map[ddp.Network][]string)
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
for r := range rt.routes {
if !r.Valid() {
continue
}
if _, ok := slices.BinarySearch(ns, r.NetStart); ok {
for z := range r.ZoneNames {
zs[r.NetStart] = append(zs[r.NetStart], z)
}
}
}
return zs
}
func (rt *RouteTable) RoutesForZone(zone string) []*Route {
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
var routes []*Route
for r := range rt.routes {
if !r.Valid() {
continue
}
if r.ZoneNames.Contains(zone) {
routes = append(routes, r)
}
}
return routes
}
func (rt *RouteTable) AllZoneNames() (zones []string) {
defer slices.Sort(zones)
rt.routesMu.Lock()
defer rt.routesMu.Unlock()
zs := make(StringSet)
for r := range rt.routes {
if !r.Valid() {
continue
}
zs.Add(r.ZoneNames)
}
return zs.ToSlice()
}