Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send multiple MQTT measures in a batch update to CB #857

Merged
merged 10 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Fix: allow send multiple measures in MQTT to CB in a batch (POST /v2/op/update) and sorted by TimeInstant when possible, instead of using multiples single request (#825, iotagent-node-lib#1612) (reopened)
- Add: allow MQTT single array measures (#856)
- Fix: check endpoint expression when execute http command
- Fix: use config.defaultTransport (from config.js or IOTA_DEFAULT_TRANSPORT env var) instead of magic 'HTTP' at provision device
Expand Down
66 changes: 31 additions & 35 deletions lib/commonBindings.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,50 +283,46 @@ function singleMeasure(apiKey, deviceId, attribute, device, parsedMessage) {
*/
function multipleMeasures(apiKey, deviceId, device, messageObj) {
let measure;
let values;
const ctxt = fillService(context, device);
config.getLogger().debug(context, 'Processing multiple measures for device %s with apiKey %s', deviceId, apiKey);

let attributesArray = [];
for (let j = 0; j < messageObj.length; j++) {
measure = messageObj[j];
let attributesArray = [];
const values = extractAttributes(device, measure, device.payloadType);
values = extractAttributes(device, measure, device.payloadType);
if (values && values[0] && values[0][0]) {
// multi measure is extracted due to payloadType is ngsi
attributesArray = values;
// Check multimeasure from a ngsiv2/ngsild entities array
attributesArray = attributesArray.concat(values);
} else {
attributesArray = [values];
attributesArray.push(values);
}
config
.getLogger()
.debug(
context,
'Processing multiple measures for device %s with apiKey %s values %j',
deviceId,
apiKey,
attributesArray
);
iotAgentLib.update(device.name, device.type, '', attributesArray, device, function (error) {
if (error) {
config.getLogger().error(
ctxt,
/*jshint quotmark: double */
"MEASURES-002: Couldn't send the updated values to the Context Broker due to an error: %j",
/*jshint quotmark: single */
error
);
} else {
config
.getLogger()
.info(
ctxt,
'Multiple measures for device %s with apiKey %s successfully updated',
deviceId,
apiKey
);
}
finishSouthBoundTransaction(null);
});
}
config
.getLogger()
.debug(
context,
'Processing multiple measures for device %s with apiKey %s values %j',
deviceId,
apiKey,
attributesArray
);
iotAgentLib.update(device.name, device.type, '', attributesArray, device, function (error) {
if (error) {
config.getLogger().error(
ctxt,
/*jshint quotmark: double */
"MEASURES-002: Couldn't send the updated values to the Context Broker due to an error: %j",
/*jshint quotmark: single */
error
);
} else {
config
.getLogger()
.info(ctxt, 'Multiple measures for device %s with apiKey %s successfully updated', deviceId, apiKey);
}
finishSouthBoundTransaction(null);
});
}

/**
Expand Down
53 changes: 8 additions & 45 deletions test/unit/ngsiv2/MQTT_receive_measures-test2.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,8 @@ describe('MQTT: Measure reception ', function () {
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/multipleMeasuresJsonTypes.json')
)
.reply(204);

contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/multipleMeasuresJsonTypes2.json')
'/v2/op/update',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/multipleMeasuresJsonTypes4.json')
)
.reply(204);
});
Expand Down Expand Up @@ -207,20 +198,10 @@ describe('MQTT: Measure reception ', function () {
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/unprovisionedDevice.json')
'/v2/op/update',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/unprovisionedDevice4.json')
)
.reply(204);

contextBrokerUnprovMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/unprovisionedDevice2.json')
)
.reply(204);

request(groupCreation, function (error, response, body) {
done();
});
Expand Down Expand Up @@ -278,17 +259,8 @@ describe('MQTT: Measure reception ', function () {
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/unknownMeasures.json')
)
.reply(204);

contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/unknownMeasures2.json')
'/v2/op/update',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/unknownMeasures3.json')
)
.reply(204);
});
Expand Down Expand Up @@ -338,17 +310,8 @@ describe('MQTT: Measure reception ', function () {
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/timestampMeasure.json')
)
.reply(204);

contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/timestampMeasure2.json')
'/v2/op/update',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/timestampMeasure3.json')
)
.reply(204);
});
Expand Down
13 changes: 2 additions & 11 deletions test/unit/ngsiv2/MQTT_receive_ngsild_measures-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ describe('MQTT: NGSILD Measure reception ', function () {
async.series([iotAgentLib.clearAll, iotaJson.stop], done);
});


describe('When a publish single NGSILD entity measure with NGSILD format arrives for the HTTP binding and NGSILD is the expected payload type', function () {
const measure = {
id: 'urn:ngsi-ld:ParkingSpot:santander:daoiz_velarde_1_5:3',
Expand Down Expand Up @@ -206,16 +205,8 @@ describe('MQTT: NGSILD Measure reception ', function () {
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/ngsildPayloadMeasure.json')
)
.reply(204);
contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/ngsildPayloadMeasure2.json')
'/v2/op/update',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/ngsildPayloadMeasure3.json')
)
.reply(204);
});
Expand Down
12 changes: 2 additions & 10 deletions test/unit/ngsiv2/MQTT_receive_ngsiv2_measures-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,8 @@ describe('MQTT: NGSIv2 Measure reception ', function () {
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/ngsiv2PayloadMeasure.json')
)
.reply(204);
contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/ngsiv2PayloadMeasure2.json')
'/v2/op/update',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/ngsiv2PayloadMeasure3.json')
)
.reply(204);
});
Expand Down
28 changes: 5 additions & 23 deletions test/unit/ngsiv2/amqpBinding-test2.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,10 @@ describe('AMQP Transport binding: multiple measures', function () {
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/singleMeasureAMQP3.json')
'/v2/op/update',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/multipleMeasureAMQP4.json')
)
.reply(200, utils.readExampleFile('./test/contextResponses/singleMeasureSuccess.json'));

contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/singleMeasureAMQP2.json')
)
.reply(200, utils.readExampleFile('./test/contextResponses/singleMeasureSuccess.json'));
.reply(204);
});

it('should send a single update context request with all the attributes', function (done) {
Expand All @@ -135,17 +126,8 @@ describe('AMQP Transport binding: multiple measures', function () {
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/multipleMeasure.json')
)
.reply(204);

contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/multipleMeasure2.json')
'/v2/op/update',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/multipleMeasure3.json')
)
.reply(204);
});
Expand Down
29 changes: 29 additions & 0 deletions test/unit/ngsiv2/contextRequests/multipleMeasure3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"actionType": "append",
"entities": [
{
"id": "Second MQTT Device",
"type": "AnMQTTDevice",
"a": {
"type": "celsius",
"value": "23"
},
"b": {
"type": "degrees",
"value": "98"
}
},
{
"id": "Second MQTT Device",
"type": "AnMQTTDevice",
"a": {
"type": "celsius",
"value": "25"
},
"b": {
"type": "degrees",
"value": "100"
}
}
]
}
21 changes: 21 additions & 0 deletions test/unit/ngsiv2/contextRequests/multipleMeasureAMQP4.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"actionType": "append",
"entities": [
{
"id": "Second MQTT Device",
"type": "AnMQTTDevice",
"a": {
"type": "celsius",
"value": "23"
}
},
{
"id": "Second MQTT Device",
"type": "AnMQTTDevice",
"a": {
"type": "celsius",
"value": "25"
}
}
]
}
85 changes: 85 additions & 0 deletions test/unit/ngsiv2/contextRequests/multipleMeasuresJsonTypes4.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{
"actionType": "append",
"entities": [
{
"id": "Second MQTT Device",
"type": "AnMQTTDevice",
"temperature": {
"type": "celsius",
"value": "87"
},
"humidity": {
"type": "degrees",
"value": "32"
},
"luminosity": {
"type": "Integer",
"value": 10
},
"pollution": {
"type": "Float",
"value": 43.4
},
"configuration": {
"type": "Object",
"value": {
"firmware": {
"version": "1.1.0",
"hash": "cf23df2207d99a74fbe169e3eba035e633b65d94"
}
}
},
"tags": {
"type": "Array",
"value": [
"iot",
"device"
]
},
"enabled": {
"type": "Boolean",
"value": true
}
},
{
"id": "Second MQTT Device",
"type": "AnMQTTDevice",
"temperature": {
"type": "celsius",
"value": "89"
},
"humidity": {
"type": "degrees",
"value": "33"
},
"luminosity": {
"type": "Integer",
"value": 10
},
"pollution": {
"type": "Float",
"value": 43.4
},
"configuration": {
"type": "Object",
"value": {
"firmware": {
"version": "1.1.0",
"hash": "cf23df2207d99a74fbe169e3eba035e633b65d94"
}
}
},
"tags": {
"type": "Array",
"value": [
"iot",
"device"
]
},
"enabled": {
"type": "Boolean",
"value": true
}
}
]
}
Loading