Use data in RI-Upds, add peer table to status

This commit is contained in:
Josh Deprez 2024-04-26 16:15:02 +10:00
parent 088b1d1a93
commit 5790b9d616
No known key found for this signature in database
5 changed files with 321 additions and 137 deletions

View file

@ -21,6 +21,8 @@ import (
"fmt" "fmt"
"io" "io"
"strings" "strings"
"github.com/sfiera/multitalk/pkg/ddp"
) )
type RIReqPacket struct { type RIReqPacket struct {
@ -115,15 +117,15 @@ func parseNetworkTuples(p []byte) (NetworkTuples, error) {
type NetworkTuple struct { type NetworkTuple struct {
Extended bool Extended bool
RangeStart uint16 RangeStart ddp.Network
Distance uint8 Distance uint8
RangeEnd uint16 RangeEnd ddp.Network
// 0x00 for extended tuples // 0x00 for extended tuples
} }
func (nt *NetworkTuple) WriteTo(w io.Writer) (int64, error) { func (nt *NetworkTuple) WriteTo(w io.Writer) (int64, error) {
a := acc(w) a := acc(w)
a.write16(nt.RangeStart) a.write16(uint16(nt.RangeStart))
if !nt.Extended { if !nt.Extended {
// non-extended tuple // non-extended tuple
a.write8(nt.Distance) a.write8(nt.Distance)
@ -131,7 +133,7 @@ func (nt *NetworkTuple) WriteTo(w io.Writer) (int64, error) {
} }
// extended tuple // extended tuple
a.write8(nt.Distance | 0x80) a.write8(nt.Distance | 0x80)
a.write16(nt.RangeEnd) a.write16(uint16(nt.RangeEnd))
a.write8(0x00) a.write8(0x00)
return a.ret() return a.ret()
} }
@ -142,7 +144,7 @@ func parseNetworkTuple(p []byte) (NetworkTuple, []byte, error) {
} }
var nt NetworkTuple var nt NetworkTuple
nt.RangeStart = binary.BigEndian.Uint16(p[:2]) nt.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[:2]))
nt.RangeEnd = nt.RangeStart nt.RangeEnd = nt.RangeStart
nt.Distance = p[2] nt.Distance = p[2]
nt.Extended = nt.Distance&0x80 != 0 nt.Extended = nt.Distance&0x80 != 0
@ -156,7 +158,7 @@ func parseNetworkTuple(p []byte) (NetworkTuple, []byte, error) {
} }
nt.Distance &^= 0x80 nt.Distance &^= 0x80
nt.RangeEnd = binary.BigEndian.Uint16(p[3:5]) nt.RangeEnd = ddp.Network(binary.BigEndian.Uint16(p[3:5]))
return nt, p[6:], nil return nt, p[6:], nil
} }
@ -188,15 +190,15 @@ func parseEventTuples(p []byte) (EventTuples, error) {
type EventTuple struct { type EventTuple struct {
EventCode EventCode EventCode EventCode
Extended bool Extended bool
RangeStart uint16 RangeStart ddp.Network
Distance uint8 Distance uint8
RangeEnd uint16 RangeEnd ddp.Network
} }
func (et *EventTuple) WriteTo(w io.Writer) (int64, error) { func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
a := acc(w) a := acc(w)
a.write8(uint8(et.EventCode)) a.write8(uint8(et.EventCode))
a.write16(et.RangeStart) a.write16(uint16(et.RangeStart))
if !et.Extended { if !et.Extended {
// non-extended tuple // non-extended tuple
a.write8(et.Distance) a.write8(et.Distance)
@ -204,7 +206,7 @@ func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
} }
// extended tuple // extended tuple
a.write8(et.Distance | 0x80) a.write8(et.Distance | 0x80)
a.write16(et.RangeEnd) a.write16(uint16(et.RangeEnd))
return a.ret() return a.ret()
} }
@ -215,7 +217,7 @@ func parseEventTuple(p []byte) (EventTuple, []byte, error) {
var et EventTuple var et EventTuple
et.EventCode = EventCode(p[0]) et.EventCode = EventCode(p[0])
et.RangeStart = binary.BigEndian.Uint16(p[1:3]) et.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[1:3]))
et.RangeEnd = et.RangeStart et.RangeEnd = et.RangeStart
et.Distance = p[3] et.Distance = p[3]
et.Extended = et.Distance&0x80 != 0 et.Extended = et.Distance&0x80 != 0
@ -229,28 +231,48 @@ func parseEventTuple(p []byte) (EventTuple, []byte, error) {
} }
et.Distance &^= 0x80 et.Distance &^= 0x80
et.RangeEnd = binary.BigEndian.Uint16(p[4:6]) et.RangeEnd = ddp.Network(binary.BigEndian.Uint16(p[4:6]))
return et, p[6:], nil return et, p[6:], nil
} }
type EventCode uint8 type EventCode uint8
const ( const (
// Null event
EventCodeNull EventCode = 0 EventCodeNull EventCode = 0
// Network added event
EventCodeNA EventCode = 1 EventCodeNA EventCode = 1
// Network deleted event
EventCodeND EventCode = 2 EventCodeND EventCode = 2
// Network route change event
EventCodeNRC EventCode = 3 EventCodeNRC EventCode = 3
// Network distance change event
EventCodeNDC EventCode = 4 EventCodeNDC EventCode = 4
// Network zone change event
// Note: "The ZC event tuple is not yet defined."
EventCodeZC EventCode = 5 EventCodeZC EventCode = 5
) )
func (ec EventCode) String() string { func (ec EventCode) String() string {
return map[EventCode]string{ switch ec {
EventCodeNull: "null", case EventCodeNull:
EventCodeNA: "network added", return "null"
EventCodeND: "network deleted", case EventCodeNA:
EventCodeNRC: "network route change", return "network added"
EventCodeNDC: "network distance change", case EventCodeND:
EventCodeZC: "zone name change", return "network deleted"
}[ec] case EventCodeNRC:
return "network route change"
case EventCodeNDC:
return "network distance change"
case EventCodeZC:
return "zone name change"
default:
return "unknown"
}
} }

62
main.go
View file

@ -91,6 +91,27 @@ const zoneTableTemplate = `
</table> </table>
` `
const peerTableTemplate = `
<table>
<thead><tr>
<th>Configured addr</th>
<th>Remote addr</th>
<th>Receiver state</th>
<th>Sender state</th>
</tr></thead>
<tbody>
{{range $peer := . }}
<tr>
<td>{{$peer.ConfiguredAddr}}</td>
<td>{{$peer.RemoteAddr}}</td>
<td>{{$peer.ReceiverState}}</td>
<td>{{$peer.SenderState}}</td>
</tr>
{{end}}
</tbody>
</table>
`
var hasPortRE = regexp.MustCompile(`:\d+$`) var hasPortRE = regexp.MustCompile(`:\d+$`)
var configFilePath = flag.String("config", "jrouter.yaml", "Path to configuration file to use") var configFilePath = flag.String("config", "jrouter.yaml", "Path to configuration file to use")
@ -136,12 +157,6 @@ func main() {
log.Printf("EtherTalk configuration: %+v", cfg.EtherTalk) log.Printf("EtherTalk configuration: %+v", cfg.EtherTalk)
peers := make(map[udpAddr]*router.Peer)
var nextConnID uint16
for nextConnID == 0 {
nextConnID = uint16(rand.IntN(0x10000))
}
ln, err := net.ListenUDP("udp4", &net.UDPAddr{Port: int(cfg.ListenPort)}) ln, err := net.ListenUDP("udp4", &net.UDPAddr{Port: int(cfg.ListenPort)})
if err != nil { if err != nil {
log.Fatalf("Couldn't listen on udp4:387: %v", err) log.Fatalf("Couldn't listen on udp4:387: %v", err)
@ -208,6 +223,23 @@ func main() {
}) })
// -------------------------------- Peers --------------------------------- // -------------------------------- Peers ---------------------------------
var peersMu sync.Mutex
peers := make(map[udpAddr]*router.Peer)
status.AddItem(ctx, "AURP Peers", peerTableTemplate, func(context.Context) (any, error) {
peersMu.Lock()
peerInfo := make([]*router.Peer, 0, len(peers))
for _, p := range peers {
peerInfo = append(peerInfo, p)
}
peersMu.Unlock()
return peerInfo, nil
})
var nextConnID uint16
for nextConnID == 0 {
nextConnID = uint16(rand.IntN(0x10000))
}
var wg sync.WaitGroup var wg sync.WaitGroup
goPeerHandler := func(p *router.Peer) { goPeerHandler := func(p *router.Peer) {
wg.Add(1) wg.Add(1)
@ -268,14 +300,16 @@ func main() {
LocalConnID: nextConnID, LocalConnID: nextConnID,
}, },
UDPConn: ln, UDPConn: ln,
ConfiguredAddr: peerStr,
RemoteAddr: raddr, RemoteAddr: raddr,
RecieveCh: make(chan aurp.Packet, 1024), ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes, RoutingTable: routes,
ZoneTable: zones, ZoneTable: zones,
Reconnect: true,
} }
aurp.Inc(&nextConnID) aurp.Inc(&nextConnID)
peersMu.Lock()
peers[udpAddrFromNet(raddr)] = peer peers[udpAddrFromNet(raddr)] = peer
peersMu.Unlock()
goPeerHandler(peer) goPeerHandler(peer)
} }
@ -471,8 +505,14 @@ func main() {
// Existing peer? // Existing peer?
ra := udpAddrFromNet(raddr) ra := udpAddrFromNet(raddr)
peersMu.Lock()
pr := peers[ra] pr := peers[ra]
if pr == nil { if pr == nil {
if !cfg.OpenPeering {
log.Printf("AURP: Got packet from %v but it's not in my config and open peering is disabled; dropping the packet", raddr)
peersMu.Unlock()
continue
}
// New peer! // New peer!
pr = &router.Peer{ pr = &router.Peer{
Config: cfg, Config: cfg,
@ -483,22 +523,22 @@ func main() {
}, },
UDPConn: ln, UDPConn: ln,
RemoteAddr: raddr, RemoteAddr: raddr,
RecieveCh: make(chan aurp.Packet, 1024), ReceiveCh: make(chan aurp.Packet, 1024),
RoutingTable: routes, RoutingTable: routes,
ZoneTable: zones, ZoneTable: zones,
Reconnect: false,
} }
aurp.Inc(&nextConnID) aurp.Inc(&nextConnID)
peers[ra] = pr peers[ra] = pr
goPeerHandler(pr) goPeerHandler(pr)
} }
peersMu.Unlock()
switch dh.PacketType { switch dh.PacketType {
case aurp.PacketTypeRouting: case aurp.PacketTypeRouting:
// It's AURP routing data. // It's AURP routing data.
// Pass the packet to the goroutine in charge of this peer. // Pass the packet to the goroutine in charge of this peer.
select { select {
case pr.RecieveCh <- pkt: case pr.ReceiveCh <- pkt:
// That's it for us. // That's it for us.
case <-ctx.Done(): case <-ctx.Done():

View file

@ -21,6 +21,7 @@ import (
"context" "context"
"log" "log"
"net" "net"
"sync"
"time" "time"
"gitea.drjosh.dev/josh/jrouter/aurp" "gitea.drjosh.dev/josh/jrouter/aurp"
@ -37,69 +38,122 @@ const (
updateTimer = 10 * time.Second updateTimer = 10 * time.Second
) )
type receiverState int type ReceiverState int
const ( const (
rsUnconnected receiverState = iota ReceiverUnconnected ReceiverState = iota
rsConnected ReceiverConnected
rsWaitForOpenRsp ReceiverWaitForOpenRsp
rsWaitForRIRsp ReceiverWaitForRIRsp
rsWaitForTickleAck ReceiverWaitForTickleAck
) )
func (rs receiverState) String() string { func (rs ReceiverState) String() string {
switch rs { switch rs {
case rsUnconnected: case ReceiverUnconnected:
return "unconnected" return "unconnected"
case rsConnected: case ReceiverConnected:
return "connected" return "connected"
case rsWaitForOpenRsp: case ReceiverWaitForOpenRsp:
return "waiting for Open-Rsp" return "waiting for Open-Rsp"
case rsWaitForRIRsp: case ReceiverWaitForRIRsp:
return "waiting for RI-Rsp" return "waiting for RI-Rsp"
case rsWaitForTickleAck: case ReceiverWaitForTickleAck:
return "waiting for Tickle-Ack" return "waiting for Tickle-Ack"
default: default:
return "unknown" return "unknown"
} }
} }
type senderState int type SenderState int
const ( const (
ssUnconnected senderState = iota SenderUnconnected SenderState = iota
ssConnected SenderConnected
ssWaitForRIRspAck SenderWaitForRIRspAck
ssWaitForRIUpdAck SenderWaitForRIUpdAck
ssWaitForRDAck SenderWaitForRDAck
) )
func (ss senderState) String() string { func (ss SenderState) String() string {
switch ss { switch ss {
case ssUnconnected: case SenderUnconnected:
return "unconnected" return "unconnected"
case ssConnected: case SenderConnected:
return "connected" return "connected"
case ssWaitForRIRspAck: case SenderWaitForRIRspAck:
return "waiting for RI-Ack for RI-Rsp" return "waiting for RI-Ack for RI-Rsp"
case ssWaitForRIUpdAck: case SenderWaitForRIUpdAck:
return "waiting for RI-Ack for RI-Upd" return "waiting for RI-Ack for RI-Upd"
case ssWaitForRDAck: case SenderWaitForRDAck:
return "waiting for RI-Ack for RD" return "waiting for RI-Ack for RD"
default: default:
return "unknown" return "unknown"
} }
} }
// Peer handles the peering with a peer AURP router.
type Peer struct { type Peer struct {
// Whole router config.
Config *Config Config *Config
// AURP-Tr state for producing packets.
Transport *aurp.Transport Transport *aurp.Transport
// Connection to reply to packets on.
UDPConn *net.UDPConn UDPConn *net.UDPConn
// The string that appeared in the config file / peer list file (with a
// ":387" appended as necessary).
// May be empty if this peer was not configured (it connected to us).
ConfiguredAddr string
// The resolved address of the peer.
RemoteAddr *net.UDPAddr RemoteAddr *net.UDPAddr
RecieveCh chan aurp.Packet
// Incoming packet channel.
ReceiveCh chan aurp.Packet
// Routing table (the peer will add/remove/update routes)
RoutingTable *RoutingTable RoutingTable *RoutingTable
// Zone table (the peer will add/remove/update zones)
ZoneTable *ZoneTable ZoneTable *ZoneTable
Reconnect bool
mu sync.RWMutex
rstate ReceiverState
sstate SenderState
}
func (p *Peer) ReceiverState() ReceiverState {
p.mu.RLock()
defer p.mu.RUnlock()
return p.rstate
}
func (p *Peer) SenderState() SenderState {
p.mu.RLock()
defer p.mu.RUnlock()
return p.sstate
}
func (p *Peer) setRState(rstate ReceiverState) {
p.mu.Lock()
defer p.mu.Unlock()
p.rstate = rstate
}
func (p *Peer) setSState(sstate SenderState) {
p.mu.Lock()
defer p.mu.Unlock()
p.sstate = sstate
}
func (p *Peer) disconnect() {
p.mu.Lock()
defer p.mu.Unlock()
p.rstate = ReceiverUnconnected
p.sstate = SenderUnconnected
} }
// Send encodes and sends pkt to the remote host. // Send encodes and sends pkt to the remote host.
@ -126,8 +180,7 @@ func (p *Peer) Handle(ctx context.Context) error {
var lastRISent aurp.Packet var lastRISent aurp.Packet
rstate := rsUnconnected p.disconnect()
sstate := ssUnconnected
// Write an Open-Req packet // Write an Open-Req packet
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
@ -135,12 +188,12 @@ func (p *Peer) Handle(ctx context.Context) error {
return err return err
} }
rstate = rsWaitForOpenRsp p.setRState(ReceiverWaitForOpenRsp)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
if sstate == ssUnconnected { if p.sstate == SenderUnconnected {
// Return immediately // Return immediately
return ctx.Err() return ctx.Err()
} }
@ -152,14 +205,14 @@ func (p *Peer) Handle(ctx context.Context) error {
return ctx.Err() return ctx.Err()
case <-rticker.C: case <-rticker.C:
switch rstate { switch p.rstate {
case rsWaitForOpenRsp: case ReceiverWaitForOpenRsp:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection")
rstate = rsUnconnected p.setRState(ReceiverUnconnected)
break break
} }
@ -171,7 +224,7 @@ func (p *Peer) Handle(ctx context.Context) error {
return err return err
} }
case rsConnected: case ReceiverConnected:
// Check LHFT, send tickle? // Check LHFT, send tickle?
if time.Since(lastHeardFrom) <= lastHeardFromTimer { if time.Since(lastHeardFrom) <= lastHeardFromTimer {
break break
@ -180,17 +233,17 @@ func (p *Peer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err) log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err return err
} }
rstate = rsWaitForTickleAck p.setRState(ReceiverWaitForTickleAck)
sendRetries = 0 sendRetries = 0
lastSend = time.Now() lastSend = time.Now()
case rsWaitForTickleAck: case ReceiverWaitForTickleAck:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if sendRetries >= tickleRetryLimit { if sendRetries >= tickleRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
rstate = rsUnconnected p.setRState(ReceiverUnconnected)
p.RoutingTable.DeletePeer(p) p.RoutingTable.DeletePeer(p)
break break
} }
@ -203,13 +256,13 @@ func (p *Peer) Handle(ctx context.Context) error {
} }
// still in Wait For Tickle-Ack // still in Wait For Tickle-Ack
case rsWaitForRIRsp: case ReceiverWaitForRIRsp:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
if sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection")
rstate = rsUnconnected p.setRState(ReceiverUnconnected)
p.RoutingTable.DeletePeer(p) p.RoutingTable.DeletePeer(p)
break break
} }
@ -223,10 +276,10 @@ func (p *Peer) Handle(ctx context.Context) error {
} }
// still in Wait For RI-Rsp // still in Wait For RI-Rsp
case rsUnconnected: case ReceiverUnconnected:
// Data receiver is unconnected. If data sender is connected, // Data receiver is unconnected. If data sender is connected,
// send a null RI-Upd to check if the sender is also unconnected // send a null RI-Upd to check if the sender is also unconnected
if sstate == ssConnected && time.Since(lastSend) > sendRetryTimer { if p.sstate == SenderConnected && time.Since(lastSend) > sendRetryTimer {
if sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection") log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection")
} }
@ -241,15 +294,24 @@ func (p *Peer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err return err
} }
sstate = ssWaitForRIUpdAck p.setSState(SenderWaitForRIUpdAck)
} }
if p.Reconnect { if p.ConfiguredAddr != "" {
// Periodically try to reconnect, if this peer is in the config file // Periodically try to reconnect, if this peer is in the config file
if time.Since(lastReconnect) <= reconnectTimer { if time.Since(lastReconnect) <= reconnectTimer {
break break
} }
// In case it's a DNS name, re-resolve it before reconnecting
raddr, err := net.ResolveUDPAddr("udp4", p.ConfiguredAddr)
if err != nil {
log.Printf("couldn't resolve UDP address, skipping: %v", err)
break
}
log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr)
p.RemoteAddr = raddr
lastReconnect = time.Now() lastReconnect = time.Now()
sendRetries = 0 sendRetries = 0
lastSend = time.Now() lastSend = time.Now()
@ -257,22 +319,22 @@ func (p *Peer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
rstate = rsWaitForOpenRsp p.setRState(ReceiverWaitForOpenRsp)
} }
} }
case <-sticker.C: case <-sticker.C:
switch sstate { switch p.sstate {
case ssUnconnected: case SenderUnconnected:
// Do nothing // Do nothing
case ssConnected: case SenderConnected:
if time.Since(lastUpdate) <= updateTimer { if time.Since(lastUpdate) <= updateTimer {
break break
} }
// TODO: is there a routing update to send? // TODO: is there a routing update to send?
case ssWaitForRIRspAck, ssWaitForRIUpdAck: case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
@ -282,7 +344,7 @@ func (p *Peer) Handle(ctx context.Context) error {
} }
if sendRetries >= sendRetryLimit { if sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached, closing connection") log.Printf("AURP Peer: Send retry limit reached, closing connection")
sstate = ssUnconnected p.setSState(SenderUnconnected)
continue continue
} }
sendRetries++ sendRetries++
@ -292,20 +354,20 @@ func (p *Peer) Handle(ctx context.Context) error {
return err return err
} }
case ssWaitForRDAck: case SenderWaitForRDAck:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(lastSend) <= sendRetryTimer {
break break
} }
sstate = ssUnconnected p.setSState(SenderUnconnected)
} }
case pkt := <-p.RecieveCh: case pkt := <-p.ReceiveCh:
lastHeardFrom = time.Now() lastHeardFrom = time.Now()
switch pkt := pkt.(type) { switch pkt := pkt.(type) {
case *aurp.OpenReqPacket: case *aurp.OpenReqPacket:
if sstate != ssUnconnected { if p.sstate != SenderUnconnected {
log.Printf("AURP Peer: Open-Req received but sender state is not unconnected (was %v)", sstate) log.Printf("AURP Peer: Open-Req received but sender state is not unconnected (was %v)", p.sstate)
} }
// The peer tells us their connection ID in Open-Req. // The peer tells us their connection ID in Open-Req.
@ -332,32 +394,32 @@ func (p *Peer) Handle(ctx context.Context) error {
return err return err
} }
if orsp.RateOrErrCode >= 0 { if orsp.RateOrErrCode >= 0 {
sstate = ssConnected p.setSState(SenderConnected)
} }
// If receiver is unconnected, commence connecting // If receiver is unconnected, commence connecting
if rstate == rsUnconnected { if p.rstate == ReceiverUnconnected {
lastSend = time.Now() lastSend = time.Now()
sendRetries = 0 sendRetries = 0
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
rstate = rsWaitForOpenRsp p.setRState(ReceiverWaitForOpenRsp)
} }
case *aurp.OpenRspPacket: case *aurp.OpenRspPacket:
if rstate != rsWaitForOpenRsp { if p.rstate != ReceiverWaitForOpenRsp {
log.Printf("AURP Peer: Received Open-Rsp but was not waiting for one (receiver state was %v)", rstate) log.Printf("AURP Peer: Received Open-Rsp but was not waiting for one (receiver state was %v)", p.rstate)
} }
if pkt.RateOrErrCode < 0 { if pkt.RateOrErrCode < 0 {
// It's an error code. // It's an error code.
log.Printf("AURP Peer: Open-Rsp error code from peer %v: %d", p.RemoteAddr.IP, pkt.RateOrErrCode) log.Printf("AURP Peer: Open-Rsp error code from peer %v: %d", p.RemoteAddr.IP, pkt.RateOrErrCode)
rstate = rsUnconnected p.setRState(ReceiverUnconnected)
break break
} }
//log.Printf("AURP Peer: Data receiver is connected!") //log.Printf("AURP Peer: Data receiver is connected!")
rstate = rsConnected p.setRState(ReceiverConnected)
// Send an RI-Req // Send an RI-Req
sendRetries = 0 sendRetries = 0
@ -365,18 +427,18 @@ func (p *Peer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err return err
} }
rstate = rsWaitForRIRsp p.setRState(ReceiverWaitForRIRsp)
case *aurp.RIReqPacket: case *aurp.RIReqPacket:
if sstate != ssConnected { if p.sstate != SenderConnected {
log.Printf("AURP Peer: Received RI-Req but was not expecting one (sender state was %v)", sstate) log.Printf("AURP Peer: Received RI-Req but was not expecting one (sender state was %v)", p.sstate)
} }
nets := aurp.NetworkTuples{ nets := aurp.NetworkTuples{
{ {
Extended: true, Extended: true,
RangeStart: uint16(p.Config.EtherTalk.NetStart), RangeStart: p.Config.EtherTalk.NetStart,
RangeEnd: uint16(p.Config.EtherTalk.NetEnd), RangeEnd: p.Config.EtherTalk.NetEnd,
Distance: 0, Distance: 0,
}, },
} }
@ -386,21 +448,21 @@ func (p *Peer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err)
return err return err
} }
sstate = ssWaitForRIRspAck p.setSState(SenderWaitForRIRspAck)
case *aurp.RIRspPacket: case *aurp.RIRspPacket:
if rstate != rsWaitForRIRsp { if p.rstate != ReceiverWaitForRIRsp {
log.Printf("Received RI-Rsp but was not waiting for one (receiver state was %v)", rstate) log.Printf("Received RI-Rsp but was not waiting for one (receiver state was %v)", p.rstate)
} }
log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks) log.Printf("AURP Peer: Learned about these networks: %v", pkt.Networks)
for _, nt := range pkt.Networks { for _, nt := range pkt.Networks {
p.RoutingTable.UpsertRoute( p.RoutingTable.InsertRoute(
p,
nt.Extended, nt.Extended,
ddp.Network(nt.RangeStart), ddp.Network(nt.RangeStart),
ddp.Network(nt.RangeEnd), ddp.Network(nt.RangeEnd),
p,
nt.Distance+1, nt.Distance+1,
) )
} }
@ -413,26 +475,26 @@ func (p *Peer) Handle(ctx context.Context) error {
} }
if pkt.Flags&aurp.RoutingFlagLast != 0 { if pkt.Flags&aurp.RoutingFlagLast != 0 {
// No longer waiting for an RI-Rsp // No longer waiting for an RI-Rsp
rstate = rsConnected p.setRState(ReceiverConnected)
} }
case *aurp.RIAckPacket: case *aurp.RIAckPacket:
switch sstate { switch p.sstate {
case ssWaitForRIRspAck: case SenderWaitForRIRspAck:
// We sent an RI-Rsp, this is the RI-Ack we expected. // We sent an RI-Rsp, this is the RI-Ack we expected.
case ssWaitForRIUpdAck: case SenderWaitForRIUpdAck:
// We sent an RI-Upd, this is the RI-Ack we expected. // We sent an RI-Upd, this is the RI-Ack we expected.
case ssWaitForRDAck: case SenderWaitForRDAck:
// We sent an RD... Why are we here? // We sent an RD... Why are we here?
continue continue
default: default:
log.Printf("AURP Peer: Received RI-Ack but was not waiting for one (sender state was %v)", sstate) log.Printf("AURP Peer: Received RI-Ack but was not waiting for one (sender state was %v)", p.sstate)
} }
sstate = ssConnected p.setSState(SenderConnected)
sendRetries = 0 sendRetries = 0
// If SZI flag is set, send ZI-Rsp (transaction) // If SZI flag is set, send ZI-Rsp (transaction)
@ -448,7 +510,7 @@ func (p *Peer) Handle(ctx context.Context) error {
// TODO: Continue sending next RI-Rsp (streamed)? // TODO: Continue sending next RI-Rsp (streamed)?
if rstate == rsUnconnected { if p.rstate == ReceiverUnconnected {
// Receiver is unconnected, but their receiver sent us an // Receiver is unconnected, but their receiver sent us an
// RI-Ack for something // RI-Ack for something
// Try to reconnect? // Try to reconnect?
@ -458,23 +520,58 @@ func (p *Peer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
} }
rstate = rsWaitForOpenRsp p.setRState(ReceiverWaitForOpenRsp)
} }
case *aurp.RIUpdPacket: case *aurp.RIUpdPacket:
// TODO: Integrate info into route table
var ackFlag aurp.RoutingFlag
for _, et := range pkt.Events { for _, et := range pkt.Events {
log.Printf("AURP Peer: RI-Upd event %v", et) log.Printf("AURP Peer: RI-Upd event %v", et)
switch et.EventCode {
case aurp.EventCodeNull:
// Do nothing except respond with RI-Ack
case aurp.EventCodeNA:
if err := p.RoutingTable.InsertRoute(
p,
et.Extended,
et.RangeStart,
et.RangeEnd,
et.Distance+1,
); err != nil {
log.Printf("AURP Peer: couldn't insert route: %v", err)
}
ackFlag = aurp.RoutingFlagSendZoneInfo
case aurp.EventCodeND:
p.RoutingTable.DeletePeerNetwork(p, et.RangeStart)
case aurp.EventCodeNDC:
p.RoutingTable.UpdateRouteDistance(p, et.RangeStart, et.Distance+1)
case aurp.EventCodeNRC:
// "An exterior router sends a Network Route Change
// (NRC) event if the path to an exported network
// through its local internet changes to a path through
// a tunneling port, causing split-horizoned processing
// to eliminate that networks routing information."
p.RoutingTable.DeletePeerNetwork(p, et.RangeStart)
case aurp.EventCodeZC:
// "This event is reserved for future use."
}
} }
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil { if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err) log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err return err
} }
case *aurp.RDPacket: case *aurp.RDPacket:
if rstate == rsUnconnected || rstate == rsWaitForOpenRsp { if p.rstate == ReceiverUnconnected || p.rstate == ReceiverWaitForOpenRsp {
log.Printf("AURP Peer: Received RD but was not expecting one (receiver state was %v)", rstate) log.Printf("AURP Peer: Received RD but was not expecting one (receiver state was %v)", p.rstate)
} }
log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode) log.Printf("AURP Peer: Router Down: error code %d %s", pkt.ErrorCode, pkt.ErrorCode)
@ -486,8 +583,7 @@ func (p *Peer) Handle(ctx context.Context) error {
return err return err
} }
// Connections closed // Connections closed
rstate = rsUnconnected p.disconnect()
sstate = ssUnconnected
case *aurp.ZIReqPacket: case *aurp.ZIReqPacket:
// TODO: split ZI-Rsp packets similarly to ZIP Replies // TODO: split ZI-Rsp packets similarly to ZIP Replies
@ -529,10 +625,10 @@ func (p *Peer) Handle(ctx context.Context) error {
} }
case *aurp.TickleAckPacket: case *aurp.TickleAckPacket:
if rstate != rsWaitForTickleAck { if p.rstate != ReceiverWaitForTickleAck {
log.Printf("AURP Peer: Received Tickle-Ack but was not waiting for one (receiver state was %v)", rstate) log.Printf("AURP Peer: Received Tickle-Ack but was not waiting for one (receiver state was %v)", p.rstate)
} }
rstate = rsConnected p.setRState(ReceiverConnected)
} }
} }
} }

View file

@ -101,12 +101,36 @@ func (rt *RoutingTable) DeletePeer(peer *Peer) {
} }
} }
func (rt *RoutingTable) UpsertRoute(extended bool, netStart, netEnd ddp.Network, peer *Peer, metric uint8) error { func (rt *RoutingTable) DeletePeerNetwork(peer *Peer, network ddp.Network) {
rt.mu.Lock()
defer rt.mu.Unlock()
for route := range rt.routes {
if route.Peer == peer && route.NetStart == network {
delete(rt.routes, route)
}
}
}
func (rt *RoutingTable) UpdateRouteDistance(peer *Peer, network ddp.Network, distance uint8) {
rt.mu.Lock()
defer rt.mu.Unlock()
for route := range rt.routes {
if route.Peer == peer && route.NetStart == network {
route.Distance = distance
route.LastSeen = time.Now()
}
}
}
func (rt *RoutingTable) InsertRoute(peer *Peer, extended bool, netStart, netEnd ddp.Network, metric uint8) error {
if netStart > netEnd { if netStart > netEnd {
return fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd) return fmt.Errorf("invalid network range [%d, %d]", netStart, netEnd)
} }
if netStart != netEnd && !extended {
// TODO: handle the Update part of "Upsert" return fmt.Errorf("invalid network range [%d, %d] for nonextended network", netStart, netEnd)
}
r := &Route{ r := &Route{
Extended: extended, Extended: extended,

View file

@ -25,7 +25,9 @@ import (
"gitea.drjosh.dev/josh/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"gitea.drjosh.dev/josh/jrouter/atalk/rtmp" "gitea.drjosh.dev/josh/jrouter/atalk/rtmp"
"gitea.drjosh.dev/josh/jrouter/status" "gitea.drjosh.dev/josh/jrouter/status"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/aarp" "github.com/sfiera/multitalk/pkg/aarp"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
"github.com/sfiera/multitalk/pkg/ethernet" "github.com/sfiera/multitalk/pkg/ethernet"