Add AEP, add AARP assigned chan

This commit is contained in:
Josh Deprez 2024-04-07 17:01:26 +10:00
parent 71880976d9
commit 1fa178687a
Signed by: josh
SSH key fingerprint: SHA256:zZji7w1Ilh2RuUpbQcqkLPrqmRwpiCSycbF2EfKm6Kw
4 changed files with 110 additions and 14 deletions

12
aarp.go
View file

@ -37,6 +37,7 @@ type AARPMachine struct {
mu sync.RWMutex mu sync.RWMutex
myAddr aarp.AddrPair myAddr aarp.AddrPair
probes int probes int
assignedCh chan struct{}
} }
// NewAARPMachine creates a new AARPMachine. // NewAARPMachine creates a new AARPMachine.
@ -48,6 +49,7 @@ func NewAARPMachine(cfg *config, pcapHandle *pcap.Handle, myHWAddr ethernet.Addr
myAddr: aarp.AddrPair{ myAddr: aarp.AddrPair{
Hardware: myHWAddr, Hardware: myHWAddr,
}, },
assignedCh: make(chan struct{}),
} }
} }
@ -59,6 +61,11 @@ func (a *AARPMachine) Address() (aarp.AddrPair, bool) {
return a.myAddr, a.assigned() return a.myAddr, a.assigned()
} }
// Assigned returns a channel that is closed when the local address is valid.
func (a *AARPMachine) Assigned() <-chan struct{} {
return a.assignedCh
}
// Run executes the machine. // Run executes the machine.
func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Packet) error { func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Packet) error {
// Initialise our DDP address with a preferred address (first network.1) // Initialise our DDP address with a preferred address (first network.1)
@ -80,6 +87,7 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack
case <-ticker.C: case <-ticker.C:
if a.assigned() { if a.assigned() {
close(a.assignedCh)
// No need to keep the ticker running if assigned // No need to keep the ticker running if assigned
ticker.Stop() ticker.Stop()
continue continue
@ -158,7 +166,7 @@ func (a *AARPMachine) Run(ctx context.Context, incomingCh <-chan *ethertalk.Pack
// If the address is in the cache (AMT) and is still valid, that is used. // If the address is in the cache (AMT) and is still valid, that is used.
// Otherwise, the address is resolved using AARP. // Otherwise, the address is resolved using AARP.
func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.Addr, error) { func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.Addr, error) {
result, waitCh := a.addressMappingTable.lookupOrWait(ddpAddr) result, waitCh := a.lookupOrWait(ddpAddr)
if waitCh == nil { if waitCh == nil {
return result, nil return result, nil
} }
@ -179,7 +187,7 @@ func (a *AARPMachine) Resolve(ctx context.Context, ddpAddr ddp.Addr) (ethernet.A
return ethernet.Addr{}, ctx.Err() return ethernet.Addr{}, ctx.Err()
case <-waitCh: case <-waitCh:
result, waitCh = a.addressMappingTable.lookupOrWait(ddpAddr) result, waitCh = a.lookupOrWait(ddpAddr)
if waitCh == nil { if waitCh == nil {
return result, nil return result, nil
} }

38
atalk/aep/aep.go Normal file
View file

@ -0,0 +1,38 @@
package aep
import "fmt"
type Function uint8
const (
EchoRequest Function = 1
EchoReply Function = 2
)
// Packet represents an AEP packet.
type Packet struct {
Function Function
Data []byte
}
// Marshal marshals an AEP packet.
func (p *Packet) Marshal() ([]byte, error) {
if p.Function < 1 || p.Function > 2 {
return nil, fmt.Errorf("invalid AEP function %d", p.Function)
}
if len(p.Data) == 0 {
return nil, fmt.Errorf("empty AEP packet")
}
return append([]byte{byte(p.Function)}, p.Data...), nil
}
// Unmarshal unmarshals an AEP packet.
func Unmarshal(data []byte) (*Packet, error) {
if len(data) < 1 {
return nil, fmt.Errorf("insufficient input length %d for AEP packet", len(data))
}
return &Packet{
Function: Function(data[0]),
Data: data[1:],
}, nil
}

47
main.go
View file

@ -20,6 +20,7 @@ import (
"context" "context"
"errors" "errors"
"flag" "flag"
"fmt"
"io" "io"
"log" "log"
"math/rand/v2" "math/rand/v2"
@ -31,6 +32,7 @@ import (
"time" "time"
"gitea.drjosh.dev/josh/jrouter/atalk" "gitea.drjosh.dev/josh/jrouter/atalk"
"gitea.drjosh.dev/josh/jrouter/atalk/aep"
"gitea.drjosh.dev/josh/jrouter/aurp" "gitea.drjosh.dev/josh/jrouter/aurp"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/sfiera/multitalk/pkg/ddp" "github.com/sfiera/multitalk/pkg/ddp"
@ -244,6 +246,12 @@ func main() {
switch ddpkt.DstSocket { switch ddpkt.DstSocket {
case 1: // The RTMP socket case 1: // The RTMP socket
rtmpCh <- &ddpkt rtmpCh <- &ddpkt
case 4: // The AEP socket
if err := handleAEP(pcapHandle, myHWAddr, ethFrame.Src, &ddpkt); err != nil {
log.Printf("AEP: Couldn't handle: %v", err)
}
default: default:
log.Printf("DDP: No handler for socket %d", ddpkt.DstSocket) log.Printf("DDP: No handler for socket %d", ddpkt.DstSocket)
} }
@ -327,6 +335,45 @@ func main() {
} }
} }
func handleAEP(pcapHandle *pcap.Handle, src, dst ethernet.Addr, ddpkt *ddp.ExtPacket) error {
if ddpkt.Proto != ddp.ProtoAEP {
return fmt.Errorf("invalid DDP type %d on socket 4", ddpkt.Proto)
}
ep, err := aep.Unmarshal(ddpkt.Data)
if err != nil {
return err
}
switch ep.Function {
case aep.EchoReply:
// we didn't send a request? I don't think?
// we shouldn't be sending them from this socket
return fmt.Errorf("echo reply received at socket 4 why?")
case aep.EchoRequest:
// Uno Reverso the packet
// "The client can send the Echo Request datagram through any socket
// the client has open, and the Echo Reply will come back to this socket."
ddpkt.DstNet, ddpkt.SrcNet = ddpkt.SrcNet, ddpkt.DstNet
ddpkt.DstNode, ddpkt.SrcNode = ddpkt.SrcNode, ddpkt.DstNode
ddpkt.DstSocket, ddpkt.SrcSocket = ddpkt.SrcSocket, ddpkt.DstSocket
ddpkt.Data[0] = byte(aep.EchoReply)
ethFrame, err := ethertalk.AppleTalk(src, *ddpkt)
if err != nil {
return err
}
ethFrame.Dst = dst
ethFrameRaw, err := ethertalk.Marshal(*ethFrame)
if err != nil {
return err
}
return pcapHandle.WritePacketData(ethFrameRaw)
default:
return fmt.Errorf("invalid AEP function %d", ep.Function)
}
}
// Hashable net.UDPAddr // Hashable net.UDPAddr
type udpAddr struct { type udpAddr struct {
ipv4 [4]byte ipv4 [4]byte

21
rtmp.go
View file

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"fmt"
"log" "log"
"time" "time"
@ -19,7 +20,15 @@ type RTMPMachine struct {
pcapHandle *pcap.Handle pcapHandle *pcap.Handle
} }
// Run executes the machine.
func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) error { func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket) error {
// Await local address assignment before doing anything
<-m.aarp.Assigned()
myAddr, ok := m.aarp.Address()
if !ok {
return fmt.Errorf("AARP machine closed Assigned channel but Address is not valid")
}
bcastTicker := time.NewTicker(10 * time.Second) bcastTicker := time.NewTicker(10 * time.Second)
defer bcastTicker.Stop() defer bcastTicker.Stop()
@ -30,10 +39,6 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket)
case <-bcastTicker.C: case <-bcastTicker.C:
// Broadcast an RTMP Data // Broadcast an RTMP Data
myAddr, ok := m.aarp.Address()
if !ok {
continue
}
dataPkt := m.dataPacket(myAddr.Proto) dataPkt := m.dataPacket(myAddr.Proto)
@ -71,10 +76,6 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket)
log.Printf("RTMP: Couldn't unmarshal Request packet: %v", err) log.Printf("RTMP: Couldn't unmarshal Request packet: %v", err)
} }
myAddr, ok := m.aarp.Address()
if !ok {
continue
}
// should be in the cache... // should be in the cache...
theirHWAddr, err := m.aarp.Resolve(ctx, ddp.Addr{Network: pkt.SrcNet, Node: pkt.SrcNode}) theirHWAddr, err := m.aarp.Resolve(ctx, ddp.Addr{Network: pkt.SrcNet, Node: pkt.SrcNode})
if err != nil { if err != nil {
@ -151,6 +152,8 @@ func (m *RTMPMachine) Run(ctx context.Context, incomingCh <-chan *ddp.ExtPacket)
// TODO: integrate this information with the routing table // TODO: integrate this information with the routing table
log.Print("RTMP: Got Response or Data") log.Print("RTMP: Got Response or Data")
default:
log.Printf("RTMP: invalid DDP type %d on socket 1", pkt.Proto)
} }
} }
@ -187,5 +190,5 @@ func (m *RTMPMachine) dataPacket(myAddr ddp.Addr) *rtmp.DataPacket {
}, },
}, },
} }
// TODO: append more networks! // TODO: append more networks! implement a route table!
} }