diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 54d8549b..7601a3d8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: # os: [ubuntu-latest] experimental: [false] # python-version: ["2.7","3.4","3.5","3.6","3.7","3.8","3.9","3.10","3.11","3.12"] - python-version: ["3.7","3.8","3.9","3.10","3.11","3.12"] + python-version: ["3.8","3.9","3.10","3.11","3.12", "3.13"] fail-fast: false steps: - uses: actions/checkout@v4 diff --git a/appveyor.yml b/appveyor.yml index f993c54e..33e35883 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -7,7 +7,6 @@ skip_branch_with_pr: true environment: matrix: - - TOXENV: py37 - TOXENV: py38 - TOXENV: py39 - TOXENV: py310 diff --git a/src/canmatrix/formats/dbc.py b/src/canmatrix/formats/dbc.py index 84400218..3bf6b90c 100644 --- a/src/canmatrix/formats/dbc.py +++ b/src/canmatrix/formats/dbc.py @@ -223,6 +223,8 @@ def dump(in_db, f, **options): # remove "-" from frame names if compatibility: frame.name = re.sub("[^A-Za-z0-9]", whitespace_replacement, frame.name) + if frame.name[0].isdigit(): + frame.name = "_" + frame.name duplicate_signal_totals = collections.Counter(normalized_names.values()) duplicate_signal_counter = collections.Counter() # type: typing.Counter[str] @@ -459,7 +461,7 @@ def dump(in_db, f, **options): if frame.is_complex_multiplexed: for signal in frame.signals: if signal.muxer_for_signal is not None: - f.write(("SG_MUL_VAL_ %d %s %s " % (frame.arbitration_id.to_compound_integer(), signal.name, signal.muxer_for_signal)).encode(dbc_export_encoding, ignore_encoding_errors)) + f.write(("SG_MUL_VAL_ %d %s %s " % (frame.arbitration_id.to_compound_integer(), output_names[frame][signal], signal.muxer_for_signal)).encode(dbc_export_encoding, ignore_encoding_errors)) f.write((", ".join(["%d-%d" % (a, b) for a, b in signal.mux_val_grp])).encode(dbc_export_encoding, ignore_encoding_errors)) f.write(";\n".encode(dbc_export_encoding, ignore_encoding_errors)) diff --git a/src/canmatrix/formats/eds.py b/src/canmatrix/formats/eds.py index 7c5c7e6f..11c37474 100644 --- a/src/canmatrix/formats/eds.py +++ b/src/canmatrix/formats/eds.py @@ -4,7 +4,9 @@ import canopen.objectdictionary.eds import canopen.objectdictionary.datatypes import codecs - +import copy +import re +import math logger = logging.getLogger(__name__) @@ -33,129 +35,224 @@ 0x82: 'Reset Communication'} -def get_signals(parent_object, signal_receiver): - signals = [] - position = 0 - for sub in range(1, len(parent_object)): - name = parent_object[sub].name - size = datatype_mapping[parent_object[sub].data_type][1] - unsigned = "UNSIGNED" in datatype_mapping[parent_object[sub].data_type][0] - signal = canmatrix.Signal(name = name, receivers=[signal_receiver], size=size, start_bit = position, is_signed = not unsigned) - position += size - signals.append(signal) - return signals +def name_cleanup(in_str): + rets_str = re.sub("[^A-Za-z0-9]", '_', in_str) + return rets_str +def get_bit_length(data_type_code): + return datatype_mapping[data_type_code][1] +def get_data_type_name(data_type_code): + return datatype_mapping[data_type_code][0] + +def format_index(index, subindex): + return f"Index: 0x{index:04X}{subindex:02X}" def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatrix eds_import_encoding = options.get("edsImportEncoding", 'iso-8859-1') - node_id = options.get("eds_node_id", 1) + node_id = options.get("eds_node_id", 0) + generic = options.get("generic", False) fp = codecs.getreader(eds_import_encoding)(f) od = canopen.objectdictionary.eds.import_eds(fp, node_id) db = canmatrix.CanMatrix() + signal_group_counter = 1 node_name = od.device_information.product_name + if len(node_name) == 0: + node_name = "DUMMY" plc_name = "PLC" + if generic is True: + nm_out = canmatrix.canmatrix.Frame(name="NMT_Out_Request", size=2, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0), transmitters=[plc_name]) + sig_cmd = canmatrix.canmatrix.Signal(name="nmt_CMD", size=8, start_bit = 0, receivers=[node_name]) + for val, val_name in cmd_values.items(): + sig_cmd.add_values(val, val_name) + nm_out.add_signal(sig_cmd) + nm_out.add_signal(canmatrix.canmatrix.Signal(name="Node_ID", size=8, start_bit = 8, receivers=[node_name])) + db.add_frame(nm_out) + + nm_responde = canmatrix.canmatrix.Frame(name="NMT_Response_Frame_In", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x700+node_id), transmitters=[node_name]) + response_sig1 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 0, receivers=[plc_name]) + nm_responde.add_signal(response_sig1) + response_sig2 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 32, receivers=[plc_name]) + nm_responde.add_signal(response_sig2) + db.add_frame(nm_responde) + + sync = canmatrix.canmatrix.Frame(name="SYNC", size=0, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80), transmitters=[plc_name]) + db.add_frame(sync) + + emcy = canmatrix.canmatrix.Frame(name="EMCY", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80+node_id), transmitters=[node_name]) + emcy.add_signal(canmatrix.canmatrix.Signal(name="EMCY_Error_Code", size=16, start_bit=0, receivers=[plc_name])) + emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Reg", size=8, start_bit=16, receivers=[plc_name])) + emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Number", size=8, start_bit=24, receivers=[plc_name])) + db.add_frame(emcy) - nm_out = canmatrix.canmatrix.Frame(name="NMT_Out_Request", size=2, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0), transmitters=[plc_name]) - sig_cmd = canmatrix.canmatrix.Signal(name="nmt_CMD", size=8, start_bit = 0, receivers=[node_name]) - for val, val_name in cmd_values.items(): - sig_cmd.add_values(val, val_name) - nm_out.add_signal(sig_cmd) - nm_out.add_signal(canmatrix.canmatrix.Signal(name="Node_ID", size=8, start_bit = 8, receivers=[node_name])) - db.add_frame(nm_out) - - nm_responde = canmatrix.canmatrix.Frame(name="NMT_Response_Frame_In", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x700+node_id), transmitters=[node_name]) - response_sig1 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 0, receivers=[plc_name]) - nm_responde.add_signal(response_sig1) - response_sig2 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 32, receivers=[plc_name]) - nm_responde.add_signal(response_sig2) - db.add_frame(nm_responde) - - sync = canmatrix.canmatrix.Frame(name="SYNC", size=0, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80), transmitters=[plc_name]) - db.add_frame(sync) - - emcy = canmatrix.canmatrix.Frame(name="EMCY", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80+node_id), transmitters=[node_name]) - emcy.add_signal(canmatrix.canmatrix.Signal(name="EMCY_Error_Code", size=16, start_bit=0, receivers=[plc_name])) - emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Reg", size=8, start_bit=16, receivers=[plc_name])) - emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Number", size=8, start_bit=24, receivers=[plc_name])) - db.add_frame(emcy) - - sdo_down = canmatrix.canmatrix.Frame(name="SDO_download", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x600+node_id), transmitters=[node_name]) - sig_cmd = canmatrix.canmatrix.Signal(name="sdo_down_CMD", size=8, start_bit=0, receivers=[plc_name]) + sdo_down = canmatrix.canmatrix.Frame(name="SDO_receive", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x600+node_id), transmitters=[node_name]) + sig_cmd = canmatrix.canmatrix.Signal(name="CCS", size=3, start_bit=5, receivers=[plc_name], is_signed=False) sig_cmd.is_multiplexer = True + sdo_down.is_complex_multiplexed = True sig_cmd.multiplex = "Multiplexor" sdo_down.add_signal(sig_cmd) - sdo_down.add_signal(canmatrix.canmatrix.Signal(name="sdo_down_IDX", size=16, start_bit=8, receivers=[plc_name])) - sdo_down.add_signal(canmatrix.canmatrix.Signal(name="sdo_down_SUBIDX", size=8, start_bit=24, receivers=[plc_name])) - sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data8", size=8, start_bit=32, receivers=[plc_name], multiplex=0x2f)) - sig_cmd.add_values(0x2f, "8_bytes") - sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data16", size=16, start_bit=32, receivers=[plc_name], multiplex=0x2b)) - sig_cmd.add_values(0x2b, "16_bytes") - sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data24", size=24, start_bit=32, receivers=[plc_name], multiplex=0x27)) - sig_cmd.add_values(0x27, "3_bytes") - sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data32", size=32, start_bit=32, receivers=[plc_name], multiplex=0x23)) - sig_cmd.add_values(0x23, "4_bytes") - sig_cmd.add_values(0x40, "upload_request") + index = canmatrix.canmatrix.Signal(name="IDX", size=24, start_bit=8, receivers=[plc_name]) + index.multiplex = "Multiplexor" + index.is_multiplexer = True + index.mux_val = 1 + index.mux_val_grp.append([ 2, 2]) + index.muxer_for_signal = "CCS" + sdo_down.add_signal(index) db.add_frame(sdo_down) - sdo_up = canmatrix.canmatrix.Frame(name="SDO_upload", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x580+node_id), transmitters=[plc_name]) - sig_cmd = canmatrix.canmatrix.Signal(name="sdo_state", size=8, start_bit=0, receivers=[node_name]) + sdo_up = canmatrix.canmatrix.Frame(name="SDO_transmit", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x580+node_id), transmitters=[plc_name]) + sig_cmd = canmatrix.canmatrix.Signal(name="SCS", size=3, start_bit=5, is_signed=False) sig_cmd.is_multiplexer = True + sdo_up.is_complex_multiplexed = True sig_cmd.multiplex = "Multiplexor" sdo_up.add_signal(sig_cmd) - sdo_up.add_signal(canmatrix.canmatrix.Signal(name="sdo_uo_IDX", size=16, start_bit=8, receivers=[node_name])) - sdo_up.add_signal(canmatrix.canmatrix.Signal(name="sdo_up_SUBIDX", size=8, start_bit=24, receivers=[node_name])) - sdo_up.add_signal(canmatrix.canmatrix.Signal(name="error_code", size=32, start_bit=32, receivers=[node_name], multiplex=0x80)) - sig_cmd.add_values(0x80, "upload_error") - - sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data8", size=8, start_bit=32, receivers=[plc_name], multiplex=0x4f)) - sig_cmd.add_values(0x2f, "8_bytes") - sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data16", size=16, start_bit=32, receivers=[plc_name], multiplex=0x4b)) - sig_cmd.add_values(0x2b, "16_bytes") - sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data24", size=24, start_bit=32, receivers=[plc_name], multiplex=0x47)) - sig_cmd.add_values(0x27, "3_bytes") - sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data32", size=32, start_bit=32, receivers=[plc_name], multiplex=0x43)) - sig_cmd.add_values(0x23, "4_bytes") + + index = canmatrix.canmatrix.Signal(name="IDX", size=24, start_bit=8) + index.multiplex = "Multiplexor" + index.is_multiplexer = True + index.mux_val = 2 + index.mux_val_grp.append([ 2, 2]) + index.muxer_for_signal = "SCS" + sdo_up.add_signal(index) db.add_frame(sdo_up) - # RX Can-Ids ... - for index in range(0x1400, 0x1408): - if index in od: - # store canid in object... - od[index+0x200].canid = od[index][1].default + for obj in od.values(): + if isinstance(obj, canopen.objectdictionary.ODVariable): + subindex = 0 + combined_value = int(f"{subindex:02X}{obj.index:04X}", 16) + signal_name = name_cleanup(obj.name) + size = get_bit_length(obj.data_type) + if size == 0: + logger.info("Ignoring " + signal_name + " size 0") + continue + new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=size, start_bit=32, receivers=[plc_name]) + datatype_name = get_data_type_name(obj.data_type) + if "UNSIGNED" in datatype_name: + new_sig.is_signed = False + new_sig.mux_val = combined_value + new_sig.mux_val_grp.append([ combined_value, combined_value]) + new_sig.muxer_for_signal = "IDX" + sdo_down.add_signal(new_sig) + up_sig = copy.deepcopy(new_sig) + up_sig.muxer_for_signal = "IDX" - ##RX PDOs - for index in range(0x1600, 0x1608): - if index in od: - pdo_name = od[index].name.replace(" ", "_") - frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[plc_name]) - db.add_frame(frame) - frame_id = od[index].canid + up_sig.receivers = [] + sdo_up.add_signal(up_sig) + elif isinstance(obj, canopen.objectdictionary.ODRecord): + members = [] + for subobj in obj.values(): + combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16) + signal_name = name_cleanup(subobj.name) + size = get_bit_length(subobj.data_type) + if size == 0: + logger.info("Ignoring " + signal_name + " size 0") + continue + + new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=size, start_bit=32, receivers=[plc_name]) + datatype_name = get_data_type_name(subobj.data_type) + if "UNSIGNED" in datatype_name: + new_sig.is_signed = False + new_sig.mux_val = combined_value + new_sig.mux_val_grp.append([ combined_value, combined_value]) + new_sig.muxer_for_signal = "IDX" + sdo_down.add_signal(new_sig) + up_sig = copy.deepcopy(new_sig) + up_sig.muxer_for_signal = "IDX" + + up_sig.receivers = [] + sdo_up.add_signal(up_sig) + if len(members) > 0: + sdo_down.add_signal_group("SG_R_" + name_cleanup(obj.name), signal_group_counter, members) + signal_group_counter += 1 + + elif isinstance(obj, canopen.objectdictionary.ODArray): + members = [] + for subobj in obj.values(): + combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16) + signal_name = name_cleanup(subobj.name) + size = get_bit_length(subobj.data_type) + if size == 0: + logger.info("Ignoring " + signal_name + " size 0") + continue + + new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=size, start_bit=32, receivers=[plc_name]) + datatype_name = get_data_type_name(subobj.data_type) + if "UNSIGNED" in datatype_name: + new_sig.is_signed = False + new_sig.mux_val = combined_value + new_sig.mux_val_grp.append([ combined_value, combined_value]) + new_sig.muxer_for_signal = "IDX" + sdo_down.add_signal(new_sig) + members.append(signal_name) + up_sig = copy.deepcopy(new_sig) + up_sig.muxer_for_signal = "IDX" + up_sig.receivers = [] + sdo_up.add_signal(up_sig) + if len(members) > 0: + sdo_down.add_signal_group("SG_A_" + name_cleanup(obj.name), signal_group_counter, members) + signal_group_counter += 1 + + + for start_index, rx_tx_config in {0x1400 : {"transmitter": [], "receiver": [node_name]}, 0x1800: {"transmitter": [node_name], "receiver": []}}.items(): + for comm_index in range(start_index, start_index + 0x8): + map_index = comm_index + 0x200 + if comm_index not in od or map_index not in od: + continue + + # Retrieve the COB-ID + comm_param = od[comm_index] #od.get(comm_index) + cob_id_entry = comm_param.get(1) if comm_param else None + if not cob_id_entry or cob_id_entry.default is None: + # print(f" Warning: No valid COB-ID found for {pdo_type} PDO at index 0x{comm_index:04X}. Skipping.") + continue + cob_id = cob_id_entry.default & 0x7FF + pdo_name = name_cleanup(od[comm_index].name) + frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=rx_tx_config["transmitter"]) + frame_id = cob_id frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id) - frame.size = 8 - signals = get_signals(od[index], node_name) - for sig in signals: - frame.add_signal(sig) - - # RT Can-Ids ... - for index in range(0x1800, 0x1808): - if index in od: - # store canid in object... - od[index+0x200].canid = od[index][1].default - 0x40000000 - - #TX - for index in range(0x1A00, 0x1A08): - if index in od: - frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[node_name]) db.add_frame(frame) - pdo_name = od[index].name.replace(" ", "_") - frame_id = od[index].canid - frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id) - frame.size = 8 - signals = get_signals(od[index], plc_name) - for sig in signals: - frame.add_signal(sig) + mapping_param = od.get(map_index) + if not mapping_param: + # print(f" Warning: No mapping parameter found for {pdo_type} PDO at index 0x{map_index:04X}.") + continue + num_entries = mapping_param[0].default if 0 in mapping_param else 0 + current_bit_start = 0 + for subindex in range(1, num_entries + 1): + mapping_entry = mapping_param.get(subindex) + if not mapping_entry or mapping_entry.default is None: + #print(f" Warning: Subindex {subindex} missing for mapping parameter at 0x{map_index:04X}.") + continue + + # Decode the mapping entry + mapping_value = mapping_entry.default + obj_index = (mapping_value >> 16) & 0xFFFF + obj_subindex = (mapping_value >> 8) & 0xFF + bit_length = mapping_value & 0xFF + + # Fetch the mapped object + mapped_obj = od.get_variable(obj_index, obj_subindex) + if not mapped_obj: + #print(f" Warning: Could not find object at Index: 0x{obj_index:04X}, Subindex: {obj_subindex}.") + current_bit_start += bit_length + continue + signal_name = name_cleanup(mapped_obj.name) + new_sig = canmatrix.Signal(signal_name, size=bit_length, start_bit=current_bit_start) + datatype_name = get_data_type_name(mapping_entry.data_type) + if "UNSIGNED" in datatype_name: + new_sig.is_signed = False + new_sig.factor = mapped_obj.factor + if mapped_obj.min is not None: + new_sig.min = mapped_obj.min + new_sig.offset = mapped_obj.min + if mapped_obj.max is not None: + new_sig.max = mapped_obj.max + new_sig.receivers = rx_tx_config["receiver"] + frame.add_signal(new_sig) + current_bit_start += bit_length + frame.size = math.ceil(current_bit_start/8) db.update_ecu_list() + for ecu in db.ecus: + db.rename_ecu(ecu.name, name_cleanup(ecu.name)) return db