import { ChangeBuffer } from './ChangeBuffer' import { Destroyable } from './Destroyable' import { EventBusInterface, EventDispatcher, makeConnectionMessageEvent, MqttMessage } from '../../../events' import { TreeNode } from './' import { TreeNodeFactory } from './TreeNodeFactory' export class Tree extends TreeNode { public connectionId?: string public updateSource?: EventBusInterface public nodeFilter?: (node: TreeNode) => boolean private subscriptionEvent?: any public isTree = true private cachedHash = `${Math.random()}` private unmergedMessages: ChangeBuffer = new ChangeBuffer() public didUpdate = new EventDispatcher() public updateInterval: any private paused: boolean = false private applyChangesHasCompleted = true constructor() { super(undefined, undefined) } private handleNewData = (msg: MqttMessage) => { this.unmergedMessages.push(msg) } private runUpdates() { this.updateInterval = setInterval(() => { if (!this.paused && this.applyChangesHasCompleted) { this.applyChangesHasCompleted = false if ((window as any).requestIdleCallback) { ;(window as any).requestIdleCallback(() => this.applyUnmergedChanges(), { timeout: 500 }) } else { this.applyUnmergedChanges() } } }, 300) } public destroy() { super.destroy() this.updateInterval && clearInterval(this.updateInterval) this.updateSource && this.updateSource.unsubscribe(this.subscriptionEvent, this.handleNewData) this.updateSource = undefined this.didUpdate.removeAllListeners() } public updateWithConnection( emitter: EventBusInterface, connectionId: string, nodeFilter?: (node: TreeNode) => boolean ) { this.updateSource = emitter this.connectionId = connectionId this.nodeFilter = nodeFilter this.subscriptionEvent = makeConnectionMessageEvent(connectionId) this.updateSource.subscribe(this.subscriptionEvent, this.handleNewData) this.runUpdates() } public hash() { return this.cachedHash } public pause() { this.paused = true } public resume() { this.paused = false } public applyUnmergedChanges() { this.unmergedMessages.popAll().forEach(bufferedItem => { const edges = bufferedItem.message.topic.split('/') const node = TreeNodeFactory.fromEdgesAndValue( edges, bufferedItem.message.payload, bufferedItem.received ) node.mqttMessage = bufferedItem.message if (!this.nodeFilter || this.nodeFilter(node)) { this.updateWithNode(node.firstNode()) } }) this.didUpdate.dispatch() this.applyChangesHasCompleted = true } public unmergedChanges(): ChangeBuffer { return this.unmergedMessages } public stopUpdating() { if (this.subscriptionEvent && this.updateSource) { this.updateSource.unsubscribe(this.subscriptionEvent, this.handleNewData) } } }