import { setTimeout } from 'node:timers/promises'; import { Inject, Module, OnApplicationShutdown } from '@nestjs/common'; import Bull from 'bull'; import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; import type { Provider } from '@nestjs/common'; import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js'; function q(config: Config, name: string, limitPerSec = -1) { return new Bull(name, { redis: { port: config.redisForJobQueue.port, host: config.redisForJobQueue.host, family: config.redisForJobQueue.family == null ? 0 : config.redisForJobQueue.family, password: config.redisForJobQueue.pass, db: config.redisForJobQueue.db ?? 0, }, prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue` : 'queue', limiter: limitPerSec > 0 ? { max: limitPerSec, duration: 1000, } : undefined, settings: { backoffStrategies: { apBackoff, }, }, }); } // ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 function apBackoff(attemptsMade: number, err: Error) { const baseDelay = 60 * 1000; // 1min const maxBackoff = 8 * 60 * 60 * 1000; // 8hours let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; backoff = Math.min(backoff, maxBackoff); backoff += Math.round(backoff * Math.random() * 0.2); return backoff; } export type SystemQueue = Bull.Queue>; export type EndedPollNotificationQueue = Bull.Queue; export type DeliverQueue = Bull.Queue; export type InboxQueue = Bull.Queue; export type DbQueue = Bull.Queue; export type RelationshipQueue = Bull.Queue; export type ObjectStorageQueue = Bull.Queue; export type WebhookDeliverQueue = Bull.Queue; const $system: Provider = { provide: 'queue:system', useFactory: (config: Config) => q(config, 'system'), inject: [DI.config], }; const $endedPollNotification: Provider = { provide: 'queue:endedPollNotification', useFactory: (config: Config) => q(config, 'endedPollNotification'), inject: [DI.config], }; const $deliver: Provider = { provide: 'queue:deliver', useFactory: (config: Config) => q(config, 'deliver', config.deliverJobPerSec ?? 128), inject: [DI.config], }; const $inbox: Provider = { provide: 'queue:inbox', useFactory: (config: Config) => q(config, 'inbox', config.inboxJobPerSec ?? 16), inject: [DI.config], }; const $db: Provider = { provide: 'queue:db', useFactory: (config: Config) => q(config, 'db'), inject: [DI.config], }; const $relationship: Provider = { provide: 'queue:relationship', useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64), inject: [DI.config], }; const $objectStorage: Provider = { provide: 'queue:objectStorage', useFactory: (config: Config) => q(config, 'objectStorage'), inject: [DI.config], }; const $webhookDeliver: Provider = { provide: 'queue:webhookDeliver', useFactory: (config: Config) => q(config, 'webhookDeliver', 64), inject: [DI.config], }; @Module({ imports: [ ], providers: [ $system, $endedPollNotification, $deliver, $inbox, $db, $relationship, $objectStorage, $webhookDeliver, ], exports: [ $system, $endedPollNotification, $deliver, $inbox, $db, $relationship, $objectStorage, $webhookDeliver, ], }) export class QueueModule implements OnApplicationShutdown { constructor( @Inject('queue:system') public systemQueue: SystemQueue, @Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue, @Inject('queue:deliver') public deliverQueue: DeliverQueue, @Inject('queue:inbox') public inboxQueue: InboxQueue, @Inject('queue:db') public dbQueue: DbQueue, @Inject('queue:relationship') public relationshipQueue: RelationshipQueue, @Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue, @Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue, ) {} async onApplicationShutdown(signal: string): Promise { if (process.env.NODE_ENV === 'test') { // XXX: // Shutting down the existing connections causes errors on Jest as // Misskey has asynchronous postgres/redis connections that are not // awaited. // Let's wait for some random time for them to finish. await setTimeout(5000); } await Promise.all([ this.systemQueue.close(), this.endedPollNotificationQueue.close(), this.deliverQueue.close(), this.inboxQueue.close(), this.dbQueue.close(), this.relationshipQueue.close(), this.objectStorageQueue.close(), this.webhookDeliverQueue.close(), ]); } }