Compare commits

..

No commits in common. "main" and "multi-port-refactor" have entirely different histories.

22 changed files with 391 additions and 704 deletions

View file

@ -1,12 +0,0 @@
FROM golang:alpine AS builder
WORKDIR /go/src/jrouter
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg/mod \
apk add build-base libpcap-dev && \
CGO_ENABLED=1 CGO_FLAGS='-static' go build -v -o jrouter .
FROM alpine:latest
COPY --from=builder /go/src/jrouter/jrouter /usr/bin/
RUN apk add libpcap
ENTRYPOINT ["/usr/bin/jrouter", "-config", "/etc/jrouter/jrouter.yaml"]

View file

@ -11,25 +11,21 @@ Home-grown alternative implementation of Apple Internet Router 3.0
TashTalk could be a stretch goal, if I can acquire one! TashTalk could be a stretch goal, if I can acquire one!
## Things that used to be caveats
* Previously it would listen for all EtherTalk traffic, regardless of destination.
Now it doesn't do that, which should help it co-exist with other routers on
the same host.
* You can configure an alternate Ethernet address if you are reusing the same
network interface for multiple different EtherTalk software.
* In addition to the configured EtherTalk network and zone, it now learns routes
and zones from other EtherTalk routers, and should share them across AURP.
* There's a status server. Browse to http://\[your router\]:9459/status to see
information about the state of jrouter.
## Caveats ## Caveats
Things I plan to fix Real Soon Now: Things I plan to fix Real Soon Now:
* ✅ Fixed ~~It currently listens to all AppleTalk and AARP traffic on the EtherTalk port.
This might not play well with other AppleTalk software, e.g. netatalk.~~
* ✅ Fixed ~~Also it currently uses the default Ethernet address for the interface for
sending packets. I plan to add the ability to configure a different address.~~
You can now configure a different Ethernet address for the EtherTalk
interface. I haven't tested it with netatalk or tashrouter on the same
host, but I think using a distinct Ethernet address would help them coexist.
* It currently ignores other AppleTalk routers on the EtherTalk side. This is
the next main thing to implement to make it work with e.g. netatalk.
* Some packet types need splitting to fit within limits. Some of these aren't * Some packet types need splitting to fit within limits. Some of these aren't
implemented yet (mainly encapsulated). The unimplemented ones seem unlikely to implemented yet (mainly encapsulated).
hit those limits unless you are running a lot of routers or zones locally.
* I plan to add a Prometheus metrics endpoint and at least add log levels / * I plan to add a Prometheus metrics endpoint and at least add log levels /
verbosity config. verbosity config.
* The AURP implementation is mostly there, but not fully complete. The main * The AURP implementation is mostly there, but not fully complete. The main
@ -37,7 +33,7 @@ Things I plan to fix Real Soon Now:
Things I plan to fix At Some Point: Things I plan to fix At Some Point:
* For expediency I made it act as a _seed router_. At some point I might add * For expediency I made it act as a _seed router_. At some point I might add
"soft seed" functionality. "soft seed" functionality.
@ -49,50 +45,16 @@ First, set up a `jrouter.yaml` (use the one in this repo as an example).
TODO: explain the configuration file TODO: explain the configuration file
### Building and running directly Building and running:
1. Install [Go](https://go.dev/dl). ```shell
2. Run these commands (for Debian-variety Linuxen, e.g. Ubuntu, Raspbian, Mint...): sudo apt install libpcap-dev
```shell go install gitea.drjosh.dev/josh/jrouter@latest
sudo apt install git build-essential libpcap-dev sudo setcap 'CAP_NET_BIND_SERVICE=ep CAP_NET_RAW=ep' ~/go/bin/jrouter
go install drjosh.dev/jrouter@latest ~/go/bin/jrouter
sudo setcap 'CAP_NET_BIND_SERVICE=ep CAP_NET_RAW=ep' ~/go/bin/jrouter ```
```
3. Configure `jrouter.yaml`
4. To run:
```shell
~/go/bin/jrouter
```
Notes: * `NET_BIND_SERVICE` is needed to bind UDP port 387 (for talking between AIRs)
* `NET_RAW` is needed for EtherTalk
* `git` is needed for `go install` to fetch the module TODO: instructions for non-Linux machines
* `build-essential` and`libpcap-dev` are needed for [gopacket](https://github.com/google/gopacket), which uses [CGo](https://pkg.go.dev/cmd/cgo)
* `NET_BIND_SERVICE` is needed for `jrouter` to bind UDP port 387 (for talking between AIRs)
* `NET_RAW` is needed for `jrouter` to listen for and send EtherTalk packets
TODO: instructions for non-Linux / non-Debian-like machines
### With Docker
1. Clone the repo and `cd` into it.
2. `docker build -t jrouter .`
3. Example `docker run` command:
```shell
docker run \
-v ./cfg:/etc/jrouter \
--cap-add NET_RAW \
--net host \
--name jrouter \
jrouter
```
Notes:
* Put `jrouter.yaml` inside a `cfg` directory (or some path of your choice and bind-mount it at `/etc/jrouter`) for it to find the config file.
* `--cap-add NET_RAW` and `--net host` is needed for EtherTalk access to the network interface.
* By using `--net host`, the default AURP port (387) will be bound without `-p`.
## Bug reports? Feature requests? Complaints? Praise?
You can contact me on the Fediverse at @DrJosh9000@cloudisland.nz, or email me at josh.deprez@gmail.com.

View file

@ -21,7 +21,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
) )
const ( const (

View file

@ -21,8 +21,8 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"drjosh.dev/jrouter/atalk/atp" "gitea.drjosh.dev/josh/jrouter/atalk/atp"
) )
type GetZonesPacket struct { type GetZonesPacket struct {

View file

@ -19,7 +19,7 @@ package zip
import ( import (
"testing" "testing"
"drjosh.dev/jrouter/atalk/atp" "gitea.drjosh.dev/josh/jrouter/atalk/atp"
) )
func FuzzUnmarshalPacket(f *testing.F) { func FuzzUnmarshalPacket(f *testing.F) {

View file

@ -173,12 +173,9 @@ func (e EventTuples) WriteTo(w io.Writer) (int64, error) {
} }
func parseEventTuples(p []byte) (EventTuples, error) { func parseEventTuples(p []byte) (EventTuples, error) {
// Event tuples can be 1, 4, or 6 bytes long. But the only type of length 1 // Each event tuple is at least 4 bytes, so we need to store at most
// is the Null event type sent to probe whether or not the data receiver is // len(p)/4 of them.
// still listening. If that's present there probably aren't any other e := make(EventTuples, 0, len(p)/4)
// tuples. Hence len(p)/4 (rounded up) is a reasonable estimate of max tuple
// count.
e := make(EventTuples, 0, (len(p)+3)/4)
for len(p) > 0 { for len(p) > 0 {
et, nextp, err := parseEventTuple(p) et, nextp, err := parseEventTuple(p)
if err != nil { if err != nil {
@ -201,10 +198,6 @@ type EventTuple struct {
func (et *EventTuple) WriteTo(w io.Writer) (int64, error) { func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
a := acc(w) a := acc(w)
a.write8(uint8(et.EventCode)) a.write8(uint8(et.EventCode))
if et.EventCode == EventCodeNull {
// null tuple
return a.ret()
}
a.write16(uint16(et.RangeStart)) a.write16(uint16(et.RangeStart))
if !et.Extended { if !et.Extended {
// non-extended tuple // non-extended tuple
@ -218,18 +211,12 @@ func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
} }
func parseEventTuple(p []byte) (EventTuple, []byte, error) { func parseEventTuple(p []byte) (EventTuple, []byte, error) {
if len(p) < 1 { if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for any network event tuple", len(p)) return EventTuple{}, p, fmt.Errorf("insufficient input length %d for network event tuple", len(p))
} }
var et EventTuple var et EventTuple
et.EventCode = EventCode(p[0]) et.EventCode = EventCode(p[0])
if et.EventCode == EventCodeNull {
return et, p[1:], nil
}
if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for non-Null network event tuple", len(p))
}
et.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[1:3])) et.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[1:3]))
et.RangeEnd = et.RangeStart et.RangeEnd = et.RangeStart
et.Distance = p[3] et.Distance = p[3]

10
go.mod
View file

@ -1,12 +1,14 @@
module drjosh.dev/jrouter module gitea.drjosh.dev/josh/jrouter
go 1.22.0 go 1.22.0
require ( require (
github.com/google/go-cmp v0.6.0 github.com/google/go-cmp v0.6.0
github.com/google/gopacket v1.1.19
github.com/sfiera/multitalk v0.2.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
require golang.org/x/sys v0.5.0 // indirect require (
github.com/google/gopacket v1.1.19 // indirect
github.com/sfiera/multitalk v0.2.0 // indirect
golang.org/x/sys v0.5.0 // indirect
)

10
go.sum
View file

@ -1,32 +1,24 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sfiera/multitalk v0.2.0 h1:FLLw7L6yNUTOdjMk2EgKnOwMKeu++t5zinv5qpTC2JM= github.com/sfiera/multitalk v0.2.0 h1:FLLw7L6yNUTOdjMk2EgKnOwMKeu++t5zinv5qpTC2JM=
github.com/sfiera/multitalk v0.2.0/go.mod h1:jGkgjgiyfuRMNKwfSjo/xR6b9nd10XF4smQAtApXFHc= github.com/sfiera/multitalk v0.2.0/go.mod h1:jGkgjgiyfuRMNKwfSjo/xR6b9nd10XF4smQAtApXFHc=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

91
main.go
View file

@ -30,14 +30,15 @@ import (
"os" "os"
"os/signal" "os/signal"
"regexp" "regexp"
"runtime/debug"
"slices" "slices"
"strings" "strings"
"sync" "sync"
"time" "time"
"drjosh.dev/jrouter/aurp" "gitea.drjosh.dev/josh/jrouter/aurp"
"drjosh.dev/jrouter/router" "gitea.drjosh.dev/josh/jrouter/router"
"drjosh.dev/jrouter/status" "gitea.drjosh.dev/josh/jrouter/status"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
@ -49,7 +50,6 @@ const routingTableTemplate = `
<thead><tr> <thead><tr>
<th>Network range</th> <th>Network range</th>
<th>Extended?</th> <th>Extended?</th>
<th>Zone names</th>
<th>Distance</th> <th>Distance</th>
<th>Last seen</th> <th>Last seen</th>
<th>Port</th> <th>Port</th>
@ -58,8 +58,7 @@ const routingTableTemplate = `
{{range $route := . }} {{range $route := . }}
<tr> <tr>
<td>{{$route.NetStart}}{{if not (eq $route.NetStart $route.NetEnd)}} - {{$route.NetEnd}}{{end}}</td> <td>{{$route.NetStart}}{{if not (eq $route.NetStart $route.NetEnd)}} - {{$route.NetEnd}}{{end}}</td>
<td>{{if $route.Extended}}{{else}}-{{end}}</td> <td>{{if $route.Extended}}{{else}}{{end}}</td>
<td>{{range $route.ZoneNames.ToSlice}}{{.}}<br>{{end}}</td>
<td>{{$route.Distance}}</td> <td>{{$route.Distance}}</td>
<td>{{$route.LastSeenAgo}}</td> <td>{{$route.LastSeenAgo}}</td>
<td> <td>
@ -79,6 +78,27 @@ const routingTableTemplate = `
</table> </table>
` `
const zoneTableTemplate = `
<table>
<thead><tr>
<th>Network</th>
<th>Name</th>
<th>Local Port</th>
<th>Last seen</th>
</tr></thead>
<tbody>
{{range $zone := . }}
<tr>
<td>{{$zone.Network}}</td>
<td>{{$zone.Name}}</td>
<td>{{with $zone.LocalPort}}{{.Device}}{{else}}-{{end}}</td>
<td>{{$zone.LastSeenAgo}}</td>
</tr>
{{end}}
</tbody>
</table>
`
const peerTableTemplate = ` const peerTableTemplate = `
<table> <table>
<thead><tr> <thead><tr>
@ -86,11 +106,6 @@ const peerTableTemplate = `
<th>Remote addr</th> <th>Remote addr</th>
<th>Receiver state</th> <th>Receiver state</th>
<th>Sender state</th> <th>Sender state</th>
<th>Last heard from</th>
<th>Last reconnect</th>
<th>Last update</th>
<th>Last send</th>
<th>Send retries</th>
</tr></thead> </tr></thead>
<tbody> <tbody>
{{range $peer := . }} {{range $peer := . }}
@ -99,11 +114,6 @@ const peerTableTemplate = `
<td>{{$peer.RemoteAddr}}</td> <td>{{$peer.RemoteAddr}}</td>
<td>{{$peer.ReceiverState}}</td> <td>{{$peer.ReceiverState}}</td>
<td>{{$peer.SenderState}}</td> <td>{{$peer.SenderState}}</td>
<td>{{$peer.LastHeardFromAgo}}</td>
<td>{{$peer.LastReconnectAgo}}</td>
<td>{{$peer.LastUpdateAgo}}</td>
<td>{{$peer.LastSendAgo}}</td>
<td>{{$peer.SendRetries}}</td>
</tr> </tr>
{{end}} {{end}}
</tbody> </tbody>
@ -116,9 +126,7 @@ var configFilePath = flag.String("config", "jrouter.yaml", "Path to configuratio
func main() { func main() {
// For some reason it occasionally panics and the panics have no traceback? // For some reason it occasionally panics and the panics have no traceback?
// This didn't help: debug.SetTraceback("all")
// debug.SetTraceback("all")
// I think it's calling recover in a defer too broadly.
flag.Parse() flag.Parse()
log.Println("jrouter") log.Println("jrouter")
@ -212,6 +220,15 @@ func main() {
return rs, nil return rs, nil
}) })
zones := router.NewZoneTable()
status.AddItem(ctx, "Zone table", zoneTableTemplate, func(context.Context) (any, error) {
zs := zones.Dump()
slices.SortFunc(zs, func(za, zb router.Zone) int {
return cmp.Compare(za.Name, zb.Name)
})
return zs, nil
})
// -------------------------------- Peers --------------------------------- // -------------------------------- Peers ---------------------------------
var peersMu sync.Mutex var peersMu sync.Mutex
peers := make(map[udpAddr]*router.AURPPeer) peers := make(map[udpAddr]*router.AURPPeer)
@ -298,7 +315,20 @@ func main() {
continue continue
} }
peer := router.NewAURPPeer(routes, ln, peerStr, raddr, localDI, nil, nextConnID) peer := &router.AURPPeer{
Config: cfg,
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: aurp.IPDomainIdentifier(raddr.IP),
LocalConnID: nextConnID,
},
UDPConn: ln,
ConfiguredAddr: peerStr,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes,
ZoneTable: zones,
}
aurp.Inc(&nextConnID) aurp.Inc(&nextConnID)
peersMu.Lock() peersMu.Lock()
peers[udpAddrFromNet(raddr)] = peer peers[udpAddrFromNet(raddr)] = peer
@ -314,7 +344,7 @@ func main() {
rooter := &router.Router{ rooter := &router.Router{
Config: cfg, Config: cfg,
RouteTable: routes, RouteTable: routes,
// ZoneTable: zones, ZoneTable: zones,
} }
etherTalkPort := &router.EtherTalkPort{ etherTalkPort := &router.EtherTalkPort{
@ -323,13 +353,16 @@ func main() {
NetStart: cfg.EtherTalk.NetStart, NetStart: cfg.EtherTalk.NetStart,
NetEnd: cfg.EtherTalk.NetEnd, NetEnd: cfg.EtherTalk.NetEnd,
DefaultZoneName: cfg.EtherTalk.ZoneName, DefaultZoneName: cfg.EtherTalk.ZoneName,
AvailableZones: router.SetFromSlice([]string{cfg.EtherTalk.ZoneName}), AvailableZones: []string{cfg.EtherTalk.ZoneName},
PcapHandle: pcapHandle, PcapHandle: pcapHandle,
AARPMachine: aarpMachine, AARPMachine: aarpMachine,
Router: rooter, Router: rooter,
} }
rooter.Ports = append(rooter.Ports, etherTalkPort) rooter.Ports = append(rooter.Ports, etherTalkPort)
routes.InsertEtherTalkDirect(etherTalkPort) routes.InsertEtherTalkDirect(etherTalkPort)
for _, az := range etherTalkPort.AvailableZones {
zones.Upsert(etherTalkPort.NetStart, az, etherTalkPort)
}
// --------------------------------- RTMP --------------------------------- // --------------------------------- RTMP ---------------------------------
go etherTalkPort.RunRTMP(ctx) go etherTalkPort.RunRTMP(ctx)
@ -394,7 +427,19 @@ func main() {
continue continue
} }
// New peer! // New peer!
pr = router.NewAURPPeer(routes, ln, "", raddr, localDI, dh.SourceDI, nextConnID) pr = &router.AURPPeer{
Config: cfg,
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: dh.SourceDI, // platinum rule
LocalConnID: nextConnID,
},
UDPConn: ln,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes,
ZoneTable: zones,
}
aurp.Inc(&nextConnID) aurp.Inc(&nextConnID)
peers[ra] = pr peers[ra] = pr
goPeerHandler(pr) goPeerHandler(pr)

View file

@ -24,7 +24,7 @@ import (
"sync" "sync"
"time" "time"
"drjosh.dev/jrouter/status" "gitea.drjosh.dev/josh/jrouter/status"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/aarp" "github.com/sfiera/multitalk/pkg/aarp"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
@ -37,8 +37,6 @@ const (
maxAMTEntryAge = 30 * time.Second maxAMTEntryAge = 30 * time.Second
aarpRequestRetransmit = 1 * time.Second aarpRequestRetransmit = 1 * time.Second
aarpRequestTimeout = 10 * time.Second aarpRequestTimeout = 10 * time.Second
aarpBodyLength = 28 // bytes
) )
const aarpStatusTemplate = ` const aarpStatusTemplate = `
@ -183,17 +181,8 @@ func (a *AARPMachine) Run(ctx context.Context) error {
a.incomingCh = nil a.incomingCh = nil
} }
// sfiera/multitalk will return an "excess data" error if the
// payload is too big. Most traffic I've seen locally does not have
// this problem, but I've seen one report with some junk trailing
// data on AARP packets.
payload := ethFrame.Payload
if len(payload) > aarpBodyLength {
payload = payload[:aarpBodyLength]
}
var aapkt aarp.Packet var aapkt aarp.Packet
if err := aarp.Unmarshal(payload, &aapkt); err != nil { if err := aarp.Unmarshal(ethFrame.Payload, &aapkt); err != nil {
log.Printf("Couldn't unmarshal AARP packet: %v", err) log.Printf("Couldn't unmarshal AARP packet: %v", err)
continue continue
} }
@ -337,9 +326,6 @@ func (a *AARPMachine) heyThatsMe(targ aarp.AddrPair) error {
if err != nil { if err != nil {
return err return err
} }
if len(respFrameRaw) < 64 {
respFrameRaw = append(respFrameRaw, make([]byte, 64-len(respFrameRaw))...)
}
return a.pcapHandle.WritePacketData(respFrameRaw) return a.pcapHandle.WritePacketData(respFrameRaw)
} }
@ -353,9 +339,6 @@ func (a *AARPMachine) probe() error {
if err != nil { if err != nil {
return err return err
} }
if len(probeFrameRaw) < 64 {
probeFrameRaw = append(probeFrameRaw, make([]byte, 64-len(probeFrameRaw))...)
}
return a.pcapHandle.WritePacketData(probeFrameRaw) return a.pcapHandle.WritePacketData(probeFrameRaw)
} }
@ -369,9 +352,6 @@ func (a *AARPMachine) request(ddpAddr ddp.Addr) error {
if err != nil { if err != nil {
return err return err
} }
if len(reqFrameRaw) < 64 {
reqFrameRaw = append(reqFrameRaw, make([]byte, 64-len(reqFrameRaw))...)
}
return a.pcapHandle.WritePacketData(reqFrameRaw) return a.pcapHandle.WritePacketData(reqFrameRaw)
} }
@ -398,7 +378,10 @@ func (e AMTEntry) Valid() bool {
// LastUpdatedAgo is a friendly string reporting how long ago the entry was // LastUpdatedAgo is a friendly string reporting how long ago the entry was
// updated/resolved. // updated/resolved.
func (e AMTEntry) LastUpdatedAgo() string { func (e AMTEntry) LastUpdatedAgo() string {
return ago(e.LastUpdated) if e.LastUpdated.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(e.LastUpdated).Truncate(time.Millisecond))
} }
// addressMappingTable implements a concurrent-safe Address Mapping Table for // addressMappingTable implements a concurrent-safe Address Mapping Table for

View file

@ -20,7 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"drjosh.dev/jrouter/atalk/aep" "gitea.drjosh.dev/josh/jrouter/atalk/aep"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )

View file

@ -1,65 +0,0 @@
/*
Copyright 2024 Josh Deprez
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package router
import (
"fmt"
"time"
)
// StringSet is a set of strings.
// Yep, yet another string set implementation. Took me 2 minutes to write *shrug*
type StringSet map[string]struct{}
func (set StringSet) ToSlice() []string {
ss := make([]string, 0, len(set))
for s := range set {
ss = append(ss, s)
}
return ss
}
func (set StringSet) Contains(s string) bool {
_, c := set[s]
return c
}
func (set StringSet) Insert(ss ...string) {
for _, s := range ss {
set[s] = struct{}{}
}
}
func (set StringSet) Add(t StringSet) {
for s := range t {
set[s] = struct{}{}
}
}
func SetFromSlice(ss []string) StringSet {
set := make(StringSet, len(ss))
set.Insert(ss...)
return set
}
// ago is a helper for formatting times.
func ago(t time.Time) string {
if t.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(t).Truncate(time.Millisecond))
}

View file

@ -20,9 +20,10 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"slices"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"drjosh.dev/jrouter/atalk/nbp" "gitea.drjosh.dev/josh/jrouter/atalk/nbp"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )
@ -73,10 +74,10 @@ func (port *EtherTalkPort) handleNBPBrRq(ctx context.Context, ddpkt *ddp.ExtPack
// tuple.Zone = port.DefaultZoneName // tuple.Zone = port.DefaultZoneName
// } // }
routes := port.Router.RouteTable.RoutesForZone(tuple.Zone) zones := port.Router.ZoneTable.LookupName(tuple.Zone)
for _, route := range routes { for _, z := range zones {
if outPort := route.EtherTalkDirect; outPort != nil { if outPort := z.LocalPort; outPort != nil {
// If it's for a local zone, translate it to a LkUp and broadcast // If it's for a local zone, translate it to a LkUp and broadcast
// out the corresponding EtherTalk port. // out the corresponding EtherTalk port.
// "Note: On an internet, nodes on extended networks performing lookups in // "Note: On an internet, nodes on extended networks performing lookups in
@ -146,7 +147,7 @@ func (port *EtherTalkPort) handleNBPBrRq(ctx context.Context, ddpkt *ddp.ExtPack
SrcNet: ddpkt.SrcNet, SrcNet: ddpkt.SrcNet,
SrcNode: ddpkt.SrcNode, SrcNode: ddpkt.SrcNode,
SrcSocket: ddpkt.SrcSocket, SrcSocket: ddpkt.SrcSocket,
DstNet: route.NetStart, DstNet: z.Network,
DstNode: 0x00, // Any router for the dest network DstNode: 0x00, // Any router for the dest network
DstSocket: 2, DstSocket: 2,
Proto: ddp.ProtoNBP, Proto: ddp.ProtoNBP,
@ -169,7 +170,7 @@ func (rtr *Router) handleNBPFwdReq(ctx context.Context, ddpkt *ddp.ExtPacket, nb
tuple := &nbpkt.Tuples[0] tuple := &nbpkt.Tuples[0]
for _, outPort := range rtr.Ports { for _, outPort := range rtr.Ports {
if !outPort.AvailableZones.Contains(tuple.Zone) { if !slices.Contains(outPort.AvailableZones, tuple.Zone) {
continue continue
} }
log.Printf("NBP: Converting FwdReq to LkUp (%v)", tuple) log.Printf("NBP: Converting FwdReq to LkUp (%v)", tuple)

View file

@ -20,7 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"drjosh.dev/jrouter/atalk/nbp" "gitea.drjosh.dev/josh/jrouter/atalk/nbp"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )

View file

@ -24,7 +24,7 @@ import (
"sync" "sync"
"time" "time"
"drjosh.dev/jrouter/aurp" "gitea.drjosh.dev/josh/jrouter/aurp"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )
@ -94,6 +94,9 @@ func (ss SenderState) String() string {
// AURPPeer handles the peering with a peer AURP router. // AURPPeer handles the peering with a peer AURP router.
type AURPPeer struct { type AURPPeer struct {
// Whole router config.
Config *Config
// AURP-Tr state for producing packets. // AURP-Tr state for producing packets.
Transport *aurp.Transport Transport *aurp.Transport
@ -111,78 +114,15 @@ type AURPPeer struct {
// Incoming packet channel. // Incoming packet channel.
ReceiveCh chan aurp.Packet ReceiveCh chan aurp.Packet
// Route table (the peer will add/remove/update routes and zones) // Routing table (the peer will add/remove/update routes)
RouteTable *RouteTable RoutingTable *RouteTable
// Event tuples yet to be sent to this peer in an RI-Upd. // Zone table (the peer will add/remove/update zones)
pendingEventsMu sync.Mutex ZoneTable *ZoneTable
pendingEvents aurp.EventTuples
// The internal states below are only set within the Handle loop, but can mu sync.RWMutex
// be read concurrently from outside. rstate ReceiverState
mu sync.RWMutex sstate SenderState
rstate ReceiverState
sstate SenderState
lastReconnect time.Time
lastHeardFrom time.Time
lastSend time.Time // TODO: clarify use of lastSend / sendRetries
lastUpdate time.Time
sendRetries int
}
func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, raddr *net.UDPAddr, localDI, remoteDI aurp.DomainIdentifier, connID uint16) *AURPPeer {
if remoteDI == nil {
remoteDI = aurp.IPDomainIdentifier(raddr.IP)
}
return &AURPPeer{
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: remoteDI,
LocalConnID: connID,
},
UDPConn: udpConn,
ConfiguredAddr: peerAddr,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RouteTable: routes,
}
}
func (p *AURPPeer) addPendingEvent(ec aurp.EventCode, route *Route) {
// Don't advertise routes to AURP peers to other AURP peers
if route.AURPPeer != nil {
return
}
et := aurp.EventTuple{
EventCode: ec,
Extended: route.Extended,
RangeStart: route.NetStart,
Distance: route.Distance,
RangeEnd: route.NetEnd,
}
switch ec {
case aurp.EventCodeND, aurp.EventCodeNRC:
et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0."
}
p.pendingEventsMu.Lock()
defer p.pendingEventsMu.Unlock()
p.pendingEvents = append(p.pendingEvents, et)
}
func (p *AURPPeer) RouteAdded(route *Route) {
p.addPendingEvent(aurp.EventCodeNA, route)
}
func (p *AURPPeer) RouteDeleted(route *Route) {
p.addPendingEvent(aurp.EventCodeND, route)
}
func (p *AURPPeer) RouteDistanceChanged(route *Route) {
p.addPendingEvent(aurp.EventCodeNDC, route)
}
func (p *AURPPeer) RouteForwarderChanged(route *Route) {
p.addPendingEvent(aurp.EventCodeNRC, route)
} }
func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error { func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
@ -190,7 +130,7 @@ func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
if err != nil { if err != nil {
return err return err
} }
_, err = p.send(p.Transport.NewAppleTalkPacket(outPkt)) _, err = p.Send(p.Transport.NewAppleTalkPacket(outPkt))
return err return err
} }
@ -206,36 +146,6 @@ func (p *AURPPeer) SenderState() SenderState {
return p.sstate return p.sstate
} }
func (p *AURPPeer) LastReconnectAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastReconnect)
}
func (p *AURPPeer) LastHeardFromAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastHeardFrom)
}
func (p *AURPPeer) LastSendAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastSend)
}
func (p *AURPPeer) LastUpdateAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastUpdate)
}
func (p *AURPPeer) SendRetries() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.sendRetries
}
func (p *AURPPeer) setRState(rstate ReceiverState) { func (p *AURPPeer) setRState(rstate ReceiverState) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -248,42 +158,6 @@ func (p *AURPPeer) setSState(sstate SenderState) {
p.sstate = sstate p.sstate = sstate
} }
func (p *AURPPeer) incSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries++
}
func (p *AURPPeer) resetSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries = 0
}
func (p *AURPPeer) bumpLastHeardFrom() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastHeardFrom = time.Now()
}
func (p *AURPPeer) bumpLastReconnect() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastReconnect = time.Now()
}
func (p *AURPPeer) bumpLastSend() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSend = time.Now()
}
func (p *AURPPeer) bumpLastUpdate() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastUpdate = time.Now()
}
func (p *AURPPeer) disconnect() { func (p *AURPPeer) disconnect() {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -291,8 +165,8 @@ func (p *AURPPeer) disconnect() {
p.sstate = SenderUnconnected p.sstate = SenderUnconnected
} }
// send encodes and sends pkt to the remote host. // Send encodes and sends pkt to the remote host.
func (p *AURPPeer) send(pkt aurp.Packet) (int, error) { func (p *AURPPeer) Send(pkt aurp.Packet) (int, error) {
var b bytes.Buffer var b bytes.Buffer
if _, err := pkt.WriteTo(&b); err != nil { if _, err := pkt.WriteTo(&b); err != nil {
return 0, err return 0, err
@ -302,28 +176,23 @@ func (p *AURPPeer) send(pkt aurp.Packet) (int, error) {
} }
func (p *AURPPeer) Handle(ctx context.Context) error { func (p *AURPPeer) Handle(ctx context.Context) error {
// Stop listening to events if the goroutine exits
defer p.RouteTable.RemoveObserver(p)
rticker := time.NewTicker(1 * time.Second) rticker := time.NewTicker(1 * time.Second)
defer rticker.Stop() defer rticker.Stop()
sticker := time.NewTicker(1 * time.Second) sticker := time.NewTicker(1 * time.Second)
defer sticker.Stop() defer sticker.Stop()
p.mu.Lock() lastReconnect := time.Now()
p.lastReconnect = time.Now() lastHeardFrom := time.Now()
p.lastHeardFrom = time.Now() lastSend := time.Now() // TODO: clarify use of lastSend / sendRetries
p.lastSend = time.Now() // TODO: clarify use of lastSend / sendRetries lastUpdate := time.Now()
p.lastUpdate = time.Now() sendRetries := 0
p.sendRetries = 0
p.mu.Unlock()
var lastRISent aurp.Packet var lastRISent aurp.Packet
p.disconnect() p.disconnect()
// Write an Open-Req packet // Write an Open-Req packet
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -339,7 +208,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
// Send a best-effort Router Down before returning // Send a best-effort Router Down before returning
lastRISent = p.Transport.NewRDPacket(aurp.ErrCodeNormalClose) lastRISent = p.Transport.NewRDPacket(aurp.ErrCodeNormalClose)
if _, err := p.send(lastRISent); err != nil { if _, err := p.Send(lastRISent); err != nil {
log.Printf("Couldn't send RD packet: %v", err) log.Printf("Couldn't send RD packet: %v", err)
} }
return ctx.Err() return ctx.Err()
@ -347,71 +216,70 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case <-rticker.C: case <-rticker.C:
switch p.rstate { switch p.rstate {
case ReceiverWaitForOpenRsp: case ReceiverWaitForOpenRsp:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if p.sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
break break
} }
// Send another Open-Req // Send another Open-Req
p.incSendRetries() sendRetries++
p.bumpLastSend() lastSend = time.Now()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
case ReceiverConnected: case ReceiverConnected:
// Check LHFT, send tickle? // Check LHFT, send tickle?
if time.Since(p.lastHeardFrom) <= lastHeardFromTimer { if time.Since(lastHeardFrom) <= lastHeardFromTimer {
break break
} }
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil { if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err) log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err return err
} }
p.setRState(ReceiverWaitForTickleAck) p.setRState(ReceiverWaitForTickleAck)
p.resetSendRetries() sendRetries = 0
p.bumpLastSend() lastSend = time.Now()
case ReceiverWaitForTickleAck: case ReceiverWaitForTickleAck:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if p.sendRetries >= tickleRetryLimit { if sendRetries >= tickleRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p) p.RoutingTable.DeleteAURPPeer(p)
break break
} }
p.incSendRetries() sendRetries++
p.bumpLastSend() lastSend = time.Now()
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil { if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err) log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err return err
} }
// still in Wait For Tickle-Ack // still in Wait For Tickle-Ack
case ReceiverWaitForRIRsp: case ReceiverWaitForRIRsp:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if p.sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p) p.RoutingTable.DeleteAURPPeer(p)
break break
} }
// RI-Req is stateless, so we don't need to cache the one we // RI-Req is stateless, so we don't need to cache the one we
// sent earlier just to send it again // sent earlier just to send it again
p.incSendRetries() sendRetries++
p.bumpLastSend() if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err return err
} }
@ -420,18 +288,18 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case ReceiverUnconnected: case ReceiverUnconnected:
// Data receiver is unconnected. If data sender is connected, // Data receiver is unconnected. If data sender is connected,
// send a null RI-Upd to check if the sender is also unconnected // send a null RI-Upd to check if the sender is also unconnected
if p.sstate == SenderConnected && time.Since(p.lastSend) > sendRetryTimer { if p.sstate == SenderConnected && time.Since(lastSend) > sendRetryTimer {
if p.sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection") log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection")
} }
p.incSendRetries() sendRetries++
p.bumpLastSend() lastSend = time.Now()
aurp.Inc(&p.Transport.LocalSeq) aurp.Inc(&p.Transport.LocalSeq)
events := aurp.EventTuples{{ events := aurp.EventTuples{{
EventCode: aurp.EventCodeNull, EventCode: aurp.EventCodeNull,
}} }}
lastRISent = p.Transport.NewRIUpdPacket(events) lastRISent = p.Transport.NewRIUpdPacket(events)
if _, err := p.send(lastRISent); err != nil { if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err return err
} }
@ -440,7 +308,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if p.ConfiguredAddr != "" { if p.ConfiguredAddr != "" {
// Periodically try to reconnect, if this peer is in the config file // Periodically try to reconnect, if this peer is in the config file
if time.Since(p.lastReconnect) <= reconnectTimer { if time.Since(lastReconnect) <= reconnectTimer {
break break
} }
@ -453,10 +321,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr) log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr)
p.RemoteAddr = raddr p.RemoteAddr = raddr
p.bumpLastReconnect() lastReconnect = time.Now()
p.resetSendRetries() sendRetries = 0
p.bumpLastSend() lastSend = time.Now()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -470,64 +338,40 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing // Do nothing
case SenderConnected: case SenderConnected:
if time.Since(p.lastUpdate) <= updateTimer { if time.Since(lastUpdate) <= updateTimer {
break break
} }
// TODO: is there a routing update to send?
// Are there routing updates to send?
p.pendingEventsMu.Lock()
if len(p.pendingEvents) == 0 {
p.pendingEventsMu.Unlock()
break
}
// Yes - swap the slices, release the mutex, then send them
pending := p.pendingEvents
p.pendingEvents = make(aurp.EventTuples, 0, cap(pending))
p.pendingEventsMu.Unlock()
// TODO: eliminate events that cancel out (e.g. NA then ND)
// TODO: split pending events to fit within a packet
p.bumpLastUpdate()
aurp.Inc(&p.Transport.LocalSeq)
lastRISent = p.Transport.NewRIUpdPacket(pending)
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err
}
p.setSState(SenderWaitForRIUpdAck)
case SenderWaitForRIRspAck, SenderWaitForRIUpdAck: case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if lastRISent == nil { if lastRISent == nil {
log.Print("AURP Peer: sender retry: lastRISent = nil?") log.Print("AURP Peer: sender retry: lastRISent = nil?")
continue continue
} }
if p.sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached, closing connection") log.Printf("AURP Peer: Send retry limit reached, closing connection")
p.setSState(SenderUnconnected) p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
continue continue
} }
p.incSendRetries() sendRetries++
p.bumpLastSend() lastSend = time.Now()
if _, err := p.send(lastRISent); err != nil { if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err) log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err)
return err return err
} }
case SenderWaitForRDAck: case SenderWaitForRDAck:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
p.setSState(SenderUnconnected) p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
} }
case pkt := <-p.ReceiveCh: case pkt := <-p.ReceiveCh:
p.bumpLastHeardFrom() lastHeardFrom = time.Now()
switch pkt := pkt.(type) { switch pkt := pkt.(type) {
case *aurp.OpenReqPacket: case *aurp.OpenReqPacket:
@ -554,21 +398,19 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
orsp = p.Transport.NewOpenRspPacket(0, 1, nil) orsp = p.Transport.NewOpenRspPacket(0, 1, nil)
} }
if _, err := p.send(orsp); err != nil { if _, err := p.Send(orsp); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err) log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err)
return err return err
} }
if orsp.RateOrErrCode >= 0 { if orsp.RateOrErrCode >= 0 {
// Data sender is successfully in connected state
p.setSState(SenderConnected) p.setSState(SenderConnected)
p.RouteTable.AddObserver(p)
} }
// If receiver is unconnected, commence connecting // If receiver is unconnected, commence connecting
if p.rstate == ReceiverUnconnected { if p.rstate == ReceiverUnconnected {
p.resetSendRetries() lastSend = time.Now()
p.bumpLastSend() sendRetries = 0
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -589,8 +431,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.setRState(ReceiverConnected) p.setRState(ReceiverConnected)
// Send an RI-Req // Send an RI-Req
p.resetSendRetries() sendRetries = 0
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil { if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err return err
} }
@ -601,19 +443,17 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Received RI-Req but was not expecting one (sender state was %v)", p.sstate) log.Printf("AURP Peer: Received RI-Req but was not expecting one (sender state was %v)", p.sstate)
} }
var nets aurp.NetworkTuples nets := aurp.NetworkTuples{
for _, r := range p.RouteTable.ValidNonAURPRoutes() { {
nets = append(nets, aurp.NetworkTuple{ Extended: true,
Extended: r.Extended, RangeStart: p.Config.EtherTalk.NetStart,
RangeStart: r.NetStart, RangeEnd: p.Config.EtherTalk.NetEnd,
RangeEnd: r.NetEnd, Distance: 0,
Distance: r.Distance, },
})
} }
p.Transport.LocalSeq = 1 p.Transport.LocalSeq = 1
// TODO: Split tuples across multiple packets as required
lastRISent = p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets) lastRISent = p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets)
if _, err := p.send(lastRISent); err != nil { if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err)
return err return err
} }
@ -627,7 +467,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks) log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks)
for _, nt := range pkt.Networks { for _, nt := range pkt.Networks {
p.RouteTable.InsertAURPRoute( p.RoutingTable.InsertAURPRoute(
p, p,
nt.Extended, nt.Extended,
ddp.Network(nt.RangeStart), ddp.Network(nt.RangeStart),
@ -638,7 +478,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// TODO: track which networks we don't have zone info for, and // TODO: track which networks we don't have zone info for, and
// only set SZI for those ? // only set SZI for those ?
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err)
return err return err
} }
@ -664,33 +504,15 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
p.setSState(SenderConnected) p.setSState(SenderConnected)
p.resetSendRetries() sendRetries = 0
p.RouteTable.AddObserver(p)
// If SZI flag is set, send ZI-Rsp (transaction) // If SZI flag is set, send ZI-Rsp (transaction)
// TODO: split ZI-Rsp packets similarly to ZIP Replies
if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 { if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {
// Inspect last routing info packet sent to determine zones := map[ddp.Network][]string{
// networks to gather names for p.Config.EtherTalk.NetStart: {p.Config.EtherTalk.ZoneName},
var nets []ddp.Network
switch last := lastRISent.(type) {
case *aurp.RIRspPacket:
for _, nt := range last.Networks {
nets = append(nets, nt.RangeStart)
}
case *aurp.RIUpdPacket:
for _, et := range last.Events {
// Only networks that were added
if et.EventCode != aurp.EventCodeNA {
continue
}
nets = append(nets, et.RangeStart)
}
} }
zones := p.RouteTable.ZonesForNetworks(nets) if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
// TODO: split ZI-Rsp packets similarly to ZIP Replies
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
} }
} }
@ -701,9 +523,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Receiver is unconnected, but their receiver sent us an // Receiver is unconnected, but their receiver sent us an
// RI-Ack for something // RI-Ack for something
// Try to reconnect? // Try to reconnect?
p.resetSendRetries() lastSend = time.Now()
p.bumpLastSend() sendRetries = 0
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -711,6 +533,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
case *aurp.RIUpdPacket: case *aurp.RIUpdPacket:
var ackFlag aurp.RoutingFlag var ackFlag aurp.RoutingFlag
for _, et := range pkt.Events { for _, et := range pkt.Events {
@ -720,7 +543,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing except respond with RI-Ack // Do nothing except respond with RI-Ack
case aurp.EventCodeNA: case aurp.EventCodeNA:
if err := p.RouteTable.InsertAURPRoute( if err := p.RoutingTable.InsertAURPRoute(
p, p,
et.Extended, et.Extended,
et.RangeStart, et.RangeStart,
@ -732,10 +555,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
ackFlag = aurp.RoutingFlagSendZoneInfo ackFlag = aurp.RoutingFlagSendZoneInfo
case aurp.EventCodeND: case aurp.EventCodeND:
p.RouteTable.DeleteAURPPeerNetwork(p, et.RangeStart) p.RoutingTable.DeleteAURPPeerNetwork(p, et.RangeStart)
case aurp.EventCodeNDC: case aurp.EventCodeNDC:
p.RouteTable.UpdateAURPRouteDistance(p, et.RangeStart, et.Distance+1) p.RoutingTable.UpdateAURPRouteDistance(p, et.RangeStart, et.Distance+1)
case aurp.EventCodeNRC: case aurp.EventCodeNRC:
// "An exterior router sends a Network Route Change // "An exterior router sends a Network Route Change
@ -743,14 +566,14 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// through its local internet changes to a path through // through its local internet changes to a path through
// a tunneling port, causing split-horizoned processing // a tunneling port, causing split-horizoned processing
// to eliminate that networks routing information." // to eliminate that networks routing information."
p.RouteTable.DeleteAURPPeerNetwork(p, et.RangeStart) p.RoutingTable.DeleteAURPPeerNetwork(p, et.RangeStart)
case aurp.EventCodeZC: case aurp.EventCodeZC:
// "This event is reserved for future use." // "This event is reserved for future use."
} }
} }
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err) log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err return err
} }
@ -761,10 +584,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode) log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode)
p.RouteTable.DeleteAURPPeer(p) p.RoutingTable.DeleteAURPPeer(p)
// Respond with RI-Ack // Respond with RI-Ack
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err) log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err return err
} }
@ -773,8 +596,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIReqPacket: case *aurp.ZIReqPacket:
// TODO: split ZI-Rsp packets similarly to ZIP Replies // TODO: split ZI-Rsp packets similarly to ZIP Replies
zones := p.RouteTable.ZonesForNetworks(pkt.Networks) zones := p.ZoneTable.Query(pkt.Networks)
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil { if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
return err return err
} }
@ -782,11 +605,11 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIRspPacket: case *aurp.ZIRspPacket:
log.Printf("AURP Peer: Learned about these zones: %v", pkt.Zones) log.Printf("AURP Peer: Learned about these zones: %v", pkt.Zones)
for _, zt := range pkt.Zones { for _, zt := range pkt.Zones {
p.RouteTable.AddZonesToNetwork(zt.Network, zt.Name) p.ZoneTable.Upsert(ddp.Network(zt.Network), zt.Name, nil)
} }
case *aurp.GDZLReqPacket: case *aurp.GDZLReqPacket:
if _, err := p.send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil { if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err)
return err return err
} }
@ -795,7 +618,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird") log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird")
case *aurp.GZNReqPacket: case *aurp.GZNReqPacket:
if _, err := p.send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil { if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err)
return err return err
} }
@ -805,7 +628,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.TicklePacket: case *aurp.TicklePacket:
// Immediately respond with Tickle-Ack // Immediately respond with Tickle-Ack
if _, err := p.send(p.Transport.NewTickleAckPacket()); err != nil { if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err) log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err)
return err return err
} }

View file

@ -46,8 +46,5 @@ func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error {
if err != nil { if err != nil {
return err return err
} }
if len(outFrameRaw) < 64 {
outFrameRaw = append(outFrameRaw, make([]byte, 64-len(outFrameRaw))...)
}
return p.Port.PcapHandle.WritePacketData(outFrameRaw) return p.Port.PcapHandle.WritePacketData(outFrameRaw)
} }

View file

@ -18,12 +18,11 @@ package router
import ( import (
"context" "context"
"encoding/binary"
"errors" "errors"
"io" "io"
"log" "log"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
"github.com/sfiera/multitalk/pkg/ethernet" "github.com/sfiera/multitalk/pkg/ethernet"
@ -38,7 +37,7 @@ type EtherTalkPort struct {
NetEnd ddp.Network NetEnd ddp.Network
MyAddr ddp.Addr MyAddr ddp.Addr
DefaultZoneName string DefaultZoneName string
AvailableZones StringSet AvailableZones []string
PcapHandle *pcap.Handle PcapHandle *pcap.Handle
AARPMachine *AARPMachine AARPMachine *AARPMachine
Router *Router Router *Router
@ -80,18 +79,8 @@ func (port *EtherTalkPort) Serve(ctx context.Context) {
case ethertalk.AppleTalkProto: case ethertalk.AppleTalkProto:
// log.Print("Got an AppleTalk frame") // log.Print("Got an AppleTalk frame")
// Workaround for strict length checking in sfiera/multitalk
payload := ethFrame.Payload
if len(payload) < 2 {
log.Printf("Couldn't unmarshal DDP packet: too small (length = %d)", len(payload))
}
if size := binary.BigEndian.Uint16(payload[:2]) & 0x3ff; len(payload) > int(size) {
payload = payload[:size]
}
ddpkt := new(ddp.ExtPacket) ddpkt := new(ddp.ExtPacket)
if err := ddp.ExtUnmarshal(payload, ddpkt); err != nil { if err := ddp.ExtUnmarshal(ethFrame.Payload, ddpkt); err != nil {
log.Printf("Couldn't unmarshal DDP packet: %v", err) log.Printf("Couldn't unmarshal DDP packet: %v", err)
continue continue
} }
@ -200,8 +189,5 @@ func (port *EtherTalkPort) send(dstEth ethernet.Addr, pkt *ddp.ExtPacket) error
if err != nil { if err != nil {
return err return err
} }
if len(outFrameRaw) < 64 {
outFrameRaw = append(outFrameRaw, make([]byte, 64-len(outFrameRaw))...)
}
return port.PcapHandle.WritePacketData(outFrameRaw) return port.PcapHandle.WritePacketData(outFrameRaw)
} }

View file

@ -34,10 +34,6 @@ type Route struct {
LastSeen time.Time LastSeen time.Time
// ZoneNames may be empty between learning the existence of a route and
// receiving zone information.
ZoneNames StringSet
// Exactly one of the following should be set // Exactly one of the following should be set
AURPPeer *AURPPeer // Next hop is this peer router (over AURP) AURPPeer *AURPPeer // Next hop is this peer router (over AURP)
EtherTalkPeer *EtherTalkPeer // Next hop is this peer router (over EtherTalk) EtherTalkPeer *EtherTalkPeer // Next hop is this peer router (over EtherTalk)
@ -45,50 +41,23 @@ type Route struct {
} }
func (r Route) LastSeenAgo() string { func (r Route) LastSeenAgo() string {
return ago(r.LastSeen) if r.LastSeen.IsZero() {
} return "never"
}
// Valid reports whether the route is valid. return fmt.Sprintf("%v ago", time.Since(r.LastSeen).Truncate(time.Millisecond))
// A valid route has one or more zone names, and if it is learned from a peer
// router over EtherTalk is not too old.
func (r *Route) Valid() bool {
return len(r.ZoneNames) > 0 && (r.EtherTalkPeer == nil || time.Since(r.LastSeen) <= maxRouteAge)
}
type RouteTableObserver interface {
RouteAdded(*Route)
RouteDeleted(*Route)
RouteDistanceChanged(*Route)
RouteForwarderChanged(*Route)
} }
type RouteTable struct { type RouteTable struct {
routesMu sync.RWMutex mu sync.Mutex
routes map[*Route]struct{} routes map[*Route]struct{}
observersMu sync.RWMutex
observers map[RouteTableObserver]struct{}
} }
func NewRouteTable() *RouteTable { func NewRouteTable() *RouteTable {
return &RouteTable{ return &RouteTable{
routes: make(map[*Route]struct{}), routes: make(map[*Route]struct{}),
observers: make(map[RouteTableObserver]struct{}),
} }
} }
func (rt *RouteTable) AddObserver(obs RouteTableObserver) {
rt.observersMu.Lock()
defer rt.observersMu.Unlock()
rt.observers[obs] = struct{}{}
}
func (rt *RouteTable) RemoveObserver(obs RouteTableObserver) {
rt.observersMu.Lock()
defer rt.observersMu.Unlock()
delete(rt.observers, obs)
}
func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) { func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
r := &Route{ r := &Route{
Extended: true, Extended: true,
@ -96,18 +65,17 @@ func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
NetEnd: port.NetEnd, NetEnd: port.NetEnd,
Distance: 0, // we're connected directly Distance: 0, // we're connected directly
LastSeen: time.Now(), LastSeen: time.Now(),
ZoneNames: port.AvailableZones,
EtherTalkDirect: port, EtherTalkDirect: port,
} }
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
rt.routes[r] = struct{}{} rt.routes[r] = struct{}{}
} }
func (rt *RouteTable) Dump() []Route { func (rt *RouteTable) Dump() []Route {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
table := make([]Route, 0, len(rt.routes)) table := make([]Route, 0, len(rt.routes))
for r := range rt.routes { for r := range rt.routes {
@ -117,15 +85,16 @@ func (rt *RouteTable) Dump() []Route {
} }
func (rt *RouteTable) LookupRoute(network ddp.Network) *Route { func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
var bestRoute *Route var bestRoute *Route
for r := range rt.routes { for r := range rt.routes {
if network < r.NetStart || network > r.NetEnd { if network < r.NetStart || network > r.NetEnd {
continue continue
} }
if !r.Valid() { // Exclude EtherTalk routes that are too old
if r.EtherTalkPeer != nil && time.Since(r.LastSeen) > maxRouteAge {
continue continue
} }
if bestRoute == nil { if bestRoute == nil {
@ -140,8 +109,8 @@ func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
} }
func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) { func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
for route := range rt.routes { for route := range rt.routes {
if route.AURPPeer == peer { if route.AURPPeer == peer {
@ -151,8 +120,8 @@ func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
} }
func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) { func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
for route := range rt.routes { for route := range rt.routes {
if route.AURPPeer == peer && route.NetStart == network { if route.AURPPeer == peer && route.NetStart == network {
@ -162,8 +131,8 @@ func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network)
} }
func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Network, distance uint8) { func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Network, distance uint8) {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
for route := range rt.routes { for route := range rt.routes {
if route.AURPPeer == peer && route.NetStart == network { if route.AURPPeer == peer && route.NetStart == network {
@ -173,16 +142,16 @@ func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Networ
} }
} }
func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) (*Route, error) { func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) error {
if netStart > netEnd { if netStart > netEnd {
return nil, fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd) return fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd)
} }
if netStart != netEnd && !extended { if netStart != netEnd && !extended {
return nil, fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd) return fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd)
} }
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
// Update? // Update?
for r := range rt.routes { for r := range rt.routes {
@ -200,7 +169,7 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n
} }
r.Distance = metric r.Distance = metric
r.LastSeen = time.Now() r.LastSeen = time.Now()
return r, nil return nil
} }
// Insert. // Insert.
@ -213,7 +182,7 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n
EtherTalkPeer: peer, EtherTalkPeer: peer,
} }
rt.routes[r] = struct{}{} rt.routes[r] = struct{}{}
return r, nil return nil
} }
func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) error { func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, netEnd ddp.Network, metric uint8) error {
@ -233,35 +202,19 @@ func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, n
AURPPeer: peer, AURPPeer: peer,
} }
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
rt.routes[r] = struct{}{} rt.routes[r] = struct{}{}
return nil return nil
} }
// ValidRoutes returns all valid routes.
func (rt *RouteTable) ValidRoutes() []*Route { func (rt *RouteTable) ValidRoutes() []*Route {
rt.routesMu.RLock() rt.mu.Lock()
defer rt.routesMu.RUnlock() defer rt.mu.Unlock()
valid := make([]*Route, 0, len(rt.routes)) valid := make([]*Route, 0, len(rt.routes))
for r := range rt.routes { for r := range rt.routes {
if r.Valid() { // Exclude EtherTalk routes that are too old
valid = append(valid, r) if r.EtherTalkPeer != nil && time.Since(r.LastSeen) > maxRouteAge {
}
}
return valid
}
// ValidNonAURPRoutes returns all valid routes that were not learned via AURP.
func (rt *RouteTable) ValidNonAURPRoutes() []*Route {
rt.routesMu.RLock()
defer rt.routesMu.RUnlock()
valid := make([]*Route, 0, len(rt.routes))
for r := range rt.routes {
if r.AURPPeer != nil {
continue
}
if !r.Valid() {
continue continue
} }
valid = append(valid, r) valid = append(valid, r)

View file

@ -26,6 +26,7 @@ import (
type Router struct { type Router struct {
Config *Config Config *Config
RouteTable *RouteTable RouteTable *RouteTable
ZoneTable *ZoneTable
Ports []*EtherTalkPort Ports []*EtherTalkPort
} }

View file

@ -22,10 +22,9 @@ import (
"log" "log"
"time" "time"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"drjosh.dev/jrouter/atalk/rtmp" "gitea.drjosh.dev/josh/jrouter/atalk/rtmp"
"drjosh.dev/jrouter/atalk/zip" "gitea.drjosh.dev/josh/jrouter/status"
"drjosh.dev/jrouter/status"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )
@ -102,7 +101,8 @@ func (port *EtherTalkPort) HandleRTMP(ctx context.Context, pkt *ddp.ExtPacket) e
} }
case rtmp.FunctionLoopProbe: case rtmp.FunctionLoopProbe:
return fmt.Errorf("TODO: handle Loop Probes") log.Print("RTMP: TODO: handle Loop Probes")
return nil
} }
case ddp.ProtoRTMPResp: case ddp.ProtoRTMPResp:
@ -110,51 +110,22 @@ func (port *EtherTalkPort) HandleRTMP(ctx context.Context, pkt *ddp.ExtPacket) e
log.Print("RTMP: Got Response or Data") log.Print("RTMP: Got Response or Data")
dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data) dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data)
if err != nil { if err != nil {
return fmt.Errorf("unmarshal RTMP Data packet: %w", err) log.Printf("RTMP: Couldn't unmarshal RTMP Data packet: %v", err)
break
} }
peer := &EtherTalkPeer{ peer := &EtherTalkPeer{
Port: port, Port: port,
PeerAddr: dataPkt.RouterAddr, PeerAddr: dataPkt.RouterAddr,
} }
var noZones []ddp.Network for _, rt := range dataPkt.NetworkTuples {
for _, nt := range dataPkt.NetworkTuples { if err := port.Router.RouteTable.UpsertEtherTalkRoute(peer, rt.Extended, rt.RangeStart, rt.RangeEnd, rt.Distance+1); err != nil {
route, err := port.Router.RouteTable.UpsertEtherTalkRoute(peer, nt.Extended, nt.RangeStart, nt.RangeEnd, nt.Distance+1) log.Printf("RTMP: Couldn't upsert EtherTalk route: %v", err)
if err != nil {
return fmt.Errorf("upsert EtherTalk route: %v", err)
}
if len(route.ZoneNames) == 0 {
noZones = append(noZones, route.NetStart)
}
}
if len(noZones) > 0 {
// Send a ZIP Query for all networks we don't have zone names for.
// TODO: split networks to fit in multiple packets as needed
qryPkt, err := (&zip.QueryPacket{Networks: noZones}).Marshal()
if err != nil {
return fmt.Errorf("marshal ZIP Query packet: %w", err)
}
outDDP := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(qryPkt)) + atalk.DDPExtHeaderSize,
Cksum: 0,
SrcNet: port.MyAddr.Network,
SrcNode: port.MyAddr.Node,
SrcSocket: 6,
DstNet: pkt.SrcNet,
DstNode: pkt.SrcNode,
DstSocket: 6, // ZIP socket
Proto: ddp.ProtoZIP,
},
Data: qryPkt,
}
if err := port.Send(ctx, outDDP); err != nil {
return fmt.Errorf("sending ZIP Query: %w", err)
} }
} }
default: default:
return fmt.Errorf("invalid DDP type %d on socket 1", pkt.Proto) log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto)
} }
return nil return nil

View file

@ -20,10 +20,11 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"slices"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"drjosh.dev/jrouter/atalk/atp" "gitea.drjosh.dev/josh/jrouter/atalk/atp"
"drjosh.dev/jrouter/atalk/zip" "gitea.drjosh.dev/josh/jrouter/atalk/zip"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
"github.com/sfiera/multitalk/pkg/ethernet" "github.com/sfiera/multitalk/pkg/ethernet"
) )
@ -51,9 +52,6 @@ func (port *EtherTalkPort) handleZIPZIP(ctx context.Context, ddpkt *ddp.ExtPacke
case *zip.QueryPacket: case *zip.QueryPacket:
return port.handleZIPQuery(ctx, ddpkt, zipkt) return port.handleZIPQuery(ctx, ddpkt, zipkt)
case *zip.ReplyPacket:
return port.handleZIPReply(zipkt)
case *zip.GetNetInfoPacket: case *zip.GetNetInfoPacket:
return port.handleZIPGetNetInfo(ctx, ddpkt, zipkt) return port.handleZIPGetNetInfo(ctx, ddpkt, zipkt)
@ -64,7 +62,7 @@ func (port *EtherTalkPort) handleZIPZIP(ctx context.Context, ddpkt *ddp.ExtPacke
func (port *EtherTalkPort) handleZIPQuery(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.QueryPacket) error { func (port *EtherTalkPort) handleZIPQuery(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.QueryPacket) error {
log.Printf("ZIP: Got Query for networks %v", zipkt.Networks) log.Printf("ZIP: Got Query for networks %v", zipkt.Networks)
networks := port.Router.RouteTable.ZonesForNetworks(zipkt.Networks) networks := port.Router.ZoneTable.Query(zipkt.Networks)
sendReply := func(resp *zip.ReplyPacket) error { sendReply := func(resp *zip.ReplyPacket) error {
respRaw, err := resp.Marshal() respRaw, err := resp.Marshal()
@ -158,21 +156,11 @@ func (port *EtherTalkPort) handleZIPQuery(ctx context.Context, ddpkt *ddp.ExtPac
return nil return nil
} }
func (port *EtherTalkPort) handleZIPReply(zipkt *zip.ReplyPacket) error {
log.Printf("ZIP: Got Reply containing %v", zipkt.Networks)
// Integrate new zone information into route table.
for n, zs := range zipkt.Networks {
port.Router.RouteTable.AddZonesToNetwork(n, zs...)
}
return nil
}
func (port *EtherTalkPort) handleZIPGetNetInfo(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.GetNetInfoPacket) error { func (port *EtherTalkPort) handleZIPGetNetInfo(ctx context.Context, ddpkt *ddp.ExtPacket, zipkt *zip.GetNetInfoPacket) error {
log.Printf("ZIP: Got GetNetInfo for zone %q", zipkt.ZoneName) log.Printf("ZIP: Got GetNetInfo for zone %q", zipkt.ZoneName)
// The request is zoneValid if the zone name is available on this network. // The request is zoneValid if the zone name is available on this network.
zoneValid := port.AvailableZones.Contains(zipkt.ZoneName) zoneValid := slices.Contains(port.AvailableZones, zipkt.ZoneName)
// The multicast address we return depends on the validity of the zone // The multicast address we return depends on the validity of the zone
// name. // name.
@ -270,10 +258,10 @@ func (port *EtherTalkPort) handleZIPTReq(ctx context.Context, ddpkt *ddp.ExtPack
switch gzl.Function { switch gzl.Function {
case zip.FunctionGetZoneList: case zip.FunctionGetZoneList:
resp.Zones = port.Router.RouteTable.AllZoneNames() resp.Zones = port.Router.ZoneTable.AllNames()
case zip.FunctionGetLocalZones: case zip.FunctionGetLocalZones:
resp.Zones = port.AvailableZones.ToSlice() resp.Zones = port.AvailableZones
case zip.FunctionGetMyZone: case zip.FunctionGetMyZone:
// Note: This shouldn't happen on extended networks (e.g. EtherTalk) // Note: This shouldn't happen on extended networks (e.g. EtherTalk)

View file

@ -17,72 +17,145 @@
package router package router
import ( import (
"fmt"
"slices" "slices"
"sort"
"sync"
"time"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )
func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) { //const maxZoneAge = 10 * time.Minute // TODO: confirm
rt.routesMu.Lock()
defer rt.routesMu.Unlock() type Zone struct {
for r := range rt.routes { Network ddp.Network
if n < r.NetStart || n > r.NetEnd { Name string
continue LocalPort *EtherTalkPort // nil if remote (local to another router)
} LastSeen time.Time
if r.ZoneNames == nil { }
r.ZoneNames = make(StringSet)
} func (z Zone) LastSeenAgo() string {
r.ZoneNames.Insert(zs...) if z.LastSeen.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(z.LastSeen).Truncate(time.Millisecond))
}
type zoneKey struct {
network ddp.Network
name string
}
type ZoneTable struct {
mu sync.Mutex
zones map[zoneKey]*Zone
}
func NewZoneTable() *ZoneTable {
return &ZoneTable{
zones: make(map[zoneKey]*Zone),
} }
} }
func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]string { func (zt *ZoneTable) Dump() []Zone {
zt.mu.Lock()
defer zt.mu.Unlock()
zs := make([]Zone, 0, len(zt.zones))
for _, z := range zt.zones {
zs = append(zs, *z)
}
return zs
}
func (zt *ZoneTable) Upsert(network ddp.Network, name string, localPort *EtherTalkPort) {
zt.mu.Lock()
defer zt.mu.Unlock()
key := zoneKey{network, name}
z := zt.zones[key]
if z != nil {
z.LocalPort = localPort
z.LastSeen = time.Now()
return
}
zt.zones[key] = &Zone{
Network: network,
Name: name,
LocalPort: localPort,
LastSeen: time.Now(),
}
}
func (zt *ZoneTable) Query(ns []ddp.Network) map[ddp.Network][]string {
slices.Sort(ns)
zs := make(map[ddp.Network][]string) zs := make(map[ddp.Network][]string)
rt.routesMu.Lock() zt.mu.Lock()
defer rt.routesMu.Unlock() defer zt.mu.Unlock()
for r := range rt.routes { for _, z := range zt.zones {
if !r.Valid() { // if time.Since(z.LastSeen) > maxZoneAge {
continue // continue
} // }
if _, ok := slices.BinarySearch(ns, r.NetStart); ok { if _, ok := slices.BinarySearch(ns, z.Network); ok {
for z := range r.ZoneNames { zs[z.Network] = append(zs[z.Network], z.Name)
zs[r.NetStart] = append(zs[r.NetStart], z)
}
} }
} }
return zs return zs
} }
func (rt *RouteTable) RoutesForZone(zone string) []*Route { func (zt *ZoneTable) LookupName(name string) []*Zone {
rt.routesMu.Lock() zt.mu.Lock()
defer rt.routesMu.Unlock() defer zt.mu.Unlock()
var routes []*Route var zs []*Zone
for r := range rt.routes { for _, z := range zt.zones {
if !r.Valid() { if z.Name == name {
continue zs = append(zs, z)
}
if r.ZoneNames.Contains(zone) {
routes = append(routes, r)
} }
} }
return routes return zs
} }
func (rt *RouteTable) AllZoneNames() (zones []string) { // func (zt *ZoneTable) LocalNames() []string {
defer slices.Sort(zones) // zt.mu.Lock()
// seen := make(map[string]struct{})
// zs := make([]string, 0, len(zt.zones))
// for _, z := range zt.zones {
// // if time.Since(z.LastSeen) > maxZoneAge {
// // continue
// // }
// if z.Local != nil {
// continue
// }
// if _, s := seen[z.Name]; s {
// continue
// }
// seen[z.Name] = struct{}{}
// zs = append(zs, z.Name)
rt.routesMu.Lock() // }
defer rt.routesMu.Unlock() // zt.mu.Unlock()
zs := make(StringSet) // sort.Strings(zs)
for r := range rt.routes { // return zs
if !r.Valid() { // }
func (zt *ZoneTable) AllNames() []string {
zt.mu.Lock()
seen := make(map[string]struct{})
zs := make([]string, 0, len(zt.zones))
for _, z := range zt.zones {
// if time.Since(z.LastSeen) > maxZoneAge {
// continue
// }
if _, s := seen[z.Name]; s {
continue continue
} }
zs.Add(r.ZoneNames) seen[z.Name] = struct{}{}
zs = append(zs, z.Name)
} }
zt.mu.Unlock()
return zs.ToSlice() sort.Strings(zs)
return zs
} }