Compare commits

..

2 commits

Author SHA1 Message Date
6e97109946
Redact list source 2024-05-08 15:41:39 +10:00
67daa8a4ae
Remove IP addresses from logs and status page 2024-05-08 15:33:42 +10:00
21 changed files with 197 additions and 483 deletions

View file

@ -1,12 +0,0 @@
FROM golang:alpine AS builder
WORKDIR /go/src/jrouter
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg/mod \
apk add build-base libpcap-dev && \
CGO_ENABLED=1 CGO_FLAGS='-static' go build -v -o jrouter .
FROM alpine:latest
COPY --from=builder /go/src/jrouter/jrouter /usr/bin/
RUN apk add libpcap
ENTRYPOINT ["/usr/bin/jrouter", "-config", "/etc/jrouter/jrouter.yaml"]

View file

@ -49,50 +49,16 @@ First, set up a `jrouter.yaml` (use the one in this repo as an example).
TODO: explain the configuration file TODO: explain the configuration file
### Building and running directly Building and running:
1. Install [Go](https://go.dev/dl). ```shell
2. Run these commands (for Debian-variety Linuxen, e.g. Ubuntu, Raspbian, Mint...): sudo apt install libpcap-dev
```shell go install gitea.drjosh.dev/josh/jrouter@latest
sudo apt install git build-essential libpcap-dev sudo setcap 'CAP_NET_BIND_SERVICE=ep CAP_NET_RAW=ep' ~/go/bin/jrouter
go install drjosh.dev/jrouter@latest ~/go/bin/jrouter
sudo setcap 'CAP_NET_BIND_SERVICE=ep CAP_NET_RAW=ep' ~/go/bin/jrouter ```
```
3. Configure `jrouter.yaml`
4. To run:
```shell
~/go/bin/jrouter
```
Notes: * `NET_BIND_SERVICE` is needed to bind UDP port 387 (for talking between AIRs)
* `NET_RAW` is needed for EtherTalk
* `git` is needed for `go install` to fetch the module TODO: instructions for non-Linux machines
* `build-essential` and`libpcap-dev` are needed for [gopacket](https://github.com/google/gopacket), which uses [CGo](https://pkg.go.dev/cmd/cgo)
* `NET_BIND_SERVICE` is needed for `jrouter` to bind UDP port 387 (for talking between AIRs)
* `NET_RAW` is needed for `jrouter` to listen for and send EtherTalk packets
TODO: instructions for non-Linux / non-Debian-like machines
### With Docker
1. Clone the repo and `cd` into it.
2. `docker build -t jrouter .`
3. Example `docker run` command:
```shell
docker run \
-v ./cfg:/etc/jrouter \
--cap-add NET_RAW \
--net host \
--name jrouter \
jrouter
```
Notes:
* Put `jrouter.yaml` inside a `cfg` directory (or some path of your choice and bind-mount it at `/etc/jrouter`) for it to find the config file.
* `--cap-add NET_RAW` and `--net host` is needed for EtherTalk access to the network interface.
* By using `--net host`, the default AURP port (387) will be bound without `-p`.
## Bug reports? Feature requests? Complaints? Praise?
You can contact me on the Fediverse at @DrJosh9000@cloudisland.nz, or email me at josh.deprez@gmail.com.

View file

@ -21,7 +21,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
) )
const ( const (

View file

@ -21,8 +21,8 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"drjosh.dev/jrouter/atalk/atp" "gitea.drjosh.dev/josh/jrouter/atalk/atp"
) )
type GetZonesPacket struct { type GetZonesPacket struct {

View file

@ -19,7 +19,7 @@ package zip
import ( import (
"testing" "testing"
"drjosh.dev/jrouter/atalk/atp" "gitea.drjosh.dev/josh/jrouter/atalk/atp"
) )
func FuzzUnmarshalPacket(f *testing.F) { func FuzzUnmarshalPacket(f *testing.F) {

View file

@ -173,12 +173,9 @@ func (e EventTuples) WriteTo(w io.Writer) (int64, error) {
} }
func parseEventTuples(p []byte) (EventTuples, error) { func parseEventTuples(p []byte) (EventTuples, error) {
// Event tuples can be 1, 4, or 6 bytes long. But the only type of length 1 // Each event tuple is at least 4 bytes, so we need to store at most
// is the Null event type sent to probe whether or not the data receiver is // len(p)/4 of them.
// still listening. If that's present there probably aren't any other e := make(EventTuples, 0, len(p)/4)
// tuples. Hence len(p)/4 (rounded up) is a reasonable estimate of max tuple
// count.
e := make(EventTuples, 0, (len(p)+3)/4)
for len(p) > 0 { for len(p) > 0 {
et, nextp, err := parseEventTuple(p) et, nextp, err := parseEventTuple(p)
if err != nil { if err != nil {
@ -201,10 +198,6 @@ type EventTuple struct {
func (et *EventTuple) WriteTo(w io.Writer) (int64, error) { func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
a := acc(w) a := acc(w)
a.write8(uint8(et.EventCode)) a.write8(uint8(et.EventCode))
if et.EventCode == EventCodeNull {
// null tuple
return a.ret()
}
a.write16(uint16(et.RangeStart)) a.write16(uint16(et.RangeStart))
if !et.Extended { if !et.Extended {
// non-extended tuple // non-extended tuple
@ -218,18 +211,12 @@ func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
} }
func parseEventTuple(p []byte) (EventTuple, []byte, error) { func parseEventTuple(p []byte) (EventTuple, []byte, error) {
if len(p) < 1 { if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for any network event tuple", len(p)) return EventTuple{}, p, fmt.Errorf("insufficient input length %d for network event tuple", len(p))
} }
var et EventTuple var et EventTuple
et.EventCode = EventCode(p[0]) et.EventCode = EventCode(p[0])
if et.EventCode == EventCodeNull {
return et, p[1:], nil
}
if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for non-Null network event tuple", len(p))
}
et.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[1:3])) et.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[1:3]))
et.RangeEnd = et.RangeStart et.RangeEnd = et.RangeStart
et.Distance = p[3] et.Distance = p[3]

10
go.mod
View file

@ -1,12 +1,14 @@
module drjosh.dev/jrouter module gitea.drjosh.dev/josh/jrouter
go 1.22.0 go 1.22.0
require ( require (
github.com/google/go-cmp v0.6.0 github.com/google/go-cmp v0.6.0
github.com/google/gopacket v1.1.19
github.com/sfiera/multitalk v0.2.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
require golang.org/x/sys v0.5.0 // indirect require (
github.com/google/gopacket v1.1.19 // indirect
github.com/sfiera/multitalk v0.2.0 // indirect
golang.org/x/sys v0.5.0 // indirect
)

10
go.sum
View file

@ -1,32 +1,24 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sfiera/multitalk v0.2.0 h1:FLLw7L6yNUTOdjMk2EgKnOwMKeu++t5zinv5qpTC2JM= github.com/sfiera/multitalk v0.2.0 h1:FLLw7L6yNUTOdjMk2EgKnOwMKeu++t5zinv5qpTC2JM=
github.com/sfiera/multitalk v0.2.0/go.mod h1:jGkgjgiyfuRMNKwfSjo/xR6b9nd10XF4smQAtApXFHc= github.com/sfiera/multitalk v0.2.0/go.mod h1:jGkgjgiyfuRMNKwfSjo/xR6b9nd10XF4smQAtApXFHc=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

72
main.go
View file

@ -30,14 +30,15 @@ import (
"os" "os"
"os/signal" "os/signal"
"regexp" "regexp"
"runtime/debug"
"slices" "slices"
"strings" "strings"
"sync" "sync"
"time" "time"
"drjosh.dev/jrouter/aurp" "gitea.drjosh.dev/josh/jrouter/aurp"
"drjosh.dev/jrouter/router" "gitea.drjosh.dev/josh/jrouter/router"
"drjosh.dev/jrouter/status" "gitea.drjosh.dev/josh/jrouter/status"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
@ -64,13 +65,13 @@ const routingTableTemplate = `
<td>{{$route.LastSeenAgo}}</td> <td>{{$route.LastSeenAgo}}</td>
<td> <td>
{{- with $route.AURPPeer -}} {{- with $route.AURPPeer -}}
{{.RemoteAddr}} [redacted]
{{- end -}} {{- end -}}
{{- with $route.EtherTalkPeer -}} {{- with $route.EtherTalkPeer -}}
{{.Port.Device}} {{.PeerAddr.Network}}.{{.PeerAddr.Node}} {{.Port.Device}} {{.PeerAddr.Network}}.{{.PeerAddr.Node}}
{{- end -}} {{- end -}}
{{- with $route.EtherTalkDirect -}} {{- with $route.EtherTalkDirect -}}
{{.Device}} {{.NetStart}}-{{.NetEnd}} {{.Device}}
{{- end -}} {{- end -}}
</td> </td>
</tr> </tr>
@ -86,24 +87,14 @@ const peerTableTemplate = `
<th>Remote addr</th> <th>Remote addr</th>
<th>Receiver state</th> <th>Receiver state</th>
<th>Sender state</th> <th>Sender state</th>
<th>Last heard from</th>
<th>Last reconnect</th>
<th>Last update</th>
<th>Last send</th>
<th>Send retries</th>
</tr></thead> </tr></thead>
<tbody> <tbody>
{{range $peer := . }} {{range $peer := . }}
<tr> <tr>
<td>{{$peer.ConfiguredAddr}}</td> <td>[redacted]</td>
<td>{{$peer.RemoteAddr}}</td> <td>[redacted]</td>
<td>{{$peer.ReceiverState}}</td> <td>{{$peer.ReceiverState}}</td>
<td>{{$peer.SenderState}}</td> <td>{{$peer.SenderState}}</td>
<td>{{$peer.LastHeardFromAgo}}</td>
<td>{{$peer.LastReconnectAgo}}</td>
<td>{{$peer.LastUpdateAgo}}</td>
<td>{{$peer.LastSendAgo}}</td>
<td>{{$peer.SendRetries}}</td>
</tr> </tr>
{{end}} {{end}}
</tbody> </tbody>
@ -116,9 +107,7 @@ var configFilePath = flag.String("config", "jrouter.yaml", "Path to configuratio
func main() { func main() {
// For some reason it occasionally panics and the panics have no traceback? // For some reason it occasionally panics and the panics have no traceback?
// This didn't help: debug.SetTraceback("all")
// debug.SetTraceback("all")
// I think it's calling recover in a defer too broadly.
flag.Parse() flag.Parse()
log.Println("jrouter") log.Println("jrouter")
@ -153,9 +142,9 @@ func main() {
} }
localDI := aurp.IPDomainIdentifier(localIP) localDI := aurp.IPDomainIdentifier(localIP)
log.Printf("Using %v as local domain identifier", localIP) // log.Printf("Using %v as local domain identifier", localIP)
log.Printf("EtherTalk configuration: %+v", cfg.EtherTalk) // log.Printf("EtherTalk configuration: %+v", cfg.EtherTalk)
ln, err := net.ListenUDP("udp4", &net.UDPAddr{Port: int(cfg.ListenPort)}) ln, err := net.ListenUDP("udp4", &net.UDPAddr{Port: int(cfg.ListenPort)})
if err != nil { if err != nil {
@ -257,12 +246,12 @@ func main() {
// ------------------------- Configured peer setup ------------------------ // ------------------------- Configured peer setup ------------------------
if cfg.PeerListURL != "" { if cfg.PeerListURL != "" {
log.Printf("Fetching peer list from %s...", cfg.PeerListURL) log.Print("Fetching peer list...")
existing := len(cfg.Peers) existing := len(cfg.Peers)
func() { func() {
resp, err := http.Get(cfg.PeerListURL) resp, err := http.Get(cfg.PeerListURL)
if err != nil { if err != nil {
log.Fatalf("Couldn't fetch peer list: %v", err) log.Fatalf("Couldn't fetch peer list!")
} }
defer resp.Body.Close() defer resp.Body.Close()
@ -288,17 +277,28 @@ func main() {
raddr, err := net.ResolveUDPAddr("udp4", peerStr) raddr, err := net.ResolveUDPAddr("udp4", peerStr)
if err != nil { if err != nil {
log.Printf("couldn't resolve UDP address, skipping: %v", err) log.Print("couldn't resolve UDP address, skipping peer")
continue continue
} }
log.Printf("resolved %q to %v", peerStr, raddr) //log.Printf("resolved %q to %v", peerStr, raddr)
if raddr.IP.Equal(localIP) { if raddr.IP.Equal(localIP) {
log.Printf("%v == %v == me, skipping", peerStr, raddr) //log.Print("peer == me, skipping")
continue continue
} }
peer := router.NewAURPPeer(routes, ln, peerStr, raddr, localDI, nil, nextConnID) peer := &router.AURPPeer{
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: aurp.IPDomainIdentifier(raddr.IP),
LocalConnID: nextConnID,
},
UDPConn: ln,
ConfiguredAddr: peerStr,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RouteTable: routes,
}
aurp.Inc(&nextConnID) aurp.Inc(&nextConnID)
peersMu.Lock() peersMu.Lock()
peers[udpAddrFromNet(raddr)] = peer peers[udpAddrFromNet(raddr)] = peer
@ -381,7 +381,7 @@ func main() {
return return
} }
log.Printf("AURP: Got %T from %v (%v)", pkt, raddr, dh.SourceDI) // log.Printf("AURP: Got %T from %v (%v)", pkt, raddr, dh.SourceDI)
// Existing peer? // Existing peer?
ra := udpAddrFromNet(raddr) ra := udpAddrFromNet(raddr)
@ -389,12 +389,22 @@ func main() {
pr := peers[ra] pr := peers[ra]
if pr == nil { if pr == nil {
if !cfg.OpenPeering { if !cfg.OpenPeering {
log.Printf("AURP: Got packet from %v but it's not in my config and open peering is disabled; dropping the packet", raddr) log.Print("AURP: Got packet but it's not in my config and open peering is disabled; dropping the packet")
peersMu.Unlock() peersMu.Unlock()
continue continue
} }
// New peer! // New peer!
pr = router.NewAURPPeer(routes, ln, "", raddr, localDI, dh.SourceDI, nextConnID) pr = &router.AURPPeer{
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: dh.SourceDI, // platinum rule
LocalConnID: nextConnID,
},
UDPConn: ln,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RouteTable: routes,
}
aurp.Inc(&nextConnID) aurp.Inc(&nextConnID)
peers[ra] = pr peers[ra] = pr
goPeerHandler(pr) goPeerHandler(pr)

View file

@ -24,7 +24,7 @@ import (
"sync" "sync"
"time" "time"
"drjosh.dev/jrouter/status" "gitea.drjosh.dev/josh/jrouter/status"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/aarp" "github.com/sfiera/multitalk/pkg/aarp"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
@ -37,8 +37,6 @@ const (
maxAMTEntryAge = 30 * time.Second maxAMTEntryAge = 30 * time.Second
aarpRequestRetransmit = 1 * time.Second aarpRequestRetransmit = 1 * time.Second
aarpRequestTimeout = 10 * time.Second aarpRequestTimeout = 10 * time.Second
aarpBodyLength = 28 // bytes
) )
const aarpStatusTemplate = ` const aarpStatusTemplate = `
@ -183,17 +181,8 @@ func (a *AARPMachine) Run(ctx context.Context) error {
a.incomingCh = nil a.incomingCh = nil
} }
// sfiera/multitalk will return an "excess data" error if the
// payload is too big. Most traffic I've seen locally does not have
// this problem, but I've seen one report with some junk trailing
// data on AARP packets.
payload := ethFrame.Payload
if len(payload) > aarpBodyLength {
payload = payload[:aarpBodyLength]
}
var aapkt aarp.Packet var aapkt aarp.Packet
if err := aarp.Unmarshal(payload, &aapkt); err != nil { if err := aarp.Unmarshal(ethFrame.Payload, &aapkt); err != nil {
log.Printf("Couldn't unmarshal AARP packet: %v", err) log.Printf("Couldn't unmarshal AARP packet: %v", err)
continue continue
} }
@ -337,9 +326,6 @@ func (a *AARPMachine) heyThatsMe(targ aarp.AddrPair) error {
if err != nil { if err != nil {
return err return err
} }
if len(respFrameRaw) < 64 {
respFrameRaw = append(respFrameRaw, make([]byte, 64-len(respFrameRaw))...)
}
return a.pcapHandle.WritePacketData(respFrameRaw) return a.pcapHandle.WritePacketData(respFrameRaw)
} }
@ -353,9 +339,6 @@ func (a *AARPMachine) probe() error {
if err != nil { if err != nil {
return err return err
} }
if len(probeFrameRaw) < 64 {
probeFrameRaw = append(probeFrameRaw, make([]byte, 64-len(probeFrameRaw))...)
}
return a.pcapHandle.WritePacketData(probeFrameRaw) return a.pcapHandle.WritePacketData(probeFrameRaw)
} }
@ -369,9 +352,6 @@ func (a *AARPMachine) request(ddpAddr ddp.Addr) error {
if err != nil { if err != nil {
return err return err
} }
if len(reqFrameRaw) < 64 {
reqFrameRaw = append(reqFrameRaw, make([]byte, 64-len(reqFrameRaw))...)
}
return a.pcapHandle.WritePacketData(reqFrameRaw) return a.pcapHandle.WritePacketData(reqFrameRaw)
} }
@ -398,7 +378,10 @@ func (e AMTEntry) Valid() bool {
// LastUpdatedAgo is a friendly string reporting how long ago the entry was // LastUpdatedAgo is a friendly string reporting how long ago the entry was
// updated/resolved. // updated/resolved.
func (e AMTEntry) LastUpdatedAgo() string { func (e AMTEntry) LastUpdatedAgo() string {
return ago(e.LastUpdated) if e.LastUpdated.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(e.LastUpdated).Truncate(time.Millisecond))
} }
// addressMappingTable implements a concurrent-safe Address Mapping Table for // addressMappingTable implements a concurrent-safe Address Mapping Table for

View file

@ -20,7 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"drjosh.dev/jrouter/atalk/aep" "gitea.drjosh.dev/josh/jrouter/atalk/aep"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )

View file

@ -16,11 +16,6 @@
package router package router
import (
"fmt"
"time"
)
// StringSet is a set of strings. // StringSet is a set of strings.
// Yep, yet another string set implementation. Took me 2 minutes to write *shrug* // Yep, yet another string set implementation. Took me 2 minutes to write *shrug*
type StringSet map[string]struct{} type StringSet map[string]struct{}
@ -55,11 +50,3 @@ func SetFromSlice(ss []string) StringSet {
set.Insert(ss...) set.Insert(ss...)
return set return set
} }
// ago is a helper for formatting times.
func ago(t time.Time) string {
if t.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(t).Truncate(time.Millisecond))
}

View file

@ -21,8 +21,8 @@ import (
"fmt" "fmt"
"log" "log"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"drjosh.dev/jrouter/atalk/nbp" "gitea.drjosh.dev/josh/jrouter/atalk/nbp"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )

View file

@ -20,7 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"drjosh.dev/jrouter/atalk/nbp" "gitea.drjosh.dev/josh/jrouter/atalk/nbp"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )

View file

@ -24,7 +24,7 @@ import (
"sync" "sync"
"time" "time"
"drjosh.dev/jrouter/aurp" "gitea.drjosh.dev/josh/jrouter/aurp"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )
@ -114,75 +114,9 @@ type AURPPeer struct {
// Route table (the peer will add/remove/update routes and zones) // Route table (the peer will add/remove/update routes and zones)
RouteTable *RouteTable RouteTable *RouteTable
// Event tuples yet to be sent to this peer in an RI-Upd. mu sync.RWMutex
pendingEventsMu sync.Mutex rstate ReceiverState
pendingEvents aurp.EventTuples sstate SenderState
// The internal states below are only set within the Handle loop, but can
// be read concurrently from outside.
mu sync.RWMutex
rstate ReceiverState
sstate SenderState
lastReconnect time.Time
lastHeardFrom time.Time
lastSend time.Time // TODO: clarify use of lastSend / sendRetries
lastUpdate time.Time
sendRetries int
}
func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, raddr *net.UDPAddr, localDI, remoteDI aurp.DomainIdentifier, connID uint16) *AURPPeer {
if remoteDI == nil {
remoteDI = aurp.IPDomainIdentifier(raddr.IP)
}
return &AURPPeer{
Transport: &aurp.Transport{
LocalDI: localDI,
RemoteDI: remoteDI,
LocalConnID: connID,
},
UDPConn: udpConn,
ConfiguredAddr: peerAddr,
RemoteAddr: raddr,
ReceiveCh: make(chan aurp.Packet, 1024),
RouteTable: routes,
}
}
func (p *AURPPeer) addPendingEvent(ec aurp.EventCode, route *Route) {
// Don't advertise routes to AURP peers to other AURP peers
if route.AURPPeer != nil {
return
}
et := aurp.EventTuple{
EventCode: ec,
Extended: route.Extended,
RangeStart: route.NetStart,
Distance: route.Distance,
RangeEnd: route.NetEnd,
}
switch ec {
case aurp.EventCodeND, aurp.EventCodeNRC:
et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0."
}
p.pendingEventsMu.Lock()
defer p.pendingEventsMu.Unlock()
p.pendingEvents = append(p.pendingEvents, et)
}
func (p *AURPPeer) RouteAdded(route *Route) {
p.addPendingEvent(aurp.EventCodeNA, route)
}
func (p *AURPPeer) RouteDeleted(route *Route) {
p.addPendingEvent(aurp.EventCodeND, route)
}
func (p *AURPPeer) RouteDistanceChanged(route *Route) {
p.addPendingEvent(aurp.EventCodeNDC, route)
}
func (p *AURPPeer) RouteForwarderChanged(route *Route) {
p.addPendingEvent(aurp.EventCodeNRC, route)
} }
func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error { func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
@ -190,7 +124,7 @@ func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
if err != nil { if err != nil {
return err return err
} }
_, err = p.send(p.Transport.NewAppleTalkPacket(outPkt)) _, err = p.Send(p.Transport.NewAppleTalkPacket(outPkt))
return err return err
} }
@ -206,36 +140,6 @@ func (p *AURPPeer) SenderState() SenderState {
return p.sstate return p.sstate
} }
func (p *AURPPeer) LastReconnectAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastReconnect)
}
func (p *AURPPeer) LastHeardFromAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastHeardFrom)
}
func (p *AURPPeer) LastSendAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastSend)
}
func (p *AURPPeer) LastUpdateAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastUpdate)
}
func (p *AURPPeer) SendRetries() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.sendRetries
}
func (p *AURPPeer) setRState(rstate ReceiverState) { func (p *AURPPeer) setRState(rstate ReceiverState) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -248,42 +152,6 @@ func (p *AURPPeer) setSState(sstate SenderState) {
p.sstate = sstate p.sstate = sstate
} }
func (p *AURPPeer) incSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries++
}
func (p *AURPPeer) resetSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries = 0
}
func (p *AURPPeer) bumpLastHeardFrom() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastHeardFrom = time.Now()
}
func (p *AURPPeer) bumpLastReconnect() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastReconnect = time.Now()
}
func (p *AURPPeer) bumpLastSend() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSend = time.Now()
}
func (p *AURPPeer) bumpLastUpdate() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastUpdate = time.Now()
}
func (p *AURPPeer) disconnect() { func (p *AURPPeer) disconnect() {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -291,39 +159,34 @@ func (p *AURPPeer) disconnect() {
p.sstate = SenderUnconnected p.sstate = SenderUnconnected
} }
// send encodes and sends pkt to the remote host. // Send encodes and sends pkt to the remote host.
func (p *AURPPeer) send(pkt aurp.Packet) (int, error) { func (p *AURPPeer) Send(pkt aurp.Packet) (int, error) {
var b bytes.Buffer var b bytes.Buffer
if _, err := pkt.WriteTo(&b); err != nil { if _, err := pkt.WriteTo(&b); err != nil {
return 0, err return 0, err
} }
log.Printf("AURP Peer: Sending %T (len %d) to %v", pkt, b.Len(), p.RemoteAddr) // log.Printf("AURP Peer: Sending %T (len %d) to %v", pkt, b.Len(), p.RemoteAddr)
return p.UDPConn.WriteToUDP(b.Bytes(), p.RemoteAddr) return p.UDPConn.WriteToUDP(b.Bytes(), p.RemoteAddr)
} }
func (p *AURPPeer) Handle(ctx context.Context) error { func (p *AURPPeer) Handle(ctx context.Context) error {
// Stop listening to events if the goroutine exits
defer p.RouteTable.RemoveObserver(p)
rticker := time.NewTicker(1 * time.Second) rticker := time.NewTicker(1 * time.Second)
defer rticker.Stop() defer rticker.Stop()
sticker := time.NewTicker(1 * time.Second) sticker := time.NewTicker(1 * time.Second)
defer sticker.Stop() defer sticker.Stop()
p.mu.Lock() lastReconnect := time.Now()
p.lastReconnect = time.Now() lastHeardFrom := time.Now()
p.lastHeardFrom = time.Now() lastSend := time.Now() // TODO: clarify use of lastSend / sendRetries
p.lastSend = time.Now() // TODO: clarify use of lastSend / sendRetries lastUpdate := time.Now()
p.lastUpdate = time.Now() sendRetries := 0
p.sendRetries = 0
p.mu.Unlock()
var lastRISent aurp.Packet var lastRISent aurp.Packet
p.disconnect() p.disconnect()
// Write an Open-Req packet // Write an Open-Req packet
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -339,7 +202,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
// Send a best-effort Router Down before returning // Send a best-effort Router Down before returning
lastRISent = p.Transport.NewRDPacket(aurp.ErrCodeNormalClose) lastRISent = p.Transport.NewRDPacket(aurp.ErrCodeNormalClose)
if _, err := p.send(lastRISent); err != nil { if _, err := p.Send(lastRISent); err != nil {
log.Printf("Couldn't send RD packet: %v", err) log.Printf("Couldn't send RD packet: %v", err)
} }
return ctx.Err() return ctx.Err()
@ -347,60 +210,60 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case <-rticker.C: case <-rticker.C:
switch p.rstate { switch p.rstate {
case ReceiverWaitForOpenRsp: case ReceiverWaitForOpenRsp:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if p.sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
break break
} }
// Send another Open-Req // Send another Open-Req
p.incSendRetries() sendRetries++
p.bumpLastSend() lastSend = time.Now()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
case ReceiverConnected: case ReceiverConnected:
// Check LHFT, send tickle? // Check LHFT, send tickle?
if time.Since(p.lastHeardFrom) <= lastHeardFromTimer { if time.Since(lastHeardFrom) <= lastHeardFromTimer {
break break
} }
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil { if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err) log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err return err
} }
p.setRState(ReceiverWaitForTickleAck) p.setRState(ReceiverWaitForTickleAck)
p.resetSendRetries() sendRetries = 0
p.bumpLastSend() lastSend = time.Now()
case ReceiverWaitForTickleAck: case ReceiverWaitForTickleAck:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if p.sendRetries >= tickleRetryLimit { if sendRetries >= tickleRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p) p.RouteTable.DeleteAURPPeer(p)
break break
} }
p.incSendRetries() sendRetries++
p.bumpLastSend() lastSend = time.Now()
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil { if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err) log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err return err
} }
// still in Wait For Tickle-Ack // still in Wait For Tickle-Ack
case ReceiverWaitForRIRsp: case ReceiverWaitForRIRsp:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if p.sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p) p.RouteTable.DeleteAURPPeer(p)
@ -409,9 +272,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// RI-Req is stateless, so we don't need to cache the one we // RI-Req is stateless, so we don't need to cache the one we
// sent earlier just to send it again // sent earlier just to send it again
p.incSendRetries() sendRetries++
p.bumpLastSend() if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err return err
} }
@ -420,18 +282,18 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case ReceiverUnconnected: case ReceiverUnconnected:
// Data receiver is unconnected. If data sender is connected, // Data receiver is unconnected. If data sender is connected,
// send a null RI-Upd to check if the sender is also unconnected // send a null RI-Upd to check if the sender is also unconnected
if p.sstate == SenderConnected && time.Since(p.lastSend) > sendRetryTimer { if p.sstate == SenderConnected && time.Since(lastSend) > sendRetryTimer {
if p.sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection") log.Print("AURP Peer: Send retry limit reached while probing sender connect, closing connection")
} }
p.incSendRetries() sendRetries++
p.bumpLastSend() lastSend = time.Now()
aurp.Inc(&p.Transport.LocalSeq) aurp.Inc(&p.Transport.LocalSeq)
events := aurp.EventTuples{{ events := aurp.EventTuples{{
EventCode: aurp.EventCodeNull, EventCode: aurp.EventCodeNull,
}} }}
lastRISent = p.Transport.NewRIUpdPacket(events) lastRISent = p.Transport.NewRIUpdPacket(events)
if _, err := p.send(lastRISent); err != nil { if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err return err
} }
@ -440,23 +302,23 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if p.ConfiguredAddr != "" { if p.ConfiguredAddr != "" {
// Periodically try to reconnect, if this peer is in the config file // Periodically try to reconnect, if this peer is in the config file
if time.Since(p.lastReconnect) <= reconnectTimer { if time.Since(lastReconnect) <= reconnectTimer {
break break
} }
// In case it's a DNS name, re-resolve it before reconnecting // In case it's a DNS name, re-resolve it before reconnecting
raddr, err := net.ResolveUDPAddr("udp4", p.ConfiguredAddr) raddr, err := net.ResolveUDPAddr("udp4", p.ConfiguredAddr)
if err != nil { if err != nil {
log.Printf("couldn't resolve UDP address, skipping: %v", err) log.Print("couldn't resolve UDP address, skipping")
break break
} }
log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr) // log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr)
p.RemoteAddr = raddr p.RemoteAddr = raddr
p.bumpLastReconnect() lastReconnect = time.Now()
p.resetSendRetries() sendRetries = 0
p.bumpLastSend() lastSend = time.Now()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -470,64 +332,40 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing // Do nothing
case SenderConnected: case SenderConnected:
if time.Since(p.lastUpdate) <= updateTimer { if time.Since(lastUpdate) <= updateTimer {
break break
} }
// TODO: is there a routing update to send?
// Are there routing updates to send?
p.pendingEventsMu.Lock()
if len(p.pendingEvents) == 0 {
p.pendingEventsMu.Unlock()
break
}
// Yes - swap the slices, release the mutex, then send them
pending := p.pendingEvents
p.pendingEvents = make(aurp.EventTuples, 0, cap(pending))
p.pendingEventsMu.Unlock()
// TODO: eliminate events that cancel out (e.g. NA then ND)
// TODO: split pending events to fit within a packet
p.bumpLastUpdate()
aurp.Inc(&p.Transport.LocalSeq)
lastRISent = p.Transport.NewRIUpdPacket(pending)
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err
}
p.setSState(SenderWaitForRIUpdAck)
case SenderWaitForRIRspAck, SenderWaitForRIUpdAck: case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if lastRISent == nil { if lastRISent == nil {
log.Print("AURP Peer: sender retry: lastRISent = nil?") log.Print("AURP Peer: sender retry: lastRISent = nil?")
continue continue
} }
if p.sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached, closing connection") log.Printf("AURP Peer: Send retry limit reached, closing connection")
p.setSState(SenderUnconnected) p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
continue continue
} }
p.incSendRetries() sendRetries++
p.bumpLastSend() lastSend = time.Now()
if _, err := p.send(lastRISent); err != nil { if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err) log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err)
return err return err
} }
case SenderWaitForRDAck: case SenderWaitForRDAck:
if time.Since(p.lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
p.setSState(SenderUnconnected) p.setSState(SenderUnconnected)
p.RouteTable.RemoveObserver(p)
} }
case pkt := <-p.ReceiveCh: case pkt := <-p.ReceiveCh:
p.bumpLastHeardFrom() lastHeardFrom = time.Now()
switch pkt := pkt.(type) { switch pkt := pkt.(type) {
case *aurp.OpenReqPacket: case *aurp.OpenReqPacket:
@ -554,21 +392,19 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
orsp = p.Transport.NewOpenRspPacket(0, 1, nil) orsp = p.Transport.NewOpenRspPacket(0, 1, nil)
} }
if _, err := p.send(orsp); err != nil { if _, err := p.Send(orsp); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err) log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err)
return err return err
} }
if orsp.RateOrErrCode >= 0 { if orsp.RateOrErrCode >= 0 {
// Data sender is successfully in connected state
p.setSState(SenderConnected) p.setSState(SenderConnected)
p.RouteTable.AddObserver(p)
} }
// If receiver is unconnected, commence connecting // If receiver is unconnected, commence connecting
if p.rstate == ReceiverUnconnected { if p.rstate == ReceiverUnconnected {
p.resetSendRetries() lastSend = time.Now()
p.bumpLastSend() sendRetries = 0
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -589,8 +425,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.setRState(ReceiverConnected) p.setRState(ReceiverConnected)
// Send an RI-Req // Send an RI-Req
p.resetSendRetries() sendRetries = 0
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil { if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err return err
} }
@ -613,7 +449,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.Transport.LocalSeq = 1 p.Transport.LocalSeq = 1
// TODO: Split tuples across multiple packets as required // TODO: Split tuples across multiple packets as required
lastRISent = p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets) lastRISent = p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets)
if _, err := p.send(lastRISent); err != nil { if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err)
return err return err
} }
@ -638,7 +474,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// TODO: track which networks we don't have zone info for, and // TODO: track which networks we don't have zone info for, and
// only set SZI for those ? // only set SZI for those ?
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err)
return err return err
} }
@ -664,8 +500,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
p.setSState(SenderConnected) p.setSState(SenderConnected)
p.resetSendRetries() sendRetries = 0
p.RouteTable.AddObserver(p)
// If SZI flag is set, send ZI-Rsp (transaction) // If SZI flag is set, send ZI-Rsp (transaction)
if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 { if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {
@ -690,7 +525,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
zones := p.RouteTable.ZonesForNetworks(nets) zones := p.RouteTable.ZonesForNetworks(nets)
// TODO: split ZI-Rsp packets similarly to ZIP Replies // TODO: split ZI-Rsp packets similarly to ZIP Replies
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil { if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
} }
} }
@ -701,9 +536,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Receiver is unconnected, but their receiver sent us an // Receiver is unconnected, but their receiver sent us an
// RI-Ack for something // RI-Ack for something
// Try to reconnect? // Try to reconnect?
p.resetSendRetries() lastSend = time.Now()
p.bumpLastSend() sendRetries = 0
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
@ -711,6 +546,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
case *aurp.RIUpdPacket: case *aurp.RIUpdPacket:
var ackFlag aurp.RoutingFlag var ackFlag aurp.RoutingFlag
for _, et := range pkt.Events { for _, et := range pkt.Events {
@ -750,7 +586,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
} }
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err) log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err return err
} }
@ -764,7 +600,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.RouteTable.DeleteAURPPeer(p) p.RouteTable.DeleteAURPPeer(p)
// Respond with RI-Ack // Respond with RI-Ack
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err) log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err return err
} }
@ -774,7 +610,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIReqPacket: case *aurp.ZIReqPacket:
// TODO: split ZI-Rsp packets similarly to ZIP Replies // TODO: split ZI-Rsp packets similarly to ZIP Replies
zones := p.RouteTable.ZonesForNetworks(pkt.Networks) zones := p.RouteTable.ZonesForNetworks(pkt.Networks)
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil { if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
return err return err
} }
@ -786,7 +622,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
case *aurp.GDZLReqPacket: case *aurp.GDZLReqPacket:
if _, err := p.send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil { if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err)
return err return err
} }
@ -795,7 +631,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird") log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird")
case *aurp.GZNReqPacket: case *aurp.GZNReqPacket:
if _, err := p.send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil { if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err)
return err return err
} }
@ -805,7 +641,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.TicklePacket: case *aurp.TicklePacket:
// Immediately respond with Tickle-Ack // Immediately respond with Tickle-Ack
if _, err := p.send(p.Transport.NewTickleAckPacket()); err != nil { if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err) log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err)
return err return err
} }

View file

@ -46,8 +46,5 @@ func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error {
if err != nil { if err != nil {
return err return err
} }
if len(outFrameRaw) < 64 {
outFrameRaw = append(outFrameRaw, make([]byte, 64-len(outFrameRaw))...)
}
return p.Port.PcapHandle.WritePacketData(outFrameRaw) return p.Port.PcapHandle.WritePacketData(outFrameRaw)
} }

View file

@ -18,12 +18,11 @@ package router
import ( import (
"context" "context"
"encoding/binary"
"errors" "errors"
"io" "io"
"log" "log"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
"github.com/sfiera/multitalk/pkg/ethernet" "github.com/sfiera/multitalk/pkg/ethernet"
@ -80,18 +79,8 @@ func (port *EtherTalkPort) Serve(ctx context.Context) {
case ethertalk.AppleTalkProto: case ethertalk.AppleTalkProto:
// log.Print("Got an AppleTalk frame") // log.Print("Got an AppleTalk frame")
// Workaround for strict length checking in sfiera/multitalk
payload := ethFrame.Payload
if len(payload) < 2 {
log.Printf("Couldn't unmarshal DDP packet: too small (length = %d)", len(payload))
}
if size := binary.BigEndian.Uint16(payload[:2]) & 0x3ff; len(payload) > int(size) {
payload = payload[:size]
}
ddpkt := new(ddp.ExtPacket) ddpkt := new(ddp.ExtPacket)
if err := ddp.ExtUnmarshal(payload, ddpkt); err != nil { if err := ddp.ExtUnmarshal(ethFrame.Payload, ddpkt); err != nil {
log.Printf("Couldn't unmarshal DDP packet: %v", err) log.Printf("Couldn't unmarshal DDP packet: %v", err)
continue continue
} }
@ -200,8 +189,5 @@ func (port *EtherTalkPort) send(dstEth ethernet.Addr, pkt *ddp.ExtPacket) error
if err != nil { if err != nil {
return err return err
} }
if len(outFrameRaw) < 64 {
outFrameRaw = append(outFrameRaw, make([]byte, 64-len(outFrameRaw))...)
}
return port.PcapHandle.WritePacketData(outFrameRaw) return port.PcapHandle.WritePacketData(outFrameRaw)
} }

View file

@ -45,7 +45,10 @@ type Route struct {
} }
func (r Route) LastSeenAgo() string { func (r Route) LastSeenAgo() string {
return ago(r.LastSeen) if r.LastSeen.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(r.LastSeen).Truncate(time.Millisecond))
} }
// Valid reports whether the route is valid. // Valid reports whether the route is valid.
@ -55,40 +58,17 @@ func (r *Route) Valid() bool {
return len(r.ZoneNames) > 0 && (r.EtherTalkPeer == nil || time.Since(r.LastSeen) <= maxRouteAge) return len(r.ZoneNames) > 0 && (r.EtherTalkPeer == nil || time.Since(r.LastSeen) <= maxRouteAge)
} }
type RouteTableObserver interface {
RouteAdded(*Route)
RouteDeleted(*Route)
RouteDistanceChanged(*Route)
RouteForwarderChanged(*Route)
}
type RouteTable struct { type RouteTable struct {
routesMu sync.RWMutex mu sync.Mutex
routes map[*Route]struct{} routes map[*Route]struct{}
observersMu sync.RWMutex
observers map[RouteTableObserver]struct{}
} }
func NewRouteTable() *RouteTable { func NewRouteTable() *RouteTable {
return &RouteTable{ return &RouteTable{
routes: make(map[*Route]struct{}), routes: make(map[*Route]struct{}),
observers: make(map[RouteTableObserver]struct{}),
} }
} }
func (rt *RouteTable) AddObserver(obs RouteTableObserver) {
rt.observersMu.Lock()
defer rt.observersMu.Unlock()
rt.observers[obs] = struct{}{}
}
func (rt *RouteTable) RemoveObserver(obs RouteTableObserver) {
rt.observersMu.Lock()
defer rt.observersMu.Unlock()
delete(rt.observers, obs)
}
func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) { func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
r := &Route{ r := &Route{
Extended: true, Extended: true,
@ -100,14 +80,14 @@ func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
EtherTalkDirect: port, EtherTalkDirect: port,
} }
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
rt.routes[r] = struct{}{} rt.routes[r] = struct{}{}
} }
func (rt *RouteTable) Dump() []Route { func (rt *RouteTable) Dump() []Route {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
table := make([]Route, 0, len(rt.routes)) table := make([]Route, 0, len(rt.routes))
for r := range rt.routes { for r := range rt.routes {
@ -117,8 +97,8 @@ func (rt *RouteTable) Dump() []Route {
} }
func (rt *RouteTable) LookupRoute(network ddp.Network) *Route { func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
var bestRoute *Route var bestRoute *Route
for r := range rt.routes { for r := range rt.routes {
@ -140,8 +120,8 @@ func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
} }
func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) { func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
for route := range rt.routes { for route := range rt.routes {
if route.AURPPeer == peer { if route.AURPPeer == peer {
@ -151,8 +131,8 @@ func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
} }
func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) { func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
for route := range rt.routes { for route := range rt.routes {
if route.AURPPeer == peer && route.NetStart == network { if route.AURPPeer == peer && route.NetStart == network {
@ -162,8 +142,8 @@ func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network)
} }
func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Network, distance uint8) { func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Network, distance uint8) {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
for route := range rt.routes { for route := range rt.routes {
if route.AURPPeer == peer && route.NetStart == network { if route.AURPPeer == peer && route.NetStart == network {
@ -181,8 +161,8 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n
return nil, fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd) return nil, fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd)
} }
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
// Update? // Update?
for r := range rt.routes { for r := range rt.routes {
@ -233,16 +213,16 @@ func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, n
AURPPeer: peer, AURPPeer: peer,
} }
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
rt.routes[r] = struct{}{} rt.routes[r] = struct{}{}
return nil return nil
} }
// ValidRoutes returns all valid routes. // ValidRoutes returns all valid routes.
func (rt *RouteTable) ValidRoutes() []*Route { func (rt *RouteTable) ValidRoutes() []*Route {
rt.routesMu.RLock() rt.mu.Lock()
defer rt.routesMu.RUnlock() defer rt.mu.Unlock()
valid := make([]*Route, 0, len(rt.routes)) valid := make([]*Route, 0, len(rt.routes))
for r := range rt.routes { for r := range rt.routes {
if r.Valid() { if r.Valid() {
@ -254,8 +234,8 @@ func (rt *RouteTable) ValidRoutes() []*Route {
// ValidNonAURPRoutes returns all valid routes that were not learned via AURP. // ValidNonAURPRoutes returns all valid routes that were not learned via AURP.
func (rt *RouteTable) ValidNonAURPRoutes() []*Route { func (rt *RouteTable) ValidNonAURPRoutes() []*Route {
rt.routesMu.RLock() rt.mu.Lock()
defer rt.routesMu.RUnlock() defer rt.mu.Unlock()
valid := make([]*Route, 0, len(rt.routes)) valid := make([]*Route, 0, len(rt.routes))
for r := range rt.routes { for r := range rt.routes {
if r.AURPPeer != nil { if r.AURPPeer != nil {

View file

@ -22,10 +22,10 @@ import (
"log" "log"
"time" "time"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"drjosh.dev/jrouter/atalk/rtmp" "gitea.drjosh.dev/josh/jrouter/atalk/rtmp"
"drjosh.dev/jrouter/atalk/zip" "gitea.drjosh.dev/josh/jrouter/atalk/zip"
"drjosh.dev/jrouter/status" "gitea.drjosh.dev/josh/jrouter/status"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
) )

View file

@ -21,9 +21,9 @@ import (
"fmt" "fmt"
"log" "log"
"drjosh.dev/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"drjosh.dev/jrouter/atalk/atp" "gitea.drjosh.dev/josh/jrouter/atalk/atp"
"drjosh.dev/jrouter/atalk/zip" "gitea.drjosh.dev/josh/jrouter/atalk/zip"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
"github.com/sfiera/multitalk/pkg/ethernet" "github.com/sfiera/multitalk/pkg/ethernet"
) )

View file

@ -23,8 +23,8 @@ import (
) )
func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) { func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
for r := range rt.routes { for r := range rt.routes {
if n < r.NetStart || n > r.NetEnd { if n < r.NetStart || n > r.NetEnd {
continue continue
@ -39,8 +39,8 @@ func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) {
func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]string { func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]string {
zs := make(map[ddp.Network][]string) zs := make(map[ddp.Network][]string)
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
for r := range rt.routes { for r := range rt.routes {
if !r.Valid() { if !r.Valid() {
continue continue
@ -55,8 +55,8 @@ func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]strin
} }
func (rt *RouteTable) RoutesForZone(zone string) []*Route { func (rt *RouteTable) RoutesForZone(zone string) []*Route {
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
var routes []*Route var routes []*Route
for r := range rt.routes { for r := range rt.routes {
@ -73,8 +73,8 @@ func (rt *RouteTable) RoutesForZone(zone string) []*Route {
func (rt *RouteTable) AllZoneNames() (zones []string) { func (rt *RouteTable) AllZoneNames() (zones []string) {
defer slices.Sort(zones) defer slices.Sort(zones)
rt.routesMu.Lock() rt.mu.Lock()
defer rt.routesMu.Unlock() defer rt.mu.Unlock()
zs := make(StringSet) zs := make(StringSet)
for r := range rt.routes { for r := range rt.routes {