diff --git a/atalk/rtmp/data.go b/atalk/rtmp/data.go index 77be855..2505731 100644 --- a/atalk/rtmp/data.go +++ b/atalk/rtmp/data.go @@ -1,6 +1,7 @@ package rtmp import ( + "bytes" "encoding/binary" "fmt" @@ -10,6 +11,7 @@ import ( // DataPacket represents an RTMP Data packet. type DataPacket struct { RouterAddr ddp.Addr + Extended bool NetworkTuples []NetworkTuple } @@ -21,6 +23,29 @@ type NetworkTuple struct { Distance uint8 } +// Marshal marshals an RTMP Data packet. +func (dp *DataPacket) Marshal() ([]byte, error) { + b := bytes.NewBuffer(nil) + write16(b, dp.RouterAddr.Network) + b.WriteByte(8) + b.WriteByte(byte(dp.RouterAddr.Node)) + if !dp.Extended { + write16(b, uint16(0)) + b.WriteByte(0x82) + } + for _, nt := range dp.NetworkTuples { + write16(b, nt.RangeStart) + if !nt.Extended { + b.WriteByte(nt.Distance) + continue + } + b.WriteByte(nt.Distance | 0x80) + write16(b, nt.RangeEnd) + b.WriteByte(0x82) + } + return b.Bytes(), nil +} + // UnmarshalDataPacket unmarshals a DataPacket. func UnmarshalDataPacket(data []byte) (*DataPacket, error) { if len(data) < 7 || (len(data)-4)%3 != 0 { @@ -34,6 +59,7 @@ func UnmarshalDataPacket(data []byte) (*DataPacket, error) { Network: ddp.Network(binary.BigEndian.Uint16(data[:2])), Node: ddp.Node(data[3]), }, + Extended: true, } data = data[4:] @@ -56,6 +82,7 @@ func UnmarshalDataPacket(data []byte) (*DataPacket, error) { if nt.Distance != 0x82 { return nil, fmt.Errorf("unsupported RTMP version %x", nt.Distance) } + dp.Extended = false first = false continue } diff --git a/atalk/rtmp/misc.go b/atalk/rtmp/misc.go new file mode 100644 index 0000000..ba5b14f --- /dev/null +++ b/atalk/rtmp/misc.go @@ -0,0 +1,7 @@ +package rtmp + +import "bytes" + +func write16[I ~uint16](b *bytes.Buffer, n I) { + b.Write([]byte{byte(n >> 8), byte(n & 0xff)}) +} diff --git a/atalk/rtmp/request.go b/atalk/rtmp/request.go index ce9a2ae..2acc047 100644 --- a/atalk/rtmp/request.go +++ b/atalk/rtmp/request.go @@ -7,6 +7,15 @@ type RequestPacket struct { Function uint8 } +// Marshal marshals an RTMP Request or RTMP RDR packet. +func (rp *RequestPacket) Marshal() ([]byte, error) { + if rp.Function < 1 || rp.Function > 3 { + return nil, fmt.Errorf("invalid RTMP request function %d", rp.Function) + } + return []byte{rp.Function}, nil +} + +// UnmarshalRequestPacket unmarshals an RTMP Request or RTMP RDR packet. func UnmarshalRequestPacket(data []byte) (*RequestPacket, error) { if len(data) != 1 { return nil, fmt.Errorf("invalid data length %d for RTMP Request or RTMP RDR packet", len(data)) diff --git a/atalk/rtmp/response.go b/atalk/rtmp/response.go index 2cbaec6..1844013 100644 --- a/atalk/rtmp/response.go +++ b/atalk/rtmp/response.go @@ -1,12 +1,14 @@ package rtmp import ( + "bytes" "encoding/binary" "fmt" "github.com/sfiera/multitalk/pkg/ddp" ) +// ResponsePacket represents an RTMP Response packet. type ResponsePacket struct { SenderAddr ddp.Addr Extended bool @@ -14,6 +16,24 @@ type ResponsePacket struct { RangeEnd ddp.Network } +// Marshal marshals an RTMP Response packet. +func (rp *ResponsePacket) Marshal() ([]byte, error) { + b := bytes.NewBuffer(nil) + b.Grow(10) + write16(b, rp.SenderAddr.Network) + b.WriteByte(8) + b.WriteByte(byte(rp.SenderAddr.Node)) + if !rp.Extended { + return b.Bytes(), nil + } + write16(b, rp.RangeStart) + b.WriteByte(0x80) + write16(b, rp.RangeEnd) + b.WriteByte(0x82) + return b.Bytes(), nil +} + +// UnmarshalResponsePacket unmarshals an RTMP Response packet. func UnmarshalResponsePacket(data []byte) (*ResponsePacket, error) { if len(data) != 4 && len(data) != 10 { return nil, fmt.Errorf("invalid input length %d for RTMP Response packet", len(data))