diff --git a/src/following/distribute.ts b/src/following/distribute.ts new file mode 100644 index 000000000..10ff98881 --- /dev/null +++ b/src/following/distribute.ts @@ -0,0 +1,42 @@ +import User, { pack as packUser } from '../models/user'; +import FollowingLog from '../models/following-log'; +import FollowedLog from '../models/followed-log'; +import event from '../publishers/stream'; +import notify from '../publishers/notify'; + +export default async (follower, followee) => Promise.all([ + // Increment following count + User.update(follower._id, { + $inc: { + followingCount: 1 + } + }), + + FollowingLog.insert({ + createdAt: new Date(), + userId: followee._id, + count: follower.followingCount + 1 + }), + + // Increment followers count + User.update({ _id: followee._id }, { + $inc: { + followersCount: 1 + } + }), + + FollowedLog.insert({ + createdAt: new Date(), + userId: follower._id, + count: followee.followersCount + 1 + }), + + followee.host === null && Promise.all([ + // Notify + notify(followee.id, follower.id, 'follow'), + + // Publish follow event + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)) + ]) +]); diff --git a/src/index.ts b/src/index.ts index 29c4f3431..21fb2f553 100644 --- a/src/index.ts +++ b/src/index.ts @@ -99,7 +99,7 @@ async function workerMain(opt) { if (!opt['only-server']) { // start processor - require('./processor').default(); + require('./queue').process(); } // Send a 'ready' message to parent process diff --git a/src/post/distribute.ts b/src/post/distribute.ts index ad699d6b8..f748a620c 100644 --- a/src/post/distribute.ts +++ b/src/post/distribute.ts @@ -8,7 +8,7 @@ import User, { isLocalUser } from '../models/user'; import stream, { publishChannelStream } from '../publishers/stream'; import notify from '../publishers/notify'; import pushSw from '../publishers/push-sw'; -import queue from '../queue'; +import { createHttp } from '../queue'; import watch from './watch'; export default async (user, mentions, post) => { @@ -84,7 +84,7 @@ export default async (user, mentions, post) => { } return new Promise((resolve, reject) => { - queue.create('http', { + createHttp({ type: 'deliverPost', fromId: user._id, toId: following.followerId, diff --git a/src/processor/http/perform-activitypub.ts b/src/processor/http/perform-activitypub.ts deleted file mode 100644 index 963e532fe..000000000 --- a/src/processor/http/perform-activitypub.ts +++ /dev/null @@ -1,7 +0,0 @@ -import User from '../../models/user'; -import act from '../../remote/activitypub/act'; -import Resolver from '../../remote/activitypub/resolver'; - -export default ({ data }) => User.findOne({ _id: data.actor }) - .then(actor => act(new Resolver(), actor, data.outbox)) - .then(Promise.all); diff --git a/src/processor/index.ts b/src/processor/index.ts deleted file mode 100644 index 172048dda..000000000 --- a/src/processor/index.ts +++ /dev/null @@ -1,18 +0,0 @@ -import queue from '../queue'; -import db from './db'; -import http from './http'; - -export default () => { - queue.process('db', db); - - /* - 256 is the default concurrency limit of Mozilla Firefox and Google - Chromium. - - a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google - https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff - Network.http.max-connections - MozillaZine Knowledge Base - http://kb.mozillazine.org/Network.http.max-connections - */ - queue.process('http', 256, http); -}; diff --git a/src/queue.ts b/src/queue.ts deleted file mode 100644 index 08ea13c2a..000000000 --- a/src/queue.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { createQueue } from 'kue'; -import config from './config'; - -export default createQueue({ - redis: { - port: config.redis.port, - host: config.redis.host, - auth: config.redis.pass - } -}); diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 000000000..f90754a56 --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,38 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/processor/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts similarity index 59% rename from src/processor/db/delete-post-dependents.ts rename to src/queue/processors/db/delete-post-dependents.ts index 879c41ec9..6de21eb05 100644 --- a/src/processor/db/delete-post-dependents.ts +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -1,9 +1,9 @@ -import Favorite from '../../models/favorite'; -import Notification from '../../models/notification'; -import PollVote from '../../models/poll-vote'; -import PostReaction from '../../models/post-reaction'; -import PostWatching from '../../models/post-watching'; -import Post from '../../models/post'; +import Favorite from '../../../models/favorite'; +import Notification from '../../../models/notification'; +import PollVote from '../../../models/poll-vote'; +import PostReaction from '../../../models/post-reaction'; +import PostWatching from '../../../models/post-watching'; +import Post from '../../../models/post'; export default async ({ data }) => Promise.all([ Favorite.remove({ postId: data._id }), diff --git a/src/processor/db/index.ts b/src/queue/processors/db/index.ts similarity index 100% rename from src/processor/db/index.ts rename to src/queue/processors/db/index.ts diff --git a/src/processor/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts similarity index 55% rename from src/processor/http/deliver-post.ts rename to src/queue/processors/http/deliver-post.ts index 48ad4f95a..e743fc5f6 100644 --- a/src/processor/http/deliver-post.ts +++ b/src/queue/processors/http/deliver-post.ts @@ -1,9 +1,9 @@ -import Post from '../../models/post'; -import User, { IRemoteUser } from '../../models/user'; -import context from '../../remote/activitypub/renderer/context'; -import renderCreate from '../../remote/activitypub/renderer/create'; -import renderNote from '../../remote/activitypub/renderer/note'; -import request from '../../remote/request'; +import Post from '../../../models/post'; +import User, { IRemoteUser } from '../../../models/user'; +import context from '../../../remote/activitypub/renderer/context'; +import renderCreate from '../../../remote/activitypub/renderer/create'; +import renderNote from '../../../remote/activitypub/renderer/note'; +import request from '../../../remote/request'; export default async ({ data }) => { const promisedTo = User.findOne({ _id: data.toId }) as Promise; diff --git a/src/processor/http/follow.ts b/src/queue/processors/http/follow.ts similarity index 74% rename from src/processor/http/follow.ts rename to src/queue/processors/http/follow.ts index ed36fa18d..4cb72828e 100644 --- a/src/processor/http/follow.ts +++ b/src/queue/processors/http/follow.ts @@ -1,13 +1,13 @@ -import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import FollowedLog from '../../models/followed-log'; -import event from '../../publishers/stream'; -import notify from '../../publishers/notify'; -import context from '../../remote/activitypub/renderer/context'; -import render from '../../remote/activitypub/renderer/follow'; -import request from '../../remote/request'; -import Logger from '../../utils/logger'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import FollowedLog from '../../../models/followed-log'; +import event from '../../../publishers/stream'; +import notify from '../../../publishers/notify'; +import context from '../../../remote/activitypub/renderer/context'; +import render from '../../../remote/activitypub/renderer/follow'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; export default async ({ data }) => { const { followerId, followeeId } = await Following.findOne({ _id: data.following }); diff --git a/src/processor/http/index.ts b/src/queue/processors/http/index.ts similarity index 100% rename from src/processor/http/index.ts rename to src/queue/processors/http/index.ts diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts new file mode 100644 index 000000000..7b84400d5 --- /dev/null +++ b/src/queue/processors/http/perform-activitypub.ts @@ -0,0 +1,7 @@ +import User from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import Resolver from '../../../remote/activitypub/resolver'; + +export default ({ data }) => User.findOne({ _id: data.actor }) + .then(actor => act(new Resolver(), actor, data.outbox)) + .then(Promise.all); diff --git a/src/processor/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts similarity index 76% rename from src/processor/http/process-inbox.ts rename to src/queue/processors/http/process-inbox.ts index f102f8d6b..de1dbd2f9 100644 --- a/src/processor/http/process-inbox.ts +++ b/src/queue/processors/http/process-inbox.ts @@ -1,9 +1,9 @@ import { verifySignature } from 'http-signature'; -import parseAcct from '../../acct/parse'; -import User, { IRemoteUser } from '../../models/user'; -import act from '../../remote/activitypub/act'; -import resolvePerson from '../../remote/activitypub/resolve-person'; -import Resolver from '../../remote/activitypub/resolver'; +import parseAcct from '../../../acct/parse'; +import User, { IRemoteUser } from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import resolvePerson from '../../../remote/activitypub/resolve-person'; +import Resolver from '../../../remote/activitypub/resolver'; export default async ({ data }): Promise => { const keyIdLower = data.signature.keyId.toLowerCase(); diff --git a/src/processor/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts similarity index 85% rename from src/processor/http/report-github-failure.ts rename to src/queue/processors/http/report-github-failure.ts index 4f6f5ccee..21683ba3c 100644 --- a/src/processor/http/report-github-failure.ts +++ b/src/queue/processors/http/report-github-failure.ts @@ -1,6 +1,6 @@ import * as request from 'request-promise-native'; -import User from '../../models/user'; -const createPost = require('../../server/api/endpoints/posts/create'); +import User from '../../../models/user'; +const createPost = require('../../../server/api/endpoints/posts/create'); export default async ({ data }) => { const asyncBot = User.findOne({ _id: data.userId }); diff --git a/src/processor/http/unfollow.ts b/src/queue/processors/http/unfollow.ts similarity index 71% rename from src/processor/http/unfollow.ts rename to src/queue/processors/http/unfollow.ts index fbfd7b342..801a3612a 100644 --- a/src/processor/http/unfollow.ts +++ b/src/queue/processors/http/unfollow.ts @@ -1,13 +1,13 @@ -import FollowedLog from '../../models/followed-log'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../models/user'; -import stream from '../../publishers/stream'; -import renderFollow from '../../remote/activitypub/renderer/follow'; -import renderUndo from '../../remote/activitypub/renderer/undo'; -import context from '../../remote/activitypub/renderer/context'; -import request from '../../remote/request'; -import Logger from '../../utils/logger'; +import FollowedLog from '../../../models/followed-log'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import stream from '../../../publishers/stream'; +import renderFollow from '../../../remote/activitypub/renderer/follow'; +import renderUndo from '../../../remote/activitypub/renderer/undo'; +import context from '../../../remote/activitypub/renderer/context'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; export default async ({ data }) => { const following = await Following.findOne({ _id: data.id }); diff --git a/src/remote/activitypub/act/follow.ts b/src/remote/activitypub/act/follow.ts index 23fa41df8..222a257e1 100644 --- a/src/remote/activitypub/act/follow.ts +++ b/src/remote/activitypub/act/follow.ts @@ -3,7 +3,7 @@ import parseAcct from '../../../acct/parse'; import Following, { IFollowing } from '../../../models/following'; import User from '../../../models/user'; import config from '../../../config'; -import queue from '../../../queue'; +import { createHttp } from '../../../queue'; import context from '../renderer/context'; import renderAccept from '../renderer/accept'; import request from '../../request'; @@ -44,7 +44,7 @@ export default async (resolver: Resolver, actor, activity, distribute) => { followerId: actor._id, followeeId: followee._id }).then(following => new Promise((resolve, reject) => { - queue.create('http', { + createHttp({ type: 'follow', following: following._id }).save(error => { diff --git a/src/remote/activitypub/act/undo/unfollow.ts b/src/remote/activitypub/act/undo/unfollow.ts index c17e06e8a..4f15d9a3e 100644 --- a/src/remote/activitypub/act/undo/unfollow.ts +++ b/src/remote/activitypub/act/undo/unfollow.ts @@ -1,7 +1,7 @@ -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; export default ({ $id }) => new Promise((resolve, reject) => { - queue.create('http', { type: 'unfollow', id: $id }).save(error => { + createHttp({ type: 'unfollow', id: $id }).save(error => { if (error) { reject(error); } else { diff --git a/src/remote/activitypub/delete/post.ts b/src/remote/activitypub/delete/post.ts index f6c816647..59ae8c2b9 100644 --- a/src/remote/activitypub/delete/post.ts +++ b/src/remote/activitypub/delete/post.ts @@ -1,10 +1,10 @@ import Post from '../../../models/post'; -import queue from '../../../queue'; +import { createDb } from '../../../queue'; export default async ({ $id }) => { const promisedDeletion = Post.findOneAndDelete({ _id: $id }); - await new Promise((resolve, reject) => queue.create('db', { + await new Promise((resolve, reject) => createDb({ type: 'deletePostDependents', id: $id }).delay(65536).save(error => error ? reject(error) : resolve())); diff --git a/src/remote/activitypub/resolve-person.ts b/src/remote/activitypub/resolve-person.ts index 59be65908..2cf3ad32d 100644 --- a/src/remote/activitypub/resolve-person.ts +++ b/src/remote/activitypub/resolve-person.ts @@ -1,7 +1,7 @@ import { JSDOM } from 'jsdom'; import { toUnicode } from 'punycode'; import User, { validateUsername, isValidName, isValidDescription } from '../../models/user'; -import queue from '../../queue'; +import { createHttp } from '../../queue'; import webFinger from '../webfinger'; import create from './create'; import Resolver from './resolver'; @@ -69,7 +69,7 @@ export default async (value, verifier?: string) => { }, }); - queue.create('http', { + createHttp({ type: 'performActivityPub', actor: user._id, outbox diff --git a/src/server/activitypub/inbox.ts b/src/server/activitypub/inbox.ts index 5de843385..0907823b2 100644 --- a/src/server/activitypub/inbox.ts +++ b/src/server/activitypub/inbox.ts @@ -1,7 +1,7 @@ import * as bodyParser from 'body-parser'; import * as express from 'express'; import { parseRequest } from 'http-signature'; -import queue from '../../queue'; +import { createHttp } from '../../queue'; const app = express(); @@ -22,7 +22,7 @@ app.post('/@:user/inbox', bodyParser.json({ return res.sendStatus(401); } - queue.create('http', { + createHttp({ type: 'processInbox', inbox: req.body, signature, diff --git a/src/server/api/endpoints/following/create.ts b/src/server/api/endpoints/following/create.ts index e56859521..9ccbe2017 100644 --- a/src/server/api/endpoints/following/create.ts +++ b/src/server/api/endpoints/following/create.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import User from '../../../../models/user'; import Following from '../../../../models/following'; -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; /** * Follow a user @@ -56,7 +56,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { followeeId: followee._id }); - queue.create('http', { type: 'follow', following: _id }).save(); + createHttp({ type: 'follow', following: _id }).save(); // Send response res(); diff --git a/src/server/api/endpoints/following/delete.ts b/src/server/api/endpoints/following/delete.ts index bf21bf0cb..0684b8750 100644 --- a/src/server/api/endpoints/following/delete.ts +++ b/src/server/api/endpoints/following/delete.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import User from '../../../../models/user'; import Following from '../../../../models/following'; -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; /** * Unfollow a user @@ -49,7 +49,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { return rej('already not following'); } - queue.create('http', { + createHttp({ type: 'unfollow', id: exist._id }).save(error => { diff --git a/src/server/api/service/github.ts b/src/server/api/service/github.ts index 4fd59c2a9..5fc4a92f5 100644 --- a/src/server/api/service/github.ts +++ b/src/server/api/service/github.ts @@ -3,7 +3,7 @@ import * as express from 'express'; //const crypto = require('crypto'); import User from '../../../models/user'; import config from '../../../config'; -import queue from '../../../queue'; +import { createHttp } from '../../../queue'; module.exports = async (app: express.Application) => { if (config.github_bot == null) return; @@ -42,7 +42,7 @@ module.exports = async (app: express.Application) => { const commit = event.commit; const parent = commit.parents[0]; - queue.create('http', { + createHttp({ type: 'gitHubFailureReport', userId: bot._id, parentUrl: parent.url,