From b59c5bcef1c9c3bbbe511c7e441611ba6aba9fb0 Mon Sep 17 00:00:00 2001 From: Trevor Date: Fri, 1 Dec 2023 19:19:24 -0500 Subject: [PATCH] Fix pipeline state update handling (#777) --- .gencode_hash.txt | 10 +- bin/container | 7 +- bin/set_project | 4 +- bin/test_itemcheck | 4 +- bin/test_itemized | 4 +- .../com/google/udmi/util/GeneralUtils.java | 16 +- etc/schema_itemized.out | 53 ++- etc/test_itemized.out | 4 +- etc/validator.out | 24 +- gencode/docs/configuration_pod.html | 417 ++++++++++-------- gencode/docs/monitoring.html | 4 +- .../udmi/schema/BridgePodConfiguration.java | 14 +- gencode/java/udmi/schema/Envelope.java | 2 + .../udmi/schema/configuration_pod_bridge.py | 12 +- .../main/java/daq/pubber/PointsetManager.java | 7 +- pubber/src/main/java/daq/pubber/Pubber.java | 9 +- .../main/java/daq/pubber/SystemManager.java | 11 +- schema/configuration_pod_bridge.json | 5 +- schema/envelope.json | 2 + tests/schemas/envelope/errors1.out | 2 +- udmis/Dockerfile.udmis | 3 +- udmis/bin/run | 10 + udmis/etc/k8s_udmis.yaml | 13 +- udmis/etc/prod_pod.json | 15 + .../bos/udmi/service/core/StateProcessor.java | 26 +- .../messaging/impl/LocalMessagePipe.java | 12 +- .../service/messaging/impl/MessageBase.java | 68 ++- .../messaging/impl/TraceMessagePipe.java | 34 +- .../bos/udmi/service/pod/UdmiServicePod.java | 10 +- udmis/src/test/configs/bridge_pod.json | 2 +- udmis/src/test/configs/trace_pod.json | 2 +- .../service/core/BridgeProcessorTest.java | 2 +- .../messaging/impl/TraceMessagePipeTest.java | 3 +- .../udmi/service/pod/UdmiServicePodTest.java | 12 +- 34 files changed, 478 insertions(+), 345 deletions(-) diff --git a/.gencode_hash.txt b/.gencode_hash.txt index afb2cd5f6c..911623dfe4 100644 --- a/.gencode_hash.txt +++ b/.gencode_hash.txt @@ -4,7 +4,7 @@ cde866298a8a54168d3582b50ec93160209d07338d3d6c58d215855bc35cb158 gencode/docs/c 4809ac52b172f1754cc311928c9b54611ec52837d7cce11ee03789c603cbc655 gencode/docs/config_mapping.html 08583688b20f892c0b453f41787ac01a46ac601663736bcd6ed6f57be0758e79 gencode/docs/configuration_endpoint.html 6cf94d6cb600c75cde32a64bd78acb3ed3b54adfad08dbf6bb159b467e8925c9 gencode/docs/configuration_execution.html -d0f6ea11943f46c955891b3b49a25a9bffcf558fcabb9ed2b0f7667d8413953b gencode/docs/configuration_pod.html +6f8a3766b840e96881e3573d594e28864917efb288e068d6c82de55deb576694 gencode/docs/configuration_pod.html fe6dabc7b6634449d457df758bcd441e7ecb87d8461c7e1ce26364f6b7f66f7f gencode/docs/configuration_pubber.html 69bee6bfcd2f786d95cfd563ecb497296361c8945a1422385f78d25008f38ef1 gencode/docs/event.html 0a41807b999f80bd283fcee03e6df5845e13b122d9be7c7d4723b19a334b89d5 gencode/docs/event_discovery.html @@ -13,7 +13,7 @@ d2fe30200f9a1c530d144137a7774b5b97f268aae41185bfcb25de274d399250 gencode/docs/e d39b9cbfbdd3f6b5a34396b6a7ab0e7f92a07264ce216d26e57483b22f751f37 gencode/docs/event_system.html 04a864b8b37b01a3b234ba86e7e123ebaf3c98bba497c2f06d23c4550ae8194e gencode/docs/event_validation.html fa237fe9d96c2809bf9562abdc5c4f7c7d933010beb5c54407caa4453dec30f9 gencode/docs/metadata.html -959f637f27f736c47411c29e36d1e994933f5c7f9b10aa400a60c0b12630caa3 gencode/docs/monitoring.html +0aa459d46e8dc4c1f40e574cad107ae212d08b41fdde1660c052a4dd21a197cc gencode/docs/monitoring.html 180b32717db748e164a185b163ef9a97aa83d9d6add306283d5b9852d04af947 gencode/docs/persistent_device.html 5d039d607af9ec75ee552dfe36b16c702687ea16f5663f41fc49b4533b86e00d gencode/docs/properties.html 1766f84518a315fe57e4a4bf934c0a386ad61d87091754a6bab097c686c16019 gencode/docs/readme.md @@ -33,7 +33,7 @@ ce2c747fab0d374987acc51474a52ca5b3d64659d51cffa671d5442b7114339a gencode/java/u c033a4b2c9920a4314801d1fbb7885b375a4bb890344de937ed30baf4f2c08e1 gencode/java/udmi/schema/BlobBlobsetState.java b9f903444ab08907e41eb123286434ff3207b1edd01397af3ddefb8475bbdadc gencode/java/udmi/schema/BlobsetConfig.java fcbed49f1af8b791d8c52bcbe18f65521a79d9ac3eb33ec3afd9b342ab2bfc56 gencode/java/udmi/schema/BlobsetState.java -e28c3899bf2cb08cf456dbca29ccc3d4559c1d2145e252095689b171be82b4c6 gencode/java/udmi/schema/BridgePodConfiguration.java +3ee2d42e34f36350925595a6904978698c0d92b0b5d417876bd7bf25b0780210 gencode/java/udmi/schema/BridgePodConfiguration.java 2e7c0912c6f43b8dae9e9149b7361d3640bde4f01de6e13d8e487fe01be2c2c0 gencode/java/udmi/schema/Bucket.java 0a4f6bcd5065418c1cdc6c05b900b3de31744847d25b6ab6de7aabb1e724710e gencode/java/udmi/schema/BuildingTranslation.java c47b17d70ed7fffae5cb829623088358eca22c8fa5613edd97518bed0b522620 gencode/java/udmi/schema/CapabilityValidationState.java @@ -54,7 +54,7 @@ e809df42a73ed843977e447fa3333140dcdbe7d09b4e88bf1b8fd52fbaf9ae1a gencode/java/u b01b7a05f21651a01a605466bb80b2e537059dcbb41f33734c851432da57f657 gencode/java/udmi/schema/EndpointConfiguration.java dd2eb479a8e93a851c535c8b40fbd62e152bd60e0473f3b23800ec61f798bed0 gencode/java/udmi/schema/Entry.java 06758aca1e0043ddf343b504030f47bb19260e99a82e2d66f12e86092a2434ca gencode/java/udmi/schema/Enumerate.java -05e3443f9a9da29ed561310b13be1d14459d9dfe292a438e42af2fbd2165a606 gencode/java/udmi/schema/Envelope.java +8a51984458d96d1798d067005902defa57410167a27dcfb0b730ea38a3326011 gencode/java/udmi/schema/Envelope.java e9f5c77be81486b6b8c6d88f70f2d50583d8c3fafa2ac09ead80f44b8d5e751e gencode/java/udmi/schema/Event.java 9ff3d1819b5a92713b26f56d08c77dcbb467c6ffc10f75744c6b6a227a13319b gencode/java/udmi/schema/ExecutionConfiguration.java 2e77a29988e6c17875d3f781be931e3b835bb310639f60d3bb28f24405a86bed gencode/java/udmi/schema/FamilyDiscoveryConfig.java @@ -149,7 +149,7 @@ ae3f2f71fc81b1cffc45ffe9d1c179fba8bb09a398a305e7ba42aa8aeabe125a gencode/python 14fd646b9a8638b87e4c421c9dadfb7ed2e66ad02b256217423e3b5dd6c39fd1 gencode/python/udmi/schema/configuration_execution.py e30f937983f98673b3e67ac1369fe86964d785092964f7e95cd39611f9283d7c gencode/python/udmi/schema/configuration_pod.py ccc43757750379f3c072f019b25b0ea8d970c48dd0b66e98ec2b0bfb1635a952 gencode/python/udmi/schema/configuration_pod_base.py -11c8841ed5c2a5bcaf4b44c943c8f70fcb5010f1027a025b46300435353b2432 gencode/python/udmi/schema/configuration_pod_bridge.py +ef61eea743cc2629893b18411636672cdfec0e209e58eb7918b33b43edab5196 gencode/python/udmi/schema/configuration_pod_bridge.py bed77c13436a192047a0dcdcaea7c5d7175e99a76c6c40409cce9e232ab5bc12 gencode/python/udmi/schema/configuration_pubber.py fbb4b2c04c170c0da5cdd868612429fe920e44b591fcad2522b2e047d580d537 gencode/python/udmi/schema/entry.py fcc75eec4263b11b378e30f929687cfb9619c8dfc524863b6bfb11b7e33be241 gencode/python/udmi/schema/enumeration_feature.py diff --git a/bin/container b/bin/container index 72f5345f0d..c5df81f550 100755 --- a/bin/container +++ b/bin/container @@ -62,8 +62,6 @@ fi echo Using GCP project $GCP_PROJECT echo Using udmi namespace $UDMI_NAMESPACE -LIBFILE=build/libs/${target}-1.0-SNAPSHOT-all.jar - current_user=$USER@$HOSTNAME revparse=`git rev-parse HEAD` @@ -103,6 +101,8 @@ if [[ -n $prep ]]; then bin/build + LIBFILE=build/libs/*-1.0-SNAPSHOT-all.jar + build_time=`date --utc -Imin -r $LIBFILE` cat < var/deployed_version.json { @@ -128,7 +128,8 @@ if [[ -n $push ]]; then for file in $TEMPLATES; do cp etc/$file tmp/$file sed -i tmp/$file \ - -e "s^@IMAGE-$target@^$ihash^" \ + -e "s^@IMAGE@^$ihash^" \ + -e "s^@TARGET@^$target^" \ -e "s^@UDMI_NAMESPACE@^$UDMI_NAMESPACE^" \ -e "s^@GCP_PROJECT@^$GCP_PROJECT^" if diff etc/$file tmp/$file; then diff --git a/bin/set_project b/bin/set_project index f64ba49066..181c587dd7 100755 --- a/bin/set_project +++ b/bin/set_project @@ -23,9 +23,9 @@ cd $ROOT echo Configuring for GCP project $gcp_project... current_project=$(gcloud config get project) -quota_project=$(jq -r .quota_project_id $HOME/.config/gcloud/application_default_credentials.json) +quota_project=$(jq -r .quota_project_id $HOME/.config/gcloud/application_default_credentials.json) || true -if [[ $quota_project != $gcp_project ]]; then +if [[ $quota_project != $gcp_project && -n $quota_project ]]; then echo Setting gcloud quota project to $gcp_project gcloud auth application-default set-quota-project $gcp_project fi diff --git a/bin/test_itemcheck b/bin/test_itemcheck index 02fdbf8059..7a1c8ee4e4 100755 --- a/bin/test_itemcheck +++ b/bin/test_itemcheck @@ -11,10 +11,10 @@ GOLDEN_SCHEMAS=$UDMI_ROOT/etc/schema_itemized.out failures= -echo Comparing diff $RESULTS_OUT $GOLDEN_FILE... +echo Comparing diff $RESULTS_OUT $GOLDEN_FILE diff -bu $RESULTS_OUT $GOLDEN_FILE || failures+="results " -echo Comparing diff $SCHEMA_OUT $GOLDEN_SCHEMAS... +echo Comparing diff $SCHEMA_OUT $GOLDEN_SCHEMAS diff -bu $SCHEMA_OUT $GOLDEN_SCHEMAS || failures+="schema " [[ -z $failures ]] || fail itemized checks failied because of $failures diff --git a/bin/test_itemized b/bin/test_itemized index 0106ccf4cb..1a7a6fb550 100755 --- a/bin/test_itemized +++ b/bin/test_itemized @@ -117,9 +117,9 @@ while read -u 7 action test_name pubber_opts; do mv $SITE_PATH/out $SITE_PATH/out-$test_marker if [[ -s $SCHEMA_OUT ]]; then - cat $SCHEMA_OUT | sed -e "s/^/$test_name /" >> $SCHEMA_ITEMIZED + sed -e "s/^/$test_marker $test_name /" < $SCHEMA_OUT >> $SCHEMA_ITEMIZED else - echo $test_name >> $SCHEMA_ITEMIZED + echo $test_marker $test_name >> $SCHEMA_ITEMIZED fi done 7< $INPUT_FILE diff --git a/common/src/main/java/com/google/udmi/util/GeneralUtils.java b/common/src/main/java/com/google/udmi/util/GeneralUtils.java index af2b9b898b..cb152c033b 100644 --- a/common/src/main/java/com/google/udmi/util/GeneralUtils.java +++ b/common/src/main/java/com/google/udmi/util/GeneralUtils.java @@ -319,22 +319,30 @@ public static T catchOrElse(Supplier provider, Supplier alternate) { return alternate.get(); } - public static boolean catchToFalse(Supplier provider) { + public static T catchToElse(Supplier provider, Function alternate) { try { return provider.get(); } catch (Exception e) { - return false; + return alternate.apply(e); } } - public static T catchToNull(Supplier provider) { + public static T catchToElse(Supplier provider, T alternate) { try { return provider.get(); } catch (Exception e) { - return null; + return alternate; } } + public static boolean catchToFalse(Supplier provider) { + return catchToElse(provider, false); + } + + public static T catchToNull(Supplier provider) { + return catchToElse(provider, (T) null); + } + public static U mapReplace(U previous, U added) { return added; } diff --git a/etc/schema_itemized.out b/etc/schema_itemized.out index c430be893c..de48626c61 100644 --- a/etc/schema_itemized.out +++ b/etc/schema_itemized.out @@ -1,21 +1,32 @@ -broken_config -config_logging -device_config_acked -feature_enumeration -gateway_proxy_events -pointset_remove_point -pointset_request_extraneous -pointset_sample_rate -state_make_model -system_last_update -system_last_update RESULT fail schemas event_pointset_stable STABLE 5/5 Schema violations found -system_last_update RESULT pass schemas event_system_stable STABLE 5/5 All schema validations passed -system_last_update RESULT pass schemas state_update_stable STABLE 5/5 All schema validations passed -system_min_loglevel -system_mode_restart -too_much_state -valid_serial_no -valid_serial_no RESULT pass schemas event_pointset_alpha ALPHA 5/5 All schema validations passed -valid_serial_no RESULT pass schemas event_system_alpha ALPHA 5/5 All schema validations passed -valid_serial_no RESULT pass schemas state_update_alpha ALPHA 5/5 All schema validations passed -writeback_success +01 device_config_acked +02 pointset_request_extraneous +03 pointset_remove_point +04 pointset_remove_point +05 too_much_state +06 feature_enumeration +07 valid_serial_no RESULT pass schemas event_pointset_alpha ALPHA 5/5 All schema validations passed +07 valid_serial_no RESULT pass schemas event_system_alpha ALPHA 5/5 All schema validations passed +07 valid_serial_no RESULT pass schemas state_update_alpha ALPHA 5/5 All schema validations passed +08 writeback_success +09 writeback_success +10 pointset_sample_rate +11 system_mode_restart +12 config_logging +13 broken_config +14 broken_config +15 broken_config +16 broken_config +17 broken_config +18 system_last_update RESULT fail schemas event_pointset_stable STABLE 5/5 Schema violations found +18 system_last_update RESULT fail schemas state_update_stable STABLE 5/5 Schema violations found +18 system_last_update RESULT pass schemas event_system_stable STABLE 5/5 All schema validations passed +19 state_make_model +23 valid_serial_no +24 system_last_update +25 system_min_loglevel +26 system_min_loglevel +30 gateway_proxy_events +31 gateway_proxy_events +35 gateway_proxy_events +36 gateway_proxy_events +40 device_config_acked diff --git a/etc/test_itemized.out b/etc/test_itemized.out index b3e6813978..686d3ece5b 100644 --- a/etc/test_itemized.out +++ b/etc/test_itemized.out @@ -21,7 +21,7 @@ 17 CPBLTY fail system broken_config.logging ALPHA 0/1 Timeout after 30s waiting for log category `system.config.receive` level `DEBUG` to be logged 17 RESULT fail system broken_config ALPHA 0/5 expected: but was: 18 RESULT pass system system_last_update STABLE 5/5 Sequence complete -19 RESULT fail system state_make_model BETA 0/5 Pipeline type event error: While processing message REDACTED +19 RESULT fail system state_make_model BETA 0/5 Timeout waiting for no applicable system status 23 RESULT skip system valid_serial_no ALPHA 0/0 State testing disabled 24 RESULT fail system system_last_update STABLE 0/5 Timeout waiting for state last_config matches config timestamp 25 RESULT fail system system_min_loglevel ALPHA 0/5 Received state update with no-state device @@ -29,5 +29,5 @@ 30 RESULT pass gateway gateway_proxy_events ALPHA 5/5 Sequence complete 31 RESULT fail gateway gateway_proxy_events ALPHA 0/5 Timeout waiting for Missing data from AHU-22 35 RESULT pass gateway gateway_proxy_events ALPHA 5/5 Sequence complete -36 RESULT pass gateway gateway_proxy_events ALPHA 5/5 Sequence complete +36 RESULT fail gateway gateway_proxy_events ALPHA 0/5 Received state update with no-state device 40 RESULT skip system device_config_acked BETA 0/0 No config check for proxy device diff --git a/etc/validator.out b/etc/validator.out index 9f3488d4c8..d33a4f4cf0 100644 --- a/etc/validator.out +++ b/etc/validator.out @@ -102,9 +102,9 @@ sites/udmi_site_model/out/devices/AHU-1/state.out "sub_folder" : "update", "sub_type" : "state", "status" : { - "message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold", - "detail" : "state_update: Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold REDACTED_ERROR", - "category" : "validation.device.schema", + "message" : "Multiple validation errors", + "detail" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold; While converting to json node: 1 schema violations found", + "category" : "validation.device.multiple", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 }, @@ -114,6 +114,12 @@ sites/udmi_site_model/out/devices/AHU-1/state.out "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 + }, { + "message" : "While converting to json node: 1 schema violations found", + "detail" : "state_update: While converting to json node: 1 schema violations found REDACTED_ERROR; 1 schema violations found; /system: object instance has properties which are not allowed by the schema: [\"extraField\"]", + "category" : "validation.device.schema", + "timestamp" : "REDACTED_TIMESTAMP", + "level" : 500 } ] } :::::::::::::: @@ -316,9 +322,9 @@ sites/udmi_site_model/out/devices/GAT-123/event_invalid.out "sub_folder" : "invalid", "sub_type" : "event", "status" : { - "message" : "Multiple validation errors", - "detail" : "Unknown schema subFolder 'event_invalid' for GAT-123; While converting to json node: 1 schema violations found", - "category" : "validation.device.multiple", + "message" : "Unknown schema subFolder 'event_invalid' for GAT-123", + "detail" : "event_invalid: Unknown schema subFolder 'event_invalid' for GAT-123 REDACTED_ERROR", + "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 }, @@ -328,12 +334,6 @@ sites/udmi_site_model/out/devices/GAT-123/event_invalid.out "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 - }, { - "message" : "While converting to json node: 1 schema violations found", - "detail" : "event_invalid: While converting to json node: 1 schema violations found REDACTED_ERROR; 1 schema violations found; /subFolder: instance value (\"invalid\") not found in enum (possible values: [\"udmi\",\"update\",\"error\",\"cloud\",\"discovery\",\"mapping\",\"system\",\"gateway\",\"swarm\",\"localnet\",\"pointset\",\"validation\",\"blobset\",\"monitoring\"])", - "category" : "validation.device.schema", - "timestamp" : "REDACTED_TIMESTAMP", - "level" : 500 } ] } :::::::::::::: diff --git a/gencode/docs/configuration_pod.html b/gencode/docs/configuration_pod.html index fd2a77e827..22c2bd2b53 100644 --- a/gencode/docs/configuration_pod.html +++ b/gencode/docs/configuration_pod.html @@ -2008,6 +2008,53 @@

+
+
+
+

+ +

+
+ +
+
+ + Type: string
+ + + + + + + +
+
+
+
@@ -3103,18 +3150,18 @@

-
+
-
+

- +

-
+
Type: object
+ morf
Type: object

Parameters to define a message endpoint

@@ -3149,18 +3196,18 @@

-
+
-
+

- +

-
+
Type: enum (of string)
-Type: enum (of string)
+

Must be one of:

  • "local"
  • "pubsub"
  • "file"
  • "trace"
  • "mqtt"
@@ -3206,18 +3253,18 @@

Must be one of:

-
+
-
+

- +

-
+
Type: enum (of string)
-Type: enum (of string)
+

Must be one of:

  • "ssl"
  • "tcp"
@@ -3263,18 +3310,18 @@

Must be one of:

-
+
-
+

- +

-
+
Type: string
+ hostname
Type: string
@@ -3317,18 +3364,18 @@

-
+
-
+

- +

-
+
Type: string
+ error
Type: string

Error message container for capturing errors during parsing/handling

@@ -3372,18 +3419,18 @@

-
+
-
+

- +

-
+
Type: integer Default: 8883
+ port
Type: integer Default: 8883
@@ -3426,18 +3473,18 @@

-
+
-
+

- +

-
+
Type: integer
+ config_sync_sec
Type: integer

Delay waiting for config message on start, 0 for default, <0 to disable

@@ -3481,18 +3528,18 @@

-
+
-
+

- +

-
+
Type: string
+ client_id
Type: string
@@ -3535,18 +3582,18 @@

-
+
-
+

- +

-
+
Type: string
+ msg_prefix
Type: string

Prefix for message topics/addresses

-
Must match regular expression: ^[-_/a-zA-Z0-9]+$ +Must match regular expression: ^[-_/a-zA-Z0-9]+$ @@ -3590,18 +3637,18 @@

-
+
-
+

- +

-
+
Type: string
+ recv_id
Type: string

Id for the receiving message channel

-
Must match regular expression: ^[-_/a-zA-Z0-9#]+$ +Must match regular expression: ^[-_/a-zA-Z0-9#]+$ @@ -3645,18 +3692,18 @@

-
+
-
+

- +

-
+
Type: string
+ send_id
Type: string

Id for the sending messages channel

-
Must match regular expression: ^[-_/a-zA-Z0-9#]+$ +Must match regular expression: ^[-_/a-zA-Z0-9#]+$ @@ -3700,18 +3747,18 @@

-
+
-
+

- +

-
+
Type: string
+ distributor
Type: string

processor designation for a distributor channel

-
Must match regular expression: ^[a-z][a-z0-9]*(_[a-z0-9]+)*$ +Must match regular expression: ^[a-z][a-z0-9]*(_[a-z0-9]+)*$ @@ -3755,18 +3802,18 @@

-
+
-
+

- +

-
+
Type: object
+ auth_provider
Type: object
No Additional Properties @@ -3805,18 +3852,18 @@

-
+
-
+

- +

-
+
Type: object
+ basic
Type: object
No Additional Properties @@ -3862,18 +3909,18 @@

-
+
-
+

- +

-
+
Type: string
+ username
Type: string
@@ -3930,18 +3977,18 @@

-
+
-
+

- +

-
+
Type: string
+ password
Type: string
@@ -4002,18 +4049,18 @@

-
+
-
+

- +

-
+
Type: object
+ jwt
Type: object
No Additional Properties @@ -4059,18 +4106,18 @@

-
+
-
+

- +

-
+
Type: string
+ audience
Type: string
@@ -4135,18 +4182,18 @@

-
+
-
+

- +

-
+
Type: string
+ generation
Type: string

The timestamp of the endpoint generation

@@ -4187,7 +4234,7 @@


Example:
-
"2019-01-17T14:02:29.364Z"
+
"2019-01-17T14:02:29.364Z"
 
diff --git a/gencode/docs/monitoring.html b/gencode/docs/monitoring.html index bc4aa1eead..d7e8f363e8 100644 --- a/gencode/docs/monitoring.html +++ b/gencode/docs/monitoring.html @@ -3945,7 +3945,7 @@

subFolder

Type: enum (of string)

Must be one of:

-
  • "udmi"
  • "update"
  • "error"
  • "cloud"
  • "discovery"
  • "mapping"
  • "system"
  • "gateway"
  • "swarm"
  • "localnet"
  • "pointset"
  • "validation"
  • "blobset"
  • "monitoring"
+
  • "udmi"
  • "update"
  • "invalid"
  • "error"
  • "cloud"
  • "discovery"
  • "mapping"
  • "system"
  • "gateway"
  • "swarm"
  • "localnet"
  • "pointset"
  • "validation"
  • "blobset"
  • "monitoring"
@@ -3995,7 +3995,7 @@

subType

Type: enum (of string)

Must be one of:

-
  • "event"
  • "command"
  • "config"
  • "state"
  • "query"
  • "reply"
  • "model"
+
  • "invalid"
  • "event"
  • "command"
  • "config"
  • "state"
  • "query"
  • "reply"
  • "model"
diff --git a/gencode/java/udmi/schema/BridgePodConfiguration.java b/gencode/java/udmi/schema/BridgePodConfiguration.java index 69fa5f6d95..977baf3b1c 100644 --- a/gencode/java/udmi/schema/BridgePodConfiguration.java +++ b/gencode/java/udmi/schema/BridgePodConfiguration.java @@ -16,12 +16,15 @@ */ @JsonInclude(JsonInclude.Include.NON_NULL) @JsonPropertyOrder({ + "enabled", "from", - "to" + "morf" }) @Generated("jsonschema2pojo") public class BridgePodConfiguration { + @JsonProperty("enabled") + public String enabled; /** * Endpoint Configuration *

@@ -37,15 +40,16 @@ public class BridgePodConfiguration { * Parameters to define a message endpoint * */ - @JsonProperty("to") + @JsonProperty("morf") @JsonPropertyDescription("Parameters to define a message endpoint") - public EndpointConfiguration to; + public EndpointConfiguration morf; @Override public int hashCode() { int result = 1; result = ((result* 31)+((this.from == null)? 0 :this.from.hashCode())); - result = ((result* 31)+((this.to == null)? 0 :this.to.hashCode())); + result = ((result* 31)+((this.morf == null)? 0 :this.morf.hashCode())); + result = ((result* 31)+((this.enabled == null)? 0 :this.enabled.hashCode())); return result; } @@ -58,7 +62,7 @@ public boolean equals(Object other) { return false; } BridgePodConfiguration rhs = ((BridgePodConfiguration) other); - return (((this.from == rhs.from)||((this.from!= null)&&this.from.equals(rhs.from)))&&((this.to == rhs.to)||((this.to!= null)&&this.to.equals(rhs.to)))); + return ((((this.from == rhs.from)||((this.from!= null)&&this.from.equals(rhs.from)))&&((this.morf == rhs.morf)||((this.morf!= null)&&this.morf.equals(rhs.morf))))&&((this.enabled == rhs.enabled)||((this.enabled!= null)&&this.enabled.equals(rhs.enabled)))); } } diff --git a/gencode/java/udmi/schema/Envelope.java b/gencode/java/udmi/schema/Envelope.java index 884674e178..e96b011d64 100644 --- a/gencode/java/udmi/schema/Envelope.java +++ b/gencode/java/udmi/schema/Envelope.java @@ -116,6 +116,7 @@ public enum SubFolder { UDMI("udmi"), UPDATE("update"), + INVALID("invalid"), ERROR("error"), CLOUD("cloud"), DISCOVERY("discovery"), @@ -166,6 +167,7 @@ public static Envelope.SubFolder fromValue(String value) { @Generated("jsonschema2pojo") public enum SubType { + INVALID("invalid"), EVENT("event"), COMMAND("command"), CONFIG("config"), diff --git a/gencode/python/udmi/schema/configuration_pod_bridge.py b/gencode/python/udmi/schema/configuration_pod_bridge.py index 23188e5b55..db28c94d58 100644 --- a/gencode/python/udmi/schema/configuration_pod_bridge.py +++ b/gencode/python/udmi/schema/configuration_pod_bridge.py @@ -7,16 +7,18 @@ class BridgePodConfiguration: """Generated schema class""" def __init__(self): + self.enabled = None self.from = None - self.to = None + self.morf = None @staticmethod def from_dict(source): if not source: return None result = BridgePodConfiguration() + result.enabled = source.get('enabled') result.from = EndpointConfiguration.from_dict(source.get('from')) - result.to = EndpointConfiguration.from_dict(source.get('to')) + result.morf = EndpointConfiguration.from_dict(source.get('morf')) return result @staticmethod @@ -37,8 +39,10 @@ def expand_dict(input): def to_dict(self): result = {} + if self.enabled: + result['enabled'] = self.enabled # 5 if self.from: result['from'] = self.from.to_dict() # 4 - if self.to: - result['to'] = self.to.to_dict() # 4 + if self.morf: + result['morf'] = self.morf.to_dict() # 4 return result diff --git a/pubber/src/main/java/daq/pubber/PointsetManager.java b/pubber/src/main/java/daq/pubber/PointsetManager.java index 823874724d..a2eff7712a 100644 --- a/pubber/src/main/java/daq/pubber/PointsetManager.java +++ b/pubber/src/main/java/daq/pubber/PointsetManager.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import daq.pubber.Pubber.ExtraPointsetEvent; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -27,6 +26,7 @@ import udmi.schema.PointPointsetModel; import udmi.schema.PointPointsetState; import udmi.schema.PointsetConfig; +import udmi.schema.PointsetEvent; import udmi.schema.PointsetModel; import udmi.schema.PointsetState; import udmi.schema.PubberConfiguration; @@ -262,4 +262,9 @@ private void sendDevicePoints() { host.publish(pointsetEvent); } + static class ExtraPointsetEvent extends PointsetEvent { + + // This extraField exists only to trigger schema parsing errors. + public Object extraField; + } } diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java index 744ae0131d..ea9ddb7a4a 100644 --- a/pubber/src/main/java/daq/pubber/Pubber.java +++ b/pubber/src/main/java/daq/pubber/Pubber.java @@ -44,7 +44,9 @@ import daq.pubber.MqttPublisher.InjectedMessage; import daq.pubber.MqttPublisher.InjectedState; import daq.pubber.MqttPublisher.PublisherException; +import daq.pubber.PointsetManager.ExtraPointsetEvent; import daq.pubber.PubSubClient.Bundle; +import daq.pubber.SystemManager.ExtraSystemState; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.OutputStream; @@ -135,7 +137,7 @@ public class Pubber extends ManagerBase implements ManagerHost { private static final ImmutableMap, String> MESSAGE_TOPIC_SUFFIX_MAP = new ImmutableMap.Builder, String>() .put(State.class, MqttDevice.STATE_TOPIC) - .put(SystemState.class, MqttDevice.STATE_TOPIC) // Used for badState option + .put(ExtraSystemState.class, MqttDevice.STATE_TOPIC) // Used for badState option .put(SystemEvent.class, getEventsSuffix("system")) .put(PointsetEvent.class, getEventsSuffix("pointset")) .put(ExtraPointsetEvent.class, getEventsSuffix("pointset")) @@ -1557,9 +1559,4 @@ public void error(String message, Throwable e) { deviceManager.localLog(message, Level.TRACE, getTimestamp(), stackTraceString(e)); } - static class ExtraPointsetEvent extends PointsetEvent { - - // This extraField exists only to trigger schema parsing errors. - public Object extraField; - } } diff --git a/pubber/src/main/java/daq/pubber/SystemManager.java b/pubber/src/main/java/daq/pubber/SystemManager.java index a3fc208b3a..66e6924051 100644 --- a/pubber/src/main/java/daq/pubber/SystemManager.java +++ b/pubber/src/main/java/daq/pubber/SystemManager.java @@ -3,6 +3,7 @@ import static com.google.udmi.util.GeneralUtils.catchOrElse; import static com.google.udmi.util.GeneralUtils.catchToNull; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; +import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.GeneralUtils.ifNotTrueThen; import static com.google.udmi.util.GeneralUtils.isTrue; import static com.google.udmi.util.JsonUtil.getTimestamp; @@ -79,7 +80,7 @@ public class SystemManager extends ManagerBase { } private final List logentries = new ArrayList<>(); - private final SystemState systemState; + private final ExtraSystemState systemState; private final ManagerHost host; private int systemEventCount; private SystemConfig systemConfig; @@ -94,7 +95,7 @@ public SystemManager(ManagerHost host, PubberConfiguration configuration) { info("Device start time is " + getTimestamp(DEVICE_START_TIME)); - systemState = new SystemState(); + systemState = new ExtraSystemState(); systemState.operation = new StateSystemOperation(); if (!isTrue(options.noLastStart)) { @@ -108,6 +109,8 @@ public SystemManager(ManagerHost host, PubberConfiguration configuration) { systemState.serial_no = configuration.serialNo; systemState.last_config = new Date(0); + ifNotNullThen(options.extraField, value -> systemState.extraField = value); + updateState(); } @@ -296,4 +299,8 @@ public void pubberLogMessage(String logMessage, Level level, String timestamp, S logEntry.detail = detail; publishLogMessage(logEntry); } + + class ExtraSystemState extends SystemState { + public String extraField; + } } diff --git a/schema/configuration_pod_bridge.json b/schema/configuration_pod_bridge.json index 097b22311d..059f4cd71d 100644 --- a/schema/configuration_pod_bridge.json +++ b/schema/configuration_pod_bridge.json @@ -6,10 +6,13 @@ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "properties": { + "enabled": { + "type": "string" + }, "from": { "$ref": "file:configuration_endpoint.json" }, - "to": { + "morf": { "$ref": "file:configuration_endpoint.json" } } diff --git a/schema/envelope.json b/schema/envelope.json index 6a1842eba0..c5a70378b6 100644 --- a/schema/envelope.json +++ b/schema/envelope.json @@ -46,6 +46,7 @@ "enum": [ "udmi", "update", + "invalid", "error", "cloud", "discovery", @@ -62,6 +63,7 @@ }, "subType": { "enum": [ + "invalid", "event", "command", "config", diff --git a/tests/schemas/envelope/errors1.out b/tests/schemas/envelope/errors1.out index 49244df1d2..aebd2e7bba 100644 --- a/tests/schemas/envelope/errors1.out +++ b/tests/schemas/envelope/errors1.out @@ -2,5 +2,5 @@ /deviceId: ECMA 262 regex "^[A-Z]{2,6}-[0-9]{1,6}$" does not match input string "fcu-1" /deviceNumId: ECMA 262 regex "^[0-9]+$" does not match input string "921302198324X" /deviceRegistryId: ECMA 262 regex "^[a-zA-Z][-a-zA-Z0-9._+~%]*[a-zA-Z0-9]$" does not match input string "test/registry" - /subType: instance value (5) not found in enum (possible values: ["event","command","config","state","query","reply","model"]) + /subType: instance value (5) not found in enum (possible values: ["invalid","event","command","config","state","query","reply","model"]) object has missing required properties (["projectId","subFolder"]) diff --git a/udmis/Dockerfile.udmis b/udmis/Dockerfile.udmis index 8d98bb6de1..ca68e1455e 100644 --- a/udmis/Dockerfile.udmis +++ b/udmis/Dockerfile.udmis @@ -2,7 +2,7 @@ FROM alpine:latest WORKDIR /root -RUN apk add openjdk17 bash gcompat mosquitto-clients curl jq +RUN apk add bash openjdk17 gcompat curl jq # Workaround for https://github.com/grpc/grpc-java/issues/8751 ENV LD_PRELOAD=/lib/libgcompat.so.0 @@ -14,5 +14,6 @@ ADD bin/ bin/ ADD var/ var/ ENV CLEARBLADE_CONFIGURATION=/udmi/clearblade.json +ENV SHUNT_CONFIGURATION=/udmi/shunt-config.json CMD ["/root/bin/run", "var/prod_pod.json"] diff --git a/udmis/bin/run b/udmis/bin/run index 55ad9c7c9c..ea30a5400e 100755 --- a/udmis/bin/run +++ b/udmis/bin/run @@ -44,8 +44,18 @@ export UDMI_PREFIX= if [[ -n $UDMI_NAMESPACE ]]; then UDMI_PREFIX=${UDMI_NAMESPACE}~ echo Using UDMI_PREFIX $UDMI_PREFIX +else + echo No UDMI_NAMESPACE defined, so no UDMI_PREFIX fi +if [[ -n $SHUNT_CONFIGURATION && -s $SHUNT_CONFIGURATION ]]; then + export SHUNT_NAME=${UDMI_PREFIX}$(jq -r .name $SHUNT_CONFIGURATION) + export SHUNT_FROM=$(jq -r .from $SHUNT_CONFIGURATION) + export SHUNT_MORF=$(jq -r .morf $SHUNT_CONFIGURATION) + echo Configured udmi shunt $SHUNT_NAME between $SHUNT_FROM and $SHUNT_MORF +else + echo No SHUNT_CONFIGURATION defined, so no SHUNT_NAME configured. +fi if [[ -n $CLEARBLADE_PROJECT ]]; then echo CLEARBLADE_PROJECT defined, would be clobbered by value from CLEARBLADE_CONFIGURATION diff --git a/udmis/etc/k8s_udmis.yaml b/udmis/etc/k8s_udmis.yaml index 5ce9c4f31f..9cda16db66 100644 --- a/udmis/etc/k8s_udmis.yaml +++ b/udmis/etc/k8s_udmis.yaml @@ -1,24 +1,24 @@ apiVersion: apps/v1 kind: Deployment metadata: - name: udmis-pods + name: @TARGET@-pods spec: selector: matchLabels: - app: udmis + app: @TARGET@ role: master tier: backend replicas: 3 template: metadata: labels: - app: udmis + app: @TARGET@ role: master tier: backend spec: containers: - - name: udmis-core - image: @IMAGE-udmis@ + - name: @TARGET@-core + image: @IMAGE@ imagePullPolicy: Always readinessProbe: exec: @@ -51,6 +51,9 @@ spec: sources: - secret: name: clearblade.json + - secret: + name: shunt-config.json + optional: true - name: tmp emptyDir: medium: Memory diff --git a/udmis/etc/prod_pod.json b/udmis/etc/prod_pod.json index 91618d5c11..e771e298d0 100644 --- a/udmis/etc/prod_pod.json +++ b/udmis/etc/prod_pod.json @@ -48,5 +48,20 @@ "hostname": "udmis-broker", "port": "1883" } + }, + "bridges": { + "shunt": { + "enabled": "${SHUNT_NAME}", + "from": { + "hostname": "${SHUNT_FROM}", + "recv_id": "${SHUNT_NAME}-take", + "send_id": "${SHUNT_NAME}-put" + }, + "morf": { + "hostname": "${SHUNT_MORF}", + "recv_id": "${SHUNT_NAME}-take", + "send_id": "${SHUNT_NAME}-put" + } + } } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/StateProcessor.java b/udmis/src/main/java/com/google/bos/udmi/service/core/StateProcessor.java index 3ad4577866..2d4e81d9e2 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/StateProcessor.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/StateProcessor.java @@ -1,7 +1,7 @@ package com.google.bos.udmi.service.core; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; -import static com.google.udmi.util.JsonUtil.convertToStrict; +import static com.google.udmi.util.JsonUtil.convertTo; import static com.google.udmi.util.JsonUtil.stringify; import static com.google.udmi.util.JsonUtil.toMap; import static com.google.udmi.util.MessageUpgrader.STATE_SCHEMA; @@ -33,9 +33,17 @@ public class StateProcessor extends ProcessorBase { @Override protected void defaultHandler(Object originalMessage) { + MessageContinuation continuation = getContinuation(originalMessage); + Envelope envelope = continuation.getEnvelope(); + envelope.subType = SubType.STATE; + envelope.subFolder = UPDATE; + + reflectMessage(envelope, stringify(originalMessage)); + Object upgradedMessage = new MessageUpgrader(STATE_SCHEMA, originalMessage).upgrade(); - StateUpdate stateMessage = convertToStrict(StateUpdate.class, upgradedMessage); - shardStateUpdate(getContinuation(originalMessage), stateMessage); + StateUpdate stateMessage = convertTo(StateUpdate.class, upgradedMessage); + shardStateUpdate(continuation, envelope, stateMessage); + updateLastStart(getContinuation(originalMessage).getEnvelope(), stateMessage); } @@ -49,11 +57,8 @@ protected void registerHandlers() { registerHandler(StateUpdate.class, this::stateHandler); } - private void shardStateUpdate(MessageContinuation continuation, StateUpdate message) { - Envelope envelope = continuation.getEnvelope(); - envelope.subType = SubType.STATE; - envelope.subFolder = UPDATE; - reflectMessage(envelope, stringify(message)); + private void shardStateUpdate(MessageContinuation continuation, Envelope envelope, + StateUpdate message) { continuation.publish(message); String originalTransaction = envelope.transactionId; AtomicInteger txnSuffix = new AtomicInteger(); @@ -80,7 +85,10 @@ private void shardStateUpdate(MessageContinuation continuation, StateUpdate mess } private void stateHandler(StateUpdate message) { - shardStateUpdate(getContinuation(message), message); + MessageContinuation continuation = getContinuation(message); + Envelope envelope = continuation.getEnvelope(); + reflectMessage(envelope, stringify(message)); + shardStateUpdate(continuation, envelope, message); } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipe.java index 1256959691..de3d9766f5 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipe.java @@ -6,7 +6,6 @@ import com.google.bos.udmi.service.messaging.MessagePipe; import java.util.Map; -import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -46,6 +45,10 @@ public static MessagePipe fromConfig(EndpointConfiguration config) { return new LocalMessagePipe(config); } + public static void resetForTestStatic() { + NAMESPACES.clear(); + } + private BlockingQueue getQueueForScope(String name) { checkNotNull(name, "pipe name is null"); Map> namedQueues = @@ -75,13 +78,10 @@ public void resetForTest() { resetForTestStatic(); } - public static void resetForTestStatic() { - NAMESPACES.clear(); - } - @Override public String toString() { String isActive = isActive() ? "*" : "O"; - return format("%s >-%s-> %s", super.toString(), isActive, queueIdentifier(destinationQueue)); + return format("%s >-%s-> %s", super.toString(), isActive, + queueIdentifierStatic(destinationQueue)); } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java index 2b61e193e1..da7e032125 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java @@ -1,14 +1,15 @@ package com.google.bos.udmi.service.messaging.impl; -import static com.google.bos.udmi.service.pod.UdmiServicePod.HOSTNAME; import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY; +import static com.google.udmi.util.Common.SUBTYPE_PROPERTY_KEY; +import static com.google.udmi.util.GeneralUtils.catchToElse; import static com.google.udmi.util.GeneralUtils.deepCopy; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.GeneralUtils.mergeObject; import static com.google.udmi.util.GeneralUtils.stackTraceString; -import static com.google.udmi.util.JsonUtil.convertToStrict; +import static com.google.udmi.util.JsonUtil.convertTo; import static com.google.udmi.util.JsonUtil.fromString; import static com.google.udmi.util.JsonUtil.parseJson; import static com.google.udmi.util.JsonUtil.stringify; @@ -48,14 +49,13 @@ public abstract class MessageBase extends ContainerBase implements MessagePipe { public static final String INVALID_ENVELOPE_KEY = "invalid"; + public static final int EXECUTION_THREADS = 4; + public static final String ERROR_MESSAGE_MARKER = "error-mark"; static final String TERMINATE_MARKER = "terminate"; private static final String DEFAULT_NAMESPACE = "default-namespace"; private static final Set HANDLED_QUEUES = new HashSet<>(); private static final long DEFAULT_POLL_TIME_SEC = 1; private static final long AWAIT_TERMINATION_SEC = 10; - public static final int EXECUTION_THREADS = 4; - public static final String ERROR_MESSAGE_MARKER = "error-mark"; - private final ExecutorService executor = Executors.newFixedThreadPool(EXECUTION_THREADS); private BlockingQueue sourceQueue; private Consumer dispatcher; @@ -66,19 +66,22 @@ public abstract class MessageBase extends ContainerBase implements MessagePipe { */ public static EndpointConfiguration combineConfig(EndpointConfiguration defaults, EndpointConfiguration defined) { - EndpointConfiguration useDefaults = ofNullable(defaults).orElseGet( - EndpointConfiguration::new); + EndpointConfiguration useDefaults = ofNullable(defaults).orElseGet(EndpointConfiguration::new); return ifNotNullGet(defined, () -> mergeObject(deepCopy(useDefaults), defined)); } static Bundle extractBundle(String bundleString) { - return ifNotNullGet(bundleString, b -> fromString(Bundle.class, b)); + return ifNotNullGet(bundleString, b -> fromString(Bundle.class, b)); } static String normalizeNamespace(String configSpace) { return ofNullable(configSpace).orElse(DEFAULT_NAMESPACE); } + protected static String queueIdentifierStatic(BlockingQueue queue) { + return format("%08x", Objects.hash(queue)); + } + protected Bundle makeExceptionBundle(Envelope envelope, Exception exception) { Bundle bundle = new Bundle(envelope, exception); bundle.envelope.subType = SubType.EVENT; @@ -104,8 +107,8 @@ protected void pushQueueEntry(BlockingQueue queue, String stringBund } } - protected void receiveMessage(Envelope envelope, Map messageMap) { - receiveMessage(toStringMap(envelope), stringify(messageMap)); + protected String queueIdentifier() { + return queueIdentifierStatic(sourceQueue); } protected void receiveMessage(Map envelopeMap, Map messageMap) { @@ -125,7 +128,8 @@ protected void receiveMessage(Map attributesMap, String messageS final Envelope envelope; try { - envelope = convertToStrict(Envelope.class, attributesMap); + sanitizeAttributeMap(attributesMap); + envelope = convertTo(Envelope.class, attributesMap); } catch (Exception e) { attributesMap.put(INVALID_ENVELOPE_KEY, "true"); receiveException(attributesMap, messageString, e, null); @@ -142,6 +146,10 @@ protected void receiveMessage(Map attributesMap, String messageS } } + protected void receiveMessage(Envelope envelope, Map messageMap) { + receiveMessage(toStringMap(envelope), stringify(messageMap)); + } + protected void setSourceQueue(BlockingQueue queueForScope) { sourceQueue = queueForScope; } @@ -226,19 +234,6 @@ private void messageLoop(String id) { } } - private void shutdownExecutor() { - debug("Shutdown of %s", this); - executor.shutdown(); - } - - protected String queueIdentifier() { - return queueIdentifier(sourceQueue); - } - - protected static String queueIdentifier(BlockingQueue queue) { - return format("%08x", Objects.hash(queue)); - } - private void receiveBundle(Bundle bundle) { receiveBundle(stringify(bundle)); } @@ -259,6 +254,31 @@ private void receiveException(Map attributesMap, String messageS receiveBundle(stringify(bundle)); } + private void sanitizeAttributeMap(Map attributesMap) { + String subFolderRaw = attributesMap.get(SUBFOLDER_PROPERTY_KEY); + if (subFolderRaw != null) { + SubFolder subFolder = catchToElse(() -> SubFolder.fromValue(subFolderRaw), SubFolder.INVALID); + if (!subFolder.value().equals(subFolderRaw)) { + debug("Coerced subFolder " + subFolderRaw + " to " + subFolder.value()); + attributesMap.put(SUBFOLDER_PROPERTY_KEY, subFolder.value()); + } + } + + String subTypeRaw = attributesMap.get(SUBTYPE_PROPERTY_KEY); + if (subTypeRaw != null) { + SubType subType = catchToElse(() -> SubType.fromValue(subTypeRaw), SubType.INVALID); + debug("Coerced subFolder " + subTypeRaw + " to " + subType.value()); + if (!subType.value().equals(subTypeRaw)) { + attributesMap.put(SUBTYPE_PROPERTY_KEY, subType.value()); + } + } + } + + private void shutdownExecutor() { + debug("Shutdown of %s", this); + executor.shutdown(); + } + @Override public void activate(Consumer bundleConsumer) { dispatcher = bundleConsumer; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java index 23370cc9d4..45da361165 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java @@ -1,6 +1,7 @@ package com.google.bos.udmi.service.messaging.impl; import static com.google.udmi.util.Common.DEVICE_ID_KEY; +import static com.google.udmi.util.Common.PUBLISH_TIME_KEY; import static com.google.udmi.util.GeneralUtils.decodeBase64; import static com.google.udmi.util.GeneralUtils.encodeBase64; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; @@ -35,15 +36,6 @@ */ public class TraceMessagePipe extends MessageBase { - private static final String DEVICES_DIR_NAME = "devices"; - private static final Map FOLDER_HACKS = new HashMap<>(); - - static { - // Some egregious hacks for dealing with legacy corner-cases data streams. - FOLDER_HACKS.put("discover", "discovery"); - FOLDER_HACKS.put("", null); - } - private final Map traceCounts = new HashMap<>(); private File traceOutFile; @@ -62,33 +54,21 @@ public static MessagePipe fromConfig(EndpointConfiguration config) { private void consumeTrace(File file) { try { Map traceBundle = asMap(file); - Envelope envelope = makeEnvelope(traceBundle); + Map envelopeMap = makeEnvelope(traceBundle); Map message = asMap(decodeBase64((String) traceBundle.get("data"))); - receiveMessage(envelope, message); + receiveMessage(envelopeMap, message); } catch (Exception e) { e.printStackTrace(); } } - @Nullable - private SubFolder getBundleSubfolder(Map attributes) { - String subFolder = attributes.get("subFolder"); - return ifNotNullGet(FOLDER_HACKS.getOrDefault(subFolder, subFolder), SubFolder::fromValue); - } - - private Envelope makeEnvelope(Map bundle) { + private Map makeEnvelope(Map bundle) { try { @SuppressWarnings("unchecked") Map attributes = (Map) bundle.get("attributes"); - Envelope envelope = new Envelope(); - envelope.subFolder = getBundleSubfolder(attributes); - envelope.subType = ifNotNullGet(attributes.get("subType"), SubType::fromValue); - envelope.deviceId = attributes.get(DEVICE_ID_KEY); - envelope.projectId = attributes.get("projectId"); - envelope.deviceRegistryId = attributes.get("deviceRegistryId"); - envelope.publishTime = ifNotNullGet(ofNullable(attributes.get(Common.PUBLISH_TIME_KEY)) - .orElse((String) bundle.get("publish_time")), JsonUtil::getDate); - return envelope; + attributes.put(PUBLISH_TIME_KEY, + ofNullable(attributes.get(PUBLISH_TIME_KEY)).orElse((String) bundle.get("publish_time"))); + return attributes; } catch (Exception e) { throw new RuntimeException("While extracting envelope from bundle", e); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java b/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java index 1efb70b703..41f01bd798 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java @@ -159,9 +159,15 @@ private void createAccess(String name, IotAccess config) { } private void createBridge(String name, BridgePodConfiguration config) { + String enabled = variableSubstitution(config.enabled); + if (enabled != null && enabled.isEmpty()) { + warn("Skipping not-enabled bridge " + name); + return; + } + info(format("Creating bridge %s with enabled %s", name, config.enabled)); EndpointConfiguration from = makeConfig(config.from); - EndpointConfiguration to = makeConfig(config.to); - putComponent(name, () -> new BridgeProcessor(from, to)); + EndpointConfiguration morf = makeConfig(config.morf); + putComponent(name, () -> new BridgeProcessor(from, morf)); } private void createDistributor(String name, EndpointConfiguration config) { diff --git a/udmis/src/test/configs/bridge_pod.json b/udmis/src/test/configs/bridge_pod.json index 33ac501a75..dcb3d1492d 100644 --- a/udmis/src/test/configs/bridge_pod.json +++ b/udmis/src/test/configs/bridge_pod.json @@ -9,7 +9,7 @@ "recv_id": "from", "send_id": "to" }, - "to": { + "morf": { "recv_id": "mark", "send_id": "mend" } diff --git a/udmis/src/test/configs/trace_pod.json b/udmis/src/test/configs/trace_pod.json index f9d566a759..b3d1311438 100644 --- a/udmis/src/test/configs/trace_pod.json +++ b/udmis/src/test/configs/trace_pod.json @@ -16,7 +16,7 @@ "recv_id": "../tests/traces/simple/devices", "send_id": "out/simple.trace" }, - "to": { + "morf": { "recv_id": "mark", "send_id": "mend" } diff --git a/udmis/src/test/java/com/google/bos/udmi/service/core/BridgeProcessorTest.java b/udmis/src/test/java/com/google/bos/udmi/service/core/BridgeProcessorTest.java index 9911563fc9..8d4106e7bc 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/core/BridgeProcessorTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/core/BridgeProcessorTest.java @@ -19,7 +19,7 @@ class BridgeProcessorTest extends MessageTestCore { Map> results = new ConcurrentHashMap<>(); @Test - public void basicTest() { + public void basicBridge() { EndpointConfiguration from = getConfiguration(false, "from"); EndpointConfiguration to = getConfiguration(false, "to"); BridgeProcessor bridgeProcessor = new BridgeProcessor(from, to); diff --git a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipeTest.java b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipeTest.java index 1aca542974..8a18b6117e 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipeTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipeTest.java @@ -107,8 +107,7 @@ public void tracePlayback() { List errors = consumed.stream().filter(bundle -> bundle.envelope.subFolder == SubFolder.ERROR) - .map(bundle -> (String) bundle.message) - .collect(Collectors.toList()); + .map(bundle -> (String) bundle.message).toList(); assertEquals(0, errors.size(), "expected message errors"); Set devices = diff --git a/udmis/src/test/java/com/google/bos/udmi/service/pod/UdmiServicePodTest.java b/udmis/src/test/java/com/google/bos/udmi/service/pod/UdmiServicePodTest.java index de2d5ec42f..fd0aa22424 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/pod/UdmiServicePodTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/pod/UdmiServicePodTest.java @@ -116,21 +116,21 @@ public void bridgeTest() throws Exception { EndpointConfiguration reversedFrom = combineConfig(podConfig.flow_defaults, reverseFlow(podConfig.bridges.get("test").from)); final MessageDispatcherImpl fromDispatcher = MessagePipeTestBase.getDispatcherFor(reversedFrom); - EndpointConfiguration reversedTo = - combineConfig(podConfig.flow_defaults, reverseFlow(podConfig.bridges.get("test").to)); - final MessageDispatcherImpl toDispatcher = MessagePipeTestBase.getDispatcherFor(reversedTo); + EndpointConfiguration reversedMorf = + combineConfig(podConfig.flow_defaults, reverseFlow(podConfig.bridges.get("test").morf)); + final MessageDispatcherImpl morfDispatcher = MessagePipeTestBase.getDispatcherFor(reversedMorf); CompletableFuture received = new CompletableFuture<>(); fromDispatcher.registerHandler(LocalnetModel.class, received::complete); BlockingQueue defaulted = new LinkedBlockingQueue<>(); - toDispatcher.registerHandler(Object.class, defaulted::add); + morfDispatcher.registerHandler(Object.class, defaulted::add); pod.activate(); fromDispatcher.activate(); - toDispatcher.activate(); + morfDispatcher.activate(); fromDispatcher.publish(new StateUpdate()); - toDispatcher.publish(new LocalnetModel()); + morfDispatcher.publish(new LocalnetModel()); Object polled = defaulted.poll(RECEIVE_TIMEOUT_SEC, TimeUnit.SECONDS); assertTrue(polled instanceof StateUpdate, "expected pointset state in default");