chore: decode data in frontend
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import * as FileAsync from 'lowdb/adapters/FileAsync'
|
||||
import * as fs from 'fs-extra'
|
||||
import * as lowdb from 'lowdb'
|
||||
import * as path from 'path'
|
||||
import FileAsync from 'lowdb/adapters/FileAsync'
|
||||
import fs from 'fs-extra'
|
||||
import lowdb from 'lowdb'
|
||||
import path from 'path'
|
||||
import { backendRpc } from '../../events'
|
||||
import { storageClearEvent, storageLoadEvent, storageStoreEvent } from '../../events/StorageEvents'
|
||||
|
||||
|
||||
@@ -98,7 +98,7 @@ export class MqttSource implements DataSource<MqttOptions> {
|
||||
|
||||
public publish(msg: MqttMessage) {
|
||||
if (this.client) {
|
||||
this.client.publish(msg.topic, msg.payload ? Base64Message.toUnicodeString(msg.payload) : '', {
|
||||
this.client.publish(msg.topic, msg.payload?.toBuffer() ?? '', {
|
||||
qos: msg.qos,
|
||||
retain: msg.retain,
|
||||
})
|
||||
|
||||
@@ -3,93 +3,55 @@ import { Decoder } from './Decoder'
|
||||
import { TopicDataType } from './TreeNode'
|
||||
|
||||
export class Base64Message {
|
||||
private base64Message: string
|
||||
public base64Message: string
|
||||
private unicodeValue: string
|
||||
public error?: string
|
||||
public decoder: Decoder
|
||||
public length: number
|
||||
|
||||
private constructor(base64Str: string) {
|
||||
this.base64Message = base64Str
|
||||
this.unicodeValue = Base64.decode(base64Str)
|
||||
this.length = base64Str.length
|
||||
constructor(base64Str?: string, error?: string) {
|
||||
this.base64Message = base64Str ?? ''
|
||||
this.error = error
|
||||
this.unicodeValue = Base64.decode(base64Str ?? '')
|
||||
this.length = base64Str?.length ?? 0
|
||||
this.decoder = Decoder.NONE
|
||||
}
|
||||
|
||||
public static toUnicodeString(message: Base64Message) {
|
||||
return message.unicodeValue || ''
|
||||
public toUnicodeString() {
|
||||
return this.unicodeValue || ''
|
||||
}
|
||||
|
||||
public static fromBuffer(buffer: Buffer) {
|
||||
return new Base64Message(buffer.toString('base64'))
|
||||
}
|
||||
|
||||
public toBuffer(): Buffer {
|
||||
return Buffer.from(this.base64Message, 'base64')
|
||||
}
|
||||
|
||||
public static fromString(str: string) {
|
||||
return new Base64Message(Base64.encode(str))
|
||||
}
|
||||
|
||||
/* Raw message conversions ('uint8' | 'uint16' | 'uint32' | 'uint64' | 'int8' | 'int16' | 'int32' | 'int64' | 'float' | 'double') */
|
||||
public static format(message: Base64Message | null, type: TopicDataType = 'string'): [string, 'json' | undefined] {
|
||||
if (!message) {
|
||||
return ['', undefined]
|
||||
}
|
||||
|
||||
public format(type: TopicDataType = 'string'): [string, 'json' | undefined] {
|
||||
try {
|
||||
switch (type) {
|
||||
case 'json': {
|
||||
const json = JSON.parse(Base64Message.toUnicodeString(message))
|
||||
const json = JSON.parse(this.toUnicodeString())
|
||||
return [JSON.stringify(json, undefined, ' '), 'json']
|
||||
}
|
||||
case 'hex': {
|
||||
const hex = Base64Message.toHex(message)
|
||||
const hex = Base64Message.toHex(this)
|
||||
return [hex, undefined]
|
||||
}
|
||||
case 'uint8': {
|
||||
const uint = Base64Message.toUInt(message, 1)
|
||||
return [uint ? uint : '', undefined]
|
||||
}
|
||||
case 'uint16': {
|
||||
const uint = Base64Message.toUInt(message, 2)
|
||||
return [uint ? uint : '', undefined]
|
||||
}
|
||||
case 'uint32': {
|
||||
const uint = Base64Message.toUInt(message, 4)
|
||||
return [uint ? uint : '', undefined]
|
||||
}
|
||||
case 'uint64': {
|
||||
const uint = Base64Message.toUInt(message, 8)
|
||||
return [uint ? uint : '', undefined]
|
||||
}
|
||||
case 'int8': {
|
||||
const int = Base64Message.toInt(message, 1)
|
||||
return [int ? int : '', undefined]
|
||||
}
|
||||
case 'int16': {
|
||||
const int = Base64Message.toInt(message, 2)
|
||||
return [int ? int : '', undefined]
|
||||
}
|
||||
case 'int32': {
|
||||
const int = Base64Message.toInt(message, 4)
|
||||
return [int ? int : '', undefined]
|
||||
}
|
||||
case 'int64': {
|
||||
const int = Base64Message.toInt(message, 8)
|
||||
return [int ? int : '', undefined]
|
||||
}
|
||||
case 'float': {
|
||||
const float = Base64Message.toFloat(message, 4)
|
||||
return [float ? float : '', undefined]
|
||||
}
|
||||
case 'double': {
|
||||
const float = Base64Message.toFloat(message, 8)
|
||||
return [float ? float : '', undefined]
|
||||
}
|
||||
default: {
|
||||
const str = Base64Message.toUnicodeString(message)
|
||||
const str = this.toUnicodeString()
|
||||
return [str, undefined]
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
const str = Base64Message.toUnicodeString(message)
|
||||
const str = this.toUnicodeString()
|
||||
return [str, undefined]
|
||||
}
|
||||
}
|
||||
@@ -105,89 +67,6 @@ export class Base64Message {
|
||||
return str.trimRight()
|
||||
}
|
||||
|
||||
public static toUInt(message: Base64Message, bytes: number) {
|
||||
const buf = Buffer.from(message.base64Message, 'base64')
|
||||
|
||||
let str: String[] = []
|
||||
switch (bytes) {
|
||||
case 1:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readUInt8(index).toString())
|
||||
}
|
||||
break
|
||||
case 2:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readUInt16LE(index).toString())
|
||||
}
|
||||
break
|
||||
case 4:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readUInt32LE(index).toString())
|
||||
}
|
||||
break
|
||||
case 8:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readBigUInt64LE(index).toString())
|
||||
}
|
||||
break
|
||||
default:
|
||||
return undefined
|
||||
}
|
||||
return str.join(', ')
|
||||
}
|
||||
|
||||
public static toInt(message: Base64Message, bytes: number) {
|
||||
const buf = Buffer.from(message.base64Message, 'base64')
|
||||
|
||||
let str: String[] = []
|
||||
switch (bytes) {
|
||||
case 1:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readInt8(index).toString())
|
||||
}
|
||||
break
|
||||
case 2:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readInt16LE(index).toString())
|
||||
}
|
||||
break
|
||||
case 4:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readInt32LE(index).toString())
|
||||
}
|
||||
break
|
||||
case 8:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readBigInt64LE(index).toString())
|
||||
}
|
||||
break
|
||||
default:
|
||||
return undefined
|
||||
}
|
||||
return str.join(', ')
|
||||
}
|
||||
|
||||
public static toFloat(message: Base64Message, bytes: number) {
|
||||
const buf = Buffer.from(message.base64Message, 'base64')
|
||||
|
||||
let str: String[] = []
|
||||
switch (bytes) {
|
||||
case 4:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readFloatLE(index).toString())
|
||||
}
|
||||
break
|
||||
case 8:
|
||||
for (let index = 0; index < buf.length; index += bytes) {
|
||||
str.push(buf.readDoubleLE(index).toString())
|
||||
}
|
||||
break
|
||||
default:
|
||||
return undefined
|
||||
}
|
||||
return str.join(', ')
|
||||
}
|
||||
|
||||
public static toDataUri(message: Base64Message, mimeType: string) {
|
||||
return `data:${mimeType};base64,${message.base64Message}`
|
||||
}
|
||||
|
||||
@@ -1,9 +1,31 @@
|
||||
import { Destroyable } from './Destroyable'
|
||||
import { Edge, Message, RingBuffer, MessageHistory } from './'
|
||||
import { EventDispatcher } from '../../../events'
|
||||
import { IDecoder, decoders } from './sparkplugb'
|
||||
import { Base64Message } from './Base64Message'
|
||||
|
||||
// export type TopicDataType = 'json' | 'string' | 'hex' | 'integer' | 'unsigned int' | 'floating point'
|
||||
export type TopicDataType = 'json' | 'string' | 'hex' | 'uint8' | 'uint16' | 'uint32' | 'uint64' | 'int8' | 'int16' | 'int32' | 'int64' | 'float' | 'double'
|
||||
export type TopicDataType =
|
||||
| 'json'
|
||||
| 'string'
|
||||
| 'hex'
|
||||
| 'uint8'
|
||||
| 'uint16'
|
||||
| 'uint32'
|
||||
| 'uint64'
|
||||
| 'int8'
|
||||
| 'int16'
|
||||
| 'int32'
|
||||
| 'int64'
|
||||
| 'float'
|
||||
| 'double'
|
||||
|
||||
function findDecoder<T extends Destroyable>(node: TreeNode<T>): IDecoder | undefined {
|
||||
return decoders.find(
|
||||
decoder =>
|
||||
decoder.canDecodeTopic?.(node.path()) || (node.message?.payload && decoder.canDecodeData?.(node.message?.payload))
|
||||
)
|
||||
}
|
||||
|
||||
export class TreeNode<ViewModel extends Destroyable> {
|
||||
public sourceEdge?: Edge<ViewModel>
|
||||
@@ -22,6 +44,28 @@ export class TreeNode<ViewModel extends Destroyable> {
|
||||
public isTree = false
|
||||
public type: TopicDataType = 'json'
|
||||
|
||||
private _decoder?: IDecoder
|
||||
|
||||
public decoderFormat?: string
|
||||
|
||||
get decoder(): IDecoder | undefined {
|
||||
if (!this._decoder) {
|
||||
this._decoder = findDecoder(this)
|
||||
}
|
||||
return this._decoder
|
||||
}
|
||||
|
||||
set decoder(override: IDecoder | undefined) {
|
||||
this._decoder = override
|
||||
this.message && this.onMerge.dispatch()
|
||||
}
|
||||
|
||||
decodeMessage(message: Message): Base64Message | null {
|
||||
const decoder = this.decoder
|
||||
|
||||
return this.decoder && message.payload ? this.decoder.decode(message.payload, this.decoderFormat) : message.payload
|
||||
}
|
||||
|
||||
private cachedPath?: string
|
||||
private cachedChildTopics?: Array<TreeNode<ViewModel>>
|
||||
private cachedLeafMessageCount?: number
|
||||
@@ -157,7 +201,7 @@ export class TreeNode<ViewModel extends Destroyable> {
|
||||
|
||||
public path(): string {
|
||||
if (!this.cachedPath) {
|
||||
return this.branch()
|
||||
this.cachedPath = this.branch()
|
||||
.map(node => node.sourceEdge && node.sourceEdge.name)
|
||||
.filter(name => name !== undefined)
|
||||
.join('/')
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { Destroyable } from './Destroyable'
|
||||
import { Edge, Tree, TreeNode } from './'
|
||||
import { MqttMessage } from '../../../events'
|
||||
import { Base64Message } from './Base64Message'
|
||||
|
||||
export abstract class TreeNodeFactory {
|
||||
private static messageCounter = 0
|
||||
@@ -30,6 +31,7 @@ export abstract class TreeNodeFactory {
|
||||
mqttMessage.retain
|
||||
node.setMessage({
|
||||
...mqttMessage,
|
||||
payload: mqttMessage.payload && new Base64Message(mqttMessage.payload?.base64Message),
|
||||
length: mqttMessage.payload?.length ?? 0,
|
||||
received: receiveDate,
|
||||
messageNumber: this.messageCounter,
|
||||
|
||||
@@ -1,21 +1,98 @@
|
||||
import { Base64Message } from './Base64Message'
|
||||
import { Decoder } from './Decoder'
|
||||
import { get } from 'sparkplug-payload'
|
||||
var sparkplug = get("spBv1.0")
|
||||
var sparkplug = get('spBv1.0')
|
||||
|
||||
export const SparkplugDecoder = {
|
||||
decode(input: Buffer): Base64Message {
|
||||
export interface IDecoder<T = string> {
|
||||
/**
|
||||
* Can be used to
|
||||
* @param topic
|
||||
*/
|
||||
formats: T[]
|
||||
canDecodeTopic?(topic: string): boolean
|
||||
canDecodeData?(data: Base64Message): boolean
|
||||
decode(input: Base64Message, format: T | string | undefined): Base64Message
|
||||
|
||||
/**
|
||||
* If this is just an intermediate decoder, next-decoder can be defined
|
||||
*/
|
||||
nextDecoder?: IDecoder
|
||||
}
|
||||
|
||||
export const SparkplugDecoder: IDecoder = {
|
||||
formats: ['Sparkplug'],
|
||||
canDecodeTopic(topic: string) {
|
||||
return !!topic.match(/spBv1\.0\/[^/]+\/(DDATA|NDATA|NCMD|DCMD|NBIRTH|DBIRTH|NDEATH|DDEATH\/[^/]+\/)/u)
|
||||
},
|
||||
decode(input: Base64Message): Base64Message {
|
||||
try {
|
||||
const message = Base64Message.fromString(JSON.stringify(
|
||||
// @ts-ignore
|
||||
sparkplug.decodePayload(new Uint8Array(input)))
|
||||
const message = Base64Message.fromString(
|
||||
JSON.stringify(
|
||||
// @ts-ignore
|
||||
sparkplug.decodePayload(new Uint8Array(input.toBuffer()))
|
||||
)
|
||||
)
|
||||
message.decoder = Decoder.SPARKPLUG
|
||||
return message
|
||||
} catch {
|
||||
const message = Base64Message.fromString("Failed to decode sparkplugb payload")
|
||||
const message = new Base64Message(undefined, 'Failed to decode sparkplugb payload')
|
||||
message.decoder = Decoder.NONE
|
||||
return message
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
export const StringDecoder: IDecoder = {
|
||||
formats: ['string'],
|
||||
decode(input: Base64Message): Base64Message {
|
||||
return input
|
||||
},
|
||||
}
|
||||
|
||||
type BinaryFormats =
|
||||
| 'int8'
|
||||
| 'int16'
|
||||
| 'int32'
|
||||
| 'int64'
|
||||
| 'uint8'
|
||||
| 'uint16'
|
||||
| 'uint32'
|
||||
| 'uint64'
|
||||
| 'float'
|
||||
| 'double'
|
||||
|
||||
/**
|
||||
* Binary decode primitive binary data type and arrays of these
|
||||
*/
|
||||
export const BinaryDecoder: IDecoder<BinaryFormats> = {
|
||||
formats: ['int8', 'int16', 'int32', 'int64', 'uint8', 'uint16', 'uint32', 'uint64', 'float', 'double'],
|
||||
decode(input: Base64Message, format: BinaryFormats): Base64Message {
|
||||
const decodingOption = {
|
||||
int8: [Buffer.prototype.readInt8, 1],
|
||||
int16: [Buffer.prototype.readInt16LE, 2],
|
||||
int32: [Buffer.prototype.readInt32LE, 4],
|
||||
int64: [Buffer.prototype.readBigInt64LE, 8],
|
||||
uint8: [Buffer.prototype.readUint8, 1],
|
||||
uint16: [Buffer.prototype.readUint16LE, 2],
|
||||
uint32: [Buffer.prototype.readUint32LE, 4],
|
||||
uint64: [Buffer.prototype.readBigUint64LE, 8],
|
||||
float: [Buffer.prototype.readFloatLE, 4],
|
||||
double: [Buffer.prototype.readDoubleLE, 8],
|
||||
} as const
|
||||
|
||||
const [readNumber, bytesToRead] = decodingOption[format]
|
||||
|
||||
const buf = input.toBuffer()
|
||||
let str: String[] = []
|
||||
if (buf.length % bytesToRead !== 0) {
|
||||
return new Base64Message(undefined, 'Data type does not align with message')
|
||||
}
|
||||
for (let index = 0; index < buf.length; index += bytesToRead) {
|
||||
str.push((readNumber as any).apply(buf, [index]).toString())
|
||||
}
|
||||
|
||||
return Base64Message.fromString(JSON.stringify(str.length === 1 ? str[0] : str))
|
||||
},
|
||||
}
|
||||
|
||||
export const decoders = [SparkplugDecoder, BinaryDecoder, StringDecoder] as const
|
||||
|
||||
@@ -48,12 +48,7 @@ export class ConnectionManager {
|
||||
}
|
||||
|
||||
let decoded_payload = null
|
||||
// spell-checker: disable-next-line
|
||||
if (topic.match(/spBv1\.0\/[^/]+\/(DDATA|NDATA|NCMD|DCMD|NBIRTH|DBIRTH|NDEATH|DDEATH\/[^/]+\/)/u)) {
|
||||
decoded_payload = SparkplugDecoder.decode(buffer)
|
||||
} else {
|
||||
decoded_payload = Base64Message.fromBuffer(buffer)
|
||||
}
|
||||
decoded_payload = Base64Message.fromBuffer(buffer)
|
||||
|
||||
backendEvents.emit(messageEvent, {
|
||||
topic,
|
||||
|
||||
Reference in New Issue
Block a user