diff --git a/main.go b/main.go
index 2fb4cc3..9579c8b 100644
--- a/main.go
+++ b/main.go
@@ -61,7 +61,17 @@ const routingTableTemplate = `
{{if $route.Extended}}✅{{else}}❌{{end}} |
{{$route.Distance}} |
{{$route.LastSeenAgo}} |
- {{if $route.AURPPeer}}{{$route.AURPPeer.RemoteAddr}}{{else if $route.EtherTalkPeer}}{{$route.EtherTalkPeer.PeerAddr.Network}}.{{$route.EtherTalkPeer.PeerAddr.Node}}{{else}}-{{end}} |
+
+ {{- with $route.AURPPeer -}}
+ {{.RemoteAddr}}
+ {{- end -}}
+ {{- with $route.EtherTalkPeer -}}
+ {{.Port.Device}} {{.PeerAddr.Network}}.{{.PeerAddr.Node}}
+ {{- end -}}
+ {{- with $route.EtherTalkDirect -}}
+ {{.Device}} {{.NetStart}}-{{.NetEnd}}
+ {{- end -}}
+ |
{{end}}
@@ -330,16 +340,6 @@ func main() {
aarpMachine := router.NewAARPMachine(cfg, pcapHandle, myHWAddr)
go aarpMachine.Run(ctx)
- // --------------------------------- RTMP ---------------------------------
- rtmpMachine := &router.RTMPMachine{
- AARPMachine: aarpMachine,
- Config: cfg,
- PcapHandle: pcapHandle,
- RoutingTable: routes,
- IncomingCh: make(chan *ddp.ExtPacket, 1024),
- }
- go rtmpMachine.Run(ctx)
-
// -------------------------------- Router --------------------------------
rooter := &router.Router{
Config: cfg,
@@ -348,6 +348,7 @@ func main() {
}
etherTalkPort := &router.EtherTalkPort{
+ Device: cfg.EtherTalk.Device,
EthernetAddr: myHWAddr,
NetStart: cfg.EtherTalk.NetStart,
NetEnd: cfg.EtherTalk.NetEnd,
@@ -355,7 +356,6 @@ func main() {
AvailableZones: []string{cfg.EtherTalk.ZoneName},
PcapHandle: pcapHandle,
AARPMachine: aarpMachine,
- RTMPMachine: rtmpMachine,
Router: rooter,
}
rooter.Ports = append(rooter.Ports, etherTalkPort)
@@ -364,6 +364,9 @@ func main() {
zones.Upsert(etherTalkPort.NetStart, az, etherTalkPort)
}
+ // --------------------------------- RTMP ---------------------------------
+ go etherTalkPort.RunRTMP(ctx)
+
// ---------------------- Raw AppleTalk/AARP inbound ----------------------
wg.Add(1)
go func() {
diff --git a/router/peer_eth.go b/router/peer_eth.go
index 046687e..4b181a2 100644
--- a/router/peer_eth.go
+++ b/router/peer_eth.go
@@ -19,29 +19,25 @@ package router
import (
"context"
- "github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/ddp"
- "github.com/sfiera/multitalk/pkg/ethernet"
"github.com/sfiera/multitalk/pkg/ethertalk"
)
-// EtherTalkPeer holds data needed to exchange routes and zones with another
-// router on the EtherTalk network.
+// EtherTalkPeer holds data needed to forward packets to another router on the
+// EtherTalk network.
type EtherTalkPeer struct {
- PcapHandle *pcap.Handle
- MyHWAddr ethernet.Addr
- AARP *AARPMachine
- PeerAddr ddp.Addr
+ Port *EtherTalkPort
+ PeerAddr ddp.Addr
}
// Forward forwards a DDP packet to the next router.
func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error {
// TODO: AARP resolution can block
- de, err := p.AARP.Resolve(ctx, p.PeerAddr)
+ de, err := p.Port.AARPMachine.Resolve(ctx, p.PeerAddr)
if err != nil {
return err
}
- outFrame, err := ethertalk.AppleTalk(p.MyHWAddr, *pkt)
+ outFrame, err := ethertalk.AppleTalk(p.Port.EthernetAddr, *pkt)
if err != nil {
return err
}
@@ -50,5 +46,5 @@ func (p *EtherTalkPeer) Forward(ctx context.Context, pkt *ddp.ExtPacket) error {
if err != nil {
return err
}
- return p.PcapHandle.WritePacketData(outFrameRaw)
+ return p.Port.PcapHandle.WritePacketData(outFrameRaw)
}
diff --git a/router/port.go b/router/port.go
index 39ac386..eb8aad6 100644
--- a/router/port.go
+++ b/router/port.go
@@ -31,6 +31,7 @@ import (
// EtherTalkPort is all the data and helpers needed for EtherTalk on one port.
type EtherTalkPort struct {
+ Device string
EthernetAddr ethernet.Addr
NetStart ddp.Network
NetEnd ddp.Network
@@ -39,7 +40,6 @@ type EtherTalkPort struct {
AvailableZones []string
PcapHandle *pcap.Handle
AARPMachine *AARPMachine
- RTMPMachine *RTMPMachine
Router *Router
}
@@ -129,7 +129,9 @@ func (port *EtherTalkPort) Serve(ctx context.Context) {
switch ddpkt.DstSocket {
case 1: // The RTMP socket
- port.RTMPMachine.Handle(ctx, ddpkt)
+ if err := port.HandleRTMP(ctx, ddpkt); err != nil {
+ log.Printf("RTMP: Couldn't handle: %v", err)
+ }
case 2: // The NIS (name information socket / NBP socket)
if err := port.HandleNBP(ctx, ddpkt); err != nil {
diff --git a/router/rtmp.go b/router/rtmp.go
index 04b96ce..8a55b03 100644
--- a/router/rtmp.go
+++ b/router/rtmp.go
@@ -26,33 +26,113 @@ import (
"gitea.drjosh.dev/josh/jrouter/atalk/rtmp"
"gitea.drjosh.dev/josh/jrouter/status"
- "github.com/google/gopacket/pcap"
-
- "github.com/sfiera/multitalk/pkg/aarp"
"github.com/sfiera/multitalk/pkg/ddp"
- "github.com/sfiera/multitalk/pkg/ethernet"
- "github.com/sfiera/multitalk/pkg/ethertalk"
)
// RTMPMachine implements RTMP on an AppleTalk network attached to the router.
-type RTMPMachine struct {
- AARPMachine *AARPMachine
- Config *Config
- PcapHandle *pcap.Handle
- RoutingTable *RouteTable
+func (port *EtherTalkPort) HandleRTMP(ctx context.Context, pkt *ddp.ExtPacket) error {
+ switch pkt.Proto {
+ case ddp.ProtoRTMPReq:
+ // I can answer RTMP requests!
+ req, err := rtmp.UnmarshalRequestPacket(pkt.Data)
+ if err != nil {
+ return fmt.Errorf("unmarshal Request packet: %w", err)
+ }
- IncomingCh chan *ddp.ExtPacket
-}
+ switch req.Function {
+ case rtmp.FunctionRequest:
+ // Respond with RTMP Response
+ respPkt := &rtmp.ResponsePacket{
+ SenderAddr: port.MyAddr,
+ Extended: true,
+ RangeStart: port.NetStart,
+ RangeEnd: port.NetEnd,
+ }
+ respPktRaw, err := respPkt.Marshal()
+ if err != nil {
+ return fmt.Errorf("marshal RTMP Response packet: %w", err)
+ }
+ ddpPkt := &ddp.ExtPacket{
+ ExtHeader: ddp.ExtHeader{
+ Size: uint16(len(respPktRaw)) + atalk.DDPExtHeaderSize,
+ Cksum: 0,
+ DstNet: pkt.SrcNet,
+ DstNode: pkt.SrcNode,
+ DstSocket: 1, // the RTMP socket
+ SrcNet: port.MyAddr.Network,
+ SrcNode: port.MyAddr.Node,
+ SrcSocket: 1, // the RTMP socket
+ Proto: ddp.ProtoRTMPResp,
+ },
+ Data: respPktRaw,
+ }
-func (m *RTMPMachine) Handle(ctx context.Context, pkt *ddp.ExtPacket) {
- select {
- case <-ctx.Done():
- case m.IncomingCh <- pkt:
+ if err := port.Router.Forward(ctx, ddpPkt); err != nil {
+ return fmt.Errorf("send Response: %w", err)
+ }
+
+ case rtmp.FunctionRDRSplitHorizon, rtmp.FunctionRDRComplete:
+ // Like the Data broadcast, but solicited by a request (RDR).
+ splitHorizon := req.Function == rtmp.FunctionRDRSplitHorizon
+ for _, dataPkt := range port.rtmpDataPackets(splitHorizon) {
+ dataPktRaw, err := dataPkt.Marshal()
+ if err != nil {
+ return fmt.Errorf("marshal RTMP Data packet: %w", err)
+ }
+
+ ddpPkt := &ddp.ExtPacket{
+ ExtHeader: ddp.ExtHeader{
+ Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
+ Cksum: 0,
+ DstNet: pkt.SrcNet,
+ DstNode: pkt.SrcNode,
+ DstSocket: 1, // the RTMP socket
+ SrcNet: port.MyAddr.Network,
+ SrcNode: port.MyAddr.Node,
+ SrcSocket: 1, // the RTMP socket
+ Proto: ddp.ProtoRTMPResp,
+ },
+ Data: dataPktRaw,
+ }
+
+ if err := port.Router.Forward(ctx, ddpPkt); err != nil {
+ return fmt.Errorf("send Data: %w", err)
+ }
+ }
+
+ case rtmp.FunctionLoopProbe:
+ log.Print("RTMP: TODO: handle Loop Probes")
+ return nil
+ }
+
+ case ddp.ProtoRTMPResp:
+ // It's a peer router on the AppleTalk network!
+ log.Print("RTMP: Got Response or Data")
+ dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data)
+ if err != nil {
+ log.Printf("RTMP: Couldn't unmarshal RTMP Data packet: %v", err)
+ break
+ }
+ peer := &EtherTalkPeer{
+ Port: port,
+ PeerAddr: dataPkt.RouterAddr,
+ }
+
+ for _, rt := range dataPkt.NetworkTuples {
+ if err := port.Router.RouteTable.UpsertEthRoute(peer, rt.Extended, rt.RangeStart, rt.RangeEnd, rt.Distance+1); err != nil {
+ log.Printf("RTMP: Couldn't upsert EtherTalk route: %v", err)
+ }
+ }
+
+ default:
+ log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto)
}
+
+ return nil
}
-// Run executes the machine.
-func (m *RTMPMachine) Run(ctx context.Context) (err error) {
+// RunRTMP makes periodic RTMP Data broadcasts on this port.
+func (port *EtherTalkPort) RunRTMP(ctx context.Context) (err error) {
ctx, setStatus, _ := status.AddSimpleItem(ctx, "RTMP")
defer func() {
setStatus(fmt.Sprintf("Run loop stopped! Return: %v", err))
@@ -61,16 +141,12 @@ func (m *RTMPMachine) Run(ctx context.Context) (err error) {
setStatus("Awaiting DDP address assignment")
// Await local address assignment before doing anything
- <-m.AARPMachine.Assigned()
- myAddr, ok := m.AARPMachine.Address()
- if !ok {
- return fmt.Errorf("AARP machine closed Assigned channel but Address is not valid")
- }
+ <-port.AARPMachine.Assigned()
setStatus("Initial RTMP Data broadcast")
// Initial broadcast
- if err := m.broadcastData(myAddr); err != nil {
+ if err := port.broadcastRTMPData(); err != nil {
log.Printf("RTMP: Couldn't broadcast Data: %v", err)
}
@@ -86,141 +162,15 @@ func (m *RTMPMachine) Run(ctx context.Context) (err error) {
case <-bcastTicker.C:
setStatus("Broadcasting RTMP Data")
- if err := m.broadcastData(myAddr); err != nil {
+ if err := port.broadcastRTMPData(); err != nil {
log.Printf("RTMP: Couldn't broadcast Data: %v", err)
}
-
- case pkt := <-m.IncomingCh:
- setStatus("Handling incoming packet")
- switch pkt.Proto {
- case ddp.ProtoRTMPReq:
- // I can answer RTMP requests!
- req, err := rtmp.UnmarshalRequestPacket(pkt.Data)
- if err != nil {
- log.Printf("RTMP: Couldn't unmarshal Request packet: %v", err)
- }
-
- // should be in the cache...
- theirHWAddr, err := m.AARPMachine.Resolve(ctx, ddp.Addr{Network: pkt.SrcNet, Node: pkt.SrcNode})
- if err != nil {
- log.Printf("RTMP: Couldn't resolve %d.%d to a hardware address: %v", pkt.SrcNet, pkt.SrcNode, err)
- continue
- }
-
- switch req.Function {
- case rtmp.FunctionRequest:
- // Respond with RTMP Response
- respPkt := &rtmp.ResponsePacket{
- SenderAddr: myAddr.Proto,
- Extended: true,
- RangeStart: m.Config.EtherTalk.NetStart,
- RangeEnd: m.Config.EtherTalk.NetEnd,
- }
- respPktRaw, err := respPkt.Marshal()
- if err != nil {
- log.Printf("RTMP: Couldn't marshal RTMP Response packet: %v", err)
- continue
- }
- ddpPkt := &ddp.ExtPacket{
- ExtHeader: ddp.ExtHeader{
- Size: uint16(len(respPktRaw)) + atalk.DDPExtHeaderSize,
- Cksum: 0,
- DstNet: pkt.SrcNet,
- DstNode: pkt.SrcNode,
- DstSocket: 1, // the RTMP socket
- SrcNet: myAddr.Proto.Network,
- SrcNode: myAddr.Proto.Node,
- SrcSocket: 1, // the RTMP socket
- Proto: ddp.ProtoRTMPResp,
- },
- Data: respPktRaw,
- }
-
- if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil {
- log.Printf("RTMP: Couldn't send Data broadcast: %v", err)
- }
-
- case rtmp.FunctionRDRSplitHorizon, rtmp.FunctionRDRComplete:
- // Like the Data broadcast, but solicited by a request (RDR).
- // TODO: handle split-horizon processing
- for _, dataPkt := range m.dataPackets(myAddr.Proto) {
- dataPktRaw, err := dataPkt.Marshal()
- if err != nil {
- log.Printf("RTMP: Couldn't marshal Data packet: %v", err)
- break
- }
-
- ddpPkt := &ddp.ExtPacket{
- ExtHeader: ddp.ExtHeader{
- Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
- Cksum: 0,
- DstNet: pkt.SrcNet,
- DstNode: pkt.SrcNode,
- DstSocket: 1, // the RTMP socket
- SrcNet: myAddr.Proto.Network,
- SrcNode: myAddr.Proto.Node,
- SrcSocket: 1, // the RTMP socket
- Proto: ddp.ProtoRTMPResp,
- },
- Data: dataPktRaw,
- }
-
- if err := m.send(myAddr.Hardware, theirHWAddr, ddpPkt); err != nil {
- log.Printf("RTMP: Couldn't send Data response: %v", err)
- break
- }
- }
-
- case rtmp.FunctionLoopProbe:
- log.Print("RTMP: TODO: handle Loop Probes")
-
- }
-
- case ddp.ProtoRTMPResp:
- // It's a peer router on the AppleTalk network!
- log.Print("RTMP: Got Response or Data")
- dataPkt, err := rtmp.UnmarshalDataPacket(pkt.Data)
- if err != nil {
- log.Printf("RTMP: Couldn't unmarshal RTMP Data packet: %v", err)
- break
- }
- peer := &EtherTalkPeer{
- PcapHandle: m.PcapHandle,
- MyHWAddr: m.AARPMachine.myAddr.Hardware,
- AARP: m.AARPMachine,
- PeerAddr: dataPkt.RouterAddr,
- }
-
- for _, rt := range dataPkt.NetworkTuples {
- if err := m.RoutingTable.UpsertEthRoute(peer, rt.Extended, rt.RangeStart, rt.RangeEnd, rt.Distance+1); err != nil {
- log.Printf("RTMP: Couldn't upsert EtherTalk route: %v", err)
- }
- }
-
- default:
- log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto)
- }
-
}
}
}
-func (m *RTMPMachine) send(src, dst ethernet.Addr, ddpPkt *ddp.ExtPacket) error {
- ethFrame, err := ethertalk.AppleTalk(src, *ddpPkt)
- if err != nil {
- return err
- }
- ethFrame.Dst = dst
-
- ethFrameRaw, err := ethertalk.Marshal(*ethFrame)
- if err != nil {
- return err
- }
- return m.PcapHandle.WritePacketData(ethFrameRaw)
-}
-
-func (m *RTMPMachine) broadcastData(myAddr aarp.AddrPair) error {
- for _, dataPkt := range m.dataPackets(myAddr.Proto) {
+func (port *EtherTalkPort) broadcastRTMPData() error {
+ for _, dataPkt := range port.rtmpDataPackets(true) {
dataPktRaw, err := dataPkt.Marshal()
if err != nil {
return fmt.Errorf("marshal Data packet: %v", err)
@@ -230,29 +180,40 @@ func (m *RTMPMachine) broadcastData(myAddr aarp.AddrPair) error {
ExtHeader: ddp.ExtHeader{
Size: uint16(len(dataPktRaw)) + atalk.DDPExtHeaderSize,
Cksum: 0,
- DstNet: 0, // this network
- DstNode: 0xff, // broadcast packet
- DstSocket: 1, // the RTMP socket
- SrcNet: myAddr.Proto.Network,
- SrcNode: myAddr.Proto.Node,
+ DstNet: 0x0000, // this network
+ DstNode: 0xff, // broadcast packet
+ DstSocket: 1, // the RTMP socket
+ SrcNet: port.MyAddr.Network,
+ SrcNode: port.MyAddr.Node,
SrcSocket: 1, // the RTMP socket
Proto: ddp.ProtoRTMPResp,
},
Data: dataPktRaw,
}
- if err := m.send(myAddr.Hardware, ethertalk.AppleTalkBroadcast, ddpPkt); err != nil {
+ if err := port.Broadcast(ddpPkt); err != nil {
return err
}
}
return nil
}
-func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket {
+func (port *EtherTalkPort) rtmpDataPackets(splitHorizon bool) []*rtmp.DataPacket {
// Build up a slice of routing tuples.
- routes := m.RoutingTable.ValidRoutes()
+ routes := port.Router.RouteTable.ValidRoutes()
tuples := make([]rtmp.NetworkTuple, 0, len(routes))
for _, rt := range routes {
+ if rt.EtherTalkDirect == port {
+ // If the route is actually a direct connection to this port,
+ // don't include it.
+ // (It's manually set as the first tuple anyway.)
+ continue
+ }
+ if splitHorizon && rt.EtherTalkPeer.Port == port {
+ // If the route is through a peer accessible on this port, don't
+ // include it.
+ continue
+ }
tuples = append(tuples, rtmp.NetworkTuple{
Extended: rt.Extended,
RangeStart: rt.NetStart,
@@ -266,8 +227,8 @@ func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket {
// TODO: support non-extended local networks (LocalTalk)
first := rtmp.NetworkTuple{
Extended: true,
- RangeStart: m.Config.EtherTalk.NetStart,
- RangeEnd: m.Config.EtherTalk.NetEnd,
+ RangeStart: port.NetStart,
+ RangeEnd: port.NetEnd,
Distance: 0,
}
@@ -287,7 +248,7 @@ func (m *RTMPMachine) dataPackets(myAddr ddp.Addr) []*rtmp.DataPacket {
rem = rem[len(chunk)-1:]
packets = append(packets, &rtmp.DataPacket{
- RouterAddr: myAddr,
+ RouterAddr: port.MyAddr,
Extended: true,
NetworkTuples: chunk,
})