Refactor communication

Add QoS andd retain flag
Refactor reducer
This commit is contained in:
Thomas Nordquist
2019-01-20 05:30:21 +01:00
parent 1839b551c0
commit f893d5ce60
21 changed files with 433 additions and 166 deletions

View File

@@ -1,4 +1,5 @@
import { DataSourceStateMachine } from './'
import { MqttMessage } from '../../../events'
type MessageCallback = (topic: string, payload: Buffer) => void
@@ -7,7 +8,7 @@ interface DataSource<DataSourceOptions> {
connect(options: DataSourceOptions): DataSourceStateMachine
disconnect(): void
onMessage(messageCallback: MessageCallback): void
publish(topic: string, payload: any): void
publish(msg: MqttMessage): void
topicSeparator: string
stateMachine: DataSourceStateMachine
}

View File

@@ -2,6 +2,7 @@ import * as Url from 'url'
import { Client, connect as mqttConnect } from 'mqtt'
import { DataSource, DataSourceStateMachine } from './'
import { MqttMessage } from '../../../events'
export interface MqttOptions {
url: string
@@ -15,11 +16,11 @@ export interface MqttOptions {
export class MqttSource implements DataSource<MqttOptions> {
public stateMachine: DataSourceStateMachine = new DataSourceStateMachine()
private client: Client | undefined
private messageCallback?: (topic: string, message: Buffer) => void
private messageCallback?: (topic: string, message: Buffer, packet: any) => void
private rootSubscription = '#'
public topicSeparator = '/'
public onMessage(messageCallback: (topic: string, message: Buffer) => void) {
public onMessage(messageCallback: (topic: string, message: Buffer, packet: any) => void) {
this.messageCallback = messageCallback
}
@@ -73,14 +74,14 @@ export class MqttSource implements DataSource<MqttOptions> {
})
client.on('message', (topic, message, packet) => {
this.messageCallback && this.messageCallback(topic, message)
this.messageCallback && this.messageCallback(topic, message, packet)
})
return this.stateMachine
}
public publish(topic: string, payload: any) {
this.client && this.client.publish(topic, payload)
public publish(msg: MqttMessage) {
this.client && this.client.publish(msg.topic, msg.payload, { qos: msg.qos, retain: msg.retain })
}
public disconnect() {

View File

@@ -1,15 +1,15 @@
import { Edge, Message, RingBuffer } from './'
import { EventDispatcher } from '../../../events'
import { EventDispatcher, MqttMessage } from '../../../events'
export class TreeNode {
public sourceEdge?: Edge
public message?: Message
public mqttMessage?: MqttMessage
public messageHistory: RingBuffer<Message> = new RingBuffer<Message>(3000, 100)
public edges: {[s: string]: Edge} = {}
public collapsed = false
public messages: number = 0
public lastUpdate: number = Date.now()
public onMerge = new EventDispatcher<void, TreeNode>(this)
public onEdgesChange = new EventDispatcher<void, TreeNode>(this)
public onMessage = new EventDispatcher<Message, TreeNode>(this)

View File

@@ -1,7 +1,7 @@
import {
AddMqttConnection,
EventDispatcher,
Message,
MqttMessage,
addMqttConnectionEvent,
backendEvents,
checkForUpdates,
@@ -36,19 +36,20 @@ export class ConnectionManager {
connection.connect(options)
this.handleNewMessagesForConnection(connectionId, connection)
backendEvents.subscribe(makePublishEvent(connectionId), (msg: Message) => {
this.connections[connectionId].publish(msg.topic, msg.payload)
backendEvents.subscribe(makePublishEvent(connectionId), (msg: MqttMessage) => {
this.connections[connectionId].publish(msg)
})
}
private handleNewMessagesForConnection(connectionId: string, connection: MqttSource) {
const messageEvent = makeConnectionMessageEvent(connectionId)
connection.onMessage((topic: string, payload: Buffer) => {
connection.onMessage((topic: string, payload: Buffer, packet: any) => {
let buffer = payload
if (buffer.length > 10000) {
buffer = buffer.slice(0, 10000)
}
backendEvents.emit(messageEvent, { topic, payload: buffer.toString('base64') })
backendEvents.emit(messageEvent, { topic, payload: buffer.toString(), qos: packet.qos, retain: packet.retain })
})
}