From 331da095c3cb499aa2e09ebd0e7da04644ccf442 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Sun, 12 May 2024 18:07:27 +1000 Subject: [PATCH] AURP peer timers in status --- main.go | 10 +++ router/peer_aurp.go | 177 +++++++++++++++++++++++++++++++++----------- 2 files changed, 143 insertions(+), 44 deletions(-) diff --git a/main.go b/main.go index ac05d7b..699c5b6 100644 --- a/main.go +++ b/main.go @@ -87,6 +87,11 @@ const peerTableTemplate = ` Remote addr Receiver state Sender state + Last heard from + Last reconnect + Last update + Last send + Send retries {{range $peer := . }} @@ -95,6 +100,11 @@ const peerTableTemplate = ` {{$peer.RemoteAddr}} {{$peer.ReceiverState}} {{$peer.SenderState}} + {{$peer.LastHeardFromAgo}} + {{$peer.LastReconnectAgo}} + {{$peer.LastUpdateAgo}} + {{$peer.LastSendAgo}} + {{$peer.SendRetries}} {{end}} diff --git a/router/peer_aurp.go b/router/peer_aurp.go index e8b151c..8913151 100644 --- a/router/peer_aurp.go +++ b/router/peer_aurp.go @@ -19,6 +19,7 @@ package router import ( "bytes" "context" + "fmt" "log" "net" "sync" @@ -114,9 +115,16 @@ type AURPPeer struct { // Route table (the peer will add/remove/update routes and zones) RouteTable *RouteTable - mu sync.RWMutex - rstate ReceiverState - sstate SenderState + // The internal states below are only set within the Handle loop, but can + // be read concurrently from outside. + mu sync.RWMutex + rstate ReceiverState + sstate SenderState + lastReconnect time.Time + lastHeardFrom time.Time + lastSend time.Time // TODO: clarify use of lastSend / sendRetries + lastUpdate time.Time + sendRetries int } func (p *AURPPeer) Forward(ddpkt *ddp.ExtPacket) error { @@ -140,6 +148,48 @@ func (p *AURPPeer) SenderState() SenderState { return p.sstate } +func (p *AURPPeer) LastReconnectAgo() string { + p.mu.RLock() + defer p.mu.RUnlock() + if p.lastReconnect.IsZero() { + return "never" + } + return fmt.Sprintf("%v ago", time.Since(p.lastReconnect).Truncate(time.Millisecond)) +} + +func (p *AURPPeer) LastHeardFromAgo() string { + p.mu.RLock() + defer p.mu.RUnlock() + if p.lastHeardFrom.IsZero() { + return "never" + } + return fmt.Sprintf("%v ago", time.Since(p.lastHeardFrom).Truncate(time.Millisecond)) +} + +func (p *AURPPeer) LastSendAgo() string { + p.mu.RLock() + defer p.mu.RUnlock() + if p.lastSend.IsZero() { + return "never" + } + return fmt.Sprintf("%v ago", time.Since(p.lastSend).Truncate(time.Millisecond)) +} + +func (p *AURPPeer) LastUpdateAgo() string { + p.mu.RLock() + defer p.mu.RUnlock() + if p.lastUpdate.IsZero() { + return "never" + } + return fmt.Sprintf("%v ago", time.Since(p.lastUpdate).Truncate(time.Millisecond)) +} + +func (p *AURPPeer) SendRetries() int { + p.mu.RLock() + defer p.mu.RUnlock() + return p.sendRetries +} + func (p *AURPPeer) setRState(rstate ReceiverState) { p.mu.Lock() defer p.mu.Unlock() @@ -152,6 +202,42 @@ func (p *AURPPeer) setSState(sstate SenderState) { p.sstate = sstate } +func (p *AURPPeer) incSendRetries() { + p.mu.Lock() + defer p.mu.Unlock() + p.sendRetries++ +} + +func (p *AURPPeer) resetSendRetries() { + p.mu.Lock() + defer p.mu.Unlock() + p.sendRetries = 0 +} + +func (p *AURPPeer) bumpLastHeardFrom() { + p.mu.Lock() + defer p.mu.Unlock() + p.lastHeardFrom = time.Now() +} + +func (p *AURPPeer) bumpLastReconnect() { + p.mu.Lock() + defer p.mu.Unlock() + p.lastReconnect = time.Now() +} + +func (p *AURPPeer) bumpLastSend() { + p.mu.Lock() + defer p.mu.Unlock() + p.lastSend = time.Now() +} + +func (p *AURPPeer) bumpLastUpdate() { + p.mu.Lock() + defer p.mu.Unlock() + p.lastUpdate = time.Now() +} + func (p *AURPPeer) disconnect() { p.mu.Lock() defer p.mu.Unlock() @@ -175,11 +261,13 @@ func (p *AURPPeer) Handle(ctx context.Context) error { sticker := time.NewTicker(1 * time.Second) defer sticker.Stop() - lastReconnect := time.Now() - lastHeardFrom := time.Now() - lastSend := time.Now() // TODO: clarify use of lastSend / sendRetries - lastUpdate := time.Now() - sendRetries := 0 + p.mu.Lock() + p.lastReconnect = time.Now() + p.lastHeardFrom = time.Now() + p.lastSend = time.Now() // TODO: clarify use of lastSend / sendRetries + p.lastUpdate = time.Now() + p.sendRetries = 0 + p.mu.Unlock() var lastRISent aurp.Packet @@ -210,18 +298,18 @@ func (p *AURPPeer) Handle(ctx context.Context) error { case <-rticker.C: switch p.rstate { case ReceiverWaitForOpenRsp: - if time.Since(lastSend) <= sendRetryTimer { + if time.Since(p.lastSend) <= sendRetryTimer { break } - if sendRetries >= sendRetryLimit { + if p.sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached while waiting for Open-Rsp, closing connection") p.setRState(ReceiverUnconnected) break } // Send another Open-Req - sendRetries++ - lastSend = time.Now() + p.incSendRetries() + p.bumpLastSend() if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err @@ -229,7 +317,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { case ReceiverConnected: // Check LHFT, send tickle? - if time.Since(lastHeardFrom) <= lastHeardFromTimer { + if time.Since(p.lastHeardFrom) <= lastHeardFromTimer { break } if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { @@ -237,22 +325,22 @@ func (p *AURPPeer) Handle(ctx context.Context) error { return err } p.setRState(ReceiverWaitForTickleAck) - sendRetries = 0 - lastSend = time.Now() + p.resetSendRetries() + p.bumpLastSend() case ReceiverWaitForTickleAck: - if time.Since(lastSend) <= sendRetryTimer { + if time.Since(p.lastSend) <= sendRetryTimer { break } - if sendRetries >= tickleRetryLimit { + if p.sendRetries >= tickleRetryLimit { log.Printf("AURP Peer: Send retry limit reached while waiting for Tickle-Ack, closing connection") p.setRState(ReceiverUnconnected) p.RouteTable.DeleteAURPPeer(p) break } - sendRetries++ - lastSend = time.Now() + p.incSendRetries() + p.bumpLastSend() if _, err := p.Send(p.Transport.NewTicklePacket()); err != nil { log.Printf("AURP Peer: Couldn't send Tickle: %v", err) return err @@ -260,10 +348,10 @@ func (p *AURPPeer) Handle(ctx context.Context) error { // still in Wait For Tickle-Ack case ReceiverWaitForRIRsp: - if time.Since(lastSend) <= sendRetryTimer { + if time.Since(p.lastSend) <= sendRetryTimer { break } - if sendRetries >= sendRetryLimit { + if p.sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached while waiting for RI-Rsp, closing connection") p.setRState(ReceiverUnconnected) p.RouteTable.DeleteAURPPeer(p) @@ -272,7 +360,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error { // RI-Req is stateless, so we don't need to cache the one we // sent earlier just to send it again - sendRetries++ + p.incSendRetries() + p.bumpLastSend() if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil { log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) return err @@ -282,12 +371,12 @@ func (p *AURPPeer) Handle(ctx context.Context) error { case ReceiverUnconnected: // Data receiver is unconnected. If data sender is connected, // send a null RI-Upd to check if the sender is also unconnected - if p.sstate == SenderConnected && time.Since(lastSend) > sendRetryTimer { - if sendRetries >= sendRetryLimit { + if p.sstate == SenderConnected && time.Since(p.lastSend) > sendRetryTimer { + if p.sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached while probing sender connect, closing connection") } - sendRetries++ - lastSend = time.Now() + p.incSendRetries() + p.bumpLastSend() aurp.Inc(&p.Transport.LocalSeq) events := aurp.EventTuples{{ EventCode: aurp.EventCodeNull, @@ -302,7 +391,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { if p.ConfiguredAddr != "" { // Periodically try to reconnect, if this peer is in the config file - if time.Since(lastReconnect) <= reconnectTimer { + if time.Since(p.lastReconnect) <= reconnectTimer { break } @@ -315,9 +404,9 @@ func (p *AURPPeer) Handle(ctx context.Context) error { log.Printf("AURP Peer: resolved %q to %v", p.ConfiguredAddr, raddr) p.RemoteAddr = raddr - lastReconnect = time.Now() - sendRetries = 0 - lastSend = time.Now() + p.bumpLastReconnect() + p.resetSendRetries() + p.bumpLastSend() if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err @@ -332,40 +421,41 @@ func (p *AURPPeer) Handle(ctx context.Context) error { // Do nothing case SenderConnected: - if time.Since(lastUpdate) <= updateTimer { + if time.Since(p.lastUpdate) <= updateTimer { break } // TODO: is there a routing update to send? + p.bumpLastUpdate() case SenderWaitForRIRspAck, SenderWaitForRIUpdAck: - if time.Since(lastSend) <= sendRetryTimer { + if time.Since(p.lastSend) <= sendRetryTimer { break } if lastRISent == nil { log.Print("AURP Peer: sender retry: lastRISent = nil?") continue } - if sendRetries >= sendRetryLimit { + if p.sendRetries >= sendRetryLimit { log.Printf("AURP Peer: Send retry limit reached, closing connection") p.setSState(SenderUnconnected) continue } - sendRetries++ - lastSend = time.Now() + p.incSendRetries() + p.bumpLastSend() if _, err := p.Send(lastRISent); err != nil { log.Printf("AURP Peer: Couldn't re-send %T: %v", lastRISent, err) return err } case SenderWaitForRDAck: - if time.Since(lastSend) <= sendRetryTimer { + if time.Since(p.lastSend) <= sendRetryTimer { break } p.setSState(SenderUnconnected) } case pkt := <-p.ReceiveCh: - lastHeardFrom = time.Now() + p.bumpLastHeardFrom() switch pkt := pkt.(type) { case *aurp.OpenReqPacket: @@ -402,8 +492,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error { // If receiver is unconnected, commence connecting if p.rstate == ReceiverUnconnected { - lastSend = time.Now() - sendRetries = 0 + p.resetSendRetries() + p.bumpLastSend() if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err @@ -425,7 +515,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { p.setRState(ReceiverConnected) // Send an RI-Req - sendRetries = 0 + p.resetSendRetries() if _, err := p.Send(p.Transport.NewRIReqPacket()); err != nil { log.Printf("AURP Peer: Couldn't send RI-Req packet: %v", err) return err @@ -500,7 +590,7 @@ func (p *AURPPeer) Handle(ctx context.Context) error { } p.setSState(SenderConnected) - sendRetries = 0 + p.resetSendRetries() // If SZI flag is set, send ZI-Rsp (transaction) if pkt.Flags&aurp.RoutingFlagSendZoneInfo != 0 { @@ -536,8 +626,8 @@ func (p *AURPPeer) Handle(ctx context.Context) error { // Receiver is unconnected, but their receiver sent us an // RI-Ack for something // Try to reconnect? - lastSend = time.Now() - sendRetries = 0 + p.resetSendRetries() + p.bumpLastSend() if _, err := p.Send(p.Transport.NewOpenReqPacket(nil)); err != nil { log.Printf("AURP Peer: Couldn't send Open-Req packet: %v", err) return err @@ -546,7 +636,6 @@ func (p *AURPPeer) Handle(ctx context.Context) error { } case *aurp.RIUpdPacket: - var ackFlag aurp.RoutingFlag for _, et := range pkt.Events {