Improve message receive time accuracy
This commit is contained in:
@@ -1,7 +1,12 @@
|
|||||||
import { MqttMessage } from '../../../events'
|
import { MqttMessage } from '../../../events'
|
||||||
|
|
||||||
|
interface BufferedMessage {
|
||||||
|
message: MqttMessage
|
||||||
|
received: Date
|
||||||
|
}
|
||||||
|
|
||||||
export class ChangeBuffer {
|
export class ChangeBuffer {
|
||||||
private buffer: Array<MqttMessage> = []
|
private buffer: Array<BufferedMessage> = []
|
||||||
private size = 0
|
private size = 0
|
||||||
private maxSize = 100_000_000 // ~100MB
|
private maxSize = 100_000_000 // ~100MB
|
||||||
public length = 0
|
public length = 0
|
||||||
@@ -9,7 +14,7 @@ export class ChangeBuffer {
|
|||||||
|
|
||||||
public push(val: MqttMessage) {
|
public push(val: MqttMessage) {
|
||||||
if (!this.isFull()) {
|
if (!this.isFull()) {
|
||||||
this.buffer.push(val)
|
this.buffer.push({ message: val, received: new Date() })
|
||||||
this.size += this.estimatedMessageOverhead + (val.payload ? val.payload.length : 0)
|
this.size += this.estimatedMessageOverhead + (val.payload ? val.payload.length : 0)
|
||||||
this.length += 1
|
this.length += 1
|
||||||
}
|
}
|
||||||
@@ -27,7 +32,7 @@ export class ChangeBuffer {
|
|||||||
return this.size / this.maxSize
|
return this.size / this.maxSize
|
||||||
}
|
}
|
||||||
|
|
||||||
public popAll(): Array<MqttMessage> {
|
public popAll(): Array<BufferedMessage> {
|
||||||
const tmpBuffer = this.buffer
|
const tmpBuffer = this.buffer
|
||||||
this.buffer = []
|
this.buffer = []
|
||||||
this.size = 0
|
this.size = 0
|
||||||
|
|||||||
@@ -48,10 +48,14 @@ export class Tree<ViewModel extends Destroyable> extends TreeNode<ViewModel> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public applyUnmergedChanges() {
|
public applyUnmergedChanges() {
|
||||||
this.unmergedMessages.popAll().forEach(msg => {
|
this.unmergedMessages.popAll().forEach(bufferedItem => {
|
||||||
const edges = msg.topic.split('/')
|
const edges = bufferedItem.message.topic.split('/')
|
||||||
const node = TreeNodeFactory.fromEdgesAndValue<ViewModel>(edges, msg.payload)
|
const node = TreeNodeFactory.fromEdgesAndValue<ViewModel>(
|
||||||
node.mqttMessage = msg
|
edges,
|
||||||
|
bufferedItem.message.payload,
|
||||||
|
bufferedItem.received
|
||||||
|
)
|
||||||
|
node.mqttMessage = bufferedItem.message
|
||||||
|
|
||||||
if (!this.nodeFilter || this.nodeFilter(node)) {
|
if (!this.nodeFilter || this.nodeFilter(node)) {
|
||||||
this.updateWithNode(node.firstNode())
|
this.updateWithNode(node.firstNode())
|
||||||
|
|||||||
@@ -22,13 +22,14 @@ export abstract class TreeNodeFactory {
|
|||||||
|
|
||||||
public static fromEdgesAndValue<ViewModel extends Destroyable>(
|
public static fromEdgesAndValue<ViewModel extends Destroyable>(
|
||||||
edgeNames: Array<string>,
|
edgeNames: Array<string>,
|
||||||
value?: Base64Message | null
|
value?: Base64Message | null,
|
||||||
|
receiveDate: Date = new Date()
|
||||||
): TreeNode<ViewModel> {
|
): TreeNode<ViewModel> {
|
||||||
const node = new TreeNode<ViewModel>()
|
const node = new TreeNode<ViewModel>()
|
||||||
node.setMessage({
|
node.setMessage({
|
||||||
value: value || undefined,
|
value: value || undefined,
|
||||||
length: value ? value.length : 0,
|
length: value ? value.length : 0,
|
||||||
received: new Date(),
|
received: receiveDate,
|
||||||
messageNumber: this.messageCounter,
|
messageNumber: this.messageCounter,
|
||||||
})
|
})
|
||||||
this.messageCounter += 1
|
this.messageCounter += 1
|
||||||
|
|||||||
Reference in New Issue
Block a user