Expose message Ids to the user

This commit is contained in:
Thomas Nordquist
2020-04-20 18:23:15 +02:00
parent 6fc0d3f28d
commit 07458cd712
25 changed files with 145 additions and 106 deletions

View File

@@ -1,7 +1,7 @@
import { DataSourceStateMachine } from './'
import { MqttMessage } from '../../../events'
type MessageCallback = (topic: string, payload: Buffer) => void
type MessageCallback = (topic: string, payload: Buffer, packet: any) => void
// A DataSource should automatically reconnect if connection was broken
interface DataSource<DataSourceOptions> {

View File

@@ -1,8 +1,17 @@
import { Base64Message } from './Base64Message'
import { QoS } from '../DataSource/MqttSource'
export interface Message {
value?: Base64Message
// mqtt based info
payload: Base64Message | null
messageId?: number
retain: boolean
qos: QoS
// meta info
length: number
received: Date
// Global message counter, not mqtt related
messageNumber: number
}

View File

@@ -75,13 +75,7 @@ export class Tree<ViewModel extends Destroyable> extends TreeNode<ViewModel> {
public applyUnmergedChanges() {
this.unmergedMessages.popAll().forEach(bufferedItem => {
const edges = bufferedItem.message.topic.split('/')
const node = TreeNodeFactory.fromEdgesAndValue<ViewModel>(
edges,
bufferedItem.message.payload,
bufferedItem.received
)
node.mqttMessage = bufferedItem.message
const node = TreeNodeFactory.fromMessage<ViewModel>(bufferedItem.message, bufferedItem.received)
if (!this.nodeFilter || this.nodeFilter(node)) {
this.updateWithNode(node.firstNode())

View File

@@ -1,11 +1,10 @@
import { Destroyable } from './Destroyable'
import { Edge, Message, RingBuffer, MessageHistory } from './'
import { EventDispatcher, MqttMessage } from '../../../events'
import { EventDispatcher } from '../../../events'
export class TreeNode<ViewModel extends Destroyable> {
public sourceEdge?: Edge<ViewModel>
public message?: Message
public mqttMessage?: MqttMessage
public messageHistory: MessageHistory = new RingBuffer<Message>(20000, 100)
public viewModel?: ViewModel
public edges: { [s: string]: Edge<ViewModel> } = {}
@@ -105,7 +104,7 @@ export class TreeNode<ViewModel extends Destroyable> {
}
public hasMessage() {
return this.message && this.message.value && this.message.value.length !== 0
return this.message && this.message.payload && this.message.length !== 0
}
public destroy() {
@@ -131,7 +130,6 @@ export class TreeNode<ViewModel extends Destroyable> {
public unconnectedClone() {
const node = new TreeNode<ViewModel>()
node.message = this.message
node.mqttMessage = this.mqttMessage
node.messageHistory = this.messageHistory.clone()
node.messages = this.messages
node.lastUpdate = this.lastUpdate
@@ -203,7 +201,6 @@ export class TreeNode<ViewModel extends Destroyable> {
if (node.message) {
this.setMessage(node.message)
this.onMessage.dispatch(node.message)
this.mqttMessage = node.mqttMessage
}
this.removeFromTreeIfEmpty()
@@ -236,7 +233,7 @@ export class TreeNode<ViewModel extends Destroyable> {
public childTopics(): Array<TreeNode<ViewModel>> {
if (this.cachedChildTopics === undefined) {
const initialValue = this.message && this.message.value ? [this] : []
const initialValue = this.message && this.message.payload ? [this] : []
this.cachedChildTopics = this.edgeArray
.map(e => e.target.childTopics())

View File

@@ -1,6 +1,6 @@
import { Base64Message } from './Base64Message'
import { Destroyable } from './Destroyable'
import { Edge, Tree, TreeNode } from './'
import { MqttMessage } from '../../../events'
export abstract class TreeNodeFactory {
private static messageCounter = 0
@@ -20,21 +20,23 @@ export abstract class TreeNodeFactory {
node.sourceEdge!.target = node
}
public static fromEdgesAndValue<ViewModel extends Destroyable>(
edgeNames: Array<string>,
value?: Base64Message | null,
public static fromMessage<ViewModel extends Destroyable>(
mqttMessage: MqttMessage,
receiveDate: Date = new Date()
): TreeNode<ViewModel> {
const node = new TreeNode<ViewModel>()
const edges = mqttMessage.topic.split('/')
mqttMessage.retain
node.setMessage({
value: value || undefined,
length: value ? value.length : 0,
...mqttMessage,
length: mqttMessage.payload?.length ?? 0,
received: receiveDate,
messageNumber: this.messageCounter,
})
this.messageCounter += 1
this.insertNodeAtPosition<ViewModel>(edgeNames, node)
this.insertNodeAtPosition<ViewModel>(edges, node)
return node
}

View File

@@ -1,6 +1,7 @@
import { Edge, TreeNodeFactory } from '../'
import { expect } from 'chai'
import 'mocha'
import { expect } from 'chai'
import { Edge } from '../'
import { makeTreeNode } from './makeTreeNode'
describe('Edge', () => {
it('should contain a name', () => {
@@ -9,8 +10,7 @@ describe('Edge', () => {
})
it('firstEdge should retrieve the first edge', () => {
const topics = 'foo/bar/baz'.split('/')
const leaf = TreeNodeFactory.fromEdgesAndValue(topics, undefined)
const leaf = makeTreeNode('foo/bar/baz')
const bazEdge = leaf.sourceEdge
if (!bazEdge) {
@@ -34,11 +34,8 @@ describe('Edge', () => {
})
it('hash should include change if parents are different', () => {
const topics1 = 'foo/bar/baz'.split('/')
const bazEdge1 = TreeNodeFactory.fromEdgesAndValue(topics1, undefined).sourceEdge
const topics2 = 'foo/foo/baz'.split('/')
const bazEdge2 = TreeNodeFactory.fromEdgesAndValue(topics2, undefined).sourceEdge
const bazEdge1 = makeTreeNode('foo/bar/baz').sourceEdge
const bazEdge2 = makeTreeNode('foo/foo/baz').sourceEdge
if (!bazEdge1 || !bazEdge2) {
throw Error('should not happen')

View File

@@ -1,18 +1,16 @@
import 'mocha'
import { Tree, TreeNodeFactory } from '../'
import { expect } from 'chai'
import { Tree } from '../'
import { makeTreeNode } from './makeTreeNode'
describe('Tree', () => {
it('node can be merged into a tree', () => {
const tree = new Tree()
const topics = 'foo/bar'.split('/')
const leaf = TreeNodeFactory.fromEdgesAndValue(topics, undefined)
const leaf = makeTreeNode('foo/bar')
tree.updateWithNode(leaf.firstNode())
const expectedNode = tree.findNode('foo/bar')
expect(expectedNode).to.eq(leaf)
})
})

View File

@@ -2,11 +2,11 @@ import 'mocha'
import { TreeNodeFactory } from '../'
import { expect } from 'chai'
import { makeTreeNode } from './makeTreeNode'
describe('TreeNode.findNode', () => {
it('findNode should retrieve node', () => {
const topics = 'foo/bar/baz'.split('/')
const leaf = TreeNodeFactory.fromEdgesAndValue(topics, undefined)
const leaf = makeTreeNode('foo/bar/baz')
const root = leaf.firstNode()
expect(root.sourceEdge).to.eq(undefined)

View File

@@ -1,55 +1,51 @@
import 'mocha'
import { TreeNodeFactory } from '../'
import { expect } from 'chai'
import { Base64Message } from '../Base64Message'
import { makeTreeNode } from './makeTreeNode'
describe('TreeNode', () => {
const number3 = Base64Message.fromString('3')
const number5 = Base64Message.fromString('5')
it('firstNode should retrieve first node', () => {
const topics = 'foo/bar'.split('/')
const leaf = TreeNodeFactory.fromEdgesAndValue(topics, undefined)
const leaf = makeTreeNode('foo/bar')
expect(leaf.firstNode().edges['foo']).to.not.eq(undefined)
})
it('updateWithNode should update value', () => {
const topics = 'foo/bar'.split('/')
const leaf = TreeNodeFactory.fromEdgesAndValue(topics, Base64Message.fromString('3'))
expect(Base64Message.toUnicodeString(leaf.message!.value!)).to.eq('3')
const updateLeave = TreeNodeFactory.fromEdgesAndValue(topics, Base64Message.fromString('5'))
const leaf = makeTreeNode('foo/bar', '3')
expect(Base64Message.toUnicodeString(leaf.message!.payload!)).to.eq('3')
const updateLeave = makeTreeNode('foo/bar', '5')
const root = leaf.firstNode()
root.updateWithNode(updateLeave.firstNode())
expect(root.sourceEdge).to.eq(undefined)
expect(Base64Message.toUnicodeString(leaf.message!.value!)).to.eq('5')
expect(Base64Message.toUnicodeString(leaf.message!.payload!)).to.eq('5')
})
it('updateWithNode should update intermediate nodes', () => {
const topics1 = 'foo/bar/baz'.split('/')
const leaf = TreeNodeFactory.fromEdgesAndValue(topics1, Base64Message.fromString('3'))
expect(Base64Message.toUnicodeString(leaf.message!.value!)).to.eq('3')
const leaf = makeTreeNode('foo/bar/baz', '3')
expect(Base64Message.toUnicodeString(leaf.message!.payload!)).to.eq('3')
const topics2 = 'foo/bar'.split('/')
const updateLeave = TreeNodeFactory.fromEdgesAndValue(topics2, Base64Message.fromString('5'))
const updateLeave = makeTreeNode('foo/bar', '5')
leaf.firstNode().updateWithNode(updateLeave.firstNode())
const barNode = leaf.firstNode().findNode('foo/bar')
expect(barNode && barNode.sourceEdge && barNode.sourceEdge.name).to.eq('bar')
expect(Base64Message.toUnicodeString(barNode!.message!.value!)).to.eq('5')
expect(Base64Message.toUnicodeString(barNode!.message!.payload!)).to.eq('5')
expect(leaf.sourceEdge && leaf.sourceEdge.name).to.eq('baz')
expect(Base64Message.toUnicodeString(leaf.message!.value!)).to.eq('3')
expect(Base64Message.toUnicodeString(leaf.message!.payload!)).to.eq('3')
})
it('updateWithNode should add nodes to the tree', () => {
const topics1 = 'foo/bar'.split('/')
const leaf1 = TreeNodeFactory.fromEdgesAndValue(topics1, Base64Message.fromString('foo'))
const topics2 = 'foo/bar/baz'.split('/')
const leaf2 = TreeNodeFactory.fromEdgesAndValue(topics2, Base64Message.fromString('bar'))
const leaf1 = makeTreeNode('foo/bar', 'foo')
const leaf2 = makeTreeNode('foo/bar/baz', 'bar')
leaf1.firstNode().updateWithNode(leaf2.firstNode())

View File

@@ -1,22 +1,17 @@
import 'mocha'
import { TreeNodeFactory } from '../'
import { expect } from 'chai'
import { Base64Message } from '../Base64Message'
import { makeTreeNode } from './makeTreeNode'
describe('TreeNodeFactory', () => {
it('root node must not have a sourceEdge', () => {
const topic = 'foo/bar'
const edges = topic.split('/')
const leaf = TreeNodeFactory.fromEdgesAndValue(edges, undefined)
const leaf = makeTreeNode('foo/bar')
expect(leaf.firstNode().sourceEdge).to.eq(undefined)
})
it('should create node', () => {
const topic = 'foo/bar'
const edges = topic.split('/')
const node = TreeNodeFactory.fromEdgesAndValue(edges, Base64Message.fromString('5'))
const node = makeTreeNode('foo/bar', '5')
if (!node.sourceEdge || !node.sourceEdge.source || !node.message) {
expect.fail('should not happen')
@@ -25,23 +20,21 @@ describe('TreeNodeFactory', () => {
expect(node).to.not.eq(undefined)
expect(node.sourceEdge.name).to.eq('bar')
expect(Base64Message.toUnicodeString(node.message.value!)).to.eq('5')
expect(Base64Message.toUnicodeString(node.message.payload!)).to.eq('5')
const foo = node.firstNode().findNode('foo')
expect(foo && foo.sourceEdge && foo.sourceEdge.name).to.eq('foo')
})
it('node should contain edges in order', () => {
const topic = 'foo/bar/baz'
const edges = topic.split('/')
const node = TreeNodeFactory.fromEdgesAndValue(edges, Base64Message.fromString('5'))
const node = makeTreeNode('foo/bar/baz', '5')
if (!node.sourceEdge || !node.sourceEdge.source || !node.message) {
expect.fail('should not happen')
return
}
expect(Base64Message.toUnicodeString(node.message.value!)).to.eq('5')
expect(Base64Message.toUnicodeString(node.message.payload!)).to.eq('5')
expect(node.sourceEdge.name).to.eq('baz')
const barNode = node.sourceEdge.source

View File

@@ -0,0 +1,16 @@
import { TreeNodeFactory } from '../'
import { Base64Message } from '../Base64Message'
import { TreeNode } from '../TreeNode'
import { MqttMessage } from '../../../../events'
export function makeTreeNode(topic: string, message?: string): TreeNode<any> {
const mqttMessage: MqttMessage = {
topic,
payload: message ? Base64Message.fromString(message) : null,
qos: 0,
retain: false,
messageId: undefined,
}
return TreeNodeFactory.fromMessage(mqttMessage)
}

View File

@@ -45,11 +45,13 @@ export class ConnectionManager {
if (buffer.length > 20000) {
buffer = buffer.slice(0, 20000)
}
backendEvents.emit(messageEvent, {
topic,
payload: Base64Message.fromBuffer(buffer),
qos: packet.qos,
retain: packet.retain,
messageId: packet.messageId,
})
})
}