From a239b7734f1d817ef33225040e26cd5f187f2e4a Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Sat, 4 May 2024 14:48:47 +1000 Subject: [PATCH] refactor RTMP into port --- main.go | 27 +++-- router/peer_eth.go | 18 ++- router/port.go | 6 +- router/rtmp.go | 289 ++++++++++++++++++++------------------------- 4 files changed, 151 insertions(+), 189 deletions(-) diff --git a/main.go b/main.go index 2fb4cc3..9579c8b 100644 --- a/main.go +++ b/main.go @@ -61,7 +61,17 @@ const routingTableTemplate = ` {{if $route.Extended}}✅{{else}}❌{{end}} {{$route.Distance}} {{$route.LastSeenAgo}} - {{if $route.AURPPeer}}{{$route.AURPPeer.RemoteAddr}}{{else if $route.EtherTalkPeer}}{{$route.EtherTalkPeer.PeerAddr.Network}}.{{$route.EtherTalkPeer.PeerAddr.Node}}{{else}}-{{end}} + + {{- with $route.AURPPeer -}} + {{.RemoteAddr}} + {{- end -}} + {{- with $route.EtherTalkPeer -}} + {{.Port.Device}} {{.PeerAddr.Network}}.{{.PeerAddr.Node}} + {{- end -}} + {{- with $route.EtherTalkDirect -}} + {{.Device}} {{.NetStart}}-{{.NetEnd}} + {{- end -}} + {{end}} @@ -330,16 +340,6 @@ func main() { aarpMachine := router.NewAARPMachine(cfg, pcapHandle, myHWAddr) go aarpMachine.Run(ctx) - // --------------------------------- RTMP --------------------------------- - rtmpMachine := &router.RTMPMachine{ - AARPMachine: aarpMachine, - Config: cfg, - PcapHandle: pcapHandle, - RoutingTable: routes, - IncomingCh: make(chan *ddp.ExtPacket, 1024), - } - go rtmpMachine.Run(ctx) - // -------------------------------- Router -------------------------------- rooter := &router.Router{ Config: cfg, @@ -348,6 +348,7 @@ func main() { } etherTalkPort := &router.EtherTalkPort{ + Device: cfg.EtherTalk.Device, EthernetAddr: myHWAddr, NetStart: cfg.EtherTalk.NetStart, NetEnd: cfg.EtherTalk.NetEnd, @@ -355,7 +356,6 @@ func main() { AvailableZones: []string{cfg.EtherTalk.ZoneName}, PcapHandle: pcapHandle, AARPMachine: aarpMachine, - RTMPMachine: rtmpMachine, Router: rooter, } rooter.Ports = append(rooter.Ports, etherTalkPort) @@ -364,6 +364,9 @@ func main() { zones.Upsert(etherTalkPort.NetStart, az, etherTalkPort) } + // --------------------------------- RTMP --------------------------------- + go etherTalkPort.RunRTMP(ctx) + // ---------------------- Raw AppleTalk/AARP inbound ---------------------- wg.Add(1) go func() { diff --git a/router/peer_eth.go b/router/peer_eth.go index 046687e..4b181a2 100644 --- a/router/peer_eth.go +++ b/router/peer_eth.go @@ -19,29 +19,25 @@ package router import ( "context" - "github.com/google/gopacket/pcap" "github.com/sfiera/multitalk/pkg/ddp" - "github.com/sfiera/multitalk/pkg/ethernet" "github.com/sfiera/multitalk/pkg/ethertalk" ) -// EtherTalkPeer holds data needed to exchange routes and zones with another -// router on the EtherTalk network. +// EtherTalkPeer holds data needed to forward packets to another router on the +// EtherTalk network. type EtherTalkPeer struct { - PcapHandle *pcap.Handle - MyHWAddr ethernet.Addr - AARP *AARPMachine - PeerAddr ddp.Addr + Port *EtherTalkPort + PeerAddr ddp.Addr } // Forward forwards a DDP packet to the next router. func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error { // TODO: AARP resolution can block - de, err := p.AARP.Resolve(ctx, p.PeerAddr) + de, err := p.Port.AARPMachine.Resolve(ctx, p.PeerAddr) if err != nil { return err } - outFrame, err := ethertalk.AppleTalk(p.MyHWAddr, *pkt) + outFrame, err := ethertalk.AppleTalk(p.Port.EthernetAddr, *pkt) if err != nil { return err } @@ -50,5 +46,5 @@ func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error { if err != nil { return err } - return p.PcapHandle.WritePacketData(outFrameRaw) + return p.Port.PcapHandle.WritePacketData(outFrameRaw) } diff --git a/router/port.go b/router/port.go index 39ac386..eb8aad6 100644 --- a/router/port.go +++ b/router/port.go @@ -31,6 +31,7 @@ import ( // EtherTalkPort is all the data and helpers needed for EtherTalk on one port. type EtherTalkPort struct { + Device string EthernetAddr ethernet.Addr NetStart ddp.Network NetEnd ddp.Network @@ -39,7 +40,6 @@ type EtherTalkPort struct { AvailableZones []string PcapHandle *pcap.Handle AARPMachine *AARPMachine - RTMPMachine *RTMPMachine Router *Router } @@ -129,7 +129,9 @@ func (port *EtherTalkPort) Serve(ctx context.Context) { switch ddpkt.DstSocket { case 1: // The RTMP socket - port.RTMPMachine.Handle(ctx, ddpkt) + if err := port.HandleRTMP(ctx, ddpkt); err != nil { + log.Printf("RTMP: Couldn't handle: %v", err) + } case 2: // The NIS (name information socket / NBP socket) if err := port.HandleNBP(ctx, ddpkt); err != nil { diff --git a/router/rtmp.go b/router/rtmp.go index 04b96ce..8a55b03 100644 --- a/router/rtmp.go +++ b/router/rtmp.go @@ -26,33 +26,113 @@ import ( "gitea.drjosh.dev/josh/jrouter/atalk/rtmp" "gitea.drjosh.dev/josh/jrouter/status" - "github.com/google/gopacket/pcap" - - "github.com/sfiera/multitalk/pkg/aarp" "github.com/sfiera/multitalk/pkg/ddp" - "github.com/sfiera/multitalk/pkg/ethernet" - "github.com/sfiera/multitalk/pkg/ethertalk" ) // RTMPMachine implements RTMP on an AppleTalk network attached to the router. -type RTMPMachine struct { - AARPMachine *AARPMachine - Config *Config - PcapHandle *pcap.Handle - RoutingTable *RouteTable +func (port *EtherTalkPort) HandleRTMP(ctx context.Context, pkt *ddp.ExtPacket) error { + switch pkt.Proto { + case ddp.ProtoRTMPReq: + // I can answer RTMP requests! + req, err := rtmp.UnmarshalRequestPacket(pkt.Data) + if err != nil { + return fmt.Errorf("unmarshal Request packet: %w", err) + } - IncomingCh chan *ddp.ExtPacket -} + switch req.Function { + case rtmp.FunctionRequest: + // Respond with RTMP Response + respPkt := &rtmp.ResponsePacket{ + SenderAddr: port.MyAddr, + Extended: true, + RangeStart: port.NetStart, + RangeEnd: port.NetEnd, + } + respPktRaw, err := respPkt.Marshal() + if err != nil { + return fmt.Errorf("marshal RTMP Response packet: %w", err) + } + ddpPkt := &ddp.ExtPacket{ + ExtHeader: ddp.ExtHeader{ + Size: uint16(len(respPktRaw)) + atalk.DDPExtHeaderSize, + Cksum: 0, + DstNet: pkt.SrcNet, + DstNode: pkt.SrcNode, + DstSocket: 1, // the RTMP socket + SrcNet: port.MyAddr.Network, + SrcNode: port.MyAddr.Node, + SrcSocket: 1, // the RTMP socket + Proto: ddp.ProtoRTMPResp, + }, + Data: respPktRaw, + } -func (m *RTMPMachine) Handle(ctx context.Context, pkt *ddp.ExtPacket) { - select { - case <-ctx.Done(): - case m.IncomingCh <- pkt: + if err := port.Router.Forward(ctx, ddpPkt); err != nil { + return fmt.Errorf("send Response: %w", err) + } + + case rtmp.FunctionRDRSplitHorizon, rtmp.FunctionRDRComplete: + // Like the Data broadcast, but solicited by a request (RDR). + splitHorizon := req.Function == rtmp.FunctionRDRSplitHorizon + for _, dataPkt := range port.rtmpDataPackets(splitHorizon) { + dataPktRaw, err := dataPkt.Marshal() + if err != nil { + return fmt.Errorf("marshal RTMP Data packet: %w", err) + } + + ddpPkt := &ddp.ExtPacket{ + ExtHeader: ddp.ExtHeader{ + Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize, + Cksum: 0, + DstNet: pkt.SrcNet, + DstNode: pkt.SrcNode, + DstSocket: 1, // the RTMP socket + SrcNet: port.MyAddr.Network, + SrcNode: port.MyAddr.Node, + SrcSocket: 1, // the RTMP socket + Proto: ddp.ProtoRTMPResp, + }, + Data: dataPktRaw, + } + + if err := port.Router.Forward(ctx, ddpPkt); err != nil { + return fmt.Errorf("send Data: %w", err) + } + } + + case rtmp.FunctionLoopProbe: + log.Print("RTMP: TODO: handle Loop Probes") + return nil + } + + case ddp.ProtoRTMPResp: + // It's a peer router on the AppleTalk network! + log.Print("RTMP: Got Response or Data") + dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data) + if err != nil { + log.Printf("RTMP: Couldn't unmarshal RTMP Data packet: %v", err) + break + } + peer := &EtherTalkPeer{ + Port: port, + PeerAddr: dataPkt.RouterAddr, + } + + for _, rt := range dataPkt.NetworkTuples { + if err := port.Router.RouteTable.UpsertEthRoute(peer, rt.Extended, rt.RangeStart, rt.RangeEnd, rt.Distance+1); err != nil { + log.Printf("RTMP: Couldn't upsert EtherTalk route: %v", err) + } + } + + default: + log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto) } + + return nil } -// Run executes the machine. -func (m *RTMPMachine) Run(ctx context.Context) (err error) { +// RunRTMP makes periodic RTMP Data broadcasts on this port. +func (port *EtherTalkPort) RunRTMP(ctx context.Context) (err error) { ctx, setStatus, _ := status.AddSimpleItem(ctx, "RTMP") defer func() { setStatus(fmt.Sprintf("Run loop stopped! Return: %v", err)) @@ -61,16 +141,12 @@ func (m *RTMPMachine) Run(ctx context.Context) (err error) { setStatus("Awaiting DDP address assignment") // Await local address assignment before doing anything - <-m.AARPMachine.Assigned() - myAddr, ok := m.AARPMachine.Address() - if !ok { - return fmt.Errorf("AARP machine closed Assigned channel but Address is not valid") - } + <-port.AARPMachine.Assigned() setStatus("Initial RTMP Data broadcast") // Initial broadcast - if err := m.broadcastData(myAddr); err != nil { + if err := port.broadcastRTMPData(); err != nil { log.Printf("RTMP: Couldn't broadcast Data: %v", err) } @@ -86,141 +162,15 @@ func (m *RTMPMachine) Run(ctx context.Context) (err error) { case <-bcastTicker.C: setStatus("Broadcasting RTMP Data") - if err := m.broadcastData(myAddr); err != nil { + if err := port.broadcastRTMPData(); err != nil { log.Printf("RTMP: Couldn't broadcast Data: %v", err) } - - case pkt := <-m.IncomingCh: - setStatus("Handling incoming packet") - switch pkt.Proto { - case ddp.ProtoRTMPReq: - // I can answer RTMP requests! - req, err := rtmp.UnmarshalRequestPacket(pkt.Data) - if err != nil { - log.Printf("RTMP: Couldn't unmarshal Request packet: %v", err) - } - - // should be in the cache... - theirHWAddr, err := m.AARPMachine.Resolve(ctx, ddp.Addr{Network: pkt.SrcNet, Node: pkt.SrcNode}) - if err != nil { - log.Printf("RTMP: Couldn't resolve %d.%d to a hardware address: %v", pkt.SrcNet, pkt.SrcNode, err) - continue - } - - switch req.Function { - case rtmp.FunctionRequest: - // Respond with RTMP Response - respPkt := &rtmp.ResponsePacket{ - SenderAddr: myAddr.Proto, - Extended: true, - RangeStart: m.Config.EtherTalk.NetStart, - RangeEnd: m.Config.EtherTalk.NetEnd, - } - respPktRaw, err := respPkt.Marshal() - if err != nil { - log.Printf("RTMP: Couldn't marshal RTMP Response packet: %v", err) - continue - } - ddpPkt := &ddp.ExtPacket{ - ExtHeader: ddp.ExtHeader{ - Size: uint16(len(respPktRaw)) + atalk.DDPExtHeaderSize, - Cksum: 0, - DstNet: pkt.SrcNet, - DstNode: pkt.SrcNode, - DstSocket: 1, // the RTMP socket - SrcNet: myAddr.Proto.Network, - SrcNode: myAddr.Proto.Node, - SrcSocket: 1, // the RTMP socket - Proto: ddp.ProtoRTMPResp, - }, - Data: respPktRaw, - } - - if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil { - log.Printf("RTMP: Couldn't send Data broadcast: %v", err) - } - - case rtmp.FunctionRDRSplitHorizon, rtmp.FunctionRDRComplete: - // Like the Data broadcast, but solicited by a request (RDR). - // TODO: handle split-horizon processing - for _, dataPkt := range m.dataPackets(myAddr.Proto) { - dataPktRaw, err := dataPkt.Marshal() - if err != nil { - log.Printf("RTMP: Couldn't marshal Data packet: %v", err) - break - } - - ddpPkt := &ddp.ExtPacket{ - ExtHeader: ddp.ExtHeader{ - Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize, - Cksum: 0, - DstNet: pkt.SrcNet, - DstNode: pkt.SrcNode, - DstSocket: 1, // the RTMP socket - SrcNet: myAddr.Proto.Network, - SrcNode: myAddr.Proto.Node, - SrcSocket: 1, // the RTMP socket - Proto: ddp.ProtoRTMPResp, - }, - Data: dataPktRaw, - } - - if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil { - log.Printf("RTMP: Couldn't send Data response: %v", err) - break - } - } - - case rtmp.FunctionLoopProbe: - log.Print("RTMP: TODO: handle Loop Probes") - - } - - case ddp.ProtoRTMPResp: - // It's a peer router on the AppleTalk network! - log.Print("RTMP: Got Response or Data") - dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data) - if err != nil { - log.Printf("RTMP: Couldn't unmarshal RTMP Data packet: %v", err) - break - } - peer := &EtherTalkPeer{ - PcapHandle: m.PcapHandle, - MyHWAddr: m.AARPMachine.myAddr.Hardware, - AARP: m.AARPMachine, - PeerAddr: dataPkt.RouterAddr, - } - - for _, rt := range dataPkt.NetworkTuples { - if err := m.RoutingTable.UpsertEthRoute(peer, rt.Extended, rt.RangeStart, rt.RangeEnd, rt.Distance+1); err != nil { - log.Printf("RTMP: Couldn't upsert EtherTalk route: %v", err) - } - } - - default: - log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto) - } - } } } -func (m *RTMPMachine) send(src, dst ethernet.Addr, ddpPkt *ddp.ExtPacket) error { - ethFrame, err := ethertalk.AppleTalk(src, *ddpPkt) - if err != nil { - return err - } - ethFrame.Dst = dst - - ethFrameRaw, err := ethertalk.Marshal(*ethFrame) - if err != nil { - return err - } - return m.PcapHandle.WritePacketData(ethFrameRaw) -} - -func (m *RTMPMachine) broadcastData(myAddr aarp.AddrPair) error { - for _, dataPkt := range m.dataPackets(myAddr.Proto) { +func (port *EtherTalkPort) broadcastRTMPData() error { + for _, dataPkt := range port.rtmpDataPackets(true) { dataPktRaw, err := dataPkt.Marshal() if err != nil { return fmt.Errorf("marshal Data packet: %v", err) @@ -230,29 +180,40 @@ func (m *RTMPMachine) broadcastData(myAddr aarp.AddrPair) error { ExtHeader: ddp.ExtHeader{ Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize, Cksum: 0, - DstNet: 0, // this network - DstNode: 0xff, // broadcast packet - DstSocket: 1, // the RTMP socket - SrcNet: myAddr.Proto.Network, - SrcNode: myAddr.Proto.Node, + DstNet: 0x0000, // this network + DstNode: 0xff, // broadcast packet + DstSocket: 1, // the RTMP socket + SrcNet: port.MyAddr.Network, + SrcNode: port.MyAddr.Node, SrcSocket: 1, // the RTMP socket Proto: ddp.ProtoRTMPResp, }, Data: dataPktRaw, } - if err := m.send(myAddr.Hardware, ethertalk.AppleTalkBroadcast, ddpPkt); err != nil { + if err := port.Broadcast(ddpPkt); err != nil { return err } } return nil } -func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket { +func (port *EtherTalkPort) rtmpDataPackets(splitHorizon bool) []*rtmp.DataPacket { // Build up a slice of routing tuples. - routes := m.RoutingTable.ValidRoutes() + routes := port.Router.RouteTable.ValidRoutes() tuples := make([]rtmp.NetworkTuple, 0, len(routes)) for _, rt := range routes { + if rt.EtherTalkDirect == port { + // If the route is actually a direct connection to this port, + // don't include it. + // (It's manually set as the first tuple anyway.) + continue + } + if splitHorizon && rt.EtherTalkPeer.Port == port { + // If the route is through a peer accessible on this port, don't + // include it. + continue + } tuples = append(tuples, rtmp.NetworkTuple{ Extended: rt.Extended, RangeStart: rt.NetStart, @@ -266,8 +227,8 @@ func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket { // TODO: support non-extended local networks (LocalTalk) first := rtmp.NetworkTuple{ Extended: true, - RangeStart: m.Config.EtherTalk.NetStart, - RangeEnd: m.Config.EtherTalk.NetEnd, + RangeStart: port.NetStart, + RangeEnd: port.NetEnd, Distance: 0, } @@ -287,7 +248,7 @@ func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket { rem = rem[len(chunk)-1:] packets = append(packets, &rtmp.DataPacket{ - RouterAddr: myAddr, + RouterAddr: port.MyAddr, Extended: true, NetworkTuples: chunk, })