Compare commits

..

8 commits

6 changed files with 189 additions and 83 deletions

View file

@ -173,9 +173,12 @@ func (e EventTuples) WriteTo(w io.Writer) (int64, error) {
}
func parseEventTuples(p []byte) (EventTuples, error) {
// Each event tuple is at least 4 bytes, so we need to store at most
// len(p)/4 of them.
e := make(EventTuples, 0, len(p)/4)
// Event tuples can be 1, 4, or 6 bytes long. But the only type of length 1
// is the Null event type sent to probe whether or not the data receiver is
// still listening. If that's present there probably aren't any other
// tuples. Hence len(p)/4 (rounded up) is a reasonable estimate of max tuple
// count.
e := make(EventTuples, 0, (len(p)+3)/4)
for len(p) > 0 {
et, nextp, err := parseEventTuple(p)
if err != nil {
@ -198,6 +201,10 @@ type EventTuple struct {
func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
a := acc(w)
a.write8(uint8(et.EventCode))
if et.EventCode == EventCodeNull {
// null tuple
return a.ret()
}
a.write16(uint16(et.RangeStart))
if !et.Extended {
// non-extended tuple
@ -211,12 +218,18 @@ func (et *EventTuple) WriteTo(w io.Writer) (int64, error) {
}
func parseEventTuple(p []byte) (EventTuple, []byte, error) {
if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for network event tuple", len(p))
if len(p) < 1 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for any network event tuple", len(p))
}
var et EventTuple
et.EventCode = EventCode(p[0])
if et.EventCode == EventCodeNull {
return et, p[1:], nil
}
if len(p) < 4 {
return EventTuple{}, p, fmt.Errorf("insufficient input length %d for non-Null network event tuple", len(p))
}
et.RangeStart = ddp.Network(binary.BigEndian.Uint16(p[1:3]))
et.RangeEnd = et.RangeStart
et.Distance = p[3]

10
main.go
View file

@ -87,6 +87,11 @@ const peerTableTemplate = `
<th>Remote addr</th>
<th>Receiver state</th>
<th>Sender state</th>
<th>Last heard from</th>
<th>Last reconnect</th>
<th>Last update</th>
<th>Last send</th>
<th>Send retries</th>
</tr></thead>
<tbody>
{{range $peer := . }}
@ -95,6 +100,11 @@ const peerTableTemplate = `
<td>[redacted]</td>
<td>{{$peer.ReceiverState}}</td>
<td>{{$peer.SenderState}}</td>
<td>{{$peer.LastHeardFromAgo}}</td>
<td>{{$peer.LastReconnectAgo}}</td>
<td>{{$peer.LastUpdateAgo}}</td>
<td>{{$peer.LastSendAgo}}</td>
<td>{{$peer.SendRetries}}</td>
</tr>
{{end}}
</tbody>

View file

@ -378,10 +378,7 @@ func (e AMTEntry) Valid() bool {
// LastUpdatedAgo is a friendly string reporting how long ago the entry was
// updated/resolved.
func (e AMTEntry) LastUpdatedAgo() string {
if e.LastUpdated.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(e.LastUpdated).Truncate(time.Millisecond))
return ago(e.LastUpdated)
}
// addressMappingTable implements a concurrent-safe Address Mapping Table for

View file

@ -16,6 +16,11 @@
package router
import (
"fmt"
"time"
)
// StringSet is a set of strings.
// Yep, yet another string set implementation. Took me 2 minutes to write *shrug*
type StringSet map[string]struct{}
@ -50,3 +55,11 @@ func SetFromSlice(ss []string) StringSet {
set.Insert(ss...)
return set
}
// ago is a helper for formatting times.
func ago(t time.Time) string {
if t.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(t).Truncate(time.Millisecond))
}

View file

@ -117,9 +117,16 @@ type AURPPeer struct {
// Event tuples yet to be sent to this peer in an RI-Upd.
pendingEvents chan aurp.EventTuple
mu sync.RWMutex
rstate ReceiverState
sstate SenderState
// The internal states below are only set within the Handle loop, but can
// be read concurrently from outside.
mu sync.RWMutex
rstate ReceiverState
sstate SenderState
lastReconnect time.Time
lastHeardFrom time.Time
lastSend time.Time // TODO: clarify use of lastSend / sendRetries
lastUpdate time.Time
sendRetries int
}
func NewAURPPeer(routes *RouteTable, udpConn *net.UDPConn, peerAddr string, raddr *net.UDPAddr, localDI, remoteDI aurp.DomainIdentifier, connID uint16) *AURPPeer {
@ -181,7 +188,7 @@ func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
if err != nil {
return err
}
_, err = p.Send(p.Transport.NewAppleTalkPacket(outPkt))
_, err = p.send(p.Transport.NewAppleTalkPacket(outPkt))
return err
}
@ -197,6 +204,36 @@ func (p *AURPPeer) SenderState() SenderState {
return p.sstate
}
func (p *AURPPeer) LastReconnectAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastReconnect)
}
func (p *AURPPeer) LastHeardFromAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastHeardFrom)
}
func (p *AURPPeer) LastSendAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastSend)
}
func (p *AURPPeer) LastUpdateAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
return ago(p.lastUpdate)
}
func (p *AURPPeer) SendRetries() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.sendRetries
}
func (p *AURPPeer) setRState(rstate ReceiverState) {
p.mu.Lock()
defer p.mu.Unlock()
@ -209,6 +246,42 @@ func (p *AURPPeer) setSState(sstate SenderState) {
p.sstate = sstate
}
func (p *AURPPeer) incSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries++
}
func (p *AURPPeer) resetSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries = 0
}
func (p *AURPPeer) bumpLastHeardFrom() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastHeardFrom = time.Now()
}
func (p *AURPPeer) bumpLastReconnect() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastReconnect = time.Now()
}
func (p *AURPPeer) bumpLastSend() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSend = time.Now()
}
func (p *AURPPeer) bumpLastUpdate() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastUpdate = time.Now()
}
func (p *AURPPeer) disconnect() {
p.mu.Lock()
defer p.mu.Unlock()
@ -216,8 +289,8 @@ func (p *AURPPeer) disconnect() {
p.sstate = SenderUnconnected
}
// Send encodes and sends pkt to the remote host.
func (p *AURPPeer) Send(pkt aurp.Packet) (int, error) {
// send encodes and sends pkt to the remote host.
func (p *AURPPeer) send(pkt aurp.Packet) (int, error) {
var b bytes.Buffer
if _, err := pkt.WriteTo(&b); err != nil {
return 0, err
@ -232,18 +305,20 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
sticker := time.NewTicker(1 * time.Second)
defer sticker.Stop()
lastReconnect := time.Now()
lastHeardFrom := time.Now()
lastSend := time.Now() // TODO: clarify use of lastSend / sendRetries
lastUpdate := time.Now()
sendRetries := 0
p.mu.Lock()
p.lastReconnect = time.Now()
p.lastHeardFrom = time.Now()
p.lastSend = time.Now() // TODO: clarify use of lastSend / sendRetries
p.lastUpdate = time.Now()
p.sendRetries = 0
p.mu.Unlock()
var lastRISent aurp.Packet
p.disconnect()
// Write an Open-Req packet
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -259,7 +334,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
// Send a best-effort Router Down before returning
lastRISent = p.Transport.NewRDPacket(aurp.ErrCodeNormalClose)
if _, err := p.Send(lastRISent); err != nil {
if _, err := p.send(lastRISent); err != nil {
log.Printf("Couldn't send RD packet: %v", err)
}
return ctx.Err()
@ -267,60 +342,60 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case <-rticker.C:
switch p.rstate {
case ReceiverWaitForOpenRsp:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
if sendRetries >= sendRetryLimit {
if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection")
p.setRState(ReceiverUnconnected)
break
}
// Send another Open-Req
sendRetries++
lastSend = time.Now()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
case ReceiverConnected:
// Check LHFT, send tickle?
if time.Since(lastHeardFrom) <= lastHeardFromTimer {
if time.Since(p.lastHeardFrom) <= lastHeardFromTimer {
break
}
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err
}
p.setRState(ReceiverWaitForTickleAck)
sendRetries = 0
lastSend = time.Now()
p.resetSendRetries()
p.bumpLastSend()
case ReceiverWaitForTickleAck:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
if sendRetries >= tickleRetryLimit {
if p.sendRetries >= tickleRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p)
break
}
sendRetries++
lastSend = time.Now()
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err
}
// still in Wait For Tickle-Ack
case ReceiverWaitForRIRsp:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
if sendRetries >= sendRetryLimit {
if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection")
p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p)
@ -329,8 +404,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// RI-Req is stateless, so we don't need to cache the one we
// sent earlier just to send it again
sendRetries++
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err
}
@ -339,18 +415,18 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case ReceiverUnconnected:
// Data receiver is unconnected. If data sender is connected,
// send a null RI-Upd to check if the sender is also unconnected
if p.sstate == SenderConnected && time.Since(lastSend) > sendRetryTimer {
if sendRetries >= sendRetryLimit {
log.Print("AURP Peer: Send retry limit reached while probing sender connect, closing connection")
if p.sstate == SenderConnected && time.Since(p.lastSend) > sendRetryTimer {
if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection")
}
sendRetries++
lastSend = time.Now()
p.incSendRetries()
p.bumpLastSend()
aurp.Inc(&p.Transport.LocalSeq)
events := aurp.EventTuples{{
EventCode: aurp.EventCodeNull,
}}
lastRISent = p.Transport.NewRIUpdPacket(events)
if _, err := p.Send(lastRISent); err != nil {
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Upd packet: %v", err)
return err
}
@ -359,7 +435,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if p.ConfiguredAddr != "" {
// Periodically try to reconnect, if this peer is in the config file
if time.Since(lastReconnect) <= reconnectTimer {
if time.Since(p.lastReconnect) <= reconnectTimer {
break
}
@ -372,10 +448,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr)
p.RemoteAddr = raddr
lastReconnect = time.Now()
sendRetries = 0
lastSend = time.Now()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
p.bumpLastReconnect()
p.resetSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -389,40 +465,41 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing
case SenderConnected:
if time.Since(lastUpdate) <= updateTimer {
if time.Since(p.lastUpdate) <= updateTimer {
break
}
// TODO: is there a routing update to send?
p.bumpLastUpdate()
case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
if lastRISent == nil {
log.Print("AURP Peer: sender retry: lastRISent = nil?")
continue
}
if sendRetries >= sendRetryLimit {
if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached, closing connection")
p.setSState(SenderUnconnected)
continue
}
sendRetries++
lastSend = time.Now()
if _, err := p.Send(lastRISent); err != nil {
p.incSendRetries()
p.bumpLastSend()
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err)
return err
}
case SenderWaitForRDAck:
if time.Since(lastSend) <= sendRetryTimer {
if time.Since(p.lastSend) <= sendRetryTimer {
break
}
p.setSState(SenderUnconnected)
}
case pkt := <-p.ReceiveCh:
lastHeardFrom = time.Now()
p.bumpLastHeardFrom()
switch pkt := pkt.(type) {
case *aurp.OpenReqPacket:
@ -449,7 +526,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
orsp = p.Transport.NewOpenRspPacket(0, 1, nil)
}
if _, err := p.Send(orsp); err != nil {
if _, err := p.send(orsp); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Rsp: %v", err)
return err
}
@ -459,9 +536,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// If receiver is unconnected, commence connecting
if p.rstate == ReceiverUnconnected {
lastSend = time.Now()
sendRetries = 0
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
p.resetSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -482,8 +559,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.setRState(ReceiverConnected)
// Send an RI-Req
sendRetries = 0
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
p.resetSendRetries()
if _, err := p.send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err
}
@ -506,7 +583,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.Transport.LocalSeq = 1
// TODO: Split tuples across multiple packets as required
lastRISent = p.Transport.NewRIRspPacket(aurp.RoutingFlagLast, nets)
if _, err := p.Send(lastRISent); err != nil {
if _, err := p.send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Rsp packet: %v", err)
return err
}
@ -531,7 +608,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// TODO: track which networks we don't have zone info for, and
// only set SZI for those ?
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, aurp.RoutingFlagSendZoneInfo)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack packet: %v", err)
return err
}
@ -557,7 +634,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
p.setSState(SenderConnected)
sendRetries = 0
p.resetSendRetries()
// If SZI flag is set, send ZI-Rsp (transaction)
if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {
@ -582,7 +659,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
zones := p.RouteTable.ZonesForNetworks(nets)
// TODO: split ZI-Rsp packets similarly to ZIP Replies
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
}
}
@ -593,9 +670,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Receiver is unconnected, but their receiver sent us an
// RI-Ack for something
// Try to reconnect?
lastSend = time.Now()
sendRetries = 0
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
p.resetSendRetries()
p.bumpLastSend()
if _, err := p.send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err
}
@ -603,7 +680,6 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
case *aurp.RIUpdPacket:
var ackFlag aurp.RoutingFlag
for _, et := range pkt.Events {
@ -643,7 +719,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
}
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, ackFlag)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err
}
@ -657,7 +733,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.RouteTable.DeleteAURPPeer(p)
// Respond with RI-Ack
if _, err := p.Send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
if _, err := p.send(p.Transport.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Ack: %v", err)
return err
}
@ -667,7 +743,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.ZIReqPacket:
// TODO: split ZI-Rsp packets similarly to ZIP Replies
zones := p.RouteTable.ZonesForNetworks(pkt.Networks)
if _, err := p.Send(p.Transport.NewZIRspPacket(zones)); err != nil {
if _, err := p.send(p.Transport.NewZIRspPacket(zones)); err != nil {
log.Printf("AURP Peer: Couldn't send ZI-Rsp packet: %v", err)
return err
}
@ -679,7 +755,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
}
case *aurp.GDZLReqPacket:
if _, err := p.Send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
if _, err := p.send(p.Transport.NewGDZLRspPacket(-1, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GDZL-Rsp packet: %v", err)
return err
}
@ -688,7 +764,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: Received a GDZL-Rsp, but I wouldn't have sent a GDZL-Req - that's weird")
case *aurp.GZNReqPacket:
if _, err := p.Send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
if _, err := p.send(p.Transport.NewGZNRspPacket(pkt.ZoneName, false, nil)); err != nil {
log.Printf("AURP Peer: Couldn't send GZN-Rsp packet: %v", err)
return err
}
@ -698,7 +774,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case *aurp.TicklePacket:
// Immediately respond with Tickle-Ack
if _, err := p.Send(p.Transport.NewTickleAckPacket()); err != nil {
if _, err := p.send(p.Transport.NewTickleAckPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle-Ack: %v", err)
return err
}

View file

@ -45,10 +45,7 @@ type Route struct {
}
func (r Route) LastSeenAgo() string {
if r.LastSeen.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(r.LastSeen).Truncate(time.Millisecond))
return ago(r.LastSeen)
}
// Valid reports whether the route is valid.