import { Event } from '../Events'; import { EventBusInterface } from './EventBusInterface' import { v4 } from 'uuid'; export type RpcEvent = { topic: string; }; export class Rpc { constructor(private participant: EventBusInterface) { } async call(event: RpcEvent, request: RpcRequest, timeout: number = 0): Promise { return new Promise((resolve, reject) => { let id = v4(); let responseEvent: Event = { topic: `${event.topic}/response` }; let requestEvent: Event = { topic: `${event.topic}/request` }; let callback = (result: { id: string; payload: RpcResponse; error: unknown }) => { this.participant.unsubscribe(responseEvent as any, callback); if (result.id === id) { if (result.error) { reject(result.error) } else { resolve(result.payload); } } console.log("received", result) }; this.participant.subscribe(responseEvent, callback); this.participant.emit(requestEvent, { id, payload: request }); if (timeout > 0) { setTimeout(() => { reject(new Error(`Did not respond to ${event.topic} within ${timeout}ms`)) this.participant.unsubscribe(responseEvent as any, callback); }, 10000) } }); } async on(event: RpcEvent, handler: (request: RpcRequest) => Promise) { this.participant.subscribe({ topic: `${event.topic}/request` } as RpcEvent, async (request) => { let payload; let error; try { payload = await handler((request as any).payload); } catch (e) { error = e } console.log("Responding with", payload, error) this.participant.emit({ topic: `${event.topic}/response` }, { id: (request as any).id, payload, error }); }); } }