Send multiple RTMP data packets when routing table is big

This commit is contained in:
Josh Deprez 2024-04-19 15:36:47 +10:00
parent 52b08a83f1
commit 208b273bd1
No known key found for this signature in database
2 changed files with 96 additions and 64 deletions

View file

@ -39,6 +39,14 @@ type NetworkTuple struct {
Distance uint8
}
func (nt *NetworkTuple) Size() int {
if nt.Extended {
return 6
} else {
return 3
}
}
// Marshal marshals an RTMP Data packet.
func (dp *DataPacket) Marshal() ([]byte, error) {
b := bytes.NewBuffer(nil)

View file

@ -118,37 +118,38 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket)
case 2, 3:
// Like the Data broadcast, but solicited by a request (RDR).
// TODO: handle split-horizon processing
dataPkt := m.dataPacket(myAddr.Proto)
for _, dataPkt := range m.dataPackets(myAddr.Proto) {
dataPktRaw, err := dataPkt.Marshal()
if err != nil {
log.Printf("RTMP: Couldn't marshal Data packet: %v", err)
break
}
dataPktRaw, err := dataPkt.Marshal()
if err != nil {
log.Printf("RTMP: Couldn't marshal Data packet: %v", err)
continue
}
ddpPkt := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
DstNet: pkt.SrcNet,
DstNode: pkt.SrcNode,
DstSocket: 1, // the RTMP socket
SrcNet: myAddr.Proto.Network,
SrcNode: myAddr.Proto.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: dataPktRaw,
}
ddpPkt := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
DstNet: pkt.SrcNet,
DstNode: pkt.SrcNode,
DstSocket: 1, // the RTMP socket
SrcNet: myAddr.Proto.Network,
SrcNode: myAddr.Proto.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: dataPktRaw,
}
if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil {
log.Printf("RTMP: Couldn't send Data response: %v", err)
if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil {
log.Printf("RTMP: Couldn't send Data response: %v", err)
break
}
}
}
case ddp.ProtoRTMPResp:
// It's a peer router on the AppleTalk network!
// TODO: integrate this information with the routing table
// TODO: integrate this information with the routing table?
log.Print("RTMP: Got Response or Data")
default:
@ -174,54 +175,77 @@ func (m *RTMPMachine) send(src, dst ethernet.Addr, ddpPkt *ddp.ExtPacket) error
}
func (m *RTMPMachine) broadcastData(myAddr aarp.AddrPair) error {
dataPkt := m.dataPacket(myAddr.Proto)
for _, dataPkt := range m.dataPackets(myAddr.Proto) {
dataPktRaw, err := dataPkt.Marshal()
if err != nil {
return fmt.Errorf("marshal Data packet: %v", err)
}
dataPktRaw, err := dataPkt.Marshal()
if err != nil {
return fmt.Errorf("marshal Data packet: %v", err)
ddpPkt := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
DstNet: 0, // this network
DstNode: 0xff, // broadcast packet
DstSocket: 1, // the RTMP socket
SrcNet: myAddr.Proto.Network,
SrcNode: myAddr.Proto.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: dataPktRaw,
}
if err := m.send(myAddr.Hardware, ethertalk.AppleTalkBroadcast, ddpPkt); err != nil {
return err
}
}
ddpPkt := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
DstNet: 0, // this network
DstNode: 0xff, // broadcast packet
DstSocket: 1, // the RTMP socket
SrcNet: myAddr.Proto.Network,
SrcNode: myAddr.Proto.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: dataPktRaw,
}
return m.send(myAddr.Hardware, ethertalk.AppleTalkBroadcast, ddpPkt)
return nil
}
func (m *RTMPMachine) dataPacket(myAddr ddp.Addr) *rtmp.DataPacket {
p := &rtmp.DataPacket{
RouterAddr: myAddr,
Extended: true,
NetworkTuples: []rtmp.NetworkTuple{
// "The first tuple in RTMP Data packets sent on extended
// networks ... indicates the network number range assigned
// to that network."
{
Extended: true,
RangeStart: m.Config.EtherTalk.NetStart,
RangeEnd: m.Config.EtherTalk.NetEnd,
Distance: 0,
},
},
}
for _, rt := range m.RoutingTable.ValidRoutes() {
p.NetworkTuples = append(p.NetworkTuples, rtmp.NetworkTuple{
func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket {
// Build up a slice of routing tuples.
routes := m.RoutingTable.ValidRoutes()
tuples := make([]rtmp.NetworkTuple, 0, len(routes))
for _, rt := range routes {
tuples = append(tuples, rtmp.NetworkTuple{
Extended: rt.Extended,
RangeStart: rt.NetStart,
RangeEnd: rt.NetEnd,
Distance: rt.Distance + 1,
})
}
return p
// "The first tuple in RTMP Data packets sent on extended
// networks ... indicates the network number range assigned
// to that network."
// TODO: support non-extended local networks (LocalTalk)
first := rtmp.NetworkTuple{
Extended: true,
RangeStart: m.Config.EtherTalk.NetStart,
RangeEnd: m.Config.EtherTalk.NetEnd,
Distance: 0,
}
var packets []*rtmp.DataPacket
rem := tuples
for len(rem) > 0 {
chunk := []rtmp.NetworkTuple{first}
size := 10 // router network + 1 + router node ID + first tuple
for _, nt := range rem {
size += nt.Size()
if size > atalk.DDPMaxDataSize {
break
}
chunk = append(chunk, nt)
}
rem = rem[len(chunk)-1:]
packets = append(packets, &rtmp.DataPacket{
RouterAddr: myAddr,
Extended: true,
NetworkTuples: chunk,
})
}
return packets
}