AURP peer timers in status

This commit is contained in:
Josh Deprez 2024-05-12 18:07:27 +10:00
parent 3067daa264
commit 331da095c3
Signed by: josh
SSH key fingerprint: SHA256:zZji7w1Ilh2RuUpbQcqkLPrqmRwpiCSycbF2EfKm6Kw
2 changed files with 143 additions and 44 deletions

10
main.go
View file

@ -87,6 +87,11 @@ const peerTableTemplate = `
<th>Remote addr</th> <th>Remote addr</th>
<th>Receiver state</th> <th>Receiver state</th>
<th>Sender state</th> <th>Sender state</th>
<th>Last heard from</th>
<th>Last reconnect</th>
<th>Last update</th>
<th>Last send</th>
<th>Send retries</th>
</tr></thead> </tr></thead>
<tbody> <tbody>
{{range $peer := . }} {{range $peer := . }}
@ -95,6 +100,11 @@ const peerTableTemplate = `
<td>{{$peer.RemoteAddr}}</td> <td>{{$peer.RemoteAddr}}</td>
<td>{{$peer.ReceiverState}}</td> <td>{{$peer.ReceiverState}}</td>
<td>{{$peer.SenderState}}</td> <td>{{$peer.SenderState}}</td>
<td>{{$peer.LastHeardFromAgo}}</td>
<td>{{$peer.LastReconnectAgo}}</td>
<td>{{$peer.LastUpdateAgo}}</td>
<td>{{$peer.LastSendAgo}}</td>
<td>{{$peer.SendRetries}}</td>
</tr> </tr>
{{end}} {{end}}
</tbody> </tbody>

View file

@ -19,6 +19,7 @@ package router
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"log" "log"
"net" "net"
"sync" "sync"
@ -114,9 +115,16 @@ type AURPPeer struct {
// Route table (the peer will add/remove/update routes and zones) // Route table (the peer will add/remove/update routes and zones)
RouteTable *RouteTable RouteTable *RouteTable
mu sync.RWMutex // The internal states below are only set within the Handle loop, but can
rstate ReceiverState // be read concurrently from outside.
sstate SenderState mu sync.RWMutex
rstate ReceiverState
sstate SenderState
lastReconnect time.Time
lastHeardFrom time.Time
lastSend time.Time // TODO: clarify use of lastSend / sendRetries
lastUpdate time.Time
sendRetries int
} }
func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error { func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error {
@ -140,6 +148,48 @@ func (p *AURPPeer) SenderState() SenderState {
return p.sstate return p.sstate
} }
func (p *AURPPeer) LastReconnectAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
if p.lastReconnect.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(p.lastReconnect).Truncate(time.Millisecond))
}
func (p *AURPPeer) LastHeardFromAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
if p.lastHeardFrom.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(p.lastHeardFrom).Truncate(time.Millisecond))
}
func (p *AURPPeer) LastSendAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
if p.lastSend.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(p.lastSend).Truncate(time.Millisecond))
}
func (p *AURPPeer) LastUpdateAgo() string {
p.mu.RLock()
defer p.mu.RUnlock()
if p.lastUpdate.IsZero() {
return "never"
}
return fmt.Sprintf("%v ago", time.Since(p.lastUpdate).Truncate(time.Millisecond))
}
func (p *AURPPeer) SendRetries() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.sendRetries
}
func (p *AURPPeer) setRState(rstate ReceiverState) { func (p *AURPPeer) setRState(rstate ReceiverState) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -152,6 +202,42 @@ func (p *AURPPeer) setSState(sstate SenderState) {
p.sstate = sstate p.sstate = sstate
} }
func (p *AURPPeer) incSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries++
}
func (p *AURPPeer) resetSendRetries() {
p.mu.Lock()
defer p.mu.Unlock()
p.sendRetries = 0
}
func (p *AURPPeer) bumpLastHeardFrom() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastHeardFrom = time.Now()
}
func (p *AURPPeer) bumpLastReconnect() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastReconnect = time.Now()
}
func (p *AURPPeer) bumpLastSend() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSend = time.Now()
}
func (p *AURPPeer) bumpLastUpdate() {
p.mu.Lock()
defer p.mu.Unlock()
p.lastUpdate = time.Now()
}
func (p *AURPPeer) disconnect() { func (p *AURPPeer) disconnect() {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -175,11 +261,13 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
sticker := time.NewTicker(1 * time.Second) sticker := time.NewTicker(1 * time.Second)
defer sticker.Stop() defer sticker.Stop()
lastReconnect := time.Now() p.mu.Lock()
lastHeardFrom := time.Now() p.lastReconnect = time.Now()
lastSend := time.Now() // TODO: clarify use of lastSend / sendRetries p.lastHeardFrom = time.Now()
lastUpdate := time.Now() p.lastSend = time.Now() // TODO: clarify use of lastSend / sendRetries
sendRetries := 0 p.lastUpdate = time.Now()
p.sendRetries = 0
p.mu.Unlock()
var lastRISent aurp.Packet var lastRISent aurp.Packet
@ -210,18 +298,18 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case <-rticker.C: case <-rticker.C:
switch p.rstate { switch p.rstate {
case ReceiverWaitForOpenRsp: case ReceiverWaitForOpenRsp:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(p.lastSend) <= sendRetryTimer {
break break
} }
if sendRetries >= sendRetryLimit { if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
break break
} }
// Send another Open-Req // Send another Open-Req
sendRetries++ p.incSendRetries()
lastSend = time.Now() p.bumpLastSend()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
@ -229,7 +317,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case ReceiverConnected: case ReceiverConnected:
// Check LHFT, send tickle? // Check LHFT, send tickle?
if time.Since(lastHeardFrom) <= lastHeardFromTimer { if time.Since(p.lastHeardFrom) <= lastHeardFromTimer {
break break
} }
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
@ -237,22 +325,22 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
return err return err
} }
p.setRState(ReceiverWaitForTickleAck) p.setRState(ReceiverWaitForTickleAck)
sendRetries = 0 p.resetSendRetries()
lastSend = time.Now() p.bumpLastSend()
case ReceiverWaitForTickleAck: case ReceiverWaitForTickleAck:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(p.lastSend) <= sendRetryTimer {
break break
} }
if sendRetries >= tickleRetryLimit { if p.sendRetries >= tickleRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p) p.RouteTable.DeleteAURPPeer(p)
break break
} }
sendRetries++ p.incSendRetries()
lastSend = time.Now() p.bumpLastSend()
if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil {
log.Printf("AURP Peer: Couldn't send Tickle: %v", err) log.Printf("AURP Peer: Couldn't send Tickle: %v", err)
return err return err
@ -260,10 +348,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// still in Wait For Tickle-Ack // still in Wait For Tickle-Ack
case ReceiverWaitForRIRsp: case ReceiverWaitForRIRsp:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(p.lastSend) <= sendRetryTimer {
break break
} }
if sendRetries >= sendRetryLimit { if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection") log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection")
p.setRState(ReceiverUnconnected) p.setRState(ReceiverUnconnected)
p.RouteTable.DeleteAURPPeer(p) p.RouteTable.DeleteAURPPeer(p)
@ -272,7 +360,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// RI-Req is stateless, so we don't need to cache the one we // RI-Req is stateless, so we don't need to cache the one we
// sent earlier just to send it again // sent earlier just to send it again
sendRetries++ p.incSendRetries()
p.bumpLastSend()
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil { if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err return err
@ -282,12 +371,12 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
case ReceiverUnconnected: case ReceiverUnconnected:
// Data receiver is unconnected. If data sender is connected, // Data receiver is unconnected. If data sender is connected,
// send a null RI-Upd to check if the sender is also unconnected // send a null RI-Upd to check if the sender is also unconnected
if p.sstate == SenderConnected && time.Since(lastSend) > sendRetryTimer { if p.sstate == SenderConnected && time.Since(p.lastSend) > sendRetryTimer {
if sendRetries >= sendRetryLimit { if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection") log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection")
} }
sendRetries++ p.incSendRetries()
lastSend = time.Now() p.bumpLastSend()
aurp.Inc(&p.Transport.LocalSeq) aurp.Inc(&p.Transport.LocalSeq)
events := aurp.EventTuples{{ events := aurp.EventTuples{{
EventCode: aurp.EventCodeNull, EventCode: aurp.EventCodeNull,
@ -302,7 +391,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
if p.ConfiguredAddr != "" { if p.ConfiguredAddr != "" {
// Periodically try to reconnect, if this peer is in the config file // Periodically try to reconnect, if this peer is in the config file
if time.Since(lastReconnect) <= reconnectTimer { if time.Since(p.lastReconnect) <= reconnectTimer {
break break
} }
@ -315,9 +404,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr) log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr)
p.RemoteAddr = raddr p.RemoteAddr = raddr
lastReconnect = time.Now() p.bumpLastReconnect()
sendRetries = 0 p.resetSendRetries()
lastSend = time.Now() p.bumpLastSend()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
@ -332,40 +421,41 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Do nothing // Do nothing
case SenderConnected: case SenderConnected:
if time.Since(lastUpdate) <= updateTimer { if time.Since(p.lastUpdate) <= updateTimer {
break break
} }
// TODO: is there a routing update to send? // TODO: is there a routing update to send?
p.bumpLastUpdate()
case SenderWaitForRIRspAck, SenderWaitForRIUpdAck: case SenderWaitForRIRspAck, SenderWaitForRIUpdAck:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(p.lastSend) <= sendRetryTimer {
break break
} }
if lastRISent == nil { if lastRISent == nil {
log.Print("AURP Peer: sender retry: lastRISent = nil?") log.Print("AURP Peer: sender retry: lastRISent = nil?")
continue continue
} }
if sendRetries >= sendRetryLimit { if p.sendRetries >= sendRetryLimit {
log.Printf("AURP Peer: Send retry limit reached, closing connection") log.Printf("AURP Peer: Send retry limit reached, closing connection")
p.setSState(SenderUnconnected) p.setSState(SenderUnconnected)
continue continue
} }
sendRetries++ p.incSendRetries()
lastSend = time.Now() p.bumpLastSend()
if _, err := p.Send(lastRISent); err != nil { if _, err := p.Send(lastRISent); err != nil {
log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err) log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err)
return err return err
} }
case SenderWaitForRDAck: case SenderWaitForRDAck:
if time.Since(lastSend) <= sendRetryTimer { if time.Since(p.lastSend) <= sendRetryTimer {
break break
} }
p.setSState(SenderUnconnected) p.setSState(SenderUnconnected)
} }
case pkt := <-p.ReceiveCh: case pkt := <-p.ReceiveCh:
lastHeardFrom = time.Now() p.bumpLastHeardFrom()
switch pkt := pkt.(type) { switch pkt := pkt.(type) {
case *aurp.OpenReqPacket: case *aurp.OpenReqPacket:
@ -402,8 +492,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// If receiver is unconnected, commence connecting // If receiver is unconnected, commence connecting
if p.rstate == ReceiverUnconnected { if p.rstate == ReceiverUnconnected {
lastSend = time.Now() p.resetSendRetries()
sendRetries = 0 p.bumpLastSend()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
@ -425,7 +515,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
p.setRState(ReceiverConnected) p.setRState(ReceiverConnected)
// Send an RI-Req // Send an RI-Req
sendRetries = 0 p.resetSendRetries()
if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil { if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil {
log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err)
return err return err
@ -500,7 +590,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
p.setSState(SenderConnected) p.setSState(SenderConnected)
sendRetries = 0 p.resetSendRetries()
// If SZI flag is set, send ZI-Rsp (transaction) // If SZI flag is set, send ZI-Rsp (transaction)
if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 { if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 {
@ -536,8 +626,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
// Receiver is unconnected, but their receiver sent us an // Receiver is unconnected, but their receiver sent us an
// RI-Ack for something // RI-Ack for something
// Try to reconnect? // Try to reconnect?
lastSend = time.Now() p.resetSendRetries()
sendRetries = 0 p.bumpLastSend()
if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil {
log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err)
return err return err
@ -546,7 +636,6 @@ func (p *AURPPeer) Handle(ctx context.Context) error {
} }
case *aurp.RIUpdPacket: case *aurp.RIUpdPacket:
var ackFlag aurp.RoutingFlag var ackFlag aurp.RoutingFlag
for _, et := range pkt.Events { for _, et := range pkt.Events {