AARP machine improvements

This commit is contained in:
Josh Deprez 2024-04-07 12:09:58 +10:00
parent 7c3e508dd1
commit 7cb0a31100
Signed by: josh
SSH key fingerprint: SHA256:zZji7w1Ilh2RuUpbQcqkLPrqmRwpiCSycbF2EfKm6Kw

150
aarp.go
View file

@ -21,52 +21,57 @@ const (
aarpRequestTimeout = 10 * time.Second aarpRequestTimeout = 10 * time.Second
) )
type aarpState int
const (
aarpStateProbing aarpState = iota
aarpStateAssigned
)
// AARPMachine maintains both an Address Mapping Table and handles AARP packets // AARPMachine maintains both an Address Mapping Table and handles AARP packets
// (sending and receiving requests, responses, and probes). This process assumes // (sending and receiving requests, responses, and probes). This process assumes
// a particular network range rather than using the startup range, since this // a particular network range rather than using the startup range, since this
// program is a seed router. // program is a seed router.
type AARPMachine struct { type AARPMachine struct {
*AMT *addressMappingTable
cfg *config cfg *config
pcapHandle *pcap.Handle pcapHandle *pcap.Handle
state aarpState // The Run goroutine is responsible for all writes to myAddr.Proto and
probes int // probes, so this mutex is not used to enforce a single writer, only
// consistent reads
mu sync.RWMutex
myAddr aarp.AddrPair myAddr aarp.AddrPair
probes int
} }
// NewAARPMachine creates a new AARPMachine.
func NewAARPMachine(cfg *config, pcapHandle *pcap.Handle, myHWAddr ethernet.Addr) *AARPMachine { func NewAARPMachine(cfg *config, pcapHandle *pcap.Handle, myHWAddr ethernet.Addr) *AARPMachine {
return &AARPMachine{ return &AARPMachine{
AMT: new(AMT), addressMappingTable: new(addressMappingTable),
cfg: cfg, cfg: cfg,
pcapHandle: pcapHandle, pcapHandle: pcapHandle,
myAddr: aarp.AddrPair{ myAddr: aarp.AddrPair{
Hardware: myHWAddr, Hardware: myHWAddr,
}, },
} }
} }
// Address returns the address of this node, and reports if the address is valid
// (i.e. not tentative).
func (a *AARPMachine) Address() (aarp.AddrPair, bool) {
a.mu.RLock()
defer a.mu.RUnlock()
return a.myAddr, a.assigned()
}
// Run executes the machine.
func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Packet) error { func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Packet) error {
ticker := time.NewTicker(200 * time.Millisecond) // 200ms is the AARP probe retransmit
defer ticker.Stop()
a.state = aarpStateProbing
a.probes = 0
// Initialise our DDP address with a preferred address (first network.1) // Initialise our DDP address with a preferred address (first network.1)
a.mu.Lock()
a.probes = 0
a.myAddr.Proto = ddp.Addr{ a.myAddr.Proto = ddp.Addr{
Network: ddp.Network(a.cfg.EtherTalk.NetStart), Network: ddp.Network(a.cfg.EtherTalk.NetStart),
Node: 1, Node: 1,
} }
a.mu.Unlock()
ticker := time.NewTicker(200 * time.Millisecond) // 200ms is the AARP probe retransmit
defer ticker.Stop()
for { for {
select { select {
@ -74,21 +79,18 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack
return ctx.Err() return ctx.Err()
case <-ticker.C: case <-ticker.C:
switch a.state { if a.assigned() {
case aarpStateAssigned:
// No need to keep the ticker running if assigned // No need to keep the ticker running if assigned
ticker.Stop() ticker.Stop()
continue
}
case aarpStateProbing: a.mu.Lock()
if a.probes >= 10 { a.probes++
a.state = aarpStateAssigned a.mu.Unlock()
continue
} if err := a.probe(); err != nil {
a.probes++ log.Printf("Couldn't broadcast a Probe: %v", err)
if err := a.probe(); err != nil {
log.Printf("Couldn't broadcast a Probe: %v", err)
continue
}
} }
case ethFrame, ok := <-incomingCh: case ethFrame, ok := <-incomingCh:
@ -106,15 +108,13 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack
case aarp.RequestOp: case aarp.RequestOp:
log.Printf("AARP: Who has %v? Tell %v", aapkt.Dst.Proto, aapkt.Src.Proto) log.Printf("AARP: Who has %v? Tell %v", aapkt.Dst.Proto, aapkt.Src.Proto)
// Glean that aapkt.Src.Proto -> aapkt.Src.Hardware // Glean that aapkt.Src.Proto -> aapkt.Src.Hardware
a.AMT.Learn(aapkt.Src.Proto, aapkt.Src.Hardware) a.addressMappingTable.Learn(aapkt.Src.Proto, aapkt.Src.Hardware)
log.Printf("AARP: Gleaned that %v -> %v", aapkt.Src.Proto, aapkt.Src.Hardware) log.Printf("AARP: Gleaned that %v -> %v", aapkt.Src.Proto, aapkt.Src.Hardware)
if aapkt.Dst.Proto != a.myAddr.Proto { if !(aapkt.Dst.Proto == a.myAddr.Proto && a.assigned()) {
continue
}
if a.state != aarpStateAssigned {
continue continue
} }
// Hey that's me! Let them know! // Hey that's me! Let them know!
if err := a.heyThatsMe(aapkt.Src); err != nil { if err := a.heyThatsMe(aapkt.Src); err != nil {
log.Printf("AARP: Couldn't respond to Request: %v", err) log.Printf("AARP: Couldn't respond to Request: %v", err)
@ -123,12 +123,12 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack
case aarp.ResponseOp: case aarp.ResponseOp:
log.Printf("AARP: %v is at %v", aapkt.Dst.Proto, aapkt.Dst.Hardware) log.Printf("AARP: %v is at %v", aapkt.Dst.Proto, aapkt.Dst.Hardware)
a.AMT.Learn(aapkt.Dst.Proto, aapkt.Dst.Hardware) a.addressMappingTable.Learn(aapkt.Dst.Proto, aapkt.Dst.Hardware)
if aapkt.Dst.Proto != a.myAddr.Proto { if aapkt.Dst.Proto != a.myAddr.Proto {
continue continue
} }
if a.state == aarpStateProbing { if !a.assigned() {
a.reroll() a.reroll()
} }
@ -139,19 +139,17 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack
if aapkt.Dst.Proto != a.myAddr.Proto { if aapkt.Dst.Proto != a.myAddr.Proto {
continue continue
} }
switch a.state { if !a.assigned() {
case aarpStateProbing:
// Another node is probing for the same address! Unlucky // Another node is probing for the same address! Unlucky
a.reroll() a.reroll()
continue
}
case aarpStateAssigned: if err := a.heyThatsMe(aapkt.Src); err != nil {
if err := a.heyThatsMe(aapkt.Src); err != nil { log.Printf("AARP: Couldn't respond to Probe: %v", err)
log.Printf("AARP: Couldn't respond to Probe: %v", err) continue
continue
}
} }
} }
} }
} }
} }
@ -160,9 +158,8 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack
// If the address is in the cache (AMT) and is still valid, that is used. // If the address is in the cache (AMT) and is still valid, that is used.
// Otherwise, the address is resolved using AARP. // Otherwise, the address is resolved using AARP.
func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.Addr, error) { func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.Addr, error) {
// try the cache first result, waitCh := a.addressMappingTable.lookupOrWait(ddpAddr)
result, ok := a.AMT.Lookup(ddpAddr) if waitCh == nil {
if ok {
return result, nil return result, nil
} }
@ -177,18 +174,15 @@ func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.A
defer cancel() defer cancel()
for { for {
// We might have a result already
result, ok := a.AMT.Lookup(ddpAddr)
if ok {
return result, nil
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ethernet.Addr{}, ctx.Err() return ethernet.Addr{}, ctx.Err()
case <-a.AMT.Wait(ddpAddr): case <-waitCh:
// Should have a result now. result, waitCh = a.addressMappingTable.lookupOrWait(ddpAddr)
if waitCh == nil {
return result, nil
}
case <-ticker.C: case <-ticker.C:
if err := a.request(ddpAddr); err != nil { if err := a.request(ddpAddr); err != nil {
@ -198,11 +192,15 @@ func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.A
} }
} }
func (a *AARPMachine) assigned() bool { return a.probes >= 10 }
// Re-roll a local address // Re-roll a local address
func (a *AARPMachine) reroll() { func (a *AARPMachine) reroll() {
a.mu.Lock()
defer a.mu.Unlock()
if a.cfg.EtherTalk.NetStart != a.cfg.EtherTalk.NetEnd { if a.cfg.EtherTalk.NetStart != a.cfg.EtherTalk.NetEnd {
// Pick a new network number at random // Pick a new network number at random
a.myAddr.Proto.Network = rand.N[ddp.Network]( a.myAddr.Proto.Network = rand.N(
a.cfg.EtherTalk.NetEnd-a.cfg.EtherTalk.NetStart+1, a.cfg.EtherTalk.NetEnd-a.cfg.EtherTalk.NetStart+1,
) + a.cfg.EtherTalk.NetStart ) + a.cfg.EtherTalk.NetStart
} }
@ -245,7 +243,7 @@ func (a *AARPMachine) probe() error {
} }
// Broadcast an AARP Request // Broadcast an AARP Request
func (a AARPMachine) request(ddpAddr ddp.Addr) error { func (a *AARPMachine) request(ddpAddr ddp.Addr) error {
reqFrame, err := ethertalk.AARP(a.myAddr.Hardware, aarp.Request(a.myAddr, ddpAddr)) reqFrame, err := ethertalk.AARP(a.myAddr.Hardware, aarp.Request(a.myAddr, ddpAddr))
if err != nil { if err != nil {
return err return err
@ -263,15 +261,15 @@ type amtEntry struct {
updated chan struct{} updated chan struct{}
} }
// AMT implements a concurrent-safe Address Mapping Table for AppleTalk (DDP) // addressMappingTable implements a concurrent-safe Address Mapping Table for
// addresses to Ethernet hardware addresses. // AppleTalk (DDP) addresses to Ethernet hardware addresses.
type AMT struct { type addressMappingTable struct {
mu sync.RWMutex mu sync.Mutex
table map[ddp.Addr]*amtEntry table map[ddp.Addr]*amtEntry
} }
// Learn adds or updates an AMT entry. // Learn adds or updates an AMT entry.
func (t *AMT) Learn(ddpAddr ddp.Addr, hwAddr ethernet.Addr) { func (t *addressMappingTable) Learn(ddpAddr ddp.Addr, hwAddr ethernet.Addr) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if t.table == nil { if t.table == nil {
@ -297,29 +295,21 @@ func (t *AMT) Learn(ddpAddr ddp.Addr, hwAddr ethernet.Addr) {
oldEnt.updated = make(chan struct{}) oldEnt.updated = make(chan struct{})
} }
// Wait returns a channel that is closed when the entry for ddpAddr is updated. // lookupOrWait returns either the valid cached Ethernet address for the given
func (t *AMT) Wait(ddpAddr ddp.Addr) <-chan struct{} { // DDP address, or a channel that is closed when the entry is updated.
func (t *addressMappingTable) lookupOrWait(ddpAddr ddp.Addr) (ethernet.Addr, <-chan struct{}) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if t.table == nil { if t.table == nil {
t.table = make(map[ddp.Addr]*amtEntry) t.table = make(map[ddp.Addr]*amtEntry)
} }
oldEnt := t.table[ddpAddr] ent, ok := t.table[ddpAddr]
if oldEnt != nil { if ok && time.Since(ent.last) < maxAMTEntryAge {
return oldEnt.updated return ent.hwAddr, nil
} }
ch := make(chan struct{}) ch := make(chan struct{})
t.table[ddpAddr] = &amtEntry{ t.table[ddpAddr] = &amtEntry{
updated: ch, updated: ch,
} }
return ch return ethernet.Addr{}, ch
}
// Lookup searches for a non-expired entry in the table only. It does not send
// any packets.
func (t *AMT) Lookup(ddpAddr ddp.Addr) (ethernet.Addr, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
ent, ok := t.table[ddpAddr]
return ent.hwAddr, ok && time.Since(ent.last) < maxAMTEntryAge
} }