Compare commits
17 commits
no-ip-addr
...
main
Author | SHA1 | Date | |
---|---|---|---|
698e22245f | |||
ca0e81eaa1 | |||
100d70dccf | |||
0f8297e670 | |||
8d04a405f5 | |||
4dbf1b6f64 | |||
668fe5e722 | |||
2f1b4c1ce1 | |||
d063075907 | |||
d498f5d6cc | |||
a14ae919ff | |||
9c808cbf63 | |||
9e3b311b56 | |||
d25c2eb362 | |||
9fe09c0a9b | |||
331da095c3 | |||
3067daa264 |
21 changed files with 466 additions and 180 deletions
12
Dockerfile
Normal file
12
Dockerfile
Normal file
|
@ -0,0 +1,12 @@
|
|||
FROM golang:alpine AS builder
|
||||
WORKDIR /go/src/jrouter
|
||||
COPY . .
|
||||
RUN --mount=type=cache,target=/root/.cache/go-build \
|
||||
--mount=type=cache,target=/go/pkg/mod \
|
||||
apk add build-base libpcap-dev && \
|
||||
CGO_ENABLED=1 CGO_FLAGS='-static' go build -v -o jrouter .
|
||||
|
||||
FROM alpine:latest
|
||||
COPY --from=builder /go/src/jrouter/jrouter /usr/bin/
|
||||
RUN apk add libpcap
|
||||
ENTRYPOINT ["/usr/bin/jrouter", "-config", "/etc/jrouter/jrouter.yaml"]
|
56
README.md
56
README.md
|
@ -37,7 +37,7 @@ Things I plan to fix Real Soon Now:
|
|||
|
||||
Things I plan to fix At Some Point:
|
||||
|
||||
* For expediency I made it act as a _seed router_. At some point I might add
|
||||
* For expediency I made it act as a _seed router_. At some point I might add
|
||||
"soft seed" functionality.
|
||||
|
||||
|
||||
|
@ -49,16 +49,50 @@ First, set up a `jrouter.yaml` (use the one in this repo as an example).
|
|||
|
||||
TODO: explain the configuration file
|
||||
|
||||
Building and running:
|
||||
### Building and running directly
|
||||
|
||||
```shell
|
||||
sudo apt install libpcap-dev
|
||||
go install gitea.drjosh.dev/josh/jrouter@latest
|
||||
sudo setcap 'CAP_NET_BIND_SERVICE=ep CAP_NET_RAW=ep' ~/go/bin/jrouter
|
||||
~/go/bin/jrouter
|
||||
```
|
||||
1. Install [Go](https://go.dev/dl).
|
||||
2. Run these commands (for Debian-variety Linuxen, e.g. Ubuntu, Raspbian, Mint...):
|
||||
```shell
|
||||
sudo apt install git build-essential libpcap-dev
|
||||
go install drjosh.dev/jrouter@latest
|
||||
sudo setcap 'CAP_NET_BIND_SERVICE=ep CAP_NET_RAW=ep' ~/go/bin/jrouter
|
||||
```
|
||||
3. Configure `jrouter.yaml`
|
||||
4. To run:
|
||||
```shell
|
||||
~/go/bin/jrouter
|
||||
```
|
||||
|
||||
* `NET_BIND_SERVICE` is needed to bind UDP port 387 (for talking between AIRs)
|
||||
* `NET_RAW` is needed for EtherTalk
|
||||
Notes:
|
||||
|
||||
TODO: instructions for non-Linux machines
|
||||
* `git` is needed for `go install` to fetch the module
|
||||
* `build-essential` and`libpcap-dev` are needed for [gopacket](https://github.com/google/gopacket), which uses [CGo](https://pkg.go.dev/cmd/cgo)
|
||||
* `NET_BIND_SERVICE` is needed for `jrouter` to bind UDP port 387 (for talking between AIRs)
|
||||
* `NET_RAW` is needed for `jrouter` to listen for and send EtherTalk packets
|
||||
|
||||
TODO: instructions for non-Linux / non-Debian-like machines
|
||||
|
||||
### With Docker
|
||||
|
||||
1. Clone the repo and `cd` into it.
|
||||
2. `docker build -t jrouter .`
|
||||
3. Example `docker run` command:
|
||||
```shell
|
||||
docker run \
|
||||
-v ./cfg:/etc/jrouter \
|
||||
--cap-add NET_RAW \
|
||||
--net host \
|
||||
--name jrouter \
|
||||
jrouter
|
||||
```
|
||||
|
||||
Notes:
|
||||
|
||||
* Put `jrouter.yaml` inside a `cfg` directory (or some path of your choice and bind-mount it at `/etc/jrouter`) for it to find the config file.
|
||||
* `--cap-add NET_RAW` and `--net host` is needed for EtherTalk access to the network interface.
|
||||
* By using `--net host`, the default AURP port (387) will be bound without `-p`.
|
||||
|
||||
## Bug reports? Feature requests? Complaints? Praise?
|
||||
|
||||
You can contact me on the Fediverse at @DrJosh9000@cloudisland.nz, or email me at josh.deprez@gmail.com.
|
||||
|
|
|
@ -21,7 +21,7 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk"
|
||||
"drjosh.dev/jrouter/atalk"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -21,8 +21,8 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk"
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk/atp"
|
||||
"drjosh.dev/jrouter/atalk"
|
||||
"drjosh.dev/jrouter/atalk/atp"
|
||||
)
|
||||
|
||||
type GetZonesPacket struct {
|
||||
|
|
|
@ -19,7 +19,7 @@ package zip
|
|||
import (
|
||||
"testing"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk/atp"
|
||||
"drjosh.dev/jrouter/atalk/atp"
|
||||
)
|
||||
|
||||
func FuzzUnmarshalPacket(f *testing.F) {
|
||||
|
|
|
@ -173,9 +173,12 @@ func (e EventTuples) WriteTo(w io.Writer) (int64, error) {
|
|||
}
|
||||
|
||||
func parseEventTuples(p []byte) (EventTuples, error) {
|
||||
// Each event tuple is at least 4 bytes, so we need to store at most
|
||||
// len(p)/4 of them.
|
||||
e := make(EventTuples, 0, len(p)/4)
|
||||
// Event tuples can be 1, 4, or 6 bytes long. But the only type of length 1
|
||||
// is the Null event type sent to probe whether or not the data receiver is
|
||||
// still listening. If that's present there probably aren't any other
|
||||
// tuples. Hence len(p)/4 (rounded up) is a reasonable estimate of max tuple
|
||||
// count.
|
||||
e := make(EventTuples, 0, (len(p)+3)/4)
|
||||
for len(p) > 0 {
|
||||
et, nextp, err := parseEventTuple(p)
|
||||
if err != nil {
|
||||
|
@ -198,6 +201,10 @@ type EventTuple struct {
|
|||
func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
|
||||
a := acc(w)
|
||||
a.write8(uint8(et.EventCode))
|
||||
if et.EventCode == EventCodeNull {
|
||||
// null tuple
|
||||
return a.ret()
|
||||
}
|
||||
a.write16(uint16(et.RangeStart))
|
||||
if !et.Extended {
|
||||
// non-extended tuple
|
||||
|
@ -211,12 +218,18 @@ func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
|
|||
}
|
||||
|
||||
func parseEventTuple(p []byte) (EventTuple, []byte, error) {
|
||||
if len(p) < 4 {
|
||||
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for network event tuple", len(p))
|
||||
if len(p) < 1 {
|
||||
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for any network event tuple", len(p))
|
||||
}
|
||||
|
||||
var et EventTuple
|
||||
et.EventCode = EventCode(p[0])
|
||||
if et.EventCode == EventCodeNull {
|
||||
return et, p[1:], nil
|
||||
}
|
||||
if len(p) < 4 {
|
||||
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for non-Null network event tuple", len(p))
|
||||
}
|
||||
et.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[1:3]))
|
||||
et.RangeEnd = et.RangeStart
|
||||
et.Distance = p[3]
|
||||
|
|
10
go.mod
10
go.mod
|
@ -1,14 +1,12 @@
|
|||
module gitea.drjosh.dev/josh/jrouter
|
||||
module drjosh.dev/jrouter
|
||||
|
||||
go 1.22.0
|
||||
|
||||
require (
|
||||
github.com/google/go-cmp v0.6.0
|
||||
github.com/google/gopacket v1.1.19
|
||||
github.com/sfiera/multitalk v0.2.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/google/gopacket v1.1.19 // indirect
|
||||
github.com/sfiera/multitalk v0.2.0 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
)
|
||||
require golang.org/x/sys v0.5.0 // indirect
|
||||
|
|
10
go.sum
10
go.sum
|
@ -1,24 +1,32 @@
|
|||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
|
||||
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sfiera/multitalk v0.2.0 h1:FLLw7L6yNUTOdjMk2EgKnOwMKeu++t5zinv5qpTC2JM=
|
||||
github.com/sfiera/multitalk v0.2.0/go.mod h1:jGkgjgiyfuRMNKwfSjo/xR6b9nd10XF4smQAtApXFHc=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
|
||||
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
|
||||
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
46
main.go
46
main.go
|
@ -30,15 +30,14 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"regexp"
|
||||
"runtime/debug"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/aurp"
|
||||
"gitea.drjosh.dev/josh/jrouter/router"
|
||||
"gitea.drjosh.dev/josh/jrouter/status"
|
||||
"drjosh.dev/jrouter/aurp"
|
||||
"drjosh.dev/jrouter/router"
|
||||
"drjosh.dev/jrouter/status"
|
||||
|
||||
"github.com/google/gopacket/pcap"
|
||||
"github.com/sfiera/multitalk/pkg/ddp"
|
||||
|
@ -87,6 +86,11 @@ const peerTableTemplate = `
|
|||
<th>Remote addr</th>
|
||||
<th>Receiver state</th>
|
||||
<th>Sender state</th>
|
||||
<th>Last heard from</th>
|
||||
<th>Last reconnect</th>
|
||||
<th>Last update</th>
|
||||
<th>Last send</th>
|
||||
<th>Send retries</th>
|
||||
</tr></thead>
|
||||
<tbody>
|
||||
{{range $peer := . }}
|
||||
|
@ -95,6 +99,11 @@ const peerTableTemplate = `
|
|||
<td>{{$peer.RemoteAddr}}</td>
|
||||
<td>{{$peer.ReceiverState}}</td>
|
||||
<td>{{$peer.SenderState}}</td>
|
||||
<td>{{$peer.LastHeardFromAgo}}</td>
|
||||
<td>{{$peer.LastReconnectAgo}}</td>
|
||||
<td>{{$peer.LastUpdateAgo}}</td>
|
||||
<td>{{$peer.LastSendAgo}}</td>
|
||||
<td>{{$peer.SendRetries}}</td>
|
||||
</tr>
|
||||
{{end}}
|
||||
</tbody>
|
||||
|
@ -107,7 +116,9 @@ var configFilePath = flag.String("config", "jrouter.yaml", "Path to configuratio
|
|||
|
||||
func main() {
|
||||
// For some reason it occasionally panics and the panics have no traceback?
|
||||
debug.SetTraceback("all")
|
||||
// This didn't help:
|
||||
// debug.SetTraceback("all")
|
||||
// I think it's calling recover in a defer too broadly.
|
||||
|
||||
flag.Parse()
|
||||
log.Println("jrouter")
|
||||
|
@ -287,18 +298,7 @@ func main() {
|
|||
continue
|
||||
}
|
||||
|
||||
peer := &router.AURPPeer{
|
||||
Transport: &aurp.Transport{
|
||||
LocalDI: localDI,
|
||||
RemoteDI: aurp.IPDomainIdentifier(raddr.IP),
|
||||
LocalConnID: nextConnID,
|
||||
},
|
||||
UDPConn: ln,
|
||||
ConfiguredAddr: peerStr,
|
||||
RemoteAddr: raddr,
|
||||
ReceiveCh: make(chan aurp.Packet, 1024),
|
||||
RouteTable: routes,
|
||||
}
|
||||
peer := router.NewAURPPeer(routes, ln, peerStr, raddr, localDI, nil, nextConnID)
|
||||
aurp.Inc(&nextConnID)
|
||||
peersMu.Lock()
|
||||
peers[udpAddrFromNet(raddr)] = peer
|
||||
|
@ -394,17 +394,7 @@ func main() {
|
|||
continue
|
||||
}
|
||||
// New peer!
|
||||
pr = &router.AURPPeer{
|
||||
Transport: &aurp.Transport{
|
||||
LocalDI: localDI,
|
||||
RemoteDI: dh.SourceDI, // platinum rule
|
||||
LocalConnID: nextConnID,
|
||||
},
|
||||
UDPConn: ln,
|
||||
RemoteAddr: raddr,
|
||||
ReceiveCh: make(chan aurp.Packet, 1024),
|
||||
RouteTable: routes,
|
||||
}
|
||||
pr = router.NewAURPPeer(routes, ln, "", raddr, localDI, dh.SourceDI, nextConnID)
|
||||
aurp.Inc(&nextConnID)
|
||||
peers[ra] = pr
|
||||
goPeerHandler(pr)
|
||||
|
|
|
@ -24,7 +24,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/status"
|
||||
"drjosh.dev/jrouter/status"
|
||||
"github.com/google/gopacket/pcap"
|
||||
"github.com/sfiera/multitalk/pkg/aarp"
|
||||
"github.com/sfiera/multitalk/pkg/ddp"
|
||||
|
@ -37,6 +37,8 @@ const (
|
|||
maxAMTEntryAge = 30 * time.Second
|
||||
aarpRequestRetransmit = 1 * time.Second
|
||||
aarpRequestTimeout = 10 * time.Second
|
||||
|
||||
aarpBodyLength = 28 // bytes
|
||||
)
|
||||
|
||||
const aarpStatusTemplate = `
|
||||
|
@ -181,8 +183,17 @@ func (a *AARPMachine) Run(ctx context.Context) error {
|
|||
a.incomingCh = nil
|
||||
}
|
||||
|
||||
// sfiera/multitalk will return an "excess data" error if the
|
||||
// payload is too big. Most traffic I've seen locally does not have
|
||||
// this problem, but I've seen one report with some junk trailing
|
||||
// data on AARP packets.
|
||||
payload := ethFrame.Payload
|
||||
if len(payload) > aarpBodyLength {
|
||||
payload = payload[:aarpBodyLength]
|
||||
}
|
||||
|
||||
var aapkt aarp.Packet
|
||||
if err := aarp.Unmarshal(ethFrame.Payload, &aapkt); err != nil {
|
||||
if err := aarp.Unmarshal(payload, &aapkt); err != nil {
|
||||
log.Printf("Couldn't unmarshal AARP packet: %v", err)
|
||||
continue
|
||||
}
|
||||
|
@ -326,6 +337,9 @@ func (a *AARPMachine) heyThatsMe(targ aarp.AddrPair) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(respFrameRaw) < 64 {
|
||||
respFrameRaw = append(respFrameRaw, make([]byte, 64-len(respFrameRaw))...)
|
||||
}
|
||||
return a.pcapHandle.WritePacketData(respFrameRaw)
|
||||
}
|
||||
|
||||
|
@ -339,6 +353,9 @@ func (a *AARPMachine) probe() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(probeFrameRaw) < 64 {
|
||||
probeFrameRaw = append(probeFrameRaw, make([]byte, 64-len(probeFrameRaw))...)
|
||||
}
|
||||
return a.pcapHandle.WritePacketData(probeFrameRaw)
|
||||
}
|
||||
|
||||
|
@ -352,6 +369,9 @@ func (a *AARPMachine) request(ddpAddr ddp.Addr) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(reqFrameRaw) < 64 {
|
||||
reqFrameRaw = append(reqFrameRaw, make([]byte, 64-len(reqFrameRaw))...)
|
||||
}
|
||||
return a.pcapHandle.WritePacketData(reqFrameRaw)
|
||||
}
|
||||
|
||||
|
@ -378,10 +398,7 @@ func (e AMTEntry) Valid() bool {
|
|||
// LastUpdatedAgo is a friendly string reporting how long ago the entry was
|
||||
// updated/resolved.
|
||||
func (e AMTEntry) LastUpdatedAgo() string {
|
||||
if e.LastUpdated.IsZero() {
|
||||
return "never"
|
||||
}
|
||||
return fmt.Sprintf("%v ago", time.Since(e.LastUpdated).Truncate(time.Millisecond))
|
||||
return ago(e.LastUpdated)
|
||||
}
|
||||
|
||||
// addressMappingTable implements a concurrent-safe Address Mapping Table for
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk/aep"
|
||||
"drjosh.dev/jrouter/atalk/aep"
|
||||
"github.com/sfiera/multitalk/pkg/ddp"
|
||||
)
|
||||
|
||||
|
|
|
@ -16,6 +16,11 @@
|
|||
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// StringSet is a set of strings.
|
||||
// Yep, yet another string set implementation. Took me 2 minutes to write *shrug*
|
||||
type StringSet map[string]struct{}
|
||||
|
@ -50,3 +55,11 @@ func SetFromSlice(ss []string) StringSet {
|
|||
set.Insert(ss...)
|
||||
return set
|
||||
}
|
||||
|
||||
// ago is a helper for formatting times.
|
||||
func ago(t time.Time) string {
|
||||
if t.IsZero() {
|
||||
return "never"
|
||||
}
|
||||
return fmt.Sprintf("%v ago", time.Since(t).Truncate(time.Millisecond))
|
||||
}
|
||||
|
|
|
@ -21,8 +21,8 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk"
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk/nbp"
|
||||
"drjosh.dev/jrouter/atalk"
|
||||
"drjosh.dev/jrouter/atalk/nbp"
|
||||
"github.com/sfiera/multitalk/pkg/ddp"
|
||||
)
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk/nbp"
|
||||
"drjosh.dev/jrouter/atalk/nbp"
|
||||
"github.com/sfiera/multitalk/pkg/ddp"
|
||||
)
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/aurp"
|
||||
"drjosh.dev/jrouter/aurp"
|
||||
"github.com/sfiera/multitalk/pkg/ddp"
|
||||
)
|
||||
|
||||
|
@ -114,9 +114,75 @@ type AURPPeer struct {
|
|||
// Route table (the peer will add/remove/update routes and zones)
|
||||
RouteTable *RouteTable
|
||||
|
||||
mu sync.RWMutex
|
||||
rstate ReceiverState
|
||||
sstate SenderState
|
||||
// Event tuples yet to be sent to this peer in an RI-Upd.
|
||||
pendingEventsMu sync.Mutex
|
||||
pendingEvents aurp.EventTuples
|
||||
|
||||
// The internal states below are only set within the Handle loop, but can
|
||||
// be read concurrently from outside.
|
||||
mu sync.RWMutex
|
||||
rstate ReceiverState
|
||||
sstate SenderState
|
||||
lastReconnect time.Time
|
||||
lastHeardFrom time.Time
|
||||
lastSend time.Time // TODO: clarify use of lastSend / sendRetries
|
||||
lastUpdate time.Time
|
||||
sendRetries int
|
||||
}
|
||||
|
||||
func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, raddr *net.UDPAddr, localDI, remoteDI aurp.DomainIdentifier, connID uint16) *AURPPeer {
|
||||
if remoteDI == nil {
|
||||
remoteDI = aurp.IPDomainIdentifier(raddr.IP)
|
||||
}
|
||||
return &AURPPeer{
|
||||
Transport: &aurp.Transport{
|
||||
LocalDI: localDI,
|
||||
RemoteDI: remoteDI,
|
||||
LocalConnID: connID,
|
||||
},
|
||||
UDPConn: udpConn,
|
||||
ConfiguredAddr: peerAddr,
|
||||
RemoteAddr: raddr,
|
||||
ReceiveCh: make(chan aurp.Packet, 1024),
|
||||
RouteTable: routes,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *AURPPeer) addPendingEvent(ec aurp.EventCode, route *Route) {
|
||||
// Don't advertise routes to AURP peers to other AURP peers
|
||||
if route.AURPPeer != nil {
|
||||
return
|
||||
}
|
||||
et := aurp.EventTuple{
|
||||
EventCode: ec,
|
||||
Extended: route.Extended,
|
||||
RangeStart: route.NetStart,
|
||||
Distance: route.Distance,
|
||||
RangeEnd: route.NetEnd,
|
||||
}
|
||||
switch ec {
|
||||
case aurp.EventCodeND, aurp.EventCodeNRC:
|
||||
et.Distance = 0 // "The distance field does not apply to ND or NRC event tuples and should be set to 0."
|
||||
}
|
||||
p.pendingEventsMu.Lock()
|
||||
defer p.pendingEventsMu.Unlock()
|
||||
p.pendingEvents = append(p.pendingEvents, et)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) RouteAdded(route *Route) {
|
||||
p.addPendingEvent(aurp.EventCodeNA, route)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) RouteDeleted(route *Route) {
|
||||
p.addPendingEvent(aurp.EventCodeND, route)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) RouteDistanceChanged(route *Route) {
|
||||
p.addPendingEvent(aurp.EventCodeNDC, route)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) RouteForwarderChanged(route *Route) {
|
||||
p.addPendingEvent(aurp.EventCodeNRC, route)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
|
||||
|
@ -124,7 +190,7 @@ func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = p.Send(p.Transport.NewAppleTalkPacket(outPkt))
|
||||
_, err = p.send(p.Transport.NewAppleTalkPacket(outPkt))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -140,6 +206,36 @@ func (p *AURPPeer) SenderState() SenderState {
|
|||
return p.sstate
|
||||
}
|
||||
|
||||
func (p *AURPPeer) LastReconnectAgo() string {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return ago(p.lastReconnect)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) LastHeardFromAgo() string {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return ago(p.lastHeardFrom)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) LastSendAgo() string {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return ago(p.lastSend)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) LastUpdateAgo() string {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return ago(p.lastUpdate)
|
||||
}
|
||||
|
||||
func (p *AURPPeer) SendRetries() int {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.sendRetries
|
||||
}
|
||||
|
||||
func (p *AURPPeer) setRState(rstate ReceiverState) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
@ -152,6 +248,42 @@ func (p *AURPPeer) setSState(sstate SenderState) {
|
|||
p.sstate = sstate
|
||||
}
|
||||
|
||||
func (p *AURPPeer) incSendRetries() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.sendRetries++
|
||||
}
|
||||
|
||||
func (p *AURPPeer) resetSendRetries() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.sendRetries = 0
|
||||
}
|
||||
|
||||
func (p *AURPPeer) bumpLastHeardFrom() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.lastHeardFrom = time.Now()
|
||||
}
|
||||
|
||||
func (p *AURPPeer) bumpLastReconnect() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.lastReconnect = time.Now()
|
||||
}
|
||||
|
||||
func (p *AURPPeer) bumpLastSend() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.lastSend = time.Now()
|
||||
}
|
||||
|
||||
func (p *AURPPeer) bumpLastUpdate() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.lastUpdate = time.Now()
|
||||
}
|
||||
|
||||
func (p *AURPPeer) disconnect() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
@ -159,8 +291,8 @@ func (p *AURPPeer) disconnect() {
|
|||
p.sstate = SenderUnconnected
|
||||
}
|
||||
|
||||
// Send encodes and sends pkt to the remote host.
|
||||
func (p *AURPPeer) Send(pkt aurp.Packet) (int, error) {
|
||||
// send encodes and sends pkt to the remote host.
|
||||
func (p *AURPPeer) send(pkt aurp.Packet) (int, error) {
|
||||
var b bytes.Buffer
|
||||
if _, err := pkt.WriteTo(&b); err != nil {
|
||||
return 0, err
|
||||
|
@ -170,23 +302,28 @@ func (p *AURPPeer) Send(pkt aurp.Packet) (int, error) {
|
|||
}
|
||||
|
||||
func (p *AURPPeer) Handle(ctx context.Context) error {
|
||||
// Stop listening to events if the goroutine exits
|
||||
defer p.RouteTable.RemoveObserver(p)
|
||||
|
||||
rticker := time.NewTicker(1 * time.Second)
|
||||
defer rticker.Stop()
|
||||
sticker := time.NewTicker(1 * time.Second)
|
||||
defer sticker.Stop()
|
||||
|
||||
lastReconnect := time.Now()
|
||||
lastHeardFrom := time.Now()
|
||||
lastSend := time.Now() // TODO: clarify use of lastSend / sendRetries
|
||||
lastUpdate := time.Now()
|
||||
sendRetries := 0
|
||||
p.mu.Lock()
|
||||
p.lastReconnect = time.Now()
|
||||
p.lastHeardFrom = time.Now()
|
||||
p.lastSend = time.Now() // TODO: clarify use of lastSend / sendRetries
|
||||
p.lastUpdate = time.Now()
|
||||
p.sendRetries = 0
|
||||
p.mu.Unlock()
|
||||
|
||||
var lastRISent aurp.Packet
|
||||
|
||||
p.disconnect()
|
||||
|
||||
// Write an Open-Req packet
|
||||
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -202,7 +339,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
}
|
||||
// Send a best-effort Router Down before returning
|
||||
lastRISent = p.Transport.NewRDPacket(aurp.ErrCodeNormalClose)
|
||||
if _, err := p.Send(lastRISent); err != nil {
|
||||
if _, err := p.send(lastRISent); err != nil {
|
||||
log.Printf("Couldn't send RD packet: %v", err)
|
||||
}
|
||||
return ctx.Err()
|
||||
|
@ -210,60 +347,60 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
case <-rticker.C:
|
||||
switch p.rstate {
|
||||
case ReceiverWaitForOpenRsp:
|
||||
if time.Since(lastSend) <= sendRetryTimer {
|
||||
if time.Since(p.lastSend) <= sendRetryTimer {
|
||||
break
|
||||
}
|
||||
if sendRetries >= sendRetryLimit {
|
||||
if p.sendRetries >= sendRetryLimit {
|
||||
log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection")
|
||||
p.setRState(ReceiverUnconnected)
|
||||
break
|
||||
}
|
||||
|
||||
// Send another Open-Req
|
||||
sendRetries++
|
||||
lastSend = time.Now()
|
||||
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
p.incSendRetries()
|
||||
p.bumpLastSend()
|
||||
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
case ReceiverConnected:
|
||||
// Check LHFT, send tickle?
|
||||
if time.Since(lastHeardFrom) <= lastHeardFromTimer {
|
||||
if time.Since(p.lastHeardFrom) <= lastHeardFromTimer {
|
||||
break
|
||||
}
|
||||
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
|
||||
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
|
||||
return err
|
||||
}
|
||||
p.setRState(ReceiverWaitForTickleAck)
|
||||
sendRetries = 0
|
||||
lastSend = time.Now()
|
||||
p.resetSendRetries()
|
||||
p.bumpLastSend()
|
||||
|
||||
case ReceiverWaitForTickleAck:
|
||||
if time.Since(lastSend) <= sendRetryTimer {
|
||||
if time.Since(p.lastSend) <= sendRetryTimer {
|
||||
break
|
||||
}
|
||||
if sendRetries >= tickleRetryLimit {
|
||||
if p.sendRetries >= tickleRetryLimit {
|
||||
log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
|
||||
p.setRState(ReceiverUnconnected)
|
||||
p.RouteTable.DeleteAURPPeer(p)
|
||||
break
|
||||
}
|
||||
|
||||
sendRetries++
|
||||
lastSend = time.Now()
|
||||
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
|
||||
p.incSendRetries()
|
||||
p.bumpLastSend()
|
||||
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
|
||||
return err
|
||||
}
|
||||
// still in Wait For Tickle-Ack
|
||||
|
||||
case ReceiverWaitForRIRsp:
|
||||
if time.Since(lastSend) <= sendRetryTimer {
|
||||
if time.Since(p.lastSend) <= sendRetryTimer {
|
||||
break
|
||||
}
|
||||
if sendRetries >= sendRetryLimit {
|
||||
if p.sendRetries >= sendRetryLimit {
|
||||
log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection")
|
||||
p.setRState(ReceiverUnconnected)
|
||||
p.RouteTable.DeleteAURPPeer(p)
|
||||
|
@ -272,8 +409,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
|
||||
// RI-Req is stateless, so we don't need to cache the one we
|
||||
// sent earlier just to send it again
|
||||
sendRetries++
|
||||
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
|
||||
p.incSendRetries()
|
||||
p.bumpLastSend()
|
||||
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -282,18 +420,18 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
case ReceiverUnconnected:
|
||||
// Data receiver is unconnected. If data sender is connected,
|
||||
// send a null RI-Upd to check if the sender is also unconnected
|
||||
if p.sstate == SenderConnected && time.Since(lastSend) > sendRetryTimer {
|
||||
if sendRetries >= sendRetryLimit {
|
||||
if p.sstate == SenderConnected && time.Since(p.lastSend) > sendRetryTimer {
|
||||
if p.sendRetries >= sendRetryLimit {
|
||||
log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection")
|
||||
}
|
||||
sendRetries++
|
||||
lastSend = time.Now()
|
||||
p.incSendRetries()
|
||||
p.bumpLastSend()
|
||||
aurp.Inc(&p.Transport.LocalSeq)
|
||||
events := aurp.EventTuples{{
|
||||
EventCode: aurp.EventCodeNull,
|
||||
}}
|
||||
lastRISent = p.Transport.NewRIUpdPacket(events)
|
||||
if _, err := p.Send(lastRISent); err != nil {
|
||||
if _, err := p.send(lastRISent); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -302,7 +440,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
|
||||
if p.ConfiguredAddr != "" {
|
||||
// Periodically try to reconnect, if this peer is in the config file
|
||||
if time.Since(lastReconnect) <= reconnectTimer {
|
||||
if time.Since(p.lastReconnect) <= reconnectTimer {
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -315,10 +453,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr)
|
||||
p.RemoteAddr = raddr
|
||||
|
||||
lastReconnect = time.Now()
|
||||
sendRetries = 0
|
||||
lastSend = time.Now()
|
||||
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
p.bumpLastReconnect()
|
||||
p.resetSendRetries()
|
||||
p.bumpLastSend()
|
||||
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -332,40 +470,64 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
// Do nothing
|
||||
|
||||
case SenderConnected:
|
||||
if time.Since(lastUpdate) <= updateTimer {
|
||||
if time.Since(p.lastUpdate) <= updateTimer {
|
||||
break
|
||||
}
|
||||
// TODO: is there a routing update to send?
|
||||
|
||||
// Are there routing updates to send?
|
||||
p.pendingEventsMu.Lock()
|
||||
if len(p.pendingEvents) == 0 {
|
||||
p.pendingEventsMu.Unlock()
|
||||
break
|
||||
}
|
||||
// Yes - swap the slices, release the mutex, then send them
|
||||
pending := p.pendingEvents
|
||||
p.pendingEvents = make(aurp.EventTuples, 0, cap(pending))
|
||||
p.pendingEventsMu.Unlock()
|
||||
|
||||
// TODO: eliminate events that cancel out (e.g. NA then ND)
|
||||
// TODO: split pending events to fit within a packet
|
||||
|
||||
p.bumpLastUpdate()
|
||||
aurp.Inc(&p.Transport.LocalSeq)
|
||||
lastRISent = p.Transport.NewRIUpdPacket(pending)
|
||||
if _, err := p.send(lastRISent); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
|
||||
return err
|
||||
}
|
||||
p.setSState(SenderWaitForRIUpdAck)
|
||||
|
||||
case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
|
||||
if time.Since(lastSend) <= sendRetryTimer {
|
||||
if time.Since(p.lastSend) <= sendRetryTimer {
|
||||
break
|
||||
}
|
||||
if lastRISent == nil {
|
||||
log.Print("AURP Peer: sender retry: lastRISent = nil?")
|
||||
continue
|
||||
}
|
||||
if sendRetries >= sendRetryLimit {
|
||||
if p.sendRetries >= sendRetryLimit {
|
||||
log.Printf("AURP Peer: Send retry limit reached, closing connection")
|
||||
p.setSState(SenderUnconnected)
|
||||
p.RouteTable.RemoveObserver(p)
|
||||
continue
|
||||
}
|
||||
sendRetries++
|
||||
lastSend = time.Now()
|
||||
if _, err := p.Send(lastRISent); err != nil {
|
||||
p.incSendRetries()
|
||||
p.bumpLastSend()
|
||||
if _, err := p.send(lastRISent); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err)
|
||||
return err
|
||||
}
|
||||
|
||||
case SenderWaitForRDAck:
|
||||
if time.Since(lastSend) <= sendRetryTimer {
|
||||
if time.Since(p.lastSend) <= sendRetryTimer {
|
||||
break
|
||||
}
|
||||
p.setSState(SenderUnconnected)
|
||||
p.RouteTable.RemoveObserver(p)
|
||||
}
|
||||
|
||||
case pkt := <-p.ReceiveCh:
|
||||
lastHeardFrom = time.Now()
|
||||
p.bumpLastHeardFrom()
|
||||
|
||||
switch pkt := pkt.(type) {
|
||||
case *aurp.OpenReqPacket:
|
||||
|
@ -392,19 +554,21 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
orsp = p.Transport.NewOpenRspPacket(0, 1, nil)
|
||||
}
|
||||
|
||||
if _, err := p.Send(orsp); err != nil {
|
||||
if _, err := p.send(orsp); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err)
|
||||
return err
|
||||
}
|
||||
if orsp.RateOrErrCode >= 0 {
|
||||
// Data sender is successfully in connected state
|
||||
p.setSState(SenderConnected)
|
||||
p.RouteTable.AddObserver(p)
|
||||
}
|
||||
|
||||
// If receiver is unconnected, commence connecting
|
||||
if p.rstate == ReceiverUnconnected {
|
||||
lastSend = time.Now()
|
||||
sendRetries = 0
|
||||
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
p.resetSendRetries()
|
||||
p.bumpLastSend()
|
||||
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -425,8 +589,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
p.setRState(ReceiverConnected)
|
||||
|
||||
// Send an RI-Req
|
||||
sendRetries = 0
|
||||
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
|
||||
p.resetSendRetries()
|
||||
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -449,7 +613,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
p.Transport.LocalSeq = 1
|
||||
// TODO: Split tuples across multiple packets as required
|
||||
lastRISent = p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets)
|
||||
if _, err := p.Send(lastRISent); err != nil {
|
||||
if _, err := p.send(lastRISent); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -474,7 +638,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
|
||||
// TODO: track which networks we don't have zone info for, and
|
||||
// only set SZI for those ?
|
||||
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
|
||||
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -500,7 +664,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
}
|
||||
|
||||
p.setSState(SenderConnected)
|
||||
sendRetries = 0
|
||||
p.resetSendRetries()
|
||||
p.RouteTable.AddObserver(p)
|
||||
|
||||
// If SZI flag is set, send ZI-Rsp (transaction)
|
||||
if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {
|
||||
|
@ -525,7 +690,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
}
|
||||
zones := p.RouteTable.ZonesForNetworks(nets)
|
||||
// TODO: split ZI-Rsp packets similarly to ZIP Replies
|
||||
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
|
||||
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -536,9 +701,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
// Receiver is unconnected, but their receiver sent us an
|
||||
// RI-Ack for something
|
||||
// Try to reconnect?
|
||||
lastSend = time.Now()
|
||||
sendRetries = 0
|
||||
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
p.resetSendRetries()
|
||||
p.bumpLastSend()
|
||||
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -546,7 +711,6 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
}
|
||||
|
||||
case *aurp.RIUpdPacket:
|
||||
|
||||
var ackFlag aurp.RoutingFlag
|
||||
|
||||
for _, et := range pkt.Events {
|
||||
|
@ -586,7 +750,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
|
||||
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -600,7 +764,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
p.RouteTable.DeleteAURPPeer(p)
|
||||
|
||||
// Respond with RI-Ack
|
||||
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
|
||||
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -610,7 +774,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
case *aurp.ZIReqPacket:
|
||||
// TODO: split ZI-Rsp packets similarly to ZIP Replies
|
||||
zones := p.RouteTable.ZonesForNetworks(pkt.Networks)
|
||||
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
|
||||
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -622,7 +786,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
}
|
||||
|
||||
case *aurp.GDZLReqPacket:
|
||||
if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
|
||||
if _, err := p.send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -631,7 +795,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird")
|
||||
|
||||
case *aurp.GZNReqPacket:
|
||||
if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
|
||||
if _, err := p.send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -641,7 +805,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
|
|||
|
||||
case *aurp.TicklePacket:
|
||||
// Immediately respond with Tickle-Ack
|
||||
if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil {
|
||||
if _, err := p.send(p.Transport.NewTickleAckPacket()); err != nil {
|
||||
log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -46,5 +46,8 @@ func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(outFrameRaw) < 64 {
|
||||
outFrameRaw = append(outFrameRaw, make([]byte, 64-len(outFrameRaw))...)
|
||||
}
|
||||
return p.Port.PcapHandle.WritePacketData(outFrameRaw)
|
||||
}
|
||||
|
|
|
@ -18,11 +18,12 @@ package router
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk"
|
||||
"drjosh.dev/jrouter/atalk"
|
||||
"github.com/google/gopacket/pcap"
|
||||
"github.com/sfiera/multitalk/pkg/ddp"
|
||||
"github.com/sfiera/multitalk/pkg/ethernet"
|
||||
|
@ -79,8 +80,18 @@ func (port *EtherTalkPort) Serve(ctx context.Context) {
|
|||
|
||||
case ethertalk.AppleTalkProto:
|
||||
// log.Print("Got an AppleTalk frame")
|
||||
|
||||
// Workaround for strict length checking in sfiera/multitalk
|
||||
payload := ethFrame.Payload
|
||||
if len(payload) < 2 {
|
||||
log.Printf("Couldn't unmarshal DDP packet: too small (length = %d)", len(payload))
|
||||
}
|
||||
if size := binary.BigEndian.Uint16(payload[:2]) & 0x3ff; len(payload) > int(size) {
|
||||
payload = payload[:size]
|
||||
}
|
||||
|
||||
ddpkt := new(ddp.ExtPacket)
|
||||
if err := ddp.ExtUnmarshal(ethFrame.Payload, ddpkt); err != nil {
|
||||
if err := ddp.ExtUnmarshal(payload, ddpkt); err != nil {
|
||||
log.Printf("Couldn't unmarshal DDP packet: %v", err)
|
||||
continue
|
||||
}
|
||||
|
@ -189,5 +200,8 @@ func (port *EtherTalkPort) send(dstEth ethernet.Addr, pkt *ddp.ExtPacket) error
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(outFrameRaw) < 64 {
|
||||
outFrameRaw = append(outFrameRaw, make([]byte, 64-len(outFrameRaw))...)
|
||||
}
|
||||
return port.PcapHandle.WritePacketData(outFrameRaw)
|
||||
}
|
||||
|
|
|
@ -45,10 +45,7 @@ type Route struct {
|
|||
}
|
||||
|
||||
func (r Route) LastSeenAgo() string {
|
||||
if r.LastSeen.IsZero() {
|
||||
return "never"
|
||||
}
|
||||
return fmt.Sprintf("%v ago", time.Since(r.LastSeen).Truncate(time.Millisecond))
|
||||
return ago(r.LastSeen)
|
||||
}
|
||||
|
||||
// Valid reports whether the route is valid.
|
||||
|
@ -58,17 +55,40 @@ func (r *Route) Valid() bool {
|
|||
return len(r.ZoneNames) > 0 && (r.EtherTalkPeer == nil || time.Since(r.LastSeen) <= maxRouteAge)
|
||||
}
|
||||
|
||||
type RouteTableObserver interface {
|
||||
RouteAdded(*Route)
|
||||
RouteDeleted(*Route)
|
||||
RouteDistanceChanged(*Route)
|
||||
RouteForwarderChanged(*Route)
|
||||
}
|
||||
|
||||
type RouteTable struct {
|
||||
mu sync.Mutex
|
||||
routes map[*Route]struct{}
|
||||
routesMu sync.RWMutex
|
||||
routes map[*Route]struct{}
|
||||
|
||||
observersMu sync.RWMutex
|
||||
observers map[RouteTableObserver]struct{}
|
||||
}
|
||||
|
||||
func NewRouteTable() *RouteTable {
|
||||
return &RouteTable{
|
||||
routes: make(map[*Route]struct{}),
|
||||
routes: make(map[*Route]struct{}),
|
||||
observers: make(map[RouteTableObserver]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *RouteTable) AddObserver(obs RouteTableObserver) {
|
||||
rt.observersMu.Lock()
|
||||
defer rt.observersMu.Unlock()
|
||||
rt.observers[obs] = struct{}{}
|
||||
}
|
||||
|
||||
func (rt *RouteTable) RemoveObserver(obs RouteTableObserver) {
|
||||
rt.observersMu.Lock()
|
||||
defer rt.observersMu.Unlock()
|
||||
delete(rt.observers, obs)
|
||||
}
|
||||
|
||||
func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
|
||||
r := &Route{
|
||||
Extended: true,
|
||||
|
@ -80,14 +100,14 @@ func (rt *RouteTable) InsertEtherTalkDirect(port *EtherTalkPort) {
|
|||
EtherTalkDirect: port,
|
||||
}
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
rt.routes[r] = struct{}{}
|
||||
}
|
||||
|
||||
func (rt *RouteTable) Dump() []Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
table := make([]Route, 0, len(rt.routes))
|
||||
for r := range rt.routes {
|
||||
|
@ -97,8 +117,8 @@ func (rt *RouteTable) Dump() []Route {
|
|||
}
|
||||
|
||||
func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
var bestRoute *Route
|
||||
for r := range rt.routes {
|
||||
|
@ -120,8 +140,8 @@ func (rt *RouteTable) LookupRoute(network ddp.Network) *Route {
|
|||
}
|
||||
|
||||
func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
for route := range rt.routes {
|
||||
if route.AURPPeer == peer {
|
||||
|
@ -131,8 +151,8 @@ func (rt *RouteTable) DeleteAURPPeer(peer *AURPPeer) {
|
|||
}
|
||||
|
||||
func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network) {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
for route := range rt.routes {
|
||||
if route.AURPPeer == peer && route.NetStart == network {
|
||||
|
@ -142,8 +162,8 @@ func (rt *RouteTable) DeleteAURPPeerNetwork(peer *AURPPeer, network ddp.Network)
|
|||
}
|
||||
|
||||
func (rt *RouteTable) UpdateAURPRouteDistance(peer *AURPPeer, network ddp.Network, distance uint8) {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
for route := range rt.routes {
|
||||
if route.AURPPeer == peer && route.NetStart == network {
|
||||
|
@ -161,8 +181,8 @@ func (rt *RouteTable) UpsertEtherTalkRoute(peer *EtherTalkPeer, extended bool, n
|
|||
return nil, fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd)
|
||||
}
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
// Update?
|
||||
for r := range rt.routes {
|
||||
|
@ -213,16 +233,16 @@ func (rt *RouteTable) InsertAURPRoute(peer *AURPPeer, extended bool, netStart, n
|
|||
AURPPeer: peer,
|
||||
}
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
rt.routes[r] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidRoutes returns all valid routes.
|
||||
func (rt *RouteTable) ValidRoutes() []*Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.RLock()
|
||||
defer rt.routesMu.RUnlock()
|
||||
valid := make([]*Route, 0, len(rt.routes))
|
||||
for r := range rt.routes {
|
||||
if r.Valid() {
|
||||
|
@ -234,8 +254,8 @@ func (rt *RouteTable) ValidRoutes() []*Route {
|
|||
|
||||
// ValidNonAURPRoutes returns all valid routes that were not learned via AURP.
|
||||
func (rt *RouteTable) ValidNonAURPRoutes() []*Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.RLock()
|
||||
defer rt.routesMu.RUnlock()
|
||||
valid := make([]*Route, 0, len(rt.routes))
|
||||
for r := range rt.routes {
|
||||
if r.AURPPeer != nil {
|
||||
|
|
|
@ -22,10 +22,10 @@ import (
|
|||
"log"
|
||||
"time"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk"
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk/rtmp"
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk/zip"
|
||||
"gitea.drjosh.dev/josh/jrouter/status"
|
||||
"drjosh.dev/jrouter/atalk"
|
||||
"drjosh.dev/jrouter/atalk/rtmp"
|
||||
"drjosh.dev/jrouter/atalk/zip"
|
||||
"drjosh.dev/jrouter/status"
|
||||
|
||||
"github.com/sfiera/multitalk/pkg/ddp"
|
||||
)
|
||||
|
|
|
@ -21,9 +21,9 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk"
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk/atp"
|
||||
"gitea.drjosh.dev/josh/jrouter/atalk/zip"
|
||||
"drjosh.dev/jrouter/atalk"
|
||||
"drjosh.dev/jrouter/atalk/atp"
|
||||
"drjosh.dev/jrouter/atalk/zip"
|
||||
"github.com/sfiera/multitalk/pkg/ddp"
|
||||
"github.com/sfiera/multitalk/pkg/ethernet"
|
||||
)
|
||||
|
|
|
@ -23,8 +23,8 @@ import (
|
|||
)
|
||||
|
||||
func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
for r := range rt.routes {
|
||||
if n < r.NetStart || n > r.NetEnd {
|
||||
continue
|
||||
|
@ -39,8 +39,8 @@ func (rt *RouteTable) AddZonesToNetwork(n ddp.Network, zs ...string) {
|
|||
func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]string {
|
||||
zs := make(map[ddp.Network][]string)
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
for r := range rt.routes {
|
||||
if !r.Valid() {
|
||||
continue
|
||||
|
@ -55,8 +55,8 @@ func (rt *RouteTable) ZonesForNetworks(ns []ddp.Network) map[ddp.Network][]strin
|
|||
}
|
||||
|
||||
func (rt *RouteTable) RoutesForZone(zone string) []*Route {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
var routes []*Route
|
||||
for r := range rt.routes {
|
||||
|
@ -73,8 +73,8 @@ func (rt *RouteTable) RoutesForZone(zone string) []*Route {
|
|||
func (rt *RouteTable) AllZoneNames() (zones []string) {
|
||||
defer slices.Sort(zones)
|
||||
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
rt.routesMu.Lock()
|
||||
defer rt.routesMu.Unlock()
|
||||
|
||||
zs := make(StringSet)
|
||||
for r := range rt.routes {
|
||||
|
|
Loading…
Reference in a new issue