Fix lastSend tracking
This commit is contained in:
parent
d24fca67b8
commit
16be2ef41c
1 changed files with 60 additions and 38 deletions
98
peer.go
98
peer.go
|
@ -12,30 +12,30 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// TODO: check these parameters
|
// TODO: check these parameters
|
||||||
lastHeardFromTimer = 10 * time.Second
|
lastHeardFromTimer = 10 * time.Second
|
||||||
lastHeardFromRetryLimit = 10
|
tickleRetryLimit = 10
|
||||||
sendRetryTimer = 10 * time.Second
|
sendRetryTimer = 10 * time.Second
|
||||||
sendRetryLimit = 5
|
sendRetryLimit = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
type receiverState int
|
type receiverState int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
receiverStateUnconnected receiverState = iota
|
rsUnconnected receiverState = iota
|
||||||
receiverStateConnected
|
rsConnected
|
||||||
receiverStateWaitForOpenRsp
|
rsWaitForOpenRsp
|
||||||
receiverStateWaitForRIRsp
|
rsWaitForRIRsp
|
||||||
receiverStateWaitForTickleAck
|
rsWaitForTickleAck
|
||||||
)
|
)
|
||||||
|
|
||||||
type senderState int
|
type senderState int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
senderStateUnconnected senderState = iota
|
ssUnconnected senderState = iota
|
||||||
senderStateConnected
|
ssConnected
|
||||||
senderStateWaitForRIAck1
|
ssWaitForRIAck1
|
||||||
senderStateWaitForRIAck2
|
ssWaitForRIAck2
|
||||||
senderStateWaitForRIAck3
|
ssWaitForRIAck3
|
||||||
)
|
)
|
||||||
|
|
||||||
type peer struct {
|
type peer struct {
|
||||||
|
@ -63,8 +63,8 @@ func (p *peer) handle(ctx context.Context) error {
|
||||||
lastSend := time.Now()
|
lastSend := time.Now()
|
||||||
sendRetries := 0
|
sendRetries := 0
|
||||||
|
|
||||||
rstate := receiverStateUnconnected
|
rstate := rsUnconnected
|
||||||
sstate := senderStateUnconnected
|
sstate := ssUnconnected
|
||||||
|
|
||||||
// Write an Open-Req packet
|
// Write an Open-Req packet
|
||||||
if _, err := p.send(p.tr.NewOpenReqPacket(nil)); err != nil {
|
if _, err := p.send(p.tr.NewOpenReqPacket(nil)); err != nil {
|
||||||
|
@ -72,12 +72,13 @@ func (p *peer) handle(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
rstate = receiverStateWaitForOpenRsp
|
rstate = rsWaitForOpenRsp
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
if rstate == receiverStateUnconnected {
|
if sstate == ssUnconnected {
|
||||||
|
// Return immediately
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
// Send a best-effort Router Down before returning
|
// Send a best-effort Router Down before returning
|
||||||
|
@ -88,31 +89,53 @@ func (p *peer) handle(ctx context.Context) error {
|
||||||
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
switch rstate {
|
switch rstate {
|
||||||
case receiverStateWaitForOpenRsp:
|
case rsWaitForOpenRsp:
|
||||||
if time.Since(lastSend) <= sendRetryTimer {
|
if time.Since(lastSend) <= sendRetryTimer {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if sendRetries >= sendRetryLimit {
|
if sendRetries >= sendRetryLimit {
|
||||||
log.Printf("Send retry limit reached while waiting for Open-Rsp, closing connection")
|
log.Printf("Send retry limit reached while waiting for Open-Rsp, closing connection")
|
||||||
rstate = receiverStateUnconnected
|
rstate = rsUnconnected
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send another Open-Req
|
// Send another Open-Req
|
||||||
sendRetries++
|
sendRetries++
|
||||||
|
lastSend = time.Now()
|
||||||
if _, err := p.send(p.tr.NewOpenReqPacket(nil)); err != nil {
|
if _, err := p.send(p.tr.NewOpenReqPacket(nil)); err != nil {
|
||||||
log.Printf("Couldn't send Open-Req packet: %v", err)
|
log.Printf("Couldn't send Open-Req packet: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
case receiverStateConnected:
|
case rsConnected:
|
||||||
// Check LHFT, send tickle?
|
// Check LHFT, send tickle?
|
||||||
if time.Since(lastHeardFrom) > lastHeardFromTimer {
|
if time.Since(lastHeardFrom) <= lastHeardFromTimer {
|
||||||
if _, err := p.send(p.tr.NewTicklePacket()); err != nil {
|
break
|
||||||
log.Printf("Couldn't send Tickle: %v", err)
|
}
|
||||||
}
|
if _, err := p.send(p.tr.NewTicklePacket()); err != nil {
|
||||||
|
log.Printf("Couldn't send Tickle: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rstate = rsWaitForTickleAck
|
||||||
|
sendRetries = 0
|
||||||
|
lastSend = time.Now()
|
||||||
|
|
||||||
|
case rsWaitForTickleAck:
|
||||||
|
if time.Since(lastSend) <= sendRetryTimer {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if sendRetries >= tickleRetryLimit {
|
||||||
|
log.Printf("Send retry limit reached while waiting for Tickle-Ack, closing connection")
|
||||||
|
rstate = rsUnconnected
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
sendRetries++
|
||||||
|
lastSend = time.Now()
|
||||||
|
if _, err := p.send(p.tr.NewTicklePacket()); err != nil {
|
||||||
|
log.Printf("Couldn't send Tickle: %v", err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
rstate = receiverStateWaitForTickleAck
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case pkt := <-p.recv:
|
case pkt := <-p.recv:
|
||||||
|
@ -120,7 +143,7 @@ func (p *peer) handle(ctx context.Context) error {
|
||||||
|
|
||||||
switch pkt := pkt.(type) {
|
switch pkt := pkt.(type) {
|
||||||
case *aurp.OpenReqPacket:
|
case *aurp.OpenReqPacket:
|
||||||
if sstate != senderStateUnconnected {
|
if sstate != ssUnconnected {
|
||||||
log.Printf("Open-Req received but sender state is not Unconnected (was %d); ignoring packet", sstate)
|
log.Printf("Open-Req received but sender state is not Unconnected (was %d); ignoring packet", sstate)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -144,26 +167,23 @@ func (p *peer) handle(ctx context.Context) error {
|
||||||
orsp = p.tr.NewOpenRspPacket(0, 1, nil)
|
orsp = p.tr.NewOpenRspPacket(0, 1, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Responding with %T", orsp)
|
|
||||||
|
|
||||||
if _, err := p.send(orsp); err != nil {
|
if _, err := p.send(orsp); err != nil {
|
||||||
log.Printf("Couldn't send Open-Rsp: %v", err)
|
log.Printf("Couldn't send Open-Rsp: %v", err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
if orsp.RateOrErrCode >= 0 {
|
if orsp.RateOrErrCode >= 0 {
|
||||||
sstate = senderStateConnected
|
sstate = ssConnected
|
||||||
} else {
|
|
||||||
sstate = senderStateUnconnected
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case *aurp.OpenRspPacket:
|
case *aurp.OpenRspPacket:
|
||||||
if rstate != receiverStateWaitForOpenRsp {
|
if rstate != rsWaitForOpenRsp {
|
||||||
log.Printf("Received Open-Rsp but was not waiting for one (receiver state was %d)", rstate)
|
log.Printf("Received Open-Rsp but was not waiting for one (receiver state was %d)", rstate)
|
||||||
}
|
}
|
||||||
if pkt.RateOrErrCode < 0 {
|
if pkt.RateOrErrCode < 0 {
|
||||||
// It's an error code.
|
// It's an error code.
|
||||||
log.Printf("Open-Rsp error code from peer %v: %d", p.raddr.IP, pkt.RateOrErrCode)
|
log.Printf("Open-Rsp error code from peer %v: %d", p.raddr.IP, pkt.RateOrErrCode)
|
||||||
// Close the connection
|
// Close the connection
|
||||||
rstate = receiverStateUnconnected
|
rstate = rsUnconnected
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Make other requests
|
// TODO: Make other requests
|
||||||
|
@ -189,9 +209,10 @@ func (p *peer) handle(ctx context.Context) error {
|
||||||
// Respond with RI-Ack
|
// Respond with RI-Ack
|
||||||
if _, err := p.send(p.tr.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
|
if _, err := p.send(p.tr.NewRIAckPacket(pkt.ConnectionID, pkt.Sequence, 0)); err != nil {
|
||||||
log.Printf("Couldn't send RI-Ack: %v", err)
|
log.Printf("Couldn't send RI-Ack: %v", err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
// Connection closed
|
// Connection closed
|
||||||
rstate = receiverStateUnconnected
|
rstate = rsUnconnected
|
||||||
|
|
||||||
case *aurp.ZIReqPacket:
|
case *aurp.ZIReqPacket:
|
||||||
// TODO: Respond with ZI-Rsp
|
// TODO: Respond with ZI-Rsp
|
||||||
|
@ -203,13 +224,14 @@ func (p *peer) handle(ctx context.Context) error {
|
||||||
// Immediately respond with Tickle-Ack
|
// Immediately respond with Tickle-Ack
|
||||||
if _, err := p.send(p.tr.NewTickleAckPacket()); err != nil {
|
if _, err := p.send(p.tr.NewTickleAckPacket()); err != nil {
|
||||||
log.Printf("Couldn't send Tickle-Ack: %v", err)
|
log.Printf("Couldn't send Tickle-Ack: %v", err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
case *aurp.TickleAckPacket:
|
case *aurp.TickleAckPacket:
|
||||||
if rstate != receiverStateWaitForTickleAck {
|
if rstate != rsWaitForTickleAck {
|
||||||
log.Printf("Received Tickle-Ack but was not waiting for one (receever state was %d)", rstate)
|
log.Printf("Received Tickle-Ack but was not waiting for one (receiver state was %d)", rstate)
|
||||||
}
|
}
|
||||||
rstate = receiverStateConnected
|
rstate = rsConnected
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue