Skip to content

Commit

Permalink
fixing typos
Browse files Browse the repository at this point in the history
  • Loading branch information
PWRxPSYCHO committed Jan 1, 2025
1 parent 373235f commit 09d790c
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 66 deletions.
49 changes: 25 additions & 24 deletions klv_parser/klv/misb_0601_19/metadata.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .uas_datalink_local_set import UasDatalinkLocalSet

_OUT_OF_RANGE_STR = "Out of Range"
_NA_OFF_EARTH = "N/A (Off-Earth)"


class Metadata:
Expand Down Expand Up @@ -241,7 +242,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (180 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -253,7 +254,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (360 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -276,7 +277,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (0.15 / 65534) * self.data_to_signed_int(self.data)


Expand All @@ -288,7 +289,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (0.15 / 65534) * self.data_to_signed_int(self.data)


Expand All @@ -300,7 +301,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (0.15 / 65534) * self.data_to_signed_int(self.data)


Expand All @@ -312,7 +313,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (0.15 / 65534) * self.data_to_signed_int(self.data)


Expand All @@ -324,7 +325,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (0.15 / 65534) * self.data_to_signed_int(self.data)


Expand All @@ -336,7 +337,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (0.15 / 65534) * self.data_to_signed_int(self.data)


Expand All @@ -348,7 +349,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (0.15 / 65534) * self.data_to_signed_int(self.data)


Expand All @@ -360,7 +361,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data.hex() == 8000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (0.15 / 65534) * self.data_to_signed_int(self.data)


Expand Down Expand Up @@ -429,7 +430,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (180 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -441,7 +442,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (360 / 4294967294) * self.data_to_signed_int(self.data)


Expand Down Expand Up @@ -846,7 +847,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (180 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -858,7 +859,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (360 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -870,7 +871,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (180 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -882,7 +883,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (360 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -894,7 +895,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (180 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -906,7 +907,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (360 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -918,7 +919,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (180 / 4294967294) * self.data_to_signed_int(self.data)


Expand All @@ -930,7 +931,7 @@ def __init__(self, data, length, id):

def format_data(self):
if self.data == 80000000:
return "N/A (Off-Earth)"
return _NA_OFF_EARTH
return (360 / 4294967294) * self.data_to_signed_int(self.data)


Expand Down Expand Up @@ -1023,7 +1024,7 @@ def __init__(self, data, length, id):


# Item 98
class GeoRegistratoinLocalSet(Metadata):
class GeoRegistrationLocalSet(Metadata):
def __init__(self, data, length, id):
super().__init__(data, length, id)
self.key_type = UasDatalinkLocalSet.GEO_REGISTRATION_LOCAL_SET
Expand Down Expand Up @@ -1171,7 +1172,7 @@ def __init__(self, data, length, id):
class RadarAltimeter(Metadata):
def __init__(self, data, length, id):
super().__init__(data, length, id)
self.key_type = UasDatalinkLocalSet.RADAR_ALTIMITER
self.key_type = UasDatalinkLocalSet.RADAR_ALTIMETER


# Item 115
Expand Down Expand Up @@ -1501,7 +1502,7 @@ def __init__(self, data, length, id):
UasDatalinkLocalSet.SAR_MOTION_IMAGERY_SET: SARMotionImageryLocalSet,
UasDatalinkLocalSet.TARGET_WIDTH_EXTENDED: TargetWidthExtended,
UasDatalinkLocalSet.RANGE_IMAGE_LOCAL_SET: RangeImageLocalSet,
UasDatalinkLocalSet.GEO_REGISTRATION_LOCAL_SET: GeoRegistratoinLocalSet,
UasDatalinkLocalSet.GEO_REGISTRATION_LOCAL_SET: GeoRegistrationLocalSet,
UasDatalinkLocalSet.COMPOSITE_IMAGING_LOCAL_SET: CompositeImagingLocalSet,
UasDatalinkLocalSet.SEGMENT_LOCAL_SET: SegmentLocalSet,
UasDatalinkLocalSet.AMEND_LOCAL_SET: AmendLocalSet,
Expand All @@ -1517,7 +1518,7 @@ def __init__(self, data, length, id):
UasDatalinkLocalSet.PROPULSION_UNIT_SPEED: PropulsionUnitSpeed,
UasDatalinkLocalSet.PLATFORM_COURSE_ANGLE: PlatformCourseAngle,
UasDatalinkLocalSet.ALTITUDE_AGL: AltitudeAGL,
UasDatalinkLocalSet.RADAR_ALTIMITER: RadarAltimeter,
UasDatalinkLocalSet.RADAR_ALTIMETER: RadarAltimeter,
UasDatalinkLocalSet.CONTROL_COMMAND: ControlCommand,
UasDatalinkLocalSet.CONTROL_COMMAND_VERIFICATION_LIST: ControlCommandVerificationList,
UasDatalinkLocalSet.SENSOR_AZIMUTH_RATE: SensorAzimuthRate,
Expand Down
2 changes: 1 addition & 1 deletion klv_parser/klv/misb_0601_19/uas_datalink_local_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class UasDatalinkLocalSet(Enum):
PROPULSION_UNIT_SPEED = 111
PLATFORM_COURSE_ANGLE = 112
ALTITUDE_AGL = 113
RADAR_ALTIMITER = 114
RADAR_ALTIMETER = 114
CONTROL_COMMAND = 115
CONTROL_COMMAND_VERIFICATION_LIST = 116
SENSOR_AZIMUTH_RATE = 117
Expand Down
97 changes: 56 additions & 41 deletions klv_parser/klv/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,63 +44,78 @@ def parse_packet(value) -> Packet:


# Define a helper function to extract KLV data based on the Key-Length-Value format (16-byte key)
def parse_klv(data) -> Packet:
def parse_klv(data) -> Packet | None:
idx = 0

data_len = len(data)

while idx < data_len:
# Ensure there is enough data for the key (16 bytes) and the length byte
if idx + 16 > data_len:
logger.info("Incomplete key, skipping packet")
if not has_enough_data_for_key(idx, data_len):
break

# Extract the 16-byte key
idx += 16 # Move past key.

# Ensure there's enough data for the length byte and the value
if idx >= data_len:
logger.info("Incomplete length field, skipping packet")
if not has_enough_data_for_length(idx, data_len):
break

# Extract the length indicator
length_indicator = data[idx] & 0x80 # Check MSB of the first length byte
value = b""

if length_indicator: # Long form (MSB is set)
# BER Long format: first byte indicates length of the length field
length_of_length_field = data[idx] & 0x7F # Get the number of length bytes
idx += 1 # Move past the first byte

# Check if there are enough bytes for the length encoding
if idx + length_of_length_field > data_len:
logger.info("Incomplete long-form length, skipping packet")
return None

# Now get the actual length, which spans the next 'length_of_length_field' bytes
length = 0
for _ in range(length_of_length_field):
length = (length << 8) | data[idx] # Shift and add the next byte to length
idx += 1

# Ensure there's enough data for the value
if idx + length > data_len:
logger.info("Incomplete value, skipping packet")
return None

value = data[idx : idx + length]
idx += length # Move past the value
value, idx = parse_long_form(data, idx, data_len)
else: # Short form (MSB is not set)
# BER Short format: length is encoded in the first byte (last 7 bits)
length = data[idx] & 0x7F # Get the last 7 bits
idx += 1 # Move past the length byte
value, idx = parse_short_form(data, idx, data_len)

# Ensure there's enough data for the value
if idx + length > data_len:
logger.info("Incomplete value, skipping packet")
if value is None:
return None

value = data[idx : idx + length]
parsed_value = parse_packet(value)
idx += length # Move past the value
idx += len(value) # Move past the value

return parsed_value


def has_enough_data_for_key(idx, data_len):
if idx + 16 > data_len:
logger.info("Incomplete key, skipping packet")
return False
return True


def has_enough_data_for_length(idx, data_len):
if idx >= data_len:
logger.info("Incomplete length field, skipping packet")
return False
return True


def parse_long_form(data, idx, data_len):
length_of_length_field = data[idx] & 0x7F # Get the number of length bytes
idx += 1 # Move past the first byte

if idx + length_of_length_field > data_len:
logger.info("Incomplete long-form length, skipping packet")
return None, idx

length = 0
for _ in range(length_of_length_field):
length = (length << 8) | data[idx] # Shift and add the next byte to length
idx += 1

if idx + length > data_len:
logger.info("Incomplete value, skipping packet")
return None, idx

value = data[idx : idx + length]
idx += length # Move past the value
return value, idx


def parse_short_form(data, idx, data_len):
length = data[idx] & 0x7F # Get the last 7 bits
idx += 1 # Move past the length byte

if idx + length > data_len:
logger.info("Incomplete value, skipping packet")
return None, idx

value = data[idx : idx + length]
return value, idx

0 comments on commit 09d790c

Please sign in to comment.