From 208b273bd15ffb15f67570adcee3f8243feaf7d4 Mon Sep 17 00:00:00 2001 From: Josh Deprez Date: Fri, 19 Apr 2024 15:36:47 +1000 Subject: [PATCH] Send multiple RTMP data packets when routing table is big --- atalk/rtmp/data.go | 8 +++ router/rtmp.go | 152 ++++++++++++++++++++++++++------------------- 2 files changed, 96 insertions(+), 64 deletions(-) diff --git a/atalk/rtmp/data.go b/atalk/rtmp/data.go index af5b390..aa14c38 100644 --- a/atalk/rtmp/data.go +++ b/atalk/rtmp/data.go @@ -39,6 +39,14 @@ type NetworkTuple struct { Distance uint8 } +func (nt *NetworkTuple) Size() int { + if nt.Extended { + return 6 + } else { + return 3 + } +} + // Marshal marshals an RTMP Data packet. func (dp *DataPacket) Marshal() ([]byte, error) { b := bytes.NewBuffer(nil) diff --git a/router/rtmp.go b/router/rtmp.go index 38463e8..b63976f 100644 --- a/router/rtmp.go +++ b/router/rtmp.go @@ -118,37 +118,38 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) case 2, 3: // Like the Data broadcast, but solicited by a request (RDR). // TODO: handle split-horizon processing - dataPkt := m.dataPacket(myAddr.Proto) + for _, dataPkt := range m.dataPackets(myAddr.Proto) { + dataPktRaw, err := dataPkt.Marshal() + if err != nil { + log.Printf("RTMP: Couldn't marshal Data packet: %v", err) + break + } - dataPktRaw, err := dataPkt.Marshal() - if err != nil { - log.Printf("RTMP: Couldn't marshal Data packet: %v", err) - continue - } + ddpPkt := &ddp.ExtPacket{ + ExtHeader: ddp.ExtHeader{ + Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize, + Cksum: 0, + DstNet: pkt.SrcNet, + DstNode: pkt.SrcNode, + DstSocket: 1, // the RTMP socket + SrcNet: myAddr.Proto.Network, + SrcNode: myAddr.Proto.Node, + SrcSocket: 1, // the RTMP socket + Proto: ddp.ProtoRTMPResp, + }, + Data: dataPktRaw, + } - ddpPkt := &ddp.ExtPacket{ - ExtHeader: ddp.ExtHeader{ - Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize, - Cksum: 0, - DstNet: pkt.SrcNet, - DstNode: pkt.SrcNode, - DstSocket: 1, // the RTMP socket - SrcNet: myAddr.Proto.Network, - SrcNode: myAddr.Proto.Node, - SrcSocket: 1, // the RTMP socket - Proto: ddp.ProtoRTMPResp, - }, - Data: dataPktRaw, - } - - if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil { - log.Printf("RTMP: Couldn't send Data response: %v", err) + if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil { + log.Printf("RTMP: Couldn't send Data response: %v", err) + break + } } } case ddp.ProtoRTMPResp: // It's a peer router on the AppleTalk network! - // TODO: integrate this information with the routing table + // TODO: integrate this information with the routing table? log.Print("RTMP: Got Response or Data") default: @@ -174,54 +175,77 @@ func (m *RTMPMachine) send(src, dst ethernet.Addr, ddpPkt *ddp.ExtPacket) error } func (m *RTMPMachine) broadcastData(myAddr aarp.AddrPair) error { - dataPkt := m.dataPacket(myAddr.Proto) + for _, dataPkt := range m.dataPackets(myAddr.Proto) { + dataPktRaw, err := dataPkt.Marshal() + if err != nil { + return fmt.Errorf("marshal Data packet: %v", err) + } - dataPktRaw, err := dataPkt.Marshal() - if err != nil { - return fmt.Errorf("marshal Data packet: %v", err) + ddpPkt := &ddp.ExtPacket{ + ExtHeader: ddp.ExtHeader{ + Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize, + Cksum: 0, + DstNet: 0, // this network + DstNode: 0xff, // broadcast packet + DstSocket: 1, // the RTMP socket + SrcNet: myAddr.Proto.Network, + SrcNode: myAddr.Proto.Node, + SrcSocket: 1, // the RTMP socket + Proto: ddp.ProtoRTMPResp, + }, + Data: dataPktRaw, + } + + if err := m.send(myAddr.Hardware, ethertalk.AppleTalkBroadcast, ddpPkt); err != nil { + return err + } } - - ddpPkt := &ddp.ExtPacket{ - ExtHeader: ddp.ExtHeader{ - Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize, - Cksum: 0, - DstNet: 0, // this network - DstNode: 0xff, // broadcast packet - DstSocket: 1, // the RTMP socket - SrcNet: myAddr.Proto.Network, - SrcNode: myAddr.Proto.Node, - SrcSocket: 1, // the RTMP socket - Proto: ddp.ProtoRTMPResp, - }, - Data: dataPktRaw, - } - - return m.send(myAddr.Hardware, ethertalk.AppleTalkBroadcast, ddpPkt) + return nil } -func (m *RTMPMachine) dataPacket(myAddr ddp.Addr) *rtmp.DataPacket { - p := &rtmp.DataPacket{ - RouterAddr: myAddr, - Extended: true, - NetworkTuples: []rtmp.NetworkTuple{ - // "The first tuple in RTMP Data packets sent on extended - // networks ... indicates the network number range assigned - // to that network." - { - Extended: true, - RangeStart: m.Config.EtherTalk.NetStart, - RangeEnd: m.Config.EtherTalk.NetEnd, - Distance: 0, - }, - }, - } - for _, rt := range m.RoutingTable.ValidRoutes() { - p.NetworkTuples = append(p.NetworkTuples, rtmp.NetworkTuple{ +func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket { + // Build up a slice of routing tuples. + routes := m.RoutingTable.ValidRoutes() + tuples := make([]rtmp.NetworkTuple, 0, len(routes)) + for _, rt := range routes { + tuples = append(tuples, rtmp.NetworkTuple{ Extended: rt.Extended, RangeStart: rt.NetStart, RangeEnd: rt.NetEnd, Distance: rt.Distance + 1, }) } - return p + // "The first tuple in RTMP Data packets sent on extended + // networks ... indicates the network number range assigned + // to that network." + // TODO: support non-extended local networks (LocalTalk) + first := rtmp.NetworkTuple{ + Extended: true, + RangeStart: m.Config.EtherTalk.NetStart, + RangeEnd: m.Config.EtherTalk.NetEnd, + Distance: 0, + } + + var packets []*rtmp.DataPacket + rem := tuples + for len(rem) > 0 { + chunk := []rtmp.NetworkTuple{first} + + size := 10 // router network + 1 + router node ID + first tuple + for _, nt := range rem { + size += nt.Size() + if size > atalk.DDPMaxDataSize { + break + } + chunk = append(chunk, nt) + } + rem = rem[len(chunk)-1:] + + packets = append(packets, &rtmp.DataPacket{ + RouterAddr: myAddr, + Extended: true, + NetworkTuples: chunk, + }) + } + return packets }