From 7cb0a31100582a2da8bba68a92819f17f350162a Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Sun, 7 Apr 2024 12:09:58 +1000 Subject: [PATCH] AARP machine improvements --- aarp.go | 150 ++++++++++++++++++++++++++------------------------------ 1 file changed, 70 insertions(+), 80 deletions(-) diff --git a/aarp.go b/aarp.go index eb873d8..a368bb2 100644 --- a/aarp.go +++ b/aarp.go @@ -21,52 +21,57 @@ const ( aarpRequestTimeout = 10 * time.Second ) -type aarpState int - -const ( - aarpStateProbing aarpState = iota - aarpStateAssigned -) - // AARPMachine maintains both an Address Mapping Table and handles AARP packets // (sending and receiving requests, responses, and probes). This process assumes // a particular network range rather than using the startup range, since this // program is a seed router. type AARPMachine struct { - *AMT + *addressMappingTable cfg *config pcapHandle *pcap.Handle - state aarpState - probes int - + // The Run goroutine is responsible for all writes to myAddr.Proto and + // probes, so this mutex is not used to enforce a single writer, only + // consistent reads + mu sync.RWMutex myAddr aarp.AddrPair + probes int } +// NewAARPMachine creates a new AARPMachine. func NewAARPMachine(cfg *config, pcapHandle *pcap.Handle, myHWAddr ethernet.Addr) *AARPMachine { return &AARPMachine{ - AMT: new(AMT), - cfg: cfg, - pcapHandle: pcapHandle, + addressMappingTable: new(addressMappingTable), + cfg: cfg, + pcapHandle: pcapHandle, myAddr: aarp.AddrPair{ Hardware: myHWAddr, }, } } +// Address returns the address of this node, and reports if the address is valid +// (i.e. not tentative). +func (a *AARPMachine) Address() (aarp.AddrPair, bool) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.myAddr, a.assigned() +} + +// Run executes the machine. func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Packet) error { - ticker := time.NewTicker(200 * time.Millisecond) // 200ms is the AARP probe retransmit - defer ticker.Stop() - - a.state = aarpStateProbing - a.probes = 0 - // Initialise our DDP address with a preferred address (first network.1) + a.mu.Lock() + a.probes = 0 a.myAddr.Proto = ddp.Addr{ Network: ddp.Network(a.cfg.EtherTalk.NetStart), Node: 1, } + a.mu.Unlock() + + ticker := time.NewTicker(200 * time.Millisecond) // 200ms is the AARP probe retransmit + defer ticker.Stop() for { select { @@ -74,21 +79,18 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack return ctx.Err() case <-ticker.C: - switch a.state { - case aarpStateAssigned: + if a.assigned() { // No need to keep the ticker running if assigned ticker.Stop() + continue + } - case aarpStateProbing: - if a.probes >= 10 { - a.state = aarpStateAssigned - continue - } - a.probes++ - if err := a.probe(); err != nil { - log.Printf("Couldn't broadcast a Probe: %v", err) - continue - } + a.mu.Lock() + a.probes++ + a.mu.Unlock() + + if err := a.probe(); err != nil { + log.Printf("Couldn't broadcast a Probe: %v", err) } case ethFrame, ok := <-incomingCh: @@ -106,15 +108,13 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack case aarp.RequestOp: log.Printf("AARP: Who has %v? Tell %v", aapkt.Dst.Proto, aapkt.Src.Proto) // Glean that aapkt.Src.Proto -> aapkt.Src.Hardware - a.AMT.Learn(aapkt.Src.Proto, aapkt.Src.Hardware) + a.addressMappingTable.Learn(aapkt.Src.Proto, aapkt.Src.Hardware) log.Printf("AARP: Gleaned that %v -> %v", aapkt.Src.Proto, aapkt.Src.Hardware) - if aapkt.Dst.Proto != a.myAddr.Proto { - continue - } - if a.state != aarpStateAssigned { + if !(aapkt.Dst.Proto == a.myAddr.Proto && a.assigned()) { continue } + // Hey that's me! Let them know! if err := a.heyThatsMe(aapkt.Src); err != nil { log.Printf("AARP: Couldn't respond to Request: %v", err) @@ -123,12 +123,12 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack case aarp.ResponseOp: log.Printf("AARP: %v is at %v", aapkt.Dst.Proto, aapkt.Dst.Hardware) - a.AMT.Learn(aapkt.Dst.Proto, aapkt.Dst.Hardware) + a.addressMappingTable.Learn(aapkt.Dst.Proto, aapkt.Dst.Hardware) if aapkt.Dst.Proto != a.myAddr.Proto { continue } - if a.state == aarpStateProbing { + if !a.assigned() { a.reroll() } @@ -139,19 +139,17 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack if aapkt.Dst.Proto != a.myAddr.Proto { continue } - switch a.state { - case aarpStateProbing: + if !a.assigned() { // Another node is probing for the same address! Unlucky a.reroll() + continue + } - case aarpStateAssigned: - if err := a.heyThatsMe(aapkt.Src); err != nil { - log.Printf("AARP: Couldn't respond to Probe: %v", err) - continue - } + if err := a.heyThatsMe(aapkt.Src); err != nil { + log.Printf("AARP: Couldn't respond to Probe: %v", err) + continue } } - } } } @@ -160,9 +158,8 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack // If the address is in the cache (AMT) and is still valid, that is used. // Otherwise, the address is resolved using AARP. func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.Addr, error) { - // try the cache first - result, ok := a.AMT.Lookup(ddpAddr) - if ok { + result, waitCh := a.addressMappingTable.lookupOrWait(ddpAddr) + if waitCh == nil { return result, nil } @@ -177,18 +174,15 @@ func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.A defer cancel() for { - // We might have a result already - result, ok := a.AMT.Lookup(ddpAddr) - if ok { - return result, nil - } - select { case <-ctx.Done(): return ethernet.Addr{}, ctx.Err() - case <-a.AMT.Wait(ddpAddr): - // Should have a result now. + case <-waitCh: + result, waitCh = a.addressMappingTable.lookupOrWait(ddpAddr) + if waitCh == nil { + return result, nil + } case <-ticker.C: if err := a.request(ddpAddr); err != nil { @@ -198,11 +192,15 @@ func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.A } } +func (a *AARPMachine) assigned() bool { return a.probes >= 10 } + // Re-roll a local address func (a *AARPMachine) reroll() { + a.mu.Lock() + defer a.mu.Unlock() if a.cfg.EtherTalk.NetStart != a.cfg.EtherTalk.NetEnd { // Pick a new network number at random - a.myAddr.Proto.Network = rand.N[ddp.Network]( + a.myAddr.Proto.Network = rand.N( a.cfg.EtherTalk.NetEnd-a.cfg.EtherTalk.NetStart+1, ) + a.cfg.EtherTalk.NetStart } @@ -245,7 +243,7 @@ func (a *AARPMachine) probe() error { } // Broadcast an AARP Request -func (a AARPMachine) request(ddpAddr ddp.Addr) error { +func (a *AARPMachine) request(ddpAddr ddp.Addr) error { reqFrame, err := ethertalk.AARP(a.myAddr.Hardware, aarp.Request(a.myAddr, ddpAddr)) if err != nil { return err @@ -263,15 +261,15 @@ type amtEntry struct { updated chan struct{} } -// AMT implements a concurrent-safe Address Mapping Table for AppleTalk (DDP) -// addresses to Ethernet hardware addresses. -type AMT struct { - mu sync.RWMutex +// addressMappingTable implements a concurrent-safe Address Mapping Table for +// AppleTalk (DDP) addresses to Ethernet hardware addresses. +type addressMappingTable struct { + mu sync.Mutex table map[ddp.Addr]*amtEntry } // Learn adds or updates an AMT entry. -func (t *AMT) Learn(ddpAddr ddp.Addr, hwAddr ethernet.Addr) { +func (t *addressMappingTable) Learn(ddpAddr ddp.Addr, hwAddr ethernet.Addr) { t.mu.Lock() defer t.mu.Unlock() if t.table == nil { @@ -297,29 +295,21 @@ func (t *AMT) Learn(ddpAddr ddp.Addr, hwAddr ethernet.Addr) { oldEnt.updated = make(chan struct{}) } -// Wait returns a channel that is closed when the entry for ddpAddr is updated. -func (t *AMT) Wait(ddpAddr ddp.Addr) <-chan struct{} { +// lookupOrWait returns either the valid cached Ethernet address for the given +// DDP address, or a channel that is closed when the entry is updated. +func (t *addressMappingTable) lookupOrWait(ddpAddr ddp.Addr) (ethernet.Addr, <-chan struct{}) { t.mu.Lock() defer t.mu.Unlock() if t.table == nil { t.table = make(map[ddp.Addr]*amtEntry) } - oldEnt := t.table[ddpAddr] - if oldEnt != nil { - return oldEnt.updated + ent, ok := t.table[ddpAddr] + if ok && time.Since(ent.last) < maxAMTEntryAge { + return ent.hwAddr, nil } ch := make(chan struct{}) t.table[ddpAddr] = &amtEntry{ updated: ch, } - return ch -} - -// Lookup searches for a non-expired entry in the table only. It does not send -// any packets. -func (t *AMT) Lookup(ddpAddr ddp.Addr) (ethernet.Addr, bool) { - t.mu.RLock() - defer t.mu.RUnlock() - ent, ok := t.table[ddpAddr] - return ent.hwAddr, ok && time.Since(ent.last) < maxAMTEntryAge + return ethernet.Addr{}, ch }