From b7d9d1620161a728e34ab20d8ea99160b3eb4196 Mon Sep 17 00:00:00 2001 From: okayurisotto <47853651+okayurisotto@users.noreply.github.com> Date: Wed, 28 Feb 2024 15:34:58 +0900 Subject: [PATCH] =?UTF-8?q?refactor(backend):=20=E3=83=8E=E3=83=BC?= =?UTF-8?q?=E3=83=88=E3=81=AE=E3=82=A8=E3=82=AF=E3=82=B9=E3=83=9D=E3=83=BC?= =?UTF-8?q?=E3=83=88=E5=87=A6=E7=90=86=E3=81=A7Streams=20API=E3=82=92?= =?UTF-8?q?=E4=BD=BF=E3=81=86=E3=82=88=E3=81=86=E3=81=AB=20(#13465)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor(backend): ノートのエクスポート処理でStreams APIを使うように * fixup! refactor(backend): ノートのエクスポート処理でStreams APIを使うように `await`忘れにより、ジョブがすぐに完了したことになり削除されてしまっていた。 それによって、`NoteStream`内での`updateProgress`メソッドの呼び出しで、`Missing key for job`のエラーが発生することがあった。 --------- Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com> --- packages/backend/src/misc/FileWriterStream.ts | 31 ++++ packages/backend/src/misc/JsonArrayStream.ts | 30 ++++ .../processors/ExportNotesProcessorService.ts | 164 +++++++++--------- 3 files changed, 146 insertions(+), 79 deletions(-) create mode 100644 packages/backend/src/misc/FileWriterStream.ts create mode 100644 packages/backend/src/misc/JsonArrayStream.ts diff --git a/packages/backend/src/misc/FileWriterStream.ts b/packages/backend/src/misc/FileWriterStream.ts new file mode 100644 index 000000000..828851df0 --- /dev/null +++ b/packages/backend/src/misc/FileWriterStream.ts @@ -0,0 +1,31 @@ +import * as fs from 'node:fs/promises'; +import type { PathLike } from 'node:fs'; + +/** + * `fs.createWriteStream()`相当のことを行う`WritableStream` (Web標準) + */ +export class FileWriterStream extends WritableStream { + constructor(path: PathLike) { + let file: fs.FileHandle | null = null; + + super({ + start: async () => { + file = await fs.open(path, 'a'); + }, + write: async (chunk, controller) => { + if (file === null) { + controller.error(); + throw new Error(); + } + + await file.write(chunk); + }, + close: async () => { + await file?.close(); + }, + abort: async () => { + await file?.close(); + }, + }); + } +} diff --git a/packages/backend/src/misc/JsonArrayStream.ts b/packages/backend/src/misc/JsonArrayStream.ts new file mode 100644 index 000000000..ad35bb3a7 --- /dev/null +++ b/packages/backend/src/misc/JsonArrayStream.ts @@ -0,0 +1,30 @@ +import { TransformStream } from 'node:stream/web'; + +/** + * ストリームに流れてきた各データについて`JSON.stringify()`した上で、それらを一つの配列にまとめる + */ +export class JsonArrayStream extends TransformStream { + constructor() { + /** 最初の要素かどうかを変数に記録 */ + let isFirst = true; + + super({ + start(controller) { + controller.enqueue('['); + }, + flush(controller) { + controller.enqueue(']'); + }, + transform(chunk, controller) { + if (isFirst) { + isFirst = false; + } else { + // 妥当なJSON配列にするためには最初以外の要素の前に`,`を挿入しなければならない + controller.enqueue(',\n'); + } + + controller.enqueue(JSON.stringify(chunk)); + }, + }); + } +} diff --git a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts index f2ae0ce4b..c7611012d 100644 --- a/packages/backend/src/queue/processors/ExportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ExportNotesProcessorService.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import * as fs from 'node:fs'; +import { ReadableStream, TextEncoderStream } from 'node:stream/web'; import { Inject, Injectable } from '@nestjs/common'; import { MoreThan } from 'typeorm'; import { format as dateFormat } from 'date-fns'; @@ -18,10 +18,82 @@ import { bindThis } from '@/decorators.js'; import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js'; import { Packed } from '@/misc/json-schema.js'; import { IdService } from '@/core/IdService.js'; +import { JsonArrayStream } from '@/misc/JsonArrayStream.js'; +import { FileWriterStream } from '@/misc/FileWriterStream.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; import type { DbJobDataWithUser } from '../types.js'; +class NoteStream extends ReadableStream> { + constructor( + job: Bull.Job, + notesRepository: NotesRepository, + pollsRepository: PollsRepository, + driveFileEntityService: DriveFileEntityService, + idService: IdService, + userId: string, + ) { + let exportedNotesCount = 0; + let cursor: MiNote['id'] | null = null; + + const serialize = ( + note: MiNote, + poll: MiPoll | null, + files: Packed<'DriveFile'>[], + ): Record => { + return { + id: note.id, + text: note.text, + createdAt: idService.parse(note.id).date.toISOString(), + fileIds: note.fileIds, + files: files, + replyId: note.replyId, + renoteId: note.renoteId, + poll: poll, + cw: note.cw, + visibility: note.visibility, + visibleUserIds: note.visibleUserIds, + localOnly: note.localOnly, + reactionAcceptance: note.reactionAcceptance, + }; + }; + + super({ + async pull(controller): Promise { + const notes = await notesRepository.find({ + where: { + userId, + ...(cursor !== null ? { id: MoreThan(cursor) } : {}), + }, + take: 100, // 100件ずつ取得 + order: { id: 1 }, + }); + + if (notes.length === 0) { + job.updateProgress(100); + controller.close(); + } + + cursor = notes.at(-1)?.id ?? null; + + for (const note of notes) { + const poll = note.hasPoll + ? await pollsRepository.findOneByOrFail({ noteId: note.id }) // N+1 + : null; + const files = await driveFileEntityService.packManyByIds(note.fileIds); // N+1 + const content = serialize(note, poll, files); + + controller.enqueue(content); + exportedNotesCount++; + } + + const total = await notesRepository.countBy({ userId }); + job.updateProgress(exportedNotesCount / total); + }, + }); + } +} + @Injectable() export class ExportNotesProcessorService { private logger: Logger; @@ -59,67 +131,19 @@ export class ExportNotesProcessorService { this.logger.info(`Temp file is ${path}`); try { - const stream = fs.createWriteStream(path, { flags: 'a' }); + // メモリが足りなくならないようにストリームで処理する + await new NoteStream( + job, + this.notesRepository, + this.pollsRepository, + this.driveFileEntityService, + this.idService, + user.id, + ) + .pipeThrough(new JsonArrayStream()) + .pipeThrough(new TextEncoderStream()) + .pipeTo(new FileWriterStream(path)); - const write = (text: string): Promise => { - return new Promise((res, rej) => { - stream.write(text, err => { - if (err) { - this.logger.error(err); - rej(err); - } else { - res(); - } - }); - }); - }; - - await write('['); - - let exportedNotesCount = 0; - let cursor: MiNote['id'] | null = null; - - while (true) { - const notes = await this.notesRepository.find({ - where: { - userId: user.id, - ...(cursor ? { id: MoreThan(cursor) } : {}), - }, - take: 100, - order: { - id: 1, - }, - }) as MiNote[]; - - if (notes.length === 0) { - job.updateProgress(100); - break; - } - - cursor = notes.at(-1)?.id ?? null; - - for (const note of notes) { - let poll: MiPoll | undefined; - if (note.hasPoll) { - poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id }); - } - const files = await this.driveFileEntityService.packManyByIds(note.fileIds); - const content = JSON.stringify(this.serialize(note, poll, files)); - const isFirst = exportedNotesCount === 0; - await write(isFirst ? content : ',\n' + content); - exportedNotesCount++; - } - - const total = await this.notesRepository.countBy({ - userId: user.id, - }); - - job.updateProgress(exportedNotesCount / total); - } - - await write(']'); - - stream.end(); this.logger.succ(`Exported to: ${path}`); const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json'; @@ -130,22 +154,4 @@ export class ExportNotesProcessorService { cleanup(); } } - - private serialize(note: MiNote, poll: MiPoll | null = null, files: Packed<'DriveFile'>[]): Record { - return { - id: note.id, - text: note.text, - createdAt: this.idService.parse(note.id).date.toISOString(), - fileIds: note.fileIds, - files: files, - replyId: note.replyId, - renoteId: note.renoteId, - poll: poll, - cw: note.cw, - visibility: note.visibility, - visibleUserIds: note.visibleUserIds, - localOnly: note.localOnly, - reactionAcceptance: note.reactionAcceptance, - }; - } }