From 8a2c39ba8e98661efeb518a50dabfabdc998d9ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Dalfors?= Date: Wed, 15 May 2024 15:24:30 +0200 Subject: [PATCH 1/5] fix: use sparkplugb decoder only for spBv1.0 topic --- backend/src/Model/sparkplugb.ts | 7 ++++--- backend/src/index.ts | 9 ++++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/backend/src/Model/sparkplugb.ts b/backend/src/Model/sparkplugb.ts index f9fc7ce..b4b460d 100644 --- a/backend/src/Model/sparkplugb.ts +++ b/backend/src/Model/sparkplugb.ts @@ -9,7 +9,7 @@ const root = protobuf.parse(protocol).root export let SparkplugPayload = root.lookupType('com.cirruslink.sparkplug.protobuf.Payload') export const SparkplugDecoder = { - decode(input: Buffer): Base64Message | undefined { + decode(input: Buffer): Base64Message { try { const message = Base64Message.fromString( JSON.stringify(SparkplugPayload.toObject(SparkplugPayload.decode(new Uint8Array(input)))) @@ -17,8 +17,9 @@ export const SparkplugDecoder = { message.decoder = Decoder.SPARKPLUG return message } catch { - // ignore + const message = Base64Message.fromString("Failed to decode sparkplugb payload") + message.decoder = Decoder.NONE + return message } - return undefined }, } diff --git a/backend/src/index.ts b/backend/src/index.ts index 4cd6a92..9f0df55 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -47,9 +47,16 @@ export class ConnectionManager { buffer = buffer.slice(0, 20000) } + let decoded_payload = null + if (topic.startsWith("spBv1.0/")) { + decoded_payload = SparkplugDecoder.decode(buffer) + } else { + decoded_payload = Base64Message.fromBuffer(buffer) + } + backendEvents.emit(messageEvent, { topic, - payload: SparkplugDecoder.decode(buffer) ?? Base64Message.fromBuffer(buffer), + payload: decoded_payload, qos: packet.qos, retain: packet.retain, messageId: packet.messageId, From a346c48d3e32a62422bd56939963ebe9740178f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Dalfors?= Date: Fri, 17 May 2024 09:08:34 +0200 Subject: [PATCH 2/5] refine sparkplug detection --- backend/src/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index 9f0df55..0502ae3 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -48,7 +48,8 @@ export class ConnectionManager { } let decoded_payload = null - if (topic.startsWith("spBv1.0/")) { + // spell-checker: disable-next-line + if (topic.match(/spBv1\.0\/[^/]+\/(DDATA|NDATA|NCMD|DCMD|NBIRTH|DBIRTH|NDEATH|DDEATH\/[^/]+\/)/u)) { decoded_payload = SparkplugDecoder.decode(buffer) } else { decoded_payload = Base64Message.fromBuffer(buffer) From 7617430a3ff6b9f037930662b3157b60f1af3919 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Dalfors?= Date: Sat, 18 May 2024 21:42:25 +0200 Subject: [PATCH 3/5] fix regex --- backend/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index 0502ae3..7e597e3 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -49,7 +49,7 @@ export class ConnectionManager { let decoded_payload = null // spell-checker: disable-next-line - if (topic.match(/spBv1\.0\/[^/]+\/(DDATA|NDATA|NCMD|DCMD|NBIRTH|DBIRTH|NDEATH|DDEATH\/[^/]+\/)/u)) { + if (topic.match(/^spBv1\.0\/[^/]+\/[ND](DATA|CMD|DEATH|BIRTH)\/[^/]+(\/[^/]+)?$/u)) { decoded_payload = SparkplugDecoder.decode(buffer) } else { decoded_payload = Base64Message.fromBuffer(buffer) From b04f5dee16e4040d2fe497ece74d9f71c6074569 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Dalfors?= Date: Fri, 17 May 2024 12:03:35 +0200 Subject: [PATCH 4/5] feat: use tahu for sparkplug decoding --- backend/src/Model/sparkplugb.proto.ts | 204 ---------------------- backend/src/Model/sparkplugb.ts | 14 +- package.json | 3 +- res/sparkplug_b.proto | 197 ---------------------- scripts/sparkplug-client.js | 232 ++++++++++++++++++++++++++ yarn.lock | 61 ++++--- 6 files changed, 269 insertions(+), 442 deletions(-) delete mode 100644 backend/src/Model/sparkplugb.proto.ts delete mode 100644 res/sparkplug_b.proto create mode 100644 scripts/sparkplug-client.js diff --git a/backend/src/Model/sparkplugb.proto.ts b/backend/src/Model/sparkplugb.proto.ts deleted file mode 100644 index 569078c..0000000 --- a/backend/src/Model/sparkplugb.proto.ts +++ /dev/null @@ -1,204 +0,0 @@ -/* spell-checker: disable */ - -const protocol = ` -syntax = "proto2"; - -// -// To compile: -// cd client_libraries/java -// protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto -// -package com.cirruslink.sparkplug.protobuf; - -option java_package = "com.cirruslink.sparkplug.protobuf"; -option java_outer_classname = "SparkplugBProto"; - -message Payload { - /* - // Indexes of Data Types - - // Unknown placeholder for future expansion. - Unknown = 0; - - // Basic Types - Int8 = 1; - Int16 = 2; - Int32 = 3; - Int64 = 4; - UInt8 = 5; - UInt16 = 6; - UInt32 = 7; - UInt64 = 8; - Float = 9; - Double = 10; - Boolean = 11; - String = 12; - DateTime = 13; - Text = 14; - - // Additional Metric Types - UUID = 15; - DataSet = 16; - Bytes = 17; - File = 18; - Template = 19; - - // Additional PropertyValue Types - PropertySet = 20; - PropertySetList = 21; - - */ - - message Template { - - message Parameter { - optional string name = 1; - optional uint32 type = 2; - - oneof value { - uint32 int_value = 3; - uint64 long_value = 4; - float float_value = 5; - double double_value = 6; - bool boolean_value = 7; - string string_value = 8; - ParameterValueExtension extension_value = 9; - } - - message ParameterValueExtension { - extensions 1 to max; - } - } - - optional string version = 1; // The version of the Template to prevent mismatches - repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value - repeated Parameter parameters = 3; - optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance - optional bool is_definition = 5; - extensions 6 to max; - } - - message DataSet { - - message DataSetValue { - - oneof value { - uint32 int_value = 1; - uint64 long_value = 2; - float float_value = 3; - double double_value = 4; - bool boolean_value = 5; - string string_value = 6; - DataSetValueExtension extension_value = 7; - } - - message DataSetValueExtension { - extensions 1 to max; - } - } - - message Row { - repeated DataSetValue elements = 1; - extensions 2 to max; // For third party extensions - } - - optional uint64 num_of_columns = 1; - repeated string columns = 2; - repeated uint32 types = 3; - repeated Row rows = 4; - extensions 5 to max; // For third party extensions - } - - message PropertyValue { - - optional uint32 type = 1; - optional bool is_null = 2; - - oneof value { - uint32 int_value = 3; - uint64 long_value = 4; - float float_value = 5; - double double_value = 6; - bool boolean_value = 7; - string string_value = 8; - PropertySet propertyset_value = 9; - PropertySetList propertysets_value = 10; // List of Property Values - PropertyValueExtension extension_value = 11; - } - - message PropertyValueExtension { - extensions 1 to max; - } - } - - message PropertySet { - repeated string keys = 1; // Names of the properties - repeated PropertyValue values = 2; - extensions 3 to max; - } - - message PropertySetList { - repeated PropertySet propertyset = 1; - extensions 2 to max; - } - - message MetaData { - // Bytes specific metadata - optional bool is_multi_part = 1; - - // General metadata - optional string content_type = 2; // Content/Media type - optional uint64 size = 3; // File size, String size, Multi-part size, etc - optional uint64 seq = 4; // Sequence number for multi-part messages - - // File metadata - optional string file_name = 5; // File name - optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc) - optional string md5 = 7; // md5 of data - - // Catchalls and future expansion - optional string description = 8; // Could be anything such as json or xml of custom properties - extensions 9 to max; - } - - message Metric { - - optional string name = 1; // Metric name - should only be included on birth - optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages - optional uint64 timestamp = 3; // Timestamp associated with data acquisition time - optional uint32 datatype = 4; // DataType of the metric/tag value - optional bool is_historical = 5; // If this is historical data and should not update real time tag - optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag - optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes. - optional MetaData metadata = 8; // Metadata for the payload - optional PropertySet properties = 9; - - oneof value { - uint32 int_value = 10; - uint64 long_value = 11; - float float_value = 12; - double double_value = 13; - bool boolean_value = 14; - string string_value = 15; - bytes bytes_value = 16; // Bytes, File - DataSet dataset_value = 17; - Template template_value = 18; - MetricValueExtension extension_value = 19; - } - - message MetricValueExtension { - extensions 1 to max; - } - } - - optional uint64 timestamp = 1; // Timestamp at message sending time - repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs - optional uint64 seq = 3; // Sequence number - optional string uuid = 4; // UUID to track message type in terms of schema definitions - optional bytes body = 5; // To optionally bypass the whole definition above - extensions 6 to max; // For third party extensions -} -` - -/* spell-checker: enable */ -export default protocol diff --git a/backend/src/Model/sparkplugb.ts b/backend/src/Model/sparkplugb.ts index b4b460d..d3625b8 100644 --- a/backend/src/Model/sparkplugb.ts +++ b/backend/src/Model/sparkplugb.ts @@ -1,18 +1,14 @@ -// cSpell:words protobuf -import * as protobuf from 'protobufjs' -import protocol from './sparkplugb.proto' import { Base64Message } from './Base64Message' import { Decoder } from './Decoder' - -const root = protobuf.parse(protocol).root -/* cspell:disable-next-line */ -export let SparkplugPayload = root.lookupType('com.cirruslink.sparkplug.protobuf.Payload') +import { get } from 'sparkplug-payload' +var sparkplug = get("spBv1.0") export const SparkplugDecoder = { decode(input: Buffer): Base64Message { try { - const message = Base64Message.fromString( - JSON.stringify(SparkplugPayload.toObject(SparkplugPayload.decode(new Uint8Array(input)))) + const message = Base64Message.fromString(JSON.stringify( + // @ts-ignore + sparkplug.decodePayload(new Uint8Array(input))) ) message.decoder = Decoder.SPARKPLUG return message diff --git a/package.json b/package.json index 02eaffb..bb93080 100644 --- a/package.json +++ b/package.json @@ -107,6 +107,7 @@ "semantic-release": "^23.0.8", "semantic-release-export-data": "^1.0.1", "source-map-support": "^0.5.9", + "sparkplug-client": "^3.2.4", "ts-node": "^10.9.2", "tslint": "^6.1.3", "tslint-config-airbnb": "^5.11.2", @@ -126,8 +127,8 @@ "lowdb": "^1.0.0", "mime": "^2.4.4", "mqtt": "^4.3.6", - "protobufjs": "^6.11.4", "sha1": "^1.1.1", + "sparkplug-payload": "^1.0.3", "uuid": "^8.3.2", "yarn-run-all": "^3.1.1" } diff --git a/res/sparkplug_b.proto b/res/sparkplug_b.proto deleted file mode 100644 index bda645c..0000000 --- a/res/sparkplug_b.proto +++ /dev/null @@ -1,197 +0,0 @@ -syntax = "proto2"; - -// -// To compile: -// cd client_libraries/java -// protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto -// -package com.cirruslink.sparkplug.protobuf; - -option java_package = "com.cirruslink.sparkplug.protobuf"; -option java_outer_classname = "SparkplugBProto"; - -message Payload { - /* - // Indexes of Data Types - - // Unknown placeholder for future expansion. - Unknown = 0; - - // Basic Types - Int8 = 1; - Int16 = 2; - Int32 = 3; - Int64 = 4; - UInt8 = 5; - UInt16 = 6; - UInt32 = 7; - UInt64 = 8; - Float = 9; - Double = 10; - Boolean = 11; - String = 12; - DateTime = 13; - Text = 14; - - // Additional Metric Types - UUID = 15; - DataSet = 16; - Bytes = 17; - File = 18; - Template = 19; - - // Additional PropertyValue Types - PropertySet = 20; - PropertySetList = 21; - - */ - - message Template { - - message Parameter { - optional string name = 1; - optional uint32 type = 2; - - oneof value { - uint32 int_value = 3; - uint64 long_value = 4; - float float_value = 5; - double double_value = 6; - bool boolean_value = 7; - string string_value = 8; - ParameterValueExtension extension_value = 9; - } - - message ParameterValueExtension { - extensions 1 to max; - } - } - - optional string version = 1; // The version of the Template to prevent mismatches - repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value - repeated Parameter parameters = 3; - optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance - optional bool is_definition = 5; - extensions 6 to max; - } - - message DataSet { - - message DataSetValue { - - oneof value { - uint32 int_value = 1; - uint64 long_value = 2; - float float_value = 3; - double double_value = 4; - bool boolean_value = 5; - string string_value = 6; - DataSetValueExtension extension_value = 7; - } - - message DataSetValueExtension { - extensions 1 to max; - } - } - - message Row { - repeated DataSetValue elements = 1; - extensions 2 to max; // For third party extensions - } - - optional uint64 num_of_columns = 1; - repeated string columns = 2; - repeated uint32 types = 3; - repeated Row rows = 4; - extensions 5 to max; // For third party extensions - } - - message PropertyValue { - - optional uint32 type = 1; - optional bool is_null = 2; - - oneof value { - uint32 int_value = 3; - uint64 long_value = 4; - float float_value = 5; - double double_value = 6; - bool boolean_value = 7; - string string_value = 8; - PropertySet propertyset_value = 9; - PropertySetList propertysets_value = 10; // List of Property Values - PropertyValueExtension extension_value = 11; - } - - message PropertyValueExtension { - extensions 1 to max; - } - } - - message PropertySet { - repeated string keys = 1; // Names of the properties - repeated PropertyValue values = 2; - extensions 3 to max; - } - - message PropertySetList { - repeated PropertySet propertyset = 1; - extensions 2 to max; - } - - message MetaData { - // Bytes specific metadata - optional bool is_multi_part = 1; - - // General metadata - optional string content_type = 2; // Content/Media type - optional uint64 size = 3; // File size, String size, Multi-part size, etc - optional uint64 seq = 4; // Sequence number for multi-part messages - - // File metadata - optional string file_name = 5; // File name - optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc) - optional string md5 = 7; // md5 of data - - // Catchalls and future expansion - optional string description = 8; // Could be anything such as json or xml of custom properties - extensions 9 to max; - } - - message Metric { - - optional string name = 1; // Metric name - should only be included on birth - optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages - optional uint64 timestamp = 3; // Timestamp associated with data acquisition time - optional uint32 datatype = 4; // DataType of the metric/tag value - optional bool is_historical = 5; // If this is historical data and should not update real time tag - optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag - optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes. - optional MetaData metadata = 8; // Metadata for the payload - optional PropertySet properties = 9; - - oneof value { - uint32 int_value = 10; - uint64 long_value = 11; - float float_value = 12; - double double_value = 13; - bool boolean_value = 14; - string string_value = 15; - bytes bytes_value = 16; // Bytes, File - DataSet dataset_value = 17; - Template template_value = 18; - MetricValueExtension extension_value = 19; - } - - message MetricValueExtension { - extensions 1 to max; - } - } - - optional uint64 timestamp = 1; // Timestamp at message sending time - repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs - optional uint64 seq = 3; // Sequence number - optional string uuid = 4; // UUID to track message type in terms of schema definitions - optional bytes body = 5; // To optionally bypass the whole definition above - extensions 6 to max; // For third party extensions -} diff --git a/scripts/sparkplug-client.js b/scripts/sparkplug-client.js new file mode 100644 index 0000000..3b5414d --- /dev/null +++ b/scripts/sparkplug-client.js @@ -0,0 +1,232 @@ +/******************************************************************************** + * Copyright (c) 2016-2018 Cirrus Link Solutions and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Cirrus Link Solutions - initial implementation + ********************************************************************************/ +var SparkplugClient = require('sparkplug-client') + +/* + * Main sample function which includes the run() function for running the sample + */ +var sample = (function () { + var config = { + serverUrl: 'tcp://127.0.0.1:1883', + username: '', + password: '', + groupId: 'Sparkplug Devices', + edgeNode: 'JavaScript Edge Node', + clientId: 'JavaScriptSimpleEdgeNode', + version: 'spBv1.0', + }, + hwVersion = 'Emulated Hardware', + swVersion = 'v1.0.0', + deviceId = 'Emulated Device', + sparkPlugClient, + publishPeriod = 5000, + // Generates a random integer + randomInt = function () { + return 1 + Math.floor(Math.random() * 10) + }, + // Get BIRTH payload for the edge node + getNodeBirthPayload = function () { + return { + timestamp: new Date().getTime(), + metrics: [ + { + name: 'Node Control/Rebirth', + type: 'boolean', + value: false, + }, + { + name: 'Template1', + type: 'template', + value: { + isDefinition: true, + metrics: [ + { name: 'myBool', value: false, type: 'boolean' }, + { name: 'myInt', value: 0, type: 'int' }, + ], + parameters: [ + { + name: 'param1', + type: 'string', + value: 'value1', + }, + ], + }, + }, + ], + } + }, + // Get BIRTH payload for the device + getDeviceBirthPayload = function () { + return { + timestamp: new Date().getTime(), + metrics: [ + { name: 'my_boolean', value: Math.random() > 0.5, type: 'boolean' }, + { name: 'my_double', value: Math.random() * 0.123456789, type: 'double' }, + { name: 'my_float', value: Math.random() * 0.123, type: 'float' }, + { name: 'my_int', value: randomInt(), type: 'int' }, + { name: 'my_long', value: randomInt() * 214748364700, type: 'long' }, + { name: 'Inputs/0', value: true, type: 'boolean' }, + { name: 'Inputs/1', value: 0, type: 'int' }, + { name: 'Inputs/2', value: 1.23, type: 'float' }, + { name: 'Outputs/0', value: true, type: 'boolean' }, + { name: 'Outputs/1', value: 0, type: 'int' }, + { name: 'Outputs/2', value: 1.23, type: 'float' }, + { name: 'Properties/hw_version', value: hwVersion, type: 'string' }, + { name: 'Properties/sw_version', value: swVersion, type: 'string' }, + { + name: 'my_dataset', + type: 'dataset', + value: { + numOfColumns: 2, + types: ['string', 'string'], + columns: ['str1', 'str2'], + rows: [ + ['x', 'a'], + ['y', 'b'], + ], + }, + }, + { + name: 'TemplateInstance1', + type: 'template', + value: { + templateRef: 'Template1', + isDefinition: false, + metrics: [ + { name: 'myBool', value: true, type: 'boolean' }, + { name: 'myInt', value: 100, type: 'int' }, + ], + parameters: [ + { + name: 'param1', + type: 'string', + value: 'value2', + }, + ], + }, + }, + ], + } + }, + // Get data payload for the device + getDataPayload = function () { + return { + timestamp: new Date().getTime(), + metrics: [ + { name: 'my_boolean', value: Math.random() > 0.5, type: 'boolean' }, + { name: 'my_double', value: Math.random() * 0.123456789, type: 'double' }, + { name: 'my_float', value: Math.random() * 0.123, type: 'float' }, + { name: 'my_int', value: randomInt(), type: 'int' }, + { name: 'my_long', value: randomInt() * 214748364700, type: 'long' }, + ], + } + }, + // Runs the sample + run = function () { + // Create the SparkplugClient + sparkplugClient = SparkplugClient.newClient(config) + + // Create Incoming Message Handler + sparkplugClient.on('message', function (topic, payload) { + console.log(topic, payload) + }) + + // Create 'birth' handler + sparkplugClient.on('birth', function () { + // Publish Node BIRTH certificate + sparkplugClient.publishNodeBirth(getNodeBirthPayload()) + // Publish Device BIRTH certificate + sparkplugClient.publishDeviceBirth(deviceId, getDeviceBirthPayload()) + }) + + // Create node command handler + sparkplugClient.on('ncmd', function (payload) { + var timestamp = payload.timestamp, + metrics = payload.metrics + + if (metrics !== undefined && metrics !== null) { + for (var i = 0; i < metrics.length; i++) { + var metric = metrics[i] + if (metric.name == 'Node Control/Rebirth' && metric.value) { + console.log("Received 'Rebirth' command") + // Publish Node BIRTH certificate + sparkplugClient.publishNodeBirth(getNodeBirthPayload()) + // Publish Device BIRTH certificate + sparkplugClient.publishDeviceBirth(deviceId, getDeviceBirthPayload()) + } + } + } + }) + + // Create device command handler + sparkplugClient.on('dcmd', function (deviceId, payload) { + var timestamp = payload.timestamp, + metrics = payload.metrics, + inboundMetricMap = {}, + outboundMetric = [], + outboundPayload + + console.log('Command recevied for device ' + deviceId) + + // Loop over the metrics and store them in a map + if (metrics !== undefined && metrics !== null) { + for (var i = 0; i < metrics.length; i++) { + var metric = metrics[i] + inboundMetricMap[metric.name] = metric.value + } + } + if (inboundMetricMap['Outputs/0'] !== undefined && inboundMetricMap['Outputs/0'] !== null) { + console.log('Outputs/0: ' + inboundMetricMap['Outputs/0']) + outboundMetric.push({ name: 'Inputs/0', value: inboundMetricMap['Outputs/0'], type: 'boolean' }) + outboundMetric.push({ name: 'Outputs/0', value: inboundMetricMap['Outputs/0'], type: 'boolean' }) + console.log('Updated value for Inputs/0 ' + inboundMetricMap['Outputs/0']) + } else if (inboundMetricMap['Outputs/1'] !== undefined && inboundMetricMap['Outputs/1'] !== null) { + console.log('Outputs/1: ' + inboundMetricMap['Outputs/1']) + outboundMetric.push({ name: 'Inputs/1', value: inboundMetricMap['Outputs/1'], type: 'int' }) + outboundMetric.push({ name: 'Outputs/1', value: inboundMetricMap['Outputs/1'], type: 'int' }) + console.log('Updated value for Inputs/1 ' + inboundMetricMap['Outputs/1']) + } else if (inboundMetricMap['Outputs/2'] !== undefined && inboundMetricMap['Outputs/2'] !== null) { + console.log('Outputs/2: ' + inboundMetricMap['Outputs/2']) + outboundMetric.push({ name: 'Inputs/2', value: inboundMetricMap['Outputs/2'], type: 'float' }) + outboundMetric.push({ name: 'Outputs/2', value: inboundMetricMap['Outputs/2'], type: 'float' }) + console.log('Updated value for Inputs/2 ' + inboundMetricMap['Outputs/2']) + } + + outboundPayload = { + timestamp: new Date().getTime(), + metrics: outboundMetric, + } + + // Publish device data + sparkplugClient.publishDeviceData(deviceId, outboundPayload) + }) + + for (var i = 1; i < 101; i++) { + // Set up a device data publish for i*publishPeriod milliseconds from now + setTimeout(function () { + // Publish device data + sparkplugClient.publishDeviceData(deviceId, getDataPayload()) + + // End the client connection after the last publish + if (i === 100) { + sparkplugClient.stop() + } + }, i * publishPeriod) + } + } + + return { run: run } +})() + +// Run the sample +sample.run() diff --git a/yarn.lock b/yarn.lock index d917b26..bd3acaa 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1398,7 +1398,7 @@ resolved "https://registry.npmjs.org/@types/lodash/-/lodash-4.17.0.tgz" integrity sha512-t7dhREVv6dbNj0q17X12j7yDG4bD/DHYX7o5/DbDxobP0HnGPgpRz2Ej77aL7TZT3DSw13fqUTj8J4mMnqa7WA== -"@types/long@^4.0.1": +"@types/long@^4.0.0", "@types/long@^4.0.1": version "4.0.2" resolved "https://registry.npmjs.org/@types/long/-/long-4.0.2.tgz" integrity sha512-MqTGEo5bj5t157U6fA/BiDynNkn0YknVdh48CMPkTSpFTVmvao5UQmm7uEF6xBEo7qIMAlY/JSleYaE6VOdpaA== @@ -5074,7 +5074,7 @@ mqtt-packet@^6.8.0: debug "^4.1.1" process-nextick-args "^2.0.1" -mqtt@^4.3.6: +mqtt@^4.2.8, mqtt@^4.3.6: version "4.3.8" resolved "https://registry.npmjs.org/mqtt/-/mqtt-4.3.8.tgz" integrity sha512-2xT75uYa0kiPEF/PE0VPdavmEkoBzMT/UL9moid0rAvlCtV48qBwxD62m7Ld/4j8tSkIO1E/iqRl/S72SEOhOw== @@ -5635,6 +5635,11 @@ pacote@^18.0.0, pacote@^18.0.1, pacote@^18.0.3: ssri "^10.0.0" tar "^6.1.11" +pako@^2.0.4: + version "2.1.0" + resolved "https://registry.yarnpkg.com/pako/-/pako-2.1.0.tgz#266cc37f98c7d883545d11335c00fbd4062c9a86" + integrity sha512-w+eufiZ1WuJYgPXbV/PO3NCMEc3xqylkKHzp8bxp1uW4qaSNQUkwmLLEc3kKsfz8lpV1F8Ht3U1Cm+9Srog2ug== + parent-module@^1.0.0: version "1.0.1" resolved "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz" @@ -5982,7 +5987,7 @@ proto-list@~1.2.1: resolved "https://registry.yarnpkg.com/proto-list/-/proto-list-1.2.4.tgz#212d5bfe1318306a420f6402b8e26ff39647a849" integrity sha512-vtK/94akxsTMhe0/cbfpR+syPuszcuwhqVjJq26CuNDgFGj682oRBXOP5MJpv2r7JtE8MsiepGIqvvOTBwn2vA== -protobufjs@^6.11.4: +protobufjs@^6.11.3: version "6.11.4" resolved "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.4.tgz" integrity sha512-5kQWPaJHi1WoCpjTGszzQ32PG2F4+wRY6BmAT4Vfw56Q2FZ4YZzK20xUYQH4YkfehY1e6QSICrJquM6xXZNcrw== @@ -6619,6 +6624,25 @@ source-map@^0.6.0, source-map@^0.6.1: resolved "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz" integrity sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g== +sparkplug-client@^3.2.4: + version "3.2.4" + resolved "https://registry.yarnpkg.com/sparkplug-client/-/sparkplug-client-3.2.4.tgz#96ae0650339049099272c17a16aae7fe1278f302" + integrity sha512-Yck+cKwoS3RH+UYGyTiH9SXALT9foXQzkHCrJTUCgdS39J21NdVLIMTL9HwkkeunrxtKDakEI+UjTOIXEhWg7A== + dependencies: + debug "^4.3.4" + mqtt "^4.2.8" + pako "^2.0.4" + sparkplug-payload "^1.0.3" + +sparkplug-payload@^1.0.3: + version "1.0.3" + resolved "https://registry.yarnpkg.com/sparkplug-payload/-/sparkplug-payload-1.0.3.tgz#b8a54c9acf30b82ec9bae6ad4e2978cf89f7df2a" + integrity sha512-JAQSyuHVQQe/LzIlJdcIaD1F1c+rbXoolII3H1whQkhuZ96+G53RoPWqP9zCPZYFjvfSMHnQNIedGIZw/zjHJw== + dependencies: + "@types/long" "^4.0.0" + long "^4.0.0" + protobufjs "^6.11.3" + spawn-error-forwarder@~1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/spawn-error-forwarder/-/spawn-error-forwarder-1.0.0.tgz#1afd94738e999b0346d7b9fc373be55e07577029" @@ -6745,16 +6769,7 @@ stream-shift@^1.0.2: resolved "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz" integrity sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ== -"string-width-cjs@npm:string-width@^4.2.0": - version "4.2.3" - resolved "https://registry.npmjs.org/string-width/-/string-width-4.2.3.tgz" - integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== - dependencies: - emoji-regex "^8.0.0" - is-fullwidth-code-point "^3.0.0" - strip-ansi "^6.0.1" - -string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.3: +"string-width-cjs@npm:string-width@^4.2.0", string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.3: version "4.2.3" resolved "https://registry.npmjs.org/string-width/-/string-width-4.2.3.tgz" integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== @@ -6824,7 +6839,7 @@ string_decoder@~1.1.1: dependencies: safe-buffer "~5.1.0" -"strip-ansi-cjs@npm:strip-ansi@^6.0.1": +"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.0, strip-ansi@^6.0.1: version "6.0.1" resolved "https://registry.npmjs.org/strip-ansi/-/strip-ansi-6.0.1.tgz" integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== @@ -6838,13 +6853,6 @@ strip-ansi@^3.0.0: dependencies: ansi-regex "^2.0.0" -strip-ansi@^6.0.0, strip-ansi@^6.0.1: - version "6.0.1" - resolved "https://registry.npmjs.org/strip-ansi/-/strip-ansi-6.0.1.tgz" - integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== - dependencies: - ansi-regex "^5.0.1" - strip-ansi@^7.0.1, strip-ansi@^7.1.0: version "7.1.0" resolved "https://registry.npmjs.org/strip-ansi/-/strip-ansi-7.1.0.tgz" @@ -7536,7 +7544,7 @@ workerpool@6.2.1: resolved "https://registry.npmjs.org/workerpool/-/workerpool-6.2.1.tgz" integrity sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw== -"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0": +"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0", wrap-ansi@^7.0.0: version "7.0.0" resolved "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz" integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== @@ -7554,15 +7562,6 @@ wrap-ansi@^6.2.0: string-width "^4.1.0" strip-ansi "^6.0.0" -wrap-ansi@^7.0.0: - version "7.0.0" - resolved "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz" - integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== - dependencies: - ansi-styles "^4.0.0" - string-width "^4.1.0" - strip-ansi "^6.0.0" - wrap-ansi@^8.1.0: version "8.1.0" resolved "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-8.1.0.tgz" From c452b9f417695dbb8f0eb641e1647aa3ddbaa940 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Dalfors?= Date: Mon, 20 May 2024 13:17:09 +0200 Subject: [PATCH 5/5] add sparkplug messages to demovideo --- src/spec/demoVideo.ts | 4 +- src/spec/mock-sparkplugb.ts | 249 ++++++++++++++++++++++++++++++++++++ 2 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 src/spec/mock-sparkplugb.ts diff --git a/src/spec/demoVideo.ts b/src/spec/demoVideo.ts index ef76c98..bec1b97 100644 --- a/src/spec/demoVideo.ts +++ b/src/spec/demoVideo.ts @@ -5,6 +5,7 @@ import * as path from 'path' import { ElectronApplication, _electron as electron } from 'playwright' import mockMqtt, { stop as stopMqtt } from './mock-mqtt' +import { default as MockSparkplug } from './mock-sparkplugb' import { clearOldTopics } from './scenarios/clearOldTopics' import { clearSearch, searchTree } from './scenarios/searchTree' import { clickOnHistory, createFakeMousePointer, hideText, showText, sleep } from './util' @@ -64,6 +65,7 @@ async function doStuff() { const scenes = new SceneBuilder() await scenes.record('connect', async () => { await connectTo('127.0.0.1', page) + await MockSparkplug.run() // Start sparkplug client after connect or birth topics will be missed await sleep(1000) }) @@ -142,7 +144,7 @@ async function doStuff() { }) stopMqtt() - console.log('Stopped mqtt') + console.log('Stopped mqtt client') cleanUp(scenes, electronApp) } diff --git a/src/spec/mock-sparkplugb.ts b/src/spec/mock-sparkplugb.ts new file mode 100644 index 0000000..127b218 --- /dev/null +++ b/src/spec/mock-sparkplugb.ts @@ -0,0 +1,249 @@ +/******************************************************************************** + * Copyright (c) 2016-2018 Cirrus Link Solutions and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Cirrus Link Solutions - initial implementation + ********************************************************************************/ +import * as SparkplugClient from 'sparkplug-client' +import type { UPayload } from 'sparkplug-client' +import type { UMetric } from 'sparkplug-payload/lib/sparkplugbpayload' + +/* + * Main sample function which includes the run() function for running the sample + */ + + +export interface MockSparkplugClient { + stop: () => void +} + +var sample = (function () { + var config = { + serverUrl: 'tcp://127.0.0.1:1883', + username: '', + password: '', + groupId: 'Sparkplug Devices', + edgeNode: 'JavaScript Edge Node', + clientId: 'JavaScriptSimpleEdgeNode', + version: 'spBv1.0', + }, + hwVersion = 'Emulated Hardware', + swVersion = 'v1.0.0', + deviceId = 'Emulated Device', + sparkPlugClient, + publishPeriod = 5000, + // Generates a random integer + randomInt = function () { + return 1 + Math.floor(Math.random() * 10) + }, + // Get BIRTH payload for the edge node + getNodeBirthPayload = function (): UPayload { + return { + timestamp: new Date().getTime(), + metrics: [ + { + name: 'Node Control/Rebirth', + type: 'Boolean', + value: false, + }, + { + name: 'Template1', + type: 'Template', + value: { + isDefinition: true, + metrics: [ + { name: 'myBool', value: false, type: 'Boolean' }, + { name: 'myInt', value: 0, type: 'UInt32' }, + ], + parameters: [ + { + name: 'param1', + type: 'String', + value: 'value1', + }, + ], + }, + }, + ], + } + }, + // Get BIRTH payload for the device + getDeviceBirthPayload = function (): UPayload { + return { + timestamp: new Date().getTime(), + metrics: [ + { name: 'my_boolean', value: Math.random() > 0.5, type: 'Boolean' }, + { name: 'my_double', value: Math.random() * 0.123456789, type: 'Double' }, + { name: 'my_float', value: Math.random() * 0.123, type: 'Float' }, + { name: 'my_int', value: randomInt(), type: 'Int8' }, + { name: 'my_long', value: randomInt() * 214748364700, type: 'Int64' }, + { name: 'Inputs/0', value: true, type: 'Boolean' }, + { name: 'Inputs/1', value: 0, type: 'Int8' }, + { name: 'Inputs/2', value: 1.23, type: 'UInt64' }, + { name: 'Outputs/0', value: true, type: 'Boolean' }, + { name: 'Outputs/1', value: 0, type: 'Int16' }, + { name: 'Outputs/2', value: 1.23, type: 'UInt64' }, + { name: 'Properties/hw_version', value: hwVersion, type: 'String' }, + { name: 'Properties/sw_version', value: swVersion, type: 'String' }, + { + name: 'my_dataset', + type: 'DataSet', + value: { + numOfColumns: 2, + types: ['String', 'String'], + columns: ['str1', 'str2'], + rows: [ + ['x', 'a'], + ['y', 'b'], + ], + }, + }, + { + name: 'TemplateInstance1', + type: 'Template', + value: { + templateRef: 'Template1', + isDefinition: false, + metrics: [ + { name: 'myBool', value: true, type: 'Boolean' }, + { name: 'myInt', value: 100, type: 'Int8' }, + ], + parameters: [ + { + name: 'param1', + type: 'String', + value: 'value2', + }, + ], + }, + }, + ], + } + }, + // Get data payload for the device + getDataPayload = function (): UPayload { + return { + timestamp: new Date().getTime(), + metrics: [ + { name: 'my_boolean', value: Math.random() > 0.5, type: 'Boolean' }, + { name: 'my_double', value: Math.random() * 0.123456789, type: 'Double' }, + { name: 'my_float', value: Math.random() * 0.123, type: 'UInt64' }, + { name: 'my_int', value: randomInt(), type: 'Int16' }, + { name: 'my_long', value: randomInt() * 214748364700, type: 'UInt64' }, + ], + } + }, + // Runs the sample + run = async function (): Promise { + // Create the SparkplugClient + const sparkplugClient = SparkplugClient.newClient(config) + let updateInterval: NodeJS.Timeout | null = null + const connected = new Promise((resolve) => { + + // Create 'birth' handler + sparkplugClient.on('birth', () => { + // Publish Node BIRTH certificate + sparkplugClient.publishNodeBirth(getNodeBirthPayload()) + // Publish Device BIRTH certificate + sparkplugClient.publishDeviceBirth(deviceId, getDeviceBirthPayload()) + resolve({ + stop: () => { + if (updateInterval) { + clearInterval(updateInterval) + } + sparkplugClient.stop() + } + }) + }) + }) + + // Create Incoming Message Handler + sparkplugClient.on('message', function (topic: string, payload: UPayload) { + console.log(topic, payload) + }) + + // Create node command handler + // spell-checker: disable-next-line + sparkplugClient.on('ncmd', function (payload: UPayload) { + var timestamp = payload.timestamp, + metrics = payload.metrics + + if (metrics !== undefined && metrics !== null) { + for (var i = 0; i < metrics.length; i++) { + var metric = metrics[i] + if (metric.name == 'Node Control/Rebirth' && metric.value) { + console.log("Received 'Rebirth' command") + // Publish Node BIRTH certificate + sparkplugClient.publishNodeBirth(getNodeBirthPayload()) + // Publish Device BIRTH certificate + sparkplugClient.publishDeviceBirth(deviceId, getDeviceBirthPayload()) + } + } + } + }) + + // Create device command handler + // spell-checker: disable-next-line + sparkplugClient.on('dcmd', function (deviceId: string, payload: UPayload) { + var timestamp = payload.timestamp, + metrics = payload.metrics, + inboundMetricMap: { [name: string]: any } = {}, + outboundMetric: Array = [], + outboundPayload: UPayload + + console.log('Command received for device ' + deviceId) + + // Loop over the metrics and store them in a map + if (metrics !== undefined && metrics !== null) { + for (var i = 0; i < metrics.length; i++) { + var metric = metrics[i] + if (metric.name !== undefined && metric.name !== null) { + inboundMetricMap[metric.name] = metric.value + } + } + } + if (inboundMetricMap['Outputs/0'] !== undefined && inboundMetricMap['Outputs/0'] !== null) { + console.log('Outputs/0: ' + inboundMetricMap['Outputs/0']) + outboundMetric.push({ name: 'Inputs/0', value: inboundMetricMap['Outputs/0'], type: 'Boolean' }) + outboundMetric.push({ name: 'Outputs/0', value: inboundMetricMap['Outputs/0'], type: 'Boolean' }) + console.log('Updated value for Inputs/0 ' + inboundMetricMap['Outputs/0']) + } else if (inboundMetricMap['Outputs/1'] !== undefined && inboundMetricMap['Outputs/1'] !== null) { + console.log('Outputs/1: ' + inboundMetricMap['Outputs/1']) + outboundMetric.push({ name: 'Inputs/1', value: inboundMetricMap['Outputs/1'], type: 'Int32' }) + outboundMetric.push({ name: 'Outputs/1', value: inboundMetricMap['Outputs/1'], type: 'Int32' }) + console.log('Updated value for Inputs/1 ' + inboundMetricMap['Outputs/1']) + } else if (inboundMetricMap['Outputs/2'] !== undefined && inboundMetricMap['Outputs/2'] !== null) { + console.log('Outputs/2: ' + inboundMetricMap['Outputs/2']) + outboundMetric.push({ name: 'Inputs/2', value: inboundMetricMap['Outputs/2'], type: 'UInt64' }) + outboundMetric.push({ name: 'Outputs/2', value: inboundMetricMap['Outputs/2'], type: 'UInt64' }) + console.log('Updated value for Inputs/2 ' + inboundMetricMap['Outputs/2']) + } + + outboundPayload = { + timestamp: new Date().getTime(), + metrics: outboundMetric, + } + + // Publish device data + sparkplugClient.publishDeviceData(deviceId, outboundPayload) + }) + + updateInterval = setInterval(function () { + // Publish device data + sparkplugClient.publishDeviceData(deviceId, getDataPayload()) + + + }, 2000) + return connected + } + + return { run: run } +})() + +export default sample \ No newline at end of file