diff --git a/code-generation/language-python/src/main/resources/templates/python/data-io-template.python.ftlh b/code-generation/language-python/src/main/resources/templates/python/data-io-template.python.ftlh index cbef5a873be..8605746db96 100644 --- a/code-generation/language-python/src/main/resources/templates/python/data-io-template.python.ftlh +++ b/code-generation/language-python/src/main/resources/templates/python/data-io-template.python.ftlh @@ -78,7 +78,7 @@ class ${type.name}: <#else> ${helper.camelCaseToSnakeCase(helper.toParseExpression(dataIoTypeDefinition.switchField.orElseThrow(), discriminatorType, discriminatorValueTerm, parserArguments))} - <#sep> and + <#sep> & : # ${case.name} <#else> @@ -102,7 +102,7 @@ class ${type.name}: <@emitImport import="from typing import List" /> <@emitImport import="from plc4py.api.value.PlcValue import PlcValue" /> ${helper.camelCaseToSnakeCase(arrayField.name)}: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): ${helper.camelCaseToSnakeCase(arrayField.name)}.append(${helper.getPlcValueTypeForTypeReference(elementTypeReference)}(${helper.getLanguageTypeNameForTypeReference(elementTypeReference, false)}(<#if elementTypeReference.isSimpleTypeReference()>${helper.getReadBufferReadMethodCall(elementTypeReference.asSimpleTypeReference().orElseThrow(), "", arrayField)})<#else>${elementTypeReference.asComplexTypeReference().orElseThrow().name}IO.static_parse(read_buffer<#if elementTypeReference.params.isPresent()>, <#list elementTypeReference.params.orElseThrow() as parserArgument>(${helper.getLanguageTypeNameForTypeReference(helper.getArgumentType(elementTypeReference, parserArgument?index), true)}) (${helper.toParseExpression(arrayField, elementTypeReference, parserArgument,parserArguments)})<#sep>, ))) <#-- In all other cases do we have to work with a list, that is later converted to an array --> @@ -340,9 +340,8 @@ class ${type.name}: def static_serialize(write_buffer: WriteBuffer, _value: PlcValue<#if type.parserArguments.isPresent()>, <#list type.parserArguments.orElseThrow() as parserArgument>${helper.camelCaseToSnakeCase(parserArgument.name)}: ${helper.getLanguageTypeNameForTypeReference(parserArgument.type, false)}<#sep>, , byte_order: ByteOrder) -> None: <#assign defaultCaseOutput=false> <#assign dataIoTypeDefinition=type.asDataIoTypeDefinition().orElseThrow()> - <#list dataIoTypeDefinition.switchField.orElseThrow().cases as case> - <#if case.discriminatorValueTerms?has_content> - if <@compress single_line=true> + if <#list dataIoTypeDefinition.switchField.orElseThrow().cases as case><#if case.discriminatorValueTerms?has_content> <@compress single_line=true> + <#list case.discriminatorValueTerms as discriminatorValueTerm> <#assign discriminatorExpression=dataIoTypeDefinition.switchField.orElseThrow().discriminatorExpressions[discriminatorValueTerm?index].asLiteral().orElseThrow().asVariableLiteral().orElseThrow()> <#assign discriminatorType=helper.getDiscriminatorTypes()[discriminatorExpression.name]> @@ -436,7 +435,7 @@ class ${type.name}: <#break> - <#sep> + <#sep><@compress single_line=true>elif <@emitImport import="import math" /> diff --git a/sandbox/plc4py/plc4py/api/messages/PlcMessage.py b/sandbox/plc4py/plc4py/api/messages/PlcMessage.py index 1558207054c..6ac3e59867b 100644 --- a/sandbox/plc4py/plc4py/api/messages/PlcMessage.py +++ b/sandbox/plc4py/plc4py/api/messages/PlcMessage.py @@ -16,7 +16,8 @@ # specific language governing permissions and limitations # under the License. # +from plc4py.spi.values.Common import Serializable -class PlcMessage: +class PlcMessage(Serializable): pass diff --git a/sandbox/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py b/sandbox/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py index ab4b18171ae..c2cb165c818 100644 --- a/sandbox/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py +++ b/sandbox/plc4py/plc4py/protocols/modbus/readwrite/DataItem.py @@ -51,7 +51,7 @@ class DataItem: def static_parse( read_buffer: ReadBuffer, data_type: ModbusDataType, number_of_values: int ): - if data_type == ModbusDataType.BOOL and number_of_values == int(1): # BOOL + if data_type == ModbusDataType.BOOL & number_of_values == int(1): # BOOL # Reserved Field (Compartmentalized so the "reserved" variable can't leak) reserved: int = read_buffer.read_unsigned_int(15, logical_name="") @@ -73,11 +73,11 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcBOOL(bool(read_buffer.read_bit("")))) return PlcList(value) - if data_type == ModbusDataType.BYTE and number_of_values == int(1): # BYTE + if data_type == ModbusDataType.BYTE & number_of_values == int(1): # BYTE # Reserved Field (Compartmentalized so the "reserved" variable can't leak) reserved: int = read_buffer.read_unsigned_short(8, logical_name="") @@ -99,7 +99,7 @@ def static_parse( # Count array item_count: int = int(number_of_values * int(8)) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcBOOL(bool(read_buffer.read_bit("")))) return PlcList(value) @@ -121,7 +121,7 @@ def static_parse( value: int = read_buffer.read_unsigned_long(64, logical_name="") return PlcLWORD(value) - if data_type == ModbusDataType.SINT and number_of_values == int(1): # SINT + if data_type == ModbusDataType.SINT & number_of_values == int(1): # SINT # Reserved Field (Compartmentalized so the "reserved" variable can't leak) reserved: int = read_buffer.read_unsigned_short(8, logical_name="") @@ -143,13 +143,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcSINT(int(read_buffer.read_signed_byte(8, logical_name=""))) ) return PlcList(value) - if data_type == ModbusDataType.INT and number_of_values == int(1): # INT + if data_type == ModbusDataType.INT & number_of_values == int(1): # INT # Simple Field (value) value: int = read_buffer.read_short(16, logical_name="") @@ -160,11 +160,11 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcINT(int(read_buffer.read_short(16, logical_name="")))) return PlcList(value) - if data_type == ModbusDataType.DINT and number_of_values == int(1): # DINT + if data_type == ModbusDataType.DINT & number_of_values == int(1): # DINT # Simple Field (value) value: int = read_buffer.read_int(32, logical_name="") @@ -175,11 +175,11 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcDINT(int(read_buffer.read_int(32, logical_name="")))) return PlcList(value) - if data_type == ModbusDataType.LINT and number_of_values == int(1): # LINT + if data_type == ModbusDataType.LINT & number_of_values == int(1): # LINT # Simple Field (value) value: int = read_buffer.read_long(64, logical_name="") @@ -190,11 +190,11 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcLINT(int(read_buffer.read_long(64, logical_name="")))) return PlcList(value) - if data_type == ModbusDataType.USINT and number_of_values == int(1): # USINT + if data_type == ModbusDataType.USINT & number_of_values == int(1): # USINT # Reserved Field (Compartmentalized so the "reserved" variable can't leak) reserved: int = read_buffer.read_unsigned_short(8, logical_name="") @@ -216,13 +216,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcUINT(int(read_buffer.read_unsigned_short(8, logical_name=""))) ) return PlcList(value) - if data_type == ModbusDataType.UINT and number_of_values == int(1): # UINT + if data_type == ModbusDataType.UINT & number_of_values == int(1): # UINT # Simple Field (value) value: int = read_buffer.read_unsigned_int(16, logical_name="") @@ -233,13 +233,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcUDINT(int(read_buffer.read_unsigned_int(16, logical_name=""))) ) return PlcList(value) - if data_type == ModbusDataType.UDINT and number_of_values == int(1): # UDINT + if data_type == ModbusDataType.UDINT & number_of_values == int(1): # UDINT # Simple Field (value) value: int = read_buffer.read_unsigned_long(32, logical_name="") @@ -250,13 +250,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcULINT(int(read_buffer.read_unsigned_long(32, logical_name=""))) ) return PlcList(value) - if data_type == ModbusDataType.ULINT and number_of_values == int(1): # ULINT + if data_type == ModbusDataType.ULINT & number_of_values == int(1): # ULINT # Simple Field (value) value: int = read_buffer.read_unsigned_long(64, logical_name="") @@ -267,13 +267,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcLINT(int(read_buffer.read_unsigned_long(64, logical_name=""))) ) return PlcList(value) - if data_type == ModbusDataType.REAL and number_of_values == int(1): # REAL + if data_type == ModbusDataType.REAL & number_of_values == int(1): # REAL # Simple Field (value) value: float = read_buffer.read_float(32, logical_name="") @@ -284,13 +284,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcREAL(float(read_buffer.read_float(32, logical_name=""))) ) return PlcList(value) - if data_type == ModbusDataType.LREAL and number_of_values == int(1): # LREAL + if data_type == ModbusDataType.LREAL & number_of_values == int(1): # LREAL # Simple Field (value) value: float = read_buffer.read_double(64, logical_name="") @@ -301,13 +301,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcLREAL(float(read_buffer.read_double(64, logical_name=""))) ) return PlcList(value) - if data_type == ModbusDataType.CHAR and number_of_values == int(1): # CHAR + if data_type == ModbusDataType.CHAR & number_of_values == int(1): # CHAR # Simple Field (value) value: str = read_buffer.read_str(8, logical_name="", encoding="") @@ -318,7 +318,7 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcSTRING( str(read_buffer.read_str(8, logical_name="", encoding="")) @@ -326,7 +326,7 @@ def static_parse( ) return PlcList(value) - if data_type == ModbusDataType.WCHAR and number_of_values == int(1): # WCHAR + if data_type == ModbusDataType.WCHAR & number_of_values == int(1): # WCHAR # Simple Field (value) value: str = read_buffer.read_str(16, logical_name="", encoding="") @@ -337,7 +337,7 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcSTRING( str(read_buffer.read_str(16, logical_name="", encoding="")) @@ -362,7 +362,7 @@ def static_serialize( # Simple Field (value) value: bool = _value.get_bool() write_buffer.write_bit((value), "value") - if data_type == ModbusDataType.BOOL: # List + elif data_type == ModbusDataType.BOOL: # List values: PlcList = _value @@ -370,14 +370,14 @@ def static_serialize( value: bool = val.get_bool() write_buffer.write_bit((value), "value") - if data_type == ModbusDataType.BYTE and number_of_values == int(1): # BYTE + elif data_type == ModbusDataType.BYTE and number_of_values == int(1): # BYTE # Reserved Field write_buffer.write_byte(int(0x00), 8, "int0x00") # Simple Field (value) value: int = _value.get_int() write_buffer.write_byte((value), 8, "value") - if data_type == ModbusDataType.BYTE: # List + elif data_type == ModbusDataType.BYTE: # List values: PlcList = _value @@ -385,29 +385,29 @@ def static_serialize( value: bool = val.get_bool() write_buffer.write_bit((value), "value") - if data_type == ModbusDataType.WORD: # WORD + elif data_type == ModbusDataType.WORD: # WORD # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == ModbusDataType.DWORD: # DWORD + elif data_type == ModbusDataType.DWORD: # DWORD # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == ModbusDataType.LWORD: # LWORD + elif data_type == ModbusDataType.LWORD: # LWORD # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_long((value), 64, "value") - if data_type == ModbusDataType.SINT and number_of_values == int(1): # SINT + elif data_type == ModbusDataType.SINT and number_of_values == int(1): # SINT # Reserved Field write_buffer.write_byte(int(0x00), 8, "int0x00") # Simple Field (value) value: int = _value.get_int() write_buffer.write_signed_byte((value), 8, "value") - if data_type == ModbusDataType.SINT: # List + elif data_type == ModbusDataType.SINT: # List values: PlcList = _value @@ -415,12 +415,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_signed_byte((value), 8, "value") - if data_type == ModbusDataType.INT and number_of_values == int(1): # INT + elif data_type == ModbusDataType.INT and number_of_values == int(1): # INT # Simple Field (value) value: int = _value.get_int() write_buffer.write_short((value), 16, "value") - if data_type == ModbusDataType.INT: # List + elif data_type == ModbusDataType.INT: # List values: PlcList = _value @@ -428,12 +428,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_short((value), 16, "value") - if data_type == ModbusDataType.DINT and number_of_values == int(1): # DINT + elif data_type == ModbusDataType.DINT and number_of_values == int(1): # DINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_int((value), 32, "value") - if data_type == ModbusDataType.DINT: # List + elif data_type == ModbusDataType.DINT: # List values: PlcList = _value @@ -441,12 +441,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_int((value), 32, "value") - if data_type == ModbusDataType.LINT and number_of_values == int(1): # LINT + elif data_type == ModbusDataType.LINT and number_of_values == int(1): # LINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_long((value), 64, "value") - if data_type == ModbusDataType.LINT: # List + elif data_type == ModbusDataType.LINT: # List values: PlcList = _value @@ -454,14 +454,14 @@ def static_serialize( value: int = val.get_int() write_buffer.write_long((value), 64, "value") - if data_type == ModbusDataType.USINT and number_of_values == int(1): # USINT + elif data_type == ModbusDataType.USINT and number_of_values == int(1): # USINT # Reserved Field write_buffer.write_byte(int(0x00), 8, "int0x00") # Simple Field (value) value: int = _value.get_int() write_buffer.write_byte((value), 8, "value") - if data_type == ModbusDataType.USINT: # List + elif data_type == ModbusDataType.USINT: # List values: PlcList = _value @@ -469,12 +469,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_byte((value), 8, "value") - if data_type == ModbusDataType.UINT and number_of_values == int(1): # UINT + elif data_type == ModbusDataType.UINT and number_of_values == int(1): # UINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == ModbusDataType.UINT: # List + elif data_type == ModbusDataType.UINT: # List values: PlcList = _value @@ -482,12 +482,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == ModbusDataType.UDINT and number_of_values == int(1): # UDINT + elif data_type == ModbusDataType.UDINT and number_of_values == int(1): # UDINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == ModbusDataType.UDINT: # List + elif data_type == ModbusDataType.UDINT: # List values: PlcList = _value @@ -495,12 +495,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == ModbusDataType.ULINT and number_of_values == int(1): # ULINT + elif data_type == ModbusDataType.ULINT and number_of_values == int(1): # ULINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_long((value), 64, "value") - if data_type == ModbusDataType.ULINT: # List + elif data_type == ModbusDataType.ULINT: # List values: PlcList = _value @@ -508,12 +508,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_long((value), 64, "value") - if data_type == ModbusDataType.REAL and number_of_values == int(1): # REAL + elif data_type == ModbusDataType.REAL and number_of_values == int(1): # REAL # Simple Field (value) value: float = _value.get_float() write_buffer.write_float((value), 32, "value") - if data_type == ModbusDataType.REAL: # List + elif data_type == ModbusDataType.REAL: # List values: PlcList = _value @@ -521,12 +521,12 @@ def static_serialize( value: float = val.get_float() write_buffer.write_float((value), 32, "value") - if data_type == ModbusDataType.LREAL and number_of_values == int(1): # LREAL + elif data_type == ModbusDataType.LREAL and number_of_values == int(1): # LREAL # Simple Field (value) value: float = _value.get_float() write_buffer.write_double((value), 64, "value") - if data_type == ModbusDataType.LREAL: # List + elif data_type == ModbusDataType.LREAL: # List values: PlcList = _value @@ -534,12 +534,12 @@ def static_serialize( value: float = val.get_float() write_buffer.write_double((value), 64, "value") - if data_type == ModbusDataType.CHAR and number_of_values == int(1): # CHAR + elif data_type == ModbusDataType.CHAR and number_of_values == int(1): # CHAR # Simple Field (value) value: str = _value.get_str() write_buffer.write_str((value), 8, "UTF-8", "value") - if data_type == ModbusDataType.CHAR: # List + elif data_type == ModbusDataType.CHAR: # List values: PlcList = _value @@ -547,12 +547,12 @@ def static_serialize( value: str = val.get_str() write_buffer.write_str((value), 8, "UTF-8", "value") - if data_type == ModbusDataType.WCHAR and number_of_values == int(1): # WCHAR + elif data_type == ModbusDataType.WCHAR and number_of_values == int(1): # WCHAR # Simple Field (value) value: str = _value.get_str() write_buffer.write_str((value), 16, "UTF-16", "value") - if data_type == ModbusDataType.WCHAR: # List + elif data_type == ModbusDataType.WCHAR: # List values: PlcList = _value diff --git a/sandbox/plc4py/plc4py/protocols/simulated/readwrite/DataItem.py b/sandbox/plc4py/plc4py/protocols/simulated/readwrite/DataItem.py index 17030d0c133..f9852b52b77 100644 --- a/sandbox/plc4py/plc4py/protocols/simulated/readwrite/DataItem.py +++ b/sandbox/plc4py/plc4py/protocols/simulated/readwrite/DataItem.py @@ -47,7 +47,7 @@ class DataItem: @staticmethod def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int): - if data_type == "_bool" and number_of_values == int(1): # BOOL + if data_type == "_bool" & number_of_values == int(1): # BOOL # Simple Field (value) value: bool = read_buffer.read_bit("") @@ -58,11 +58,11 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcBOOL(bool(read_buffer.read_bit("")))) return PlcList(value) - if data_type == "_byte" and number_of_values == int(1): # BYTE + if data_type == "_byte" & number_of_values == int(1): # BYTE # Simple Field (value) value: int = read_buffer.read_unsigned_short(8, logical_name="") @@ -73,13 +73,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcUINT(int(read_buffer.read_unsigned_short(8, logical_name=""))) ) return PlcList(value) - if data_type == "_word" and number_of_values == int(1): # WORD + if data_type == "_word" & number_of_values == int(1): # WORD # Simple Field (value) value: int = read_buffer.read_unsigned_int(16, logical_name="") @@ -90,13 +90,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcUDINT(int(read_buffer.read_unsigned_int(16, logical_name=""))) ) return PlcList(value) - if data_type == "_dword" and number_of_values == int(1): # DWORD + if data_type == "_dword" & number_of_values == int(1): # DWORD # Simple Field (value) value: int = read_buffer.read_unsigned_long(32, logical_name="") @@ -107,13 +107,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcULINT(int(read_buffer.read_unsigned_long(32, logical_name=""))) ) return PlcList(value) - if data_type == "_lword" and number_of_values == int(1): # LWORD + if data_type == "_lword" & number_of_values == int(1): # LWORD # Simple Field (value) value: int = read_buffer.read_unsigned_long(64, logical_name="") @@ -124,13 +124,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcLINT(int(read_buffer.read_unsigned_long(64, logical_name=""))) ) return PlcList(value) - if data_type == "_sint" and number_of_values == int(1): # SINT + if data_type == "_sint" & number_of_values == int(1): # SINT # Simple Field (value) value: int = read_buffer.read_signed_byte(8, logical_name="") @@ -141,13 +141,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcSINT(int(read_buffer.read_signed_byte(8, logical_name=""))) ) return PlcList(value) - if data_type == "_int" and number_of_values == int(1): # INT + if data_type == "_int" & number_of_values == int(1): # INT # Simple Field (value) value: int = read_buffer.read_short(16, logical_name="") @@ -158,11 +158,11 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcINT(int(read_buffer.read_short(16, logical_name="")))) return PlcList(value) - if data_type == "_dint" and number_of_values == int(1): # DINT + if data_type == "_dint" & number_of_values == int(1): # DINT # Simple Field (value) value: int = read_buffer.read_int(32, logical_name="") @@ -173,11 +173,11 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcDINT(int(read_buffer.read_int(32, logical_name="")))) return PlcList(value) - if data_type == "_lint" and number_of_values == int(1): # LINT + if data_type == "_lint" & number_of_values == int(1): # LINT # Simple Field (value) value: int = read_buffer.read_long(64, logical_name="") @@ -188,11 +188,11 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcLINT(int(read_buffer.read_long(64, logical_name="")))) return PlcList(value) - if data_type == "_usint" and number_of_values == int(1): # USINT + if data_type == "_usint" & number_of_values == int(1): # USINT # Simple Field (value) value: int = read_buffer.read_unsigned_short(8, logical_name="") @@ -203,13 +203,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcUINT(int(read_buffer.read_unsigned_short(8, logical_name=""))) ) return PlcList(value) - if data_type == "_uint" and number_of_values == int(1): # UINT + if data_type == "_uint" & number_of_values == int(1): # UINT # Simple Field (value) value: int = read_buffer.read_unsigned_int(16, logical_name="") @@ -220,13 +220,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcUDINT(int(read_buffer.read_unsigned_int(16, logical_name=""))) ) return PlcList(value) - if data_type == "_udint" and number_of_values == int(1): # UDINT + if data_type == "_udint" & number_of_values == int(1): # UDINT # Simple Field (value) value: int = read_buffer.read_unsigned_long(32, logical_name="") @@ -237,13 +237,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcULINT(int(read_buffer.read_unsigned_long(32, logical_name=""))) ) return PlcList(value) - if data_type == "_ulint" and number_of_values == int(1): # ULINT + if data_type == "_ulint" & number_of_values == int(1): # ULINT # Simple Field (value) value: int = read_buffer.read_unsigned_long(64, logical_name="") @@ -254,13 +254,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcLINT(int(read_buffer.read_unsigned_long(64, logical_name=""))) ) return PlcList(value) - if data_type == "_real" and number_of_values == int(1): # REAL + if data_type == "_real" & number_of_values == int(1): # REAL # Simple Field (value) value: float = read_buffer.read_float(32, logical_name="") @@ -271,13 +271,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcREAL(float(read_buffer.read_float(32, logical_name=""))) ) return PlcList(value) - if data_type == "_lreal" and number_of_values == int(1): # LREAL + if data_type == "_lreal" & number_of_values == int(1): # LREAL # Simple Field (value) value: float = read_buffer.read_double(64, logical_name="") @@ -288,13 +288,13 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcLREAL(float(read_buffer.read_double(64, logical_name=""))) ) return PlcList(value) - if data_type == "_char" and number_of_values == int(1): # CHAR + if data_type == "_char" & number_of_values == int(1): # CHAR # Simple Field (value) value: str = read_buffer.read_str(8, logical_name="", encoding="") @@ -305,7 +305,7 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcSTRING( str(read_buffer.read_str(8, logical_name="", encoding="")) @@ -313,7 +313,7 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) ) return PlcList(value) - if data_type == "_wchar" and number_of_values == int(1): # WCHAR + if data_type == "_wchar" & number_of_values == int(1): # WCHAR # Simple Field (value) value: str = read_buffer.read_str(16, logical_name="", encoding="") @@ -324,7 +324,7 @@ def static_parse(read_buffer: ReadBuffer, data_type: str, number_of_values: int) # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcSTRING( str(read_buffer.read_str(16, logical_name="", encoding="")) @@ -359,7 +359,7 @@ def static_serialize( # Simple Field (value) value: bool = _value.get_bool() write_buffer.write_bit((value), "value") - if data_type == "BOOL": # List + elif data_type == "BOOL": # List values: PlcList = _value @@ -367,12 +367,12 @@ def static_serialize( value: bool = val.get_bool() write_buffer.write_bit((value), "value") - if data_type == "BYTE" and number_of_values == int(1): # BYTE + elif data_type == "BYTE" and number_of_values == int(1): # BYTE # Simple Field (value) value: int = _value.get_int() write_buffer.write_byte((value), 8, "value") - if data_type == "BYTE": # List + elif data_type == "BYTE": # List values: PlcList = _value @@ -380,12 +380,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_byte((value), 8, "value") - if data_type == "WORD" and number_of_values == int(1): # WORD + elif data_type == "WORD" and number_of_values == int(1): # WORD # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == "WORD": # List + elif data_type == "WORD": # List values: PlcList = _value @@ -393,12 +393,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == "DWORD" and number_of_values == int(1): # DWORD + elif data_type == "DWORD" and number_of_values == int(1): # DWORD # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == "DWORD": # List + elif data_type == "DWORD": # List values: PlcList = _value @@ -406,12 +406,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == "LWORD" and number_of_values == int(1): # LWORD + elif data_type == "LWORD" and number_of_values == int(1): # LWORD # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_long((value), 64, "value") - if data_type == "LWORD": # List + elif data_type == "LWORD": # List values: PlcList = _value @@ -419,12 +419,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_long((value), 64, "value") - if data_type == "SINT" and number_of_values == int(1): # SINT + elif data_type == "SINT" and number_of_values == int(1): # SINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_signed_byte((value), 8, "value") - if data_type == "SINT": # List + elif data_type == "SINT": # List values: PlcList = _value @@ -432,12 +432,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_signed_byte((value), 8, "value") - if data_type == "INT" and number_of_values == int(1): # INT + elif data_type == "INT" and number_of_values == int(1): # INT # Simple Field (value) value: int = _value.get_int() write_buffer.write_short((value), 16, "value") - if data_type == "INT": # List + elif data_type == "INT": # List values: PlcList = _value @@ -445,12 +445,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_short((value), 16, "value") - if data_type == "DINT" and number_of_values == int(1): # DINT + elif data_type == "DINT" and number_of_values == int(1): # DINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_int((value), 32, "value") - if data_type == "DINT": # List + elif data_type == "DINT": # List values: PlcList = _value @@ -458,12 +458,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_int((value), 32, "value") - if data_type == "LINT" and number_of_values == int(1): # LINT + elif data_type == "LINT" and number_of_values == int(1): # LINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_long((value), 64, "value") - if data_type == "LINT": # List + elif data_type == "LINT": # List values: PlcList = _value @@ -471,12 +471,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_long((value), 64, "value") - if data_type == "USINT" and number_of_values == int(1): # USINT + elif data_type == "USINT" and number_of_values == int(1): # USINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_byte((value), 8, "value") - if data_type == "USINT": # List + elif data_type == "USINT": # List values: PlcList = _value @@ -484,12 +484,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_byte((value), 8, "value") - if data_type == "UINT" and number_of_values == int(1): # UINT + elif data_type == "UINT" and number_of_values == int(1): # UINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == "UINT": # List + elif data_type == "UINT": # List values: PlcList = _value @@ -497,12 +497,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == "UDINT" and number_of_values == int(1): # UDINT + elif data_type == "UDINT" and number_of_values == int(1): # UDINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == "UDINT": # List + elif data_type == "UDINT": # List values: PlcList = _value @@ -510,12 +510,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == "ULINT" and number_of_values == int(1): # ULINT + elif data_type == "ULINT" and number_of_values == int(1): # ULINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_long((value), 64, "value") - if data_type == "ULINT": # List + elif data_type == "ULINT": # List values: PlcList = _value @@ -523,12 +523,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_long((value), 64, "value") - if data_type == "REAL" and number_of_values == int(1): # REAL + elif data_type == "REAL" and number_of_values == int(1): # REAL # Simple Field (value) value: float = _value.get_float() write_buffer.write_float((value), 32, "value") - if data_type == "REAL": # List + elif data_type == "REAL": # List values: PlcList = _value @@ -536,12 +536,12 @@ def static_serialize( value: float = val.get_float() write_buffer.write_float((value), 32, "value") - if data_type == "LREAL" and number_of_values == int(1): # LREAL + elif data_type == "LREAL" and number_of_values == int(1): # LREAL # Simple Field (value) value: float = _value.get_float() write_buffer.write_double((value), 64, "value") - if data_type == "LREAL": # List + elif data_type == "LREAL": # List values: PlcList = _value @@ -549,12 +549,12 @@ def static_serialize( value: float = val.get_float() write_buffer.write_double((value), 64, "value") - if data_type == "CHAR" and number_of_values == int(1): # CHAR + elif data_type == "CHAR" and number_of_values == int(1): # CHAR # Simple Field (value) value: str = _value.get_str() write_buffer.write_str((value), 8, "UTF-8", "value") - if data_type == "CHAR": # List + elif data_type == "CHAR": # List values: PlcList = _value @@ -562,12 +562,12 @@ def static_serialize( value: str = val.get_str() write_buffer.write_str((value), 8, "UTF-8", "value") - if data_type == "WCHAR" and number_of_values == int(1): # WCHAR + elif data_type == "WCHAR" and number_of_values == int(1): # WCHAR # Simple Field (value) value: str = _value.get_str() write_buffer.write_str((value), 16, "UTF-16", "value") - if data_type == "WCHAR": # List + elif data_type == "WCHAR": # List values: PlcList = _value @@ -575,12 +575,12 @@ def static_serialize( value: str = val.get_str() write_buffer.write_str((value), 16, "UTF-16", "value") - if data_type == "STRING": # STRING + elif data_type == "STRING": # STRING # Simple Field (value) value: str = _value.get_str() write_buffer.write_str((value), 255, "UTF-8", "value") - if data_type == "WSTRING": # STRING + elif data_type == "WSTRING": # STRING # Simple Field (value) value: str = _value.get_str() diff --git a/sandbox/plc4py/plc4py/protocols/umas/readwrite/DataItem.py b/sandbox/plc4py/plc4py/protocols/umas/readwrite/DataItem.py index 785889ba84d..6458cba3ba1 100644 --- a/sandbox/plc4py/plc4py/protocols/umas/readwrite/DataItem.py +++ b/sandbox/plc4py/plc4py/protocols/umas/readwrite/DataItem.py @@ -45,7 +45,7 @@ class DataItem: def static_parse( read_buffer: ReadBuffer, data_type: UmasDataType, number_of_values: int ): - if data_type == UmasDataType.BOOL and number_of_values == int(1): # BOOL + if data_type == UmasDataType.BOOL & number_of_values == int(1): # BOOL # Reserved Field (Compartmentalized so the "reserved" variable can't leak) reserved: int = read_buffer.read_unsigned_short(7, logical_name="") @@ -67,11 +67,11 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcBOOL(bool(read_buffer.read_bit("")))) return PlcList(value) - if data_type == UmasDataType.BYTE and number_of_values == int(1): # BYTE + if data_type == UmasDataType.BYTE & number_of_values == int(1): # BYTE # Simple Field (value) value: int = read_buffer.read_unsigned_short(8, logical_name="") @@ -82,7 +82,7 @@ def static_parse( # Count array item_count: int = int(number_of_values * int(8)) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcBOOL(bool(read_buffer.read_bit("")))) return PlcList(value) @@ -98,7 +98,7 @@ def static_parse( value: int = read_buffer.read_unsigned_long(32, logical_name="") return PlcDWORD(value) - if data_type == UmasDataType.INT and number_of_values == int(1): # INT + if data_type == UmasDataType.INT & number_of_values == int(1): # INT # Simple Field (value) value: int = read_buffer.read_short(16, logical_name="") @@ -109,11 +109,11 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcINT(int(read_buffer.read_short(16, logical_name="")))) return PlcList(value) - if data_type == UmasDataType.DINT and number_of_values == int(1): # DINT + if data_type == UmasDataType.DINT & number_of_values == int(1): # DINT # Simple Field (value) value: int = read_buffer.read_int(32, logical_name="") @@ -124,11 +124,11 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append(PlcDINT(int(read_buffer.read_int(32, logical_name="")))) return PlcList(value) - if data_type == UmasDataType.UINT and number_of_values == int(1): # UINT + if data_type == UmasDataType.UINT & number_of_values == int(1): # UINT # Simple Field (value) value: int = read_buffer.read_unsigned_int(16, logical_name="") @@ -139,13 +139,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcUDINT(int(read_buffer.read_unsigned_int(16, logical_name=""))) ) return PlcList(value) - if data_type == UmasDataType.UDINT and number_of_values == int(1): # UDINT + if data_type == UmasDataType.UDINT & number_of_values == int(1): # UDINT # Simple Field (value) value: int = read_buffer.read_unsigned_long(32, logical_name="") @@ -156,13 +156,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcULINT(int(read_buffer.read_unsigned_long(32, logical_name=""))) ) return PlcList(value) - if data_type == UmasDataType.REAL and number_of_values == int(1): # REAL + if data_type == UmasDataType.REAL & number_of_values == int(1): # REAL # Simple Field (value) value: float = read_buffer.read_float(32, logical_name="") @@ -173,13 +173,13 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcREAL(float(read_buffer.read_float(32, logical_name=""))) ) return PlcList(value) - if data_type == UmasDataType.STRING and number_of_values == int(1): # STRING + if data_type == UmasDataType.STRING & number_of_values == int(1): # STRING # Manual Field (value) value: str = (str)( StaticHelper.parse_terminated_string_bytes( @@ -193,7 +193,7 @@ def static_parse( # Count array item_count: int = int(number_of_values) value: List[PlcValue] = [] - for cur_item in range(item_count): + for _ in range(item_count): value.append( PlcREAL(float(read_buffer.read_float(32, logical_name=""))) ) @@ -216,7 +216,7 @@ def static_serialize( # Simple Field (value) value: bool = _value.get_bool() write_buffer.write_bit((value), "value") - if data_type == UmasDataType.BOOL: # List + elif data_type == UmasDataType.BOOL: # List values: PlcList = _value @@ -224,12 +224,12 @@ def static_serialize( value: bool = val.get_bool() write_buffer.write_bit((value), "value") - if data_type == UmasDataType.BYTE and number_of_values == int(1): # BYTE + elif data_type == UmasDataType.BYTE and number_of_values == int(1): # BYTE # Simple Field (value) value: int = _value.get_int() write_buffer.write_byte((value), 8, "value") - if data_type == UmasDataType.BYTE: # List + elif data_type == UmasDataType.BYTE: # List values: PlcList = _value @@ -237,22 +237,22 @@ def static_serialize( value: bool = val.get_bool() write_buffer.write_bit((value), "value") - if data_type == UmasDataType.WORD: # WORD + elif data_type == UmasDataType.WORD: # WORD # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == UmasDataType.DWORD: # DWORD + elif data_type == UmasDataType.DWORD: # DWORD # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == UmasDataType.INT and number_of_values == int(1): # INT + elif data_type == UmasDataType.INT and number_of_values == int(1): # INT # Simple Field (value) value: int = _value.get_int() write_buffer.write_short((value), 16, "value") - if data_type == UmasDataType.INT: # List + elif data_type == UmasDataType.INT: # List values: PlcList = _value @@ -260,12 +260,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_short((value), 16, "value") - if data_type == UmasDataType.DINT and number_of_values == int(1): # DINT + elif data_type == UmasDataType.DINT and number_of_values == int(1): # DINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_int((value), 32, "value") - if data_type == UmasDataType.DINT: # List + elif data_type == UmasDataType.DINT: # List values: PlcList = _value @@ -273,12 +273,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_int((value), 32, "value") - if data_type == UmasDataType.UINT and number_of_values == int(1): # UINT + elif data_type == UmasDataType.UINT and number_of_values == int(1): # UINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == UmasDataType.UINT: # List + elif data_type == UmasDataType.UINT: # List values: PlcList = _value @@ -286,12 +286,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_short((value), 16, "value") - if data_type == UmasDataType.UDINT and number_of_values == int(1): # UDINT + elif data_type == UmasDataType.UDINT and number_of_values == int(1): # UDINT # Simple Field (value) value: int = _value.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == UmasDataType.UDINT: # List + elif data_type == UmasDataType.UDINT: # List values: PlcList = _value @@ -299,12 +299,12 @@ def static_serialize( value: int = val.get_int() write_buffer.write_unsigned_int((value), 32, "value") - if data_type == UmasDataType.REAL and number_of_values == int(1): # REAL + elif data_type == UmasDataType.REAL and number_of_values == int(1): # REAL # Simple Field (value) value: float = _value.get_float() write_buffer.write_float((value), 32, "value") - if data_type == UmasDataType.REAL: # List + elif data_type == UmasDataType.REAL: # List values: PlcList = _value @@ -312,11 +312,11 @@ def static_serialize( value: float = val.get_float() write_buffer.write_float((value), 32, "value") - if data_type == UmasDataType.STRING and number_of_values == int(1): # STRING + elif data_type == UmasDataType.STRING and number_of_values == int(1): # STRING # Manual Field (value) serialize_terminated_string(write_buffer, self.value, self.number_of_values) - if data_type == UmasDataType.STRING: # List + elif data_type == UmasDataType.STRING: # List values: PlcList = _value diff --git a/sandbox/plc4py/plc4py/spi/values/Common.py b/sandbox/plc4py/plc4py/spi/values/Common.py index a407235b3fd..ee3d6324a1c 100644 --- a/sandbox/plc4py/plc4py/spi/values/Common.py +++ b/sandbox/plc4py/plc4py/spi/values/Common.py @@ -16,10 +16,15 @@ # specific language governing permissions and limitations # under the License. # - -from plc4py.spi.generation import WriteBuffer +from abc import abstractmethod class Serializable: - def serialize(self, write_buffer: WriteBuffer): + + @abstractmethod + def serialize(self, write_buffer): + """Serialize an object to the WriteBuffer provided. + + :param write_buffer: The WriteBuffer to serialize to + """ pass