From 32c008d0087eae2a2b32f050fd2ae126a3e2c732 Mon Sep 17 00:00:00 2001 From: Akihiko Odaki Date: Mon, 2 Apr 2018 20:16:13 +0900 Subject: [PATCH] Deliver posts to remote followers --- src/post/distribute.ts | 138 ++++++++++------------- src/processor/http/deliver-post.ts | 94 +++++++++++++++ src/processor/http/follow.ts | 46 +------- src/processor/http/index.ts | 2 + src/remote/activitypub/create.ts | 3 +- src/remote/request.ts | 35 ++++++ src/server/api/endpoints/posts/create.ts | 7 +- 7 files changed, 195 insertions(+), 130 deletions(-) create mode 100644 src/processor/http/deliver-post.ts create mode 100644 src/remote/request.ts diff --git a/src/post/distribute.ts b/src/post/distribute.ts index 4def2c27f..49c6eb22d 100644 --- a/src/post/distribute.ts +++ b/src/post/distribute.ts @@ -1,16 +1,15 @@ -import Channel from '../models/channel'; import Mute from '../models/mute'; -import Following from '../models/following'; -import Post from '../models/post'; +import Post, { pack } from '../models/post'; import Watching from '../models/post-watching'; -import ChannelWatching from '../models/channel-watching'; import User from '../models/user'; -import stream, { publishChannelStream } from '../publishers/stream'; +import stream from '../publishers/stream'; import notify from '../publishers/notify'; import pushSw from '../publishers/push-sw'; +import queue from '../queue'; import watch from './watch'; export default async (user, mentions, post) => { + const promisedPostObj = pack(post); const promises = [ User.update({ _id: user._id }, { // Increment my posts count @@ -22,66 +21,33 @@ export default async (user, mentions, post) => { latestPost: post._id } }), + new Promise((resolve, reject) => queue.create('http', { + type: 'deliverPost', + id: post._id, + }).save(error => error ? reject(error) : resolve())), ] as Array>; - function addMention(mentionee, reason) { + function addMention(promisedMentionee, reason) { // Publish event - if (!user._id.equals(mentionee)) { - promises.push(Mute.find({ - muterId: mentionee, - deletedAt: { $exists: false } - }).then(mentioneeMutes => { + promises.push(promisedMentionee.then(mentionee => { + if (user._id.equals(mentionee)) { + return Promise.resolve(); + } + + return Promise.all([ + promisedPostObj, + Mute.find({ + muterId: mentionee, + deletedAt: { $exists: false } + }) + ]).then(([postObj, mentioneeMutes]) => { const mentioneesMutedUserIds = mentioneeMutes.map(m => m.muteeId.toString()); if (mentioneesMutedUserIds.indexOf(user._id.toString()) == -1) { - stream(mentionee, reason, post); - pushSw(mentionee, reason, post); + stream(mentionee, reason, postObj); + pushSw(mentionee, reason, postObj); } - })); - } - } - - // タイムラインへの投稿 - if (!post.channelId) { - // Publish event to myself's stream - stream(user._id, 'post', post); - - // Fetch all followers - const followers = await Following - .find({ - followeeId: user._id - }, { - followerId: true, - _id: false }); - - // Publish event to followers stream - followers.forEach(following => - stream(following.followerId, 'post', post)); - } - - // チャンネルへの投稿 - if (post.channelId) { - // Increment channel index(posts count) - promises.push(Channel.update({ _id: post.channelId }, { - $inc: { - index: 1 - } })); - - // Publish event to channel - publishChannelStream(post.channelId, 'post', post); - - // Get channel watchers - const watches = await ChannelWatching.find({ - channelId: post.channelId, - // 削除されたドキュメントは除く - deletedAt: { $exists: false } - }); - - // チャンネルの視聴者(のタイムライン)に配信 - watches.forEach(w => { - stream(w.userId, 'post', post); - }); } // If has in reply to post @@ -95,8 +61,10 @@ export default async (user, mentions, post) => { }), // 自分自身へのリプライでない限りは通知を作成 - notify(post.reply.userId, user._id, 'reply', { - postId: post._id + promisedPostObj.then(({ reply }) => { + return notify(reply.userId, user._id, 'reply', { + postId: post._id + }); }), // Fetch watchers @@ -121,11 +89,13 @@ export default async (user, mentions, post) => { ); // Add mention - addMention(post.reply.userId, 'reply'); + addMention(promisedPostObj.then(({ reply }) => reply.userId), 'reply'); // この投稿をWatchする if (user.account.settings.autoWatch !== false) { - promises.push(watch(user._id, post.reply)); + promises.push(promisedPostObj.then(({ reply }) => { + return watch(user._id, reply); + })); } } @@ -134,10 +104,17 @@ export default async (user, mentions, post) => { const type = post.text ? 'quote' : 'repost'; promises.push( - // Notify - notify(post.repost.userId, user._id, type, { - postId: post._id - }), + promisedPostObj.then(({ repost }) => Promise.all([ + // Notify + notify(repost.userId, user._id, type, { + postId: post._id + }), + + // この投稿をWatchする + // TODO: ユーザーが「Repostしたときに自動でWatchする」設定を + // オフにしていた場合はしない + watch(user._id, repost) + ])), // Fetch watchers Watching @@ -157,23 +134,20 @@ export default async (user, mentions, post) => { postId: post._id }); }); - }), - - // この投稿をWatchする - // TODO: ユーザーが「Repostしたときに自動でWatchする」設定を - // オフにしていた場合はしない - watch(user._id, post.repost) + }) ); // If it is quote repost if (post.text) { // Add mention - addMention(post.repost.userId, 'quote'); + addMention(promisedPostObj.then(({ repost }) => repost.userId), 'quote'); } else { - // Publish event - if (!user._id.equals(post.repost.userId)) { - stream(post.repost.userId, 'repost', post); - } + promises.push(promisedPostObj.then(postObj => { + // Publish event + if (!user._id.equals(postObj.repost.userId)) { + stream(postObj.repost.userId, 'repost', postObj); + } + })); } // 今までで同じ投稿をRepostしているか @@ -196,10 +170,10 @@ export default async (user, mentions, post) => { } // Resolve all mentions - await Promise.all(mentions.map(async mention => { + await promisedPostObj.then(({ reply, repost }) => Promise.all(mentions.map(async mention => { // 既に言及されたユーザーに対する返信や引用repostの場合も無視 - if (post.reply && post.reply.userId.equals(mention)) return; - if (post.repost && post.repost.userId.equals(mention)) return; + if (reply && reply.userId.equals(mention)) return; + if (repost && repost.userId.equals(mention)) return; // Add mention addMention(mention, 'mention'); @@ -208,7 +182,9 @@ export default async (user, mentions, post) => { await notify(mention, user._id, 'mention', { postId: post._id }); - })); + }))); - return Promise.all(promises); + await Promise.all(promises); + + return promisedPostObj; }; diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts new file mode 100644 index 000000000..83ac8281f --- /dev/null +++ b/src/processor/http/deliver-post.ts @@ -0,0 +1,94 @@ +import Channel from '../../models/channel'; +import Following from '../../models/following'; +import ChannelWatching from '../../models/channel-watching'; +import Post, { pack } from '../../models/post'; +import User, { isLocalUser } from '../../models/user'; +import stream, { publishChannelStream } from '../../publishers/stream'; +import context from '../../remote/activitypub/renderer/context'; +import renderNote from '../../remote/activitypub/renderer/note'; +import request from '../../remote/request'; + +export default ({ data }) => Post.findOne({ _id: data.id }).then(post => { + const promisedPostObj = pack(post); + const promises = []; + + // タイムラインへの投稿 + if (!post.channelId) { + promises.push( + // Publish event to myself's stream + promisedPostObj.then(postObj => { + stream(post.userId, 'post', postObj); + }), + + Promise.all([ + User.findOne({ _id: post.userId }), + + // Fetch all followers + Following.aggregate([ + { + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, + { + $match: { + followeeId: post.userId + } + } + ], { + _id: false + }) + ]).then(([user, followers]) => Promise.all(followers.map(following => { + if (isLocalUser(following.follower)) { + // Publish event to followers stream + return promisedPostObj.then(postObj => { + stream(following.followerId, 'post', postObj); + }); + } + + return renderNote(user, post).then(rendered => { + rendered['@context'] = context; + return request(user, following.follower[0].account.inbox, rendered); + }); + }))) + ); + } + + // チャンネルへの投稿 + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + } + + return Promise.all(promises); +}); diff --git a/src/processor/http/follow.ts b/src/processor/http/follow.ts index 6b2a39d51..8bf890efb 100644 --- a/src/processor/http/follow.ts +++ b/src/processor/http/follow.ts @@ -1,6 +1,3 @@ -import { request } from 'https'; -import { sign } from 'http-signature'; -import { URL } from 'url'; import User, { isLocalUser, pack as packUser } from '../../models/user'; import Following from '../../models/following'; import FollowingLog from '../../models/following-log'; @@ -9,7 +6,7 @@ import event from '../../publishers/stream'; import notify from '../../publishers/notify'; import context from '../../remote/activitypub/renderer/context'; import render from '../../remote/activitypub/renderer/follow'; -import config from '../../config'; +import request from '../../remote/request'; export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => { const promisedFollower = User.findOne({ _id: followerId }); @@ -60,45 +57,10 @@ export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followeeEvent = packUser(follower, followee) .then(packed => event(followee._id, 'followed', packed)); } else if (isLocalUser(follower)) { - followeeEvent = new Promise((resolve, reject) => { - const { - protocol, - hostname, - port, - pathname, - search - } = new URL(followee.account.inbox); + const rendered = render(follower, followee); + rendered['@context'] = context; - const req = request({ - protocol, - hostname, - port, - method: 'POST', - path: pathname + search, - }, res => { - res.on('close', () => { - if (res.statusCode >= 200 && res.statusCode < 300) { - resolve(); - } else { - reject(res); - } - }); - - res.on('data', () => {}); - res.on('error', reject); - }); - - sign(req, { - authorizationHeaderName: 'Signature', - key: follower.account.keypair, - keyId: `acct:${follower.username}@${config.host}` - }); - - const rendered = render(follower, followee); - rendered['@context'] = context; - - req.end(JSON.stringify(rendered)); - }); + followeeEvent = request(follower, followee.account.inbox, rendered); } return Promise.all([followerEvent, followeeEvent]); diff --git a/src/processor/http/index.ts b/src/processor/http/index.ts index b3161cb99..0301b472c 100644 --- a/src/processor/http/index.ts +++ b/src/processor/http/index.ts @@ -1,9 +1,11 @@ +import deliverPost from './deliver-post'; import follow from './follow'; import performActivityPub from './perform-activitypub'; import processInbox from './process-inbox'; import reportGitHubFailure from './report-github-failure'; const handlers = { + deliverPost, follow, performActivityPub, processInbox, diff --git a/src/remote/activitypub/create.ts b/src/remote/activitypub/create.ts index dd3f7b022..f70f56b79 100644 --- a/src/remote/activitypub/create.ts +++ b/src/remote/activitypub/create.ts @@ -1,6 +1,5 @@ import { JSDOM } from 'jsdom'; import config from '../../config'; -import { pack as packPost } from '../../models/post'; import RemoteUserObject, { IRemoteUserObject } from '../../models/remote-user-object'; import { IRemoteUser } from '../../models/user'; import uploadFromUrl from '../../drive/upload-from-url'; @@ -69,7 +68,7 @@ class Creator { const promises = []; if (this.distribute) { - promises.push(distributePost(this.actor, inserted.mentions, packPost(inserted))); + promises.push(distributePost(this.actor, inserted.mentions, inserted)); } // Register to search database diff --git a/src/remote/request.ts b/src/remote/request.ts new file mode 100644 index 000000000..72262cbf6 --- /dev/null +++ b/src/remote/request.ts @@ -0,0 +1,35 @@ +import { request } from 'https'; +import { sign } from 'http-signature'; +import { URL } from 'url'; +import config from '../config'; + +export default ({ account, username }, url, object) => new Promise((resolve, reject) => { + const { protocol, hostname, port, pathname, search } = new URL(url); + + const req = request({ + protocol, + hostname, + port, + method: 'POST', + path: pathname + search, + }, res => { + res.on('end', () => { + if (res.statusCode >= 200 && res.statusCode < 300) { + resolve(); + } else { + reject(res); + } + }); + + res.on('data', () => {}); + res.on('error', reject); + }); + + sign(req, { + authorizationHeaderName: 'Signature', + key: account.keypair, + keyId: `acct:${username}@${config.host}` + }); + + req.end(JSON.stringify(object)); +}); diff --git a/src/server/api/endpoints/posts/create.ts b/src/server/api/endpoints/posts/create.ts index b633494a3..ccd617545 100644 --- a/src/server/api/endpoints/posts/create.ts +++ b/src/server/api/endpoints/posts/create.ts @@ -7,7 +7,7 @@ import renderAcct from '../../../../acct/render'; import config from '../../../../config'; import html from '../../../../text/html'; import parse from '../../../../text/parse'; -import Post, { IPost, isValidText, isValidCw, pack } from '../../../../models/post'; +import Post, { IPost, isValidText, isValidCw } from '../../../../models/post'; import { ILocalUser } from '../../../../models/user'; import Channel, { IChannel } from '../../../../models/channel'; import DriveFile from '../../../../models/drive-file'; @@ -283,16 +283,13 @@ module.exports = (params, user: ILocalUser, app) => new Promise(async (res, rej) geo }, reply, repost, atMentions); - // Serialize - const postObj = await pack(post); + const postObj = await distribute(user, post.mentions, post); // Reponse res({ createdPost: postObj }); - distribute(user, post.mentions, postObj); - // Register to search database if (post.text && config.elasticsearch.enable) { const es = require('../../../db/elasticsearch');