mirror of
https://git.joinsharkey.org/Sharkey/Sharkey.git
synced 2025-01-23 20:23:08 +02:00
parent
8bdd4fd061
commit
873444c3c6
4 changed files with 45 additions and 41 deletions
15
src/queue/get-job-info.ts
Normal file
15
src/queue/get-job-info.ts
Normal file
|
@ -0,0 +1,15 @@
|
|||
import * as Bull from 'bull';
|
||||
|
||||
export function getJobInfo(job: Bull.Job, increment = false) {
|
||||
const age = Date.now() - job.timestamp;
|
||||
|
||||
const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m`
|
||||
: age > 10000 ? `${Math.floor(age / 1000)}s`
|
||||
: `${age}ms`;
|
||||
|
||||
// onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする
|
||||
const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
|
||||
const maxAttempts = job.opts ? job.opts.attempts : 0;
|
||||
|
||||
return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
|
||||
}
|
|
@ -11,6 +11,7 @@ import processDb from './processors/db';
|
|||
import procesObjectStorage from './processors/object-storage';
|
||||
import { queueLogger } from './logger';
|
||||
import { DriveFile } from '../models/entities/drive-file';
|
||||
import { getJobInfo } from './get-job-info';
|
||||
|
||||
function initializeQueue(name: string) {
|
||||
return new Queue(name, {
|
||||
|
@ -44,19 +45,19 @@ const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
|
|||
|
||||
deliverQueue
|
||||
.on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
|
||||
.on('active', (job) => deliverLogger.debug(`active id=${job.id} to=${job.data.to}`))
|
||||
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) id=${job.id} to=${job.data.to}`))
|
||||
.on('failed', (job, err) => deliverLogger.warn(`failed(${err}) id=${job.id} to=${job.data.to}`, { job, e: renderError(err) }))
|
||||
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||
.on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
|
||||
.on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
|
||||
.on('stalled', (job) => deliverLogger.warn(`stalled id=${job.id} to=${job.data.to}`));
|
||||
.on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
|
||||
|
||||
inboxQueue
|
||||
.on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
|
||||
.on('active', (job) => inboxLogger.debug(`active id=${job.id}`))
|
||||
.on('completed', (job, result) => inboxLogger.debug(`completed(${result}) id=${job.id}`))
|
||||
.on('failed', (job, err) => inboxLogger.warn(`failed(${err}) id=${job.id} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
|
||||
.on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
|
||||
.on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
|
||||
.on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
|
||||
.on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) }))
|
||||
.on('stalled', (job) => inboxLogger.warn(`stalled id=${job.id} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
|
||||
.on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
|
||||
|
||||
dbQueue
|
||||
.on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
|
||||
|
|
|
@ -5,6 +5,8 @@ import Logger from '../../services/logger';
|
|||
import { Instances } from '../../models';
|
||||
import { instanceChart } from '../../services/chart';
|
||||
import { fetchNodeinfo } from '../../services/fetch-nodeinfo';
|
||||
import { fetchMeta } from '../../misc/fetch-meta';
|
||||
import { toPuny } from '../../misc/convert-host';
|
||||
|
||||
const logger = new Logger('deliver');
|
||||
|
||||
|
@ -13,6 +15,23 @@ let latest: string | null = null;
|
|||
export default async (job: Bull.Job) => {
|
||||
const { host } = new URL(job.data.to);
|
||||
|
||||
// ブロックしてたら中断
|
||||
const meta = await fetchMeta();
|
||||
if (meta.blockedHosts.includes(toPuny(host))) {
|
||||
return 'skip (blocked)';
|
||||
}
|
||||
|
||||
// closedなら中断
|
||||
const closedHosts = await Instances.find({
|
||||
where: {
|
||||
isMarkedAsClosed: true
|
||||
},
|
||||
cache: 60 * 1000
|
||||
});
|
||||
if (closedHosts.map(x => x.host).includes(toPuny(host))) {
|
||||
return 'skip (closed)';
|
||||
}
|
||||
|
||||
try {
|
||||
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
|
||||
logger.debug(`delivering ${latest}`);
|
||||
|
@ -48,8 +67,6 @@ export default async (job: Bull.Job) => {
|
|||
});
|
||||
|
||||
if (res != null && res.hasOwnProperty('statusCode')) {
|
||||
logger.warn(`deliver failed: ${res.statusCode} ${res.statusMessage} to=${job.data.to}`);
|
||||
|
||||
// 4xx
|
||||
if (res.statusCode >= 400 && res.statusCode < 500) {
|
||||
// HTTPステータスコード4xxはクライアントエラーであり、それはつまり
|
||||
|
@ -61,7 +78,6 @@ export default async (job: Bull.Job) => {
|
|||
throw `${res.statusCode} ${res.statusMessage}`;
|
||||
} else {
|
||||
// DNS error, socket error, timeout ...
|
||||
logger.warn(`deliver failed: ${res} to=${job.data.to}`);
|
||||
throw res;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,15 +6,10 @@ import * as cache from 'lookup-dns-cache';
|
|||
import config from '../../config';
|
||||
import { ILocalUser } from '../../models/entities/user';
|
||||
import { publishApLogStream } from '../../services/stream';
|
||||
import { apLogger } from './logger';
|
||||
import { UserKeypairs, Instances } from '../../models';
|
||||
import { fetchMeta } from '../../misc/fetch-meta';
|
||||
import { toPuny } from '../../misc/convert-host';
|
||||
import { UserKeypairs } from '../../models';
|
||||
import { ensure } from '../../prelude/ensure';
|
||||
import * as httpsProxyAgent from 'https-proxy-agent';
|
||||
|
||||
export const logger = apLogger.createSubLogger('deliver');
|
||||
|
||||
const agent = config.proxy
|
||||
? new httpsProxyAgent(config.proxy)
|
||||
: new https.Agent({
|
||||
|
@ -24,28 +19,7 @@ const agent = config.proxy
|
|||
export default async (user: ILocalUser, url: string, object: any) => {
|
||||
const timeout = 10 * 1000;
|
||||
|
||||
const { protocol, host, hostname, port, pathname, search } = new URL(url);
|
||||
|
||||
// ブロックしてたら中断
|
||||
const meta = await fetchMeta();
|
||||
if (meta.blockedHosts.includes(toPuny(host))) {
|
||||
logger.info(`skip (blocked) ${url}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// closedなら中断
|
||||
const closedHosts = await Instances.find({
|
||||
where: {
|
||||
isMarkedAsClosed: true
|
||||
},
|
||||
cache: 60 * 1000
|
||||
});
|
||||
if (closedHosts.map(x => x.host).includes(toPuny(host))) {
|
||||
logger.info(`skip (closed) ${url}`);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(`--> ${url}`);
|
||||
const { protocol, hostname, port, pathname, search } = new URL(url);
|
||||
|
||||
const data = JSON.stringify(object);
|
||||
|
||||
|
@ -73,10 +47,8 @@ export default async (user: ILocalUser, url: string, object: any) => {
|
|||
}
|
||||
}, res => {
|
||||
if (res.statusCode! >= 400) {
|
||||
logger.warn(`${url} --> ${res.statusCode}`);
|
||||
reject(res);
|
||||
} else {
|
||||
logger.succ(`${url} --> ${res.statusCode}`);
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue