Subscribe to configures topics

This commit is contained in:
Thomas Nordquist
2019-02-17 10:15:04 +01:00
parent 3cb89fe502
commit 9c863c8339
2 changed files with 9 additions and 11 deletions

View File

@@ -29,6 +29,7 @@ export function toMqttConnection(options: ConnectionOptions): MqttOptions | unde
password: options.password, password: options.password,
tls: options.encryption, tls: options.encryption,
certValidation: options.certValidation, certValidation: options.certValidation,
subscriptions: options.subscriptions,
} }
} }
@@ -46,7 +47,7 @@ export function createEmptyConnection(): ConnectionOptions {
encryption: false, encryption: false,
password: undefined, password: undefined,
username: undefined, username: undefined,
subscriptions: ['#', '$SYS'], subscriptions: ['#', '$SYS/#'],
type: 'mqtt', type: 'mqtt',
host: '', host: '',
port: 1883, port: 1883,

View File

@@ -11,13 +11,13 @@ export interface MqttOptions {
tls: boolean tls: boolean
certValidation: boolean certValidation: boolean
clientId?: string clientId?: string
subscriptions: string[]
} }
export class MqttSource implements DataSource<MqttOptions> { export class MqttSource implements DataSource<MqttOptions> {
public stateMachine: DataSourceStateMachine = new DataSourceStateMachine() public stateMachine: DataSourceStateMachine = new DataSourceStateMachine()
private client: Client | undefined private client: Client | undefined
private messageCallback?: (topic: string, message: Buffer, packet: any) => void private messageCallback?: (topic: string, message: Buffer, packet: any) => void
private rootSubscription = '#'
public topicSeparator = '/' public topicSeparator = '/'
public onMessage(messageCallback: (topic: string, message: Buffer, packet: any) => void) { public onMessage(messageCallback: (topic: string, message: Buffer, packet: any) => void) {
@@ -61,15 +61,12 @@ export class MqttSource implements DataSource<MqttOptions> {
client.on('connect', () => { client.on('connect', () => {
this.stateMachine.setConnected(true) this.stateMachine.setConnected(true)
client.subscribe(this.rootSubscription, (err: Error) => { options.subscriptions.forEach((subscription) => {
client.subscribe(subscription, (err: Error) => {
if (err) { if (err) {
this.stateMachine.setError(err) this.stateMachine.setError(err)
} }
}) })
client.subscribe('$SYS/#', (err: Error) => {
if (err) {
console.error('failed to subscribe to sys topic', err)
}
}) })
}) })