Clean up & Add connection setup
This commit is contained in:
@@ -8,6 +8,7 @@ interface DataSource<DataSourceOptions> {
|
||||
disconnect(): void
|
||||
onMessage(messageCallback: MessageCallback): void
|
||||
topicSeparator: string
|
||||
stateMachine: DataSourceStateMachine
|
||||
}
|
||||
|
||||
export { DataSource, MessageCallback }
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { EventEmitter } from 'events'
|
||||
import { EventDispatcher } from '../../../events'
|
||||
|
||||
export interface DataSourceState {
|
||||
connecting: boolean
|
||||
@@ -6,7 +6,8 @@ export interface DataSourceState {
|
||||
error?: Error
|
||||
}
|
||||
|
||||
export class DataSourceStateMachine extends EventEmitter {
|
||||
export class DataSourceStateMachine {
|
||||
public onUpdate = new EventDispatcher<DataSourceState, DataSourceStateMachine>(this)
|
||||
private state: DataSourceState = {
|
||||
error: undefined,
|
||||
connected: false,
|
||||
@@ -19,6 +20,7 @@ export class DataSourceStateMachine extends EventEmitter {
|
||||
error: undefined,
|
||||
connecting: false,
|
||||
}
|
||||
this.onUpdate.dispatch(this.state)
|
||||
}
|
||||
|
||||
public setError(error: Error) {
|
||||
@@ -27,6 +29,7 @@ export class DataSourceStateMachine extends EventEmitter {
|
||||
connected: false,
|
||||
connecting: false,
|
||||
}
|
||||
this.onUpdate.dispatch(this.state)
|
||||
}
|
||||
|
||||
public setConnecting() {
|
||||
@@ -35,6 +38,7 @@ export class DataSourceStateMachine extends EventEmitter {
|
||||
connected: false,
|
||||
connecting: true,
|
||||
}
|
||||
this.onUpdate.dispatch(this.state)
|
||||
}
|
||||
|
||||
public toJSON() {
|
||||
|
||||
@@ -3,9 +3,14 @@ import { DataSource, DataSourceStateMachine } from './'
|
||||
|
||||
export interface MqttOptions {
|
||||
url: string
|
||||
username?: string
|
||||
password?: string
|
||||
ssl: boolean
|
||||
sslValidation: boolean
|
||||
}
|
||||
|
||||
export class MqttSource implements DataSource<MqttOptions> {
|
||||
public stateMachine: DataSourceStateMachine = new DataSourceStateMachine()
|
||||
private client: Client | undefined
|
||||
private messageCallback?: (topic: string, message: Buffer) => void
|
||||
private rootSubscription = '#'
|
||||
@@ -16,8 +21,7 @@ export class MqttSource implements DataSource<MqttOptions> {
|
||||
}
|
||||
|
||||
public connect(options: MqttOptions): DataSourceStateMachine {
|
||||
const state = new DataSourceStateMachine()
|
||||
|
||||
this.stateMachine.setConnecting()
|
||||
const client = mqttConnect(options.url, {
|
||||
resubscribe: false,
|
||||
})
|
||||
@@ -25,19 +29,19 @@ export class MqttSource implements DataSource<MqttOptions> {
|
||||
this.client = client
|
||||
|
||||
client.on('error', (error: Error) => {
|
||||
state.setError(error)
|
||||
this.stateMachine.setError(error)
|
||||
})
|
||||
|
||||
client.on('close', () => {
|
||||
state.setConnected(false)
|
||||
this.stateMachine.setConnected(false)
|
||||
})
|
||||
|
||||
client.on('reconnect', () => {
|
||||
state.setConnecting()
|
||||
this.stateMachine.setConnecting()
|
||||
})
|
||||
|
||||
client.on('connect', () => {
|
||||
state.setConnected(true)
|
||||
this.stateMachine.setConnected(true)
|
||||
client.subscribe(this.rootSubscription, (err: Error) => {
|
||||
if (err) {
|
||||
throw new Error('mqtt subscription failed')
|
||||
@@ -49,7 +53,7 @@ export class MqttSource implements DataSource<MqttOptions> {
|
||||
this.messageCallback && this.messageCallback(topic, message)
|
||||
})
|
||||
|
||||
return state
|
||||
return this.stateMachine
|
||||
}
|
||||
|
||||
public disconnect() {
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
export class TopicProperties {
|
||||
public topicSeparator: string = '/'
|
||||
public multilevelWildcard: string | null = '#'
|
||||
}
|
||||
@@ -3,5 +3,4 @@ export { TreeNode, TreeNodeUpdateEvents } from './TreeNode'
|
||||
export { Message } from './Message'
|
||||
export { TreeNodeFactory } from './TreeNodeFactory'
|
||||
export { Tree } from './Tree'
|
||||
export { TopicProperties } from './TopicProperties'
|
||||
export { Hashable } from './Hashable'
|
||||
|
||||
@@ -1,34 +1,46 @@
|
||||
import * as socketIO from 'socket.io'
|
||||
const http = require('http')
|
||||
|
||||
import { TopicProperties, Tree, TreeNodeFactory } from './Model'
|
||||
import { addMqttConnectionEvent, backendEvents, makeConnectionStateEvent, makeConnectionMessageEvent, AddMqttConnection } from '../../events'
|
||||
import { MqttSource, DataSource } from './DataSource'
|
||||
|
||||
const options = { url: 'mqtt://nodered' }
|
||||
const dataSource = new MqttSource()
|
||||
class ConnectionManager {
|
||||
private connections: {[s: string]: DataSource<any>} = {}
|
||||
|
||||
const a: any[] = []
|
||||
|
||||
const server = http.createServer()
|
||||
const io = socketIO(server)
|
||||
io.on('connection', (client) => {
|
||||
console.log('connection')
|
||||
a.forEach((b) => {
|
||||
io.emit('message', b)
|
||||
})
|
||||
client.on('disconnect', () => { /* … */ })
|
||||
})
|
||||
server.listen(3000)
|
||||
|
||||
const state = dataSource.connect(options)
|
||||
dataSource.onMessage((topic: string, payload: Buffer) => {
|
||||
let buffer = payload
|
||||
if (a.length < 30) {
|
||||
a.push({ topic, payload: buffer.toString('base64') })
|
||||
}
|
||||
if (buffer.length > 10000) {
|
||||
buffer = buffer.slice(0, 10000)
|
||||
public manageConnections() {
|
||||
backendEvents.subscribe(addMqttConnectionEvent, this.handleConnectionRequest)
|
||||
}
|
||||
|
||||
io.emit('message', { topic, payload: buffer.toString('base64') })
|
||||
})
|
||||
private handleConnectionRequest = (event: AddMqttConnection) => {
|
||||
console.log(event)
|
||||
const connectionId = event.id
|
||||
const options = event.options
|
||||
const connection = new MqttSource()
|
||||
this.connections[connectionId] = connection
|
||||
|
||||
connection.stateMachine.onUpdate.subscribe((state) => {
|
||||
backendEvents.emit(makeConnectionStateEvent(connectionId), state)
|
||||
})
|
||||
|
||||
connection.connect(options)
|
||||
this.handleNewMessagesForConnection(connectionId, connection)
|
||||
}
|
||||
|
||||
private handleNewMessagesForConnection(connectionId: string, connection: MqttSource) {
|
||||
const messageEvent = makeConnectionMessageEvent(connectionId)
|
||||
connection.onMessage((topic: string, payload: Buffer) => {
|
||||
let buffer = payload
|
||||
if (buffer.length > 10000) {
|
||||
buffer = buffer.slice(0, 10000)
|
||||
}
|
||||
backendEvents.emit(messageEvent, { topic, payload: buffer.toString('base64') })
|
||||
})
|
||||
}
|
||||
|
||||
public removeConnection(hash: string) {
|
||||
const connection = this.connections[hash]
|
||||
connection.stateMachine
|
||||
connection.disconnect()
|
||||
delete this.connections[hash]
|
||||
}
|
||||
}
|
||||
|
||||
const connectionManager = new ConnectionManager()
|
||||
connectionManager.manageConnections()
|
||||
|
||||
Reference in New Issue
Block a user