Sharkey/packages/backend/src/queue/processors/DeliverProcessorService.ts

136 lines
4.4 KiB
TypeScript
Raw Normal View History

/*
* SPDX-FileCopyrightText: syuilo and other misskey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
2022-09-17 21:27:08 +03:00
import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
2022-09-17 21:27:08 +03:00
import { DI } from '@/di-symbols.js';
import type { InstancesRepository } from '@/models/index.js';
2022-09-17 21:27:08 +03:00
import type Logger from '@/logger.js';
import { MetaService } from '@/core/MetaService.js';
2022-12-04 03:16:03 +02:00
import { ApRequestService } from '@/core/activitypub/ApRequestService.js';
2022-09-17 21:27:08 +03:00
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
import { MemorySingleCache } from '@/misc/cache.js';
import type { MiInstance } from '@/models/entities/Instance.js';
2022-09-17 21:27:08 +03:00
import InstanceChart from '@/core/chart/charts/instance.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js';
import FederationChart from '@/core/chart/charts/federation.js';
import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
2023-01-03 01:48:00 +02:00
import { bindThis } from '@/decorators.js';
2022-09-17 21:27:08 +03:00
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { DeliverJobData } from '../types.js';
@Injectable()
export class DeliverProcessorService {
2022-09-18 21:11:50 +03:00
private logger: Logger;
private suspendedHostsCache: MemorySingleCache<MiInstance[]>;
2022-09-18 21:11:50 +03:00
private latest: string | null;
2022-09-17 21:27:08 +03:00
constructor(
@Inject(DI.instancesRepository)
private instancesRepository: InstancesRepository,
private metaService: MetaService,
private utilityService: UtilityService,
private federatedInstanceService: FederatedInstanceService,
private fetchInstanceMetadataService: FetchInstanceMetadataService,
private apRequestService: ApRequestService,
private instanceChart: InstanceChart,
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
) {
2022-09-18 21:11:50 +03:00
this.logger = this.queueLoggerService.logger.createSubLogger('deliver');
this.suspendedHostsCache = new MemorySingleCache<MiInstance[]>(1000 * 60 * 60);
2022-09-17 21:27:08 +03:00
}
@bindThis
2022-09-17 21:27:08 +03:00
public async process(job: Bull.Job<DeliverJobData>): Promise<string> {
const { host } = new URL(job.data.to);
// ブロックしてたら中断
const meta = await this.metaService.fetch();
if (this.utilityService.isBlockedHost(meta.blockedHosts, this.utilityService.toPuny(host))) {
2022-09-17 21:27:08 +03:00
return 'skip (blocked)';
}
// isSuspendedなら中断
2023-04-04 11:32:09 +03:00
let suspendedHosts = this.suspendedHostsCache.get();
2022-09-17 21:27:08 +03:00
if (suspendedHosts == null) {
suspendedHosts = await this.instancesRepository.find({
where: {
isSuspended: true,
},
});
2023-04-04 11:32:09 +03:00
this.suspendedHostsCache.set(suspendedHosts);
2022-09-17 21:27:08 +03:00
}
if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) {
return 'skip (suspended)';
}
try {
await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content);
// Update stats
2023-01-03 02:32:36 +02:00
this.federatedInstanceService.fetch(host).then(i => {
if (i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
2023-01-03 02:32:36 +02:00
isNotResponding: false,
});
}
2022-09-17 21:27:08 +03:00
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
this.apRequestChart.deliverSucc();
this.federationChart.deliverd(i.host, true);
if (meta.enableChartsForFederatedInstances) {
this.instanceChart.requestSent(i.host, true);
}
2022-09-17 21:27:08 +03:00
});
return 'Success';
} catch (res) {
2023-01-03 01:48:00 +02:00
// Update stats
2023-01-03 02:32:36 +02:00
this.federatedInstanceService.fetch(host).then(i => {
if (!i.isNotResponding) {
this.federatedInstanceService.update(i.id, {
2023-01-03 02:32:36 +02:00
isNotResponding: true,
});
}
2022-09-17 21:27:08 +03:00
this.apRequestChart.deliverFail();
this.federationChart.deliverd(i.host, false);
if (meta.enableChartsForFederatedInstances) {
this.instanceChart.requestSent(i.host, false);
}
2022-09-17 21:27:08 +03:00
});
if (res instanceof StatusError) {
2023-01-03 01:48:00 +02:00
// 4xx
2022-09-17 21:27:08 +03:00
if (res.isClientError) {
// 相手が閉鎖していることを明示しているため、配送停止する
if (job.data.isSharedInbox && res.statusCode === 410) {
this.federatedInstanceService.fetch(host).then(i => {
this.federatedInstanceService.update(i.id, {
isSuspended: true,
});
});
throw new Bull.UnrecoverableError(`${host} is gone`);
}
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
2022-09-17 21:27:08 +03:00
}
// 5xx etc.
throw new Error(`${res.statusCode} ${res.statusMessage}`);
2022-09-17 21:27:08 +03:00
} else {
2023-01-03 01:48:00 +02:00
// DNS error, socket error, timeout ...
2022-09-17 21:27:08 +03:00
throw res;
}
}
}
}