diff --git a/aarp.go b/aarp.go index a368bb2..5f8cc0d 100644 --- a/aarp.go +++ b/aarp.go @@ -34,9 +34,10 @@ type AARPMachine struct { // The Run goroutine is responsible for all writes to myAddr.Proto and // probes, so this mutex is not used to enforce a single writer, only // consistent reads - mu sync.RWMutex - myAddr aarp.AddrPair - probes int + mu sync.RWMutex + myAddr aarp.AddrPair + probes int + assignedCh chan struct{} } // NewAARPMachine creates a new AARPMachine. @@ -48,6 +49,7 @@ func NewAARPMachine(cfg *config, pcapHandle *pcap.Handle, myHWAddr ethernet.Addr myAddr: aarp.AddrPair{ Hardware: myHWAddr, }, + assignedCh: make(chan struct{}), } } @@ -59,6 +61,11 @@ func (a *AARPMachine) Address() (aarp.AddrPair, bool) { return a.myAddr, a.assigned() } +// Assigned returns a channel that is closed when the local address is valid. +func (a *AARPMachine) Assigned() <-chan struct{} { + return a.assignedCh +} + // Run executes the machine. func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Packet) error { // Initialise our DDP address with a preferred address (first network.1) @@ -80,6 +87,7 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack case <-ticker.C: if a.assigned() { + close(a.assignedCh) // No need to keep the ticker running if assigned ticker.Stop() continue @@ -158,7 +166,7 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack // If the address is in the cache (AMT) and is still valid, that is used. // Otherwise, the address is resolved using AARP. func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.Addr, error) { - result, waitCh := a.addressMappingTable.lookupOrWait(ddpAddr) + result, waitCh := a.lookupOrWait(ddpAddr) if waitCh == nil { return result, nil } @@ -179,7 +187,7 @@ func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.A return ethernet.Addr{}, ctx.Err() case <-waitCh: - result, waitCh = a.addressMappingTable.lookupOrWait(ddpAddr) + result, waitCh = a.lookupOrWait(ddpAddr) if waitCh == nil { return result, nil } diff --git a/atalk/aep/aep.go b/atalk/aep/aep.go new file mode 100644 index 0000000..bc8ee61 --- /dev/null +++ b/atalk/aep/aep.go @@ -0,0 +1,38 @@ +package aep + +import "fmt" + +type Function uint8 + +const ( + EchoRequest Function = 1 + EchoReply Function = 2 +) + +// Packet represents an AEP packet. +type Packet struct { + Function Function + Data []byte +} + +// Marshal marshals an AEP packet. +func (p *Packet) Marshal() ([]byte, error) { + if p.Function < 1 || p.Function > 2 { + return nil, fmt.Errorf("invalid AEP function %d", p.Function) + } + if len(p.Data) == 0 { + return nil, fmt.Errorf("empty AEP packet") + } + return append([]byte{byte(p.Function)}, p.Data...), nil +} + +// Unmarshal unmarshals an AEP packet. +func Unmarshal(data []byte) (*Packet, error) { + if len(data) < 1 { + return nil, fmt.Errorf("insufficient input length %d for AEP packet", len(data)) + } + return &Packet{ + Function: Function(data[0]), + Data: data[1:], + }, nil +} diff --git a/main.go b/main.go index 55f274d..a61571a 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ import ( "context" "errors" "flag" + "fmt" "io" "log" "math/rand/v2" @@ -31,6 +32,7 @@ import ( "time" "gitea.drjosh.dev/josh/jrouter/atalk" + "gitea.drjosh.dev/josh/jrouter/atalk/aep" "gitea.drjosh.dev/josh/jrouter/aurp" "github.com/google/gopacket/pcap" "github.com/sfiera/multitalk/pkg/ddp" @@ -244,6 +246,12 @@ func main() { switch ddpkt.DstSocket { case 1: // The RTMP socket rtmpCh <- &ddpkt + + case 4: // The AEP socket + if err := handleAEP(pcapHandle, myHWAddr, ethFrame.Src, &ddpkt); err != nil { + log.Printf("AEP: Couldn't handle: %v", err) + } + default: log.Printf("DDP: No handler for socket %d", ddpkt.DstSocket) } @@ -327,6 +335,45 @@ func main() { } } +func handleAEP(pcapHandle *pcap.Handle, src, dst ethernet.Addr, ddpkt *ddp.ExtPacket) error { + if ddpkt.Proto != ddp.ProtoAEP { + return fmt.Errorf("invalid DDP type %d on socket 4", ddpkt.Proto) + } + ep, err := aep.Unmarshal(ddpkt.Data) + if err != nil { + return err + } + switch ep.Function { + case aep.EchoReply: + // we didn't send a request? I don't think? + // we shouldn't be sending them from this socket + return fmt.Errorf("echo reply received at socket 4 why?") + + case aep.EchoRequest: + // Uno Reverso the packet + // "The client can send the Echo Request datagram through any socket + // the client has open, and the Echo Reply will come back to this socket." + ddpkt.DstNet, ddpkt.SrcNet = ddpkt.SrcNet, ddpkt.DstNet + ddpkt.DstNode, ddpkt.SrcNode = ddpkt.SrcNode, ddpkt.DstNode + ddpkt.DstSocket, ddpkt.SrcSocket = ddpkt.SrcSocket, ddpkt.DstSocket + ddpkt.Data[0] = byte(aep.EchoReply) + + ethFrame, err := ethertalk.AppleTalk(src, *ddpkt) + if err != nil { + return err + } + ethFrame.Dst = dst + ethFrameRaw, err := ethertalk.Marshal(*ethFrame) + if err != nil { + return err + } + return pcapHandle.WritePacketData(ethFrameRaw) + + default: + return fmt.Errorf("invalid AEP function %d", ep.Function) + } +} + // Hashable net.UDPAddr type udpAddr struct { ipv4 [4]byte diff --git a/rtmp.go b/rtmp.go index 0742b0d..e2a122a 100644 --- a/rtmp.go +++ b/rtmp.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "log" "time" @@ -19,7 +20,15 @@ type RTMPMachine struct { pcapHandle *pcap.Handle } +// Run executes the machine. func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) error { + // Await local address assignment before doing anything + <-m.aarp.Assigned() + myAddr, ok := m.aarp.Address() + if !ok { + return fmt.Errorf("AARP machine closed Assigned channel but Address is not valid") + } + bcastTicker := time.NewTicker(10 * time.Second) defer bcastTicker.Stop() @@ -30,10 +39,6 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) case <-bcastTicker.C: // Broadcast an RTMP Data - myAddr, ok := m.aarp.Address() - if !ok { - continue - } dataPkt := m.dataPacket(myAddr.Proto) @@ -71,10 +76,6 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) log.Printf("RTMP: Couldn't unmarshal Request packet: %v", err) } - myAddr, ok := m.aarp.Address() - if !ok { - continue - } // should be in the cache... theirHWAddr, err := m.aarp.Resolve(ctx, ddp.Addr{Network: pkt.SrcNet, Node: pkt.SrcNode}) if err != nil { @@ -151,6 +152,8 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) // TODO: integrate this information with the routing table log.Print("RTMP: Got Response or Data") + default: + log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto) } } @@ -187,5 +190,5 @@ func (m *RTMPMachine) dataPacket(myAddr ddp.Addr) *rtmp.DataPacket { }, }, } - // TODO: append more networks! + // TODO: append more networks! implement a route table! }