import WS from "ws"; import dayjs, { Dayjs } from "dayjs"; import { v4 as uuid } from "uuid"; import { EventEmitter } from "events"; import { WebSocketInterface } from "../megalodon"; import proxyAgent, { ProxyConfig } from "../proxy_config"; import MisskeyAPI from "./api_client"; /** * WebSocket * Misskey is not support http streaming. It supports websocket instead of streaming. * So this class connect to Misskey server with WebSocket. */ export default class WebSocket extends EventEmitter implements WebSocketInterface { public url: string; public channel: | "user" | "localTimeline" | "hybridTimeline" | "globalTimeline" | "conversation" | "list"; public parser: any; public headers: { [key: string]: string }; public proxyConfig: ProxyConfig | false = false; public listId: string | null = null; private _converter: MisskeyAPI.Converter; private _accessToken: string; private _reconnectInterval: number; private _reconnectMaxAttempts: number; private _reconnectCurrentAttempts: number; private _connectionClosed: boolean; private _client: WS | null = null; private _channelID: string; private _pongReceivedTimestamp: Dayjs; private _heartbeatInterval = 60000; private _pongWaiting = false; /** * @param url Full url of websocket: e.g. wss://misskey.io/streaming * @param channel Channel name is user, localTimeline, hybridTimeline, globalTimeline, conversation or list. * @param accessToken The access token. * @param listId This parameter is required when you specify list as channel. */ constructor( url: string, channel: | "user" | "localTimeline" | "hybridTimeline" | "globalTimeline" | "conversation" | "list", accessToken: string, listId: string | undefined, userAgent: string, proxyConfig: ProxyConfig | false = false, converter: MisskeyAPI.Converter, ) { super(); this.url = url; this.parser = new Parser(); this.channel = channel; this.headers = { "User-Agent": userAgent, }; if (listId === undefined) { this.listId = null; } else { this.listId = listId; } this.proxyConfig = proxyConfig; this._accessToken = accessToken; this._reconnectInterval = 10000; this._reconnectMaxAttempts = Infinity; this._reconnectCurrentAttempts = 0; this._connectionClosed = false; this._channelID = uuid(); this._pongReceivedTimestamp = dayjs(); this._converter = converter; } /** * Start websocket connection. */ public start() { this._connectionClosed = false; this._resetRetryParams(); this._startWebSocketConnection(); } private baseUrlToHost(baseUrl: string): string { return baseUrl.replace("https://", ""); } /** * Reset connection and start new websocket connection. */ private _startWebSocketConnection() { this._resetConnection(); this._setupParser(); this._client = this._connect(); this._bindSocket(this._client); } /** * Stop current connection. */ public stop() { this._connectionClosed = true; this._resetConnection(); this._resetRetryParams(); } /** * Clean up current connection, and listeners. */ private _resetConnection() { if (this._client) { this._client.close(1000); this._client.removeAllListeners(); this._client = null; } if (this.parser) { this.parser.removeAllListeners(); } } /** * Resets the parameters used in reconnect. */ private _resetRetryParams() { this._reconnectCurrentAttempts = 0; } /** * Connect to the endpoint. */ private _connect(): WS { let options: WS.ClientOptions = { headers: this.headers, }; if (this.proxyConfig) { options = Object.assign(options, { agent: proxyAgent(this.proxyConfig), }); } const cli: WS = new WS(`${this.url}?i=${this._accessToken}`, options); return cli; } /** * Connect specified channels in websocket. */ private _channel() { if (!this._client) { return; } switch (this.channel) { case "conversation": this._client.send( JSON.stringify({ type: "connect", body: { channel: "main", id: this._channelID, }, }), ); break; case "user": this._client.send( JSON.stringify({ type: "connect", body: { channel: "main", id: this._channelID, }, }), ); this._client.send( JSON.stringify({ type: "connect", body: { channel: "homeTimeline", id: this._channelID, }, }), ); break; case "list": this._client.send( JSON.stringify({ type: "connect", body: { channel: "userList", id: this._channelID, params: { listId: this.listId, }, }, }), ); break; default: this._client.send( JSON.stringify({ type: "connect", body: { channel: this.channel, id: this._channelID, }, }), ); break; } } /** * Reconnects to the same endpoint. */ private _reconnect() { setTimeout(() => { // Skip reconnect when client is connecting. // https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L365 if (this._client && this._client.readyState === WS.CONNECTING) { return; } if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { this._reconnectCurrentAttempts++; this._clearBinding(); if (this._client) { // In reconnect, we want to close the connection immediately, // because recoonect is necessary when some problems occur. this._client.terminate(); } // Call connect methods console.log("Reconnecting"); this._client = this._connect(); this._bindSocket(this._client); } }, this._reconnectInterval); } /** * Clear binding event for websocket client. */ private _clearBinding() { if (this._client) { this._client.removeAllListeners("close"); this._client.removeAllListeners("pong"); this._client.removeAllListeners("open"); this._client.removeAllListeners("message"); this._client.removeAllListeners("error"); } } /** * Bind event for web socket client. * @param client A WebSocket instance. */ private _bindSocket(client: WS) { client.on("close", (code: number, _reason: Buffer) => { if (code === 1000) { this.emit("close", {}); } else { console.log(`Closed connection with ${code}`); if (!this._connectionClosed) { this._reconnect(); } } }); client.on("pong", () => { this._pongWaiting = false; this.emit("pong", {}); this._pongReceivedTimestamp = dayjs(); // It is required to anonymous function since get this scope in checkAlive. setTimeout( () => this._checkAlive(this._pongReceivedTimestamp), this._heartbeatInterval, ); }); client.on("open", () => { this.emit("connect", {}); this._channel(); // Call first ping event. setTimeout(() => { client.ping(""); }, 10000); }); client.on("message", (data: WS.Data, isBinary: boolean) => { this.parser.parse(data, isBinary, this._channelID); }); client.on("error", (err: Error) => { this.emit("error", err); }); } /** * Set up parser when receive message. */ private _setupParser() { this.parser.on("update", (note: MisskeyAPI.Entity.Note) => { this.emit( "update", this._converter.note(note, this.baseUrlToHost(this.url)), ); }); this.parser.on( "notification", (notification: MisskeyAPI.Entity.Notification) => { this.emit( "notification", this._converter.notification( notification, this.baseUrlToHost(this.url), ), ); }, ); this.parser.on("conversation", (note: MisskeyAPI.Entity.Note) => { this.emit( "conversation", this._converter.noteToConversation(note, this.baseUrlToHost(this.url)), ); }); this.parser.on("error", (err: Error) => { this.emit("parser-error", err); }); } /** * Call ping and wait to pong. */ private _checkAlive(timestamp: Dayjs) { const now: Dayjs = dayjs(); // Block multiple calling, if multiple pong event occur. // It the duration is less than interval, through ping. if ( now.diff(timestamp) > this._heartbeatInterval - 1000 && !this._connectionClosed ) { // Skip ping when client is connecting. // https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L289 if (this._client && this._client.readyState !== WS.CONNECTING) { this._pongWaiting = true; this._client.ping(""); setTimeout(() => { if (this._pongWaiting) { this._pongWaiting = false; this._reconnect(); } }, 10000); } } } } /** * Parser * This class provides parser for websocket message. */ export class Parser extends EventEmitter { /** * @param message Message body of websocket. * @param channelID Parse only messages which has same channelID. */ public parse(data: WS.Data, isBinary: boolean, channelID: string) { const message = isBinary ? data : data.toString(); if (typeof message !== "string") { this.emit("heartbeat", {}); return; } if (message === "") { this.emit("heartbeat", {}); return; } let obj: { type: string; body: { id: string; type: string; body: any; }; }; let body: { id: string; type: string; body: any; }; try { obj = JSON.parse(message); if (obj.type !== "channel") { return; } if (!obj.body) { return; } body = obj.body; if (body.id !== channelID) { return; } } catch (err) { this.emit( "error", new Error( `Error parsing websocket reply: ${message}, error message: ${err}`, ), ); return; } switch (body.type) { case "note": this.emit("update", body.body as MisskeyAPI.Entity.Note); break; case "notification": this.emit("notification", body.body as MisskeyAPI.Entity.Notification); break; case "mention": { const note = body.body as MisskeyAPI.Entity.Note; if (note.visibility === "specified") { this.emit("conversation", note); } break; } // When renote and followed event, the same notification will be received. case "renote": case "followed": case "follow": case "unfollow": case "receiveFollowRequest": case "meUpdated": case "readAllNotifications": case "readAllUnreadSpecifiedNotes": case "readAllAntennas": case "readAllUnreadMentions": case "unreadNotification": // Ignore these events break; default: this.emit( "error", new Error(`Unknown event has received: ${JSON.stringify(body)}`), ); break; } } }