refactor RTMP into port

This commit is contained in:
Josh Deprez 2024-05-04 14:48:47 +10:00
parent fbd0a0371f
commit a239b7734f
Signed by: josh
SSH key fingerprint: SHA256:zZji7w1Ilh2RuUpbQcqkLPrqmRwpiCSycbF2EfKm6Kw
4 changed files with 151 additions and 189 deletions

27
main.go
View file

@ -61,7 +61,17 @@ const routingTableTemplate = `
<td>{{if $route.Extended}}{{else}}{{end}}</td>
<td>{{$route.Distance}}</td>
<td>{{$route.LastSeenAgo}}</td>
<td>{{if $route.AURPPeer}}{{$route.AURPPeer.RemoteAddr}}{{else if $route.EtherTalkPeer}}{{$route.EtherTalkPeer.PeerAddr.Network}}.{{$route.EtherTalkPeer.PeerAddr.Node}}{{else}}-{{end}}</td>
<td>
{{- with $route.AURPPeer -}}
{{.RemoteAddr}}
{{- end -}}
{{- with $route.EtherTalkPeer -}}
{{.Port.Device}} {{.PeerAddr.Network}}.{{.PeerAddr.Node}}
{{- end -}}
{{- with $route.EtherTalkDirect -}}
{{.Device}} {{.NetStart}}-{{.NetEnd}}
{{- end -}}
</td>
</tr>
{{end}}
</tbody>
@ -330,16 +340,6 @@ func main() {
aarpMachine := router.NewAARPMachine(cfg, pcapHandle, myHWAddr)
go aarpMachine.Run(ctx)
// --------------------------------- RTMP ---------------------------------
rtmpMachine := &router.RTMPMachine{
AARPMachine: aarpMachine,
Config: cfg,
PcapHandle: pcapHandle,
RoutingTable: routes,
IncomingCh: make(chan *ddp.ExtPacket, 1024),
}
go rtmpMachine.Run(ctx)
// -------------------------------- Router --------------------------------
rooter := &router.Router{
Config: cfg,
@ -348,6 +348,7 @@ func main() {
}
etherTalkPort := &router.EtherTalkPort{
Device: cfg.EtherTalk.Device,
EthernetAddr: myHWAddr,
NetStart: cfg.EtherTalk.NetStart,
NetEnd: cfg.EtherTalk.NetEnd,
@ -355,7 +356,6 @@ func main() {
AvailableZones: []string{cfg.EtherTalk.ZoneName},
PcapHandle: pcapHandle,
AARPMachine: aarpMachine,
RTMPMachine: rtmpMachine,
Router: rooter,
}
rooter.Ports = append(rooter.Ports, etherTalkPort)
@ -364,6 +364,9 @@ func main() {
zones.Upsert(etherTalkPort.NetStart, az, etherTalkPort)
}
// --------------------------------- RTMP ---------------------------------
go etherTalkPort.RunRTMP(ctx)
// ---------------------- Raw AppleTalk/AARP inbound ----------------------
wg.Add(1)
go func() {

View file

@ -19,29 +19,25 @@ package router
import (
"context"
"github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/ddp"
"github.com/sfiera/multitalk/pkg/ethernet"
"github.com/sfiera/multitalk/pkg/ethertalk"
)
// EtherTalkPeer holds data needed to exchange routes and zones with another
// router on the EtherTalk network.
// EtherTalkPeer holds data needed to forward packets to another router on the
// EtherTalk network.
type EtherTalkPeer struct {
PcapHandle *pcap.Handle
MyHWAddr ethernet.Addr
AARP *AARPMachine
Port *EtherTalkPort
PeerAddr ddp.Addr
}
// Forward forwards a DDP packet to the next router.
func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error {
// TODO: AARP resolution can block
de, err := p.AARP.Resolve(ctx, p.PeerAddr)
de, err := p.Port.AARPMachine.Resolve(ctx, p.PeerAddr)
if err != nil {
return err
}
outFrame, err := ethertalk.AppleTalk(p.MyHWAddr, *pkt)
outFrame, err := ethertalk.AppleTalk(p.Port.EthernetAddr, *pkt)
if err != nil {
return err
}
@ -50,5 +46,5 @@ func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error {
if err != nil {
return err
}
return p.PcapHandle.WritePacketData(outFrameRaw)
return p.Port.PcapHandle.WritePacketData(outFrameRaw)
}

View file

@ -31,6 +31,7 @@ import (
// EtherTalkPort is all the data and helpers needed for EtherTalk on one port.
type EtherTalkPort struct {
Device string
EthernetAddr ethernet.Addr
NetStart ddp.Network
NetEnd ddp.Network
@ -39,7 +40,6 @@ type EtherTalkPort struct {
AvailableZones []string
PcapHandle *pcap.Handle
AARPMachine *AARPMachine
RTMPMachine *RTMPMachine
Router *Router
}
@ -129,7 +129,9 @@ func (port *EtherTalkPort) Serve(ctx context.Context) {
switch ddpkt.DstSocket {
case 1: // The RTMP socket
port.RTMPMachine.Handle(ctx, ddpkt)
if err := port.HandleRTMP(ctx, ddpkt); err != nil {
log.Printf("RTMP: Couldn't handle: %v", err)
}
case 2: // The NIS (name information socket / NBP socket)
if err := port.HandleNBP(ctx, ddpkt); err != nil {

View file

@ -26,33 +26,113 @@ import (
"gitea.drjosh.dev/josh/jrouter/atalk/rtmp"
"gitea.drjosh.dev/josh/jrouter/status"
"github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/aarp"
"github.com/sfiera/multitalk/pkg/ddp"
"github.com/sfiera/multitalk/pkg/ethernet"
"github.com/sfiera/multitalk/pkg/ethertalk"
)
// RTMPMachine implements RTMP on an AppleTalk network attached to the router.
type RTMPMachine struct {
AARPMachine *AARPMachine
Config *Config
PcapHandle *pcap.Handle
RoutingTable *RouteTable
IncomingCh chan *ddp.ExtPacket
}
func (m *RTMPMachine) Handle(ctx context.Context, pkt *ddp.ExtPacket) {
select {
case <-ctx.Done():
case m.IncomingCh <- pkt:
func (port *EtherTalkPort) HandleRTMP(ctx context.Context, pkt *ddp.ExtPacket) error {
switch pkt.Proto {
case ddp.ProtoRTMPReq:
// I can answer RTMP requests!
req, err := rtmp.UnmarshalRequestPacket(pkt.Data)
if err != nil {
return fmt.Errorf("unmarshal Request packet: %w", err)
}
switch req.Function {
case rtmp.FunctionRequest:
// Respond with RTMP Response
respPkt := &rtmp.ResponsePacket{
SenderAddr: port.MyAddr,
Extended: true,
RangeStart: port.NetStart,
RangeEnd: port.NetEnd,
}
respPktRaw, err := respPkt.Marshal()
if err != nil {
return fmt.Errorf("marshal RTMP Response packet: %w", err)
}
ddpPkt := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(respPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
DstNet: pkt.SrcNet,
DstNode: pkt.SrcNode,
DstSocket: 1, // the RTMP socket
SrcNet: port.MyAddr.Network,
SrcNode: port.MyAddr.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: respPktRaw,
}
if err := port.Router.Forward(ctx, ddpPkt); err != nil {
return fmt.Errorf("send Response: %w", err)
}
case rtmp.FunctionRDRSplitHorizon, rtmp.FunctionRDRComplete:
// Like the Data broadcast, but solicited by a request (RDR).
splitHorizon := req.Function == rtmp.FunctionRDRSplitHorizon
for _, dataPkt := range port.rtmpDataPackets(splitHorizon) {
dataPktRaw, err := dataPkt.Marshal()
if err != nil {
return fmt.Errorf("marshal RTMP Data packet: %w", err)
}
ddpPkt := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
DstNet: pkt.SrcNet,
DstNode: pkt.SrcNode,
DstSocket: 1, // the RTMP socket
SrcNet: port.MyAddr.Network,
SrcNode: port.MyAddr.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: dataPktRaw,
}
if err := port.Router.Forward(ctx, ddpPkt); err != nil {
return fmt.Errorf("send Data: %w", err)
}
}
case rtmp.FunctionLoopProbe:
log.Print("RTMP: TODO: handle Loop Probes")
return nil
}
case ddp.ProtoRTMPResp:
// It's a peer router on the AppleTalk network!
log.Print("RTMP: Got Response or Data")
dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data)
if err != nil {
log.Printf("RTMP: Couldn't unmarshal RTMP Data packet: %v", err)
break
}
peer := &EtherTalkPeer{
Port: port,
PeerAddr: dataPkt.RouterAddr,
}
for _, rt := range dataPkt.NetworkTuples {
if err := port.Router.RouteTable.UpsertEthRoute(peer, rt.Extended, rt.RangeStart, rt.RangeEnd, rt.Distance+1); err != nil {
log.Printf("RTMP: Couldn't upsert EtherTalk route: %v", err)
}
}
default:
log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto)
}
return nil
}
// Run executes the machine.
func (m *RTMPMachine) Run(ctx context.Context) (err error) {
// RunRTMP makes periodic RTMP Data broadcasts on this port.
func (port *EtherTalkPort) RunRTMP(ctx context.Context) (err error) {
ctx, setStatus, _ := status.AddSimpleItem(ctx, "RTMP")
defer func() {
setStatus(fmt.Sprintf("Run loop stopped! Return: %v", err))
@ -61,16 +141,12 @@ func (m *RTMPMachine) Run(ctx context.Context) (err error) {
setStatus("Awaiting DDP address assignment")
// Await local address assignment before doing anything
<-m.AARPMachine.Assigned()
myAddr, ok := m.AARPMachine.Address()
if !ok {
return fmt.Errorf("AARP machine closed Assigned channel but Address is not valid")
}
<-port.AARPMachine.Assigned()
setStatus("Initial RTMP Data broadcast")
// Initial broadcast
if err := m.broadcastData(myAddr); err != nil {
if err := port.broadcastRTMPData(); err != nil {
log.Printf("RTMP: Couldn't broadcast Data: %v", err)
}
@ -86,141 +162,15 @@ func (m *RTMPMachine) Run(ctx context.Context) (err error) {
case <-bcastTicker.C:
setStatus("Broadcasting RTMP Data")
if err := m.broadcastData(myAddr); err != nil {
if err := port.broadcastRTMPData(); err != nil {
log.Printf("RTMP: Couldn't broadcast Data: %v", err)
}
case pkt := <-m.IncomingCh:
setStatus("Handling incoming packet")
switch pkt.Proto {
case ddp.ProtoRTMPReq:
// I can answer RTMP requests!
req, err := rtmp.UnmarshalRequestPacket(pkt.Data)
if err != nil {
log.Printf("RTMP: Couldn't unmarshal Request packet: %v", err)
}
// should be in the cache...
theirHWAddr, err := m.AARPMachine.Resolve(ctx, ddp.Addr{Network: pkt.SrcNet, Node: pkt.SrcNode})
if err != nil {
log.Printf("RTMP: Couldn't resolve %d.%d to a hardware address: %v", pkt.SrcNet, pkt.SrcNode, err)
continue
}
switch req.Function {
case rtmp.FunctionRequest:
// Respond with RTMP Response
respPkt := &rtmp.ResponsePacket{
SenderAddr: myAddr.Proto,
Extended: true,
RangeStart: m.Config.EtherTalk.NetStart,
RangeEnd: m.Config.EtherTalk.NetEnd,
}
respPktRaw, err := respPkt.Marshal()
if err != nil {
log.Printf("RTMP: Couldn't marshal RTMP Response packet: %v", err)
continue
}
ddpPkt := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(respPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
DstNet: pkt.SrcNet,
DstNode: pkt.SrcNode,
DstSocket: 1, // the RTMP socket
SrcNet: myAddr.Proto.Network,
SrcNode: myAddr.Proto.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: respPktRaw,
}
if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil {
log.Printf("RTMP: Couldn't send Data broadcast: %v", err)
}
case rtmp.FunctionRDRSplitHorizon, rtmp.FunctionRDRComplete:
// Like the Data broadcast, but solicited by a request (RDR).
// TODO: handle split-horizon processing
for _, dataPkt := range m.dataPackets(myAddr.Proto) {
dataPktRaw, err := dataPkt.Marshal()
if err != nil {
log.Printf("RTMP: Couldn't marshal Data packet: %v", err)
break
}
ddpPkt := &ddp.ExtPacket{
ExtHeader: ddp.ExtHeader{
Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
DstNet: pkt.SrcNet,
DstNode: pkt.SrcNode,
DstSocket: 1, // the RTMP socket
SrcNet: myAddr.Proto.Network,
SrcNode: myAddr.Proto.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: dataPktRaw,
}
if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil {
log.Printf("RTMP: Couldn't send Data response: %v", err)
break
}
}
case rtmp.FunctionLoopProbe:
log.Print("RTMP: TODO: handle Loop Probes")
}
case ddp.ProtoRTMPResp:
// It's a peer router on the AppleTalk network!
log.Print("RTMP: Got Response or Data")
dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data)
if err != nil {
log.Printf("RTMP: Couldn't unmarshal RTMP Data packet: %v", err)
break
}
peer := &EtherTalkPeer{
PcapHandle: m.PcapHandle,
MyHWAddr: m.AARPMachine.myAddr.Hardware,
AARP: m.AARPMachine,
PeerAddr: dataPkt.RouterAddr,
}
for _, rt := range dataPkt.NetworkTuples {
if err := m.RoutingTable.UpsertEthRoute(peer, rt.Extended, rt.RangeStart, rt.RangeEnd, rt.Distance+1); err != nil {
log.Printf("RTMP: Couldn't upsert EtherTalk route: %v", err)
}
}
default:
log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto)
}
}
}
}
func (m *RTMPMachine) send(src, dst ethernet.Addr, ddpPkt *ddp.ExtPacket) error {
ethFrame, err := ethertalk.AppleTalk(src, *ddpPkt)
if err != nil {
return err
}
ethFrame.Dst = dst
ethFrameRaw, err := ethertalk.Marshal(*ethFrame)
if err != nil {
return err
}
return m.PcapHandle.WritePacketData(ethFrameRaw)
}
func (m *RTMPMachine) broadcastData(myAddr aarp.AddrPair) error {
for _, dataPkt := range m.dataPackets(myAddr.Proto) {
func (port *EtherTalkPort) broadcastRTMPData() error {
for _, dataPkt := range port.rtmpDataPackets(true) {
dataPktRaw, err := dataPkt.Marshal()
if err != nil {
return fmt.Errorf("marshal Data packet: %v", err)
@ -230,29 +180,40 @@ func (m *RTMPMachine) broadcastData(myAddr aarp.AddrPair) error {
ExtHeader: ddp.ExtHeader{
Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
DstNet: 0, // this network
DstNet: 0x0000, // this network
DstNode: 0xff, // broadcast packet
DstSocket: 1, // the RTMP socket
SrcNet: myAddr.Proto.Network,
SrcNode: myAddr.Proto.Node,
SrcNet: port.MyAddr.Network,
SrcNode: port.MyAddr.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: dataPktRaw,
}
if err := m.send(myAddr.Hardware, ethertalk.AppleTalkBroadcast, ddpPkt); err != nil {
if err := port.Broadcast(ddpPkt); err != nil {
return err
}
}
return nil
}
func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket {
func (port *EtherTalkPort) rtmpDataPackets(splitHorizon bool) []*rtmp.DataPacket {
// Build up a slice of routing tuples.
routes := m.RoutingTable.ValidRoutes()
routes := port.Router.RouteTable.ValidRoutes()
tuples := make([]rtmp.NetworkTuple, 0, len(routes))
for _, rt := range routes {
if rt.EtherTalkDirect == port {
// If the route is actually a direct connection to this port,
// don't include it.
// (It's manually set as the first tuple anyway.)
continue
}
if splitHorizon && rt.EtherTalkPeer.Port == port {
// If the route is through a peer accessible on this port, don't
// include it.
continue
}
tuples = append(tuples, rtmp.NetworkTuple{
Extended: rt.Extended,
RangeStart: rt.NetStart,
@ -266,8 +227,8 @@ func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket {
// TODO: support non-extended local networks (LocalTalk)
first := rtmp.NetworkTuple{
Extended: true,
RangeStart: m.Config.EtherTalk.NetStart,
RangeEnd: m.Config.EtherTalk.NetEnd,
RangeStart: port.NetStart,
RangeEnd: port.NetEnd,
Distance: 0,
}
@ -287,7 +248,7 @@ func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket {
rem = rem[len(chunk)-1:]
packets = append(packets, &rtmp.DataPacket{
RouterAddr: myAddr,
RouterAddr: port.MyAddr,
Extended: true,
NetworkTuples: chunk,
})