diff --git a/.eslintrc.js b/.eslintrc.js index 99c52f7..48ff1e5 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -18,5 +18,6 @@ module.exports = { '@typescript-eslint/explicit-module-boundary-types': 'off', '@typescript-eslint/no-explicit-any': 'off', '@typescript-eslint/no-floating-promises': 'error', + '@typescript-eslint/explicit-member-accessibility': 'error', }, } diff --git a/src/app.module.ts b/src/app.module.ts index 70fc133..afaf626 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -17,6 +17,8 @@ import { OCRModule } from './ocr/ocr.module' import { QueueModule } from './queue/queue.module' import cacheConfig from './config/cache.config' import redisStore = require('cache-manager-ioredis') +import ocrConfig from './config/ocr.config' +import queueConfig from './config/queue.config' @Module({ imports: [ @@ -25,7 +27,15 @@ import redisStore = require('cache-manager-ioredis') }), ConfigModule.forRoot({ isGlobal: true, - load: [meilisearchConfig, botConfig, httpConfig, authConfig, cacheConfig], + load: [ + meilisearchConfig, + botConfig, + httpConfig, + authConfig, + cacheConfig, + ocrConfig, + queueConfig, + ], }), CacheModule.registerAsync({ isGlobal: true, diff --git a/src/bot/bot.service.ts b/src/bot/bot.service.ts index f3cce29..dba0eab 100644 --- a/src/bot/bot.service.ts +++ b/src/bot/bot.service.ts @@ -8,6 +8,7 @@ import { PhotoSize, Update } from '@grammyjs/types' import Debug = require('debug') import fetch from 'node-fetch' import createHttpsProxyAgent = require('https-proxy-agent') +import { IndexService } from 'src/search/index.service' const debug = Debug('app:bot:bot.service') @@ -19,12 +20,13 @@ export class BotService { private updateToken: string private agent: any - constructor( + public constructor( @Inject(botConfig.KEY) botCfg: ConfigType, @Inject(httpConfig.KEY) httpCfg: ConfigType, private search: MeiliSearchService, + private index: IndexService, ) { this.useWebhook = botCfg.webhook this.baseUrl = `${httpCfg.baseUrl}${httpCfg.globalPrefix}` @@ -55,7 +57,7 @@ export class BotService { this.bot.command('search', this.botOnSearchCommand) } - async start() { + public async start() { if (this.useWebhook) { await this.bot.init() return this.setWebhookUrl() @@ -105,7 +107,7 @@ export class BotService { return } - await this.search.queueMessage({ + await this.index.queueMessage({ id: `${chatId}__${msg.message_id}`, messageId: msg.message_id, chatId, diff --git a/src/bot/webhook.controller.ts b/src/bot/webhook.controller.ts index 0eefdda..d77be4b 100644 --- a/src/bot/webhook.controller.ts +++ b/src/bot/webhook.controller.ts @@ -10,7 +10,7 @@ import { BotService } from './bot.service' @Controller('bot/webhook') export class WebhookController { - constructor(private botService: BotService) {} + public constructor(private botService: BotService) {} @Post(':updateToken/update') public async update( diff --git a/src/main.ts b/src/main.ts index 655e845..ee7c159 100644 --- a/src/main.ts +++ b/src/main.ts @@ -9,6 +9,7 @@ import Debug from 'debug' import httpConfig from './config/http.config' import { ConfigType } from '@nestjs/config' import { BotService } from './bot/bot.service' +import { IndexService } from './search/index.service' const debug = Debug('app:main') @@ -50,7 +51,10 @@ async function bootstrap() { debug('migrating search') const search = app.get(MeiliSearchService) await search.migrate() - await search.recoverFromCache() + + debug('recovering index') + const index = app.get(IndexService) + await index.recoverFromCache() } debug('enable shutdown hooks') diff --git a/src/queue/queue.module.ts b/src/queue/queue.module.ts index 1584858..8efaba5 100644 --- a/src/queue/queue.module.ts +++ b/src/queue/queue.module.ts @@ -8,6 +8,8 @@ import { QueueService } from './queue.service' @Module({ providers: [ + BullQueueService, + MemoryQueueService, { provide: QueueService, useFactory: ( diff --git a/src/search/index.service.ts b/src/search/index.service.ts new file mode 100644 index 0000000..e1640bc --- /dev/null +++ b/src/search/index.service.ts @@ -0,0 +1,85 @@ +import { + CACHE_MANAGER, + Inject, + Injectable, + OnModuleDestroy, +} from '@nestjs/common' +import { MeiliSearchService, MessageIndex } from './meili-search.service' +import Debug from 'debug' +import { Cache } from 'cache-manager' + +const debug = Debug('app:search:index') + +const MESSAGES_QUEUE_KEY = 'messages' +const INSERT_BATCH = 100 +const INSERT_TIMEOUT = 60 * 1000 + +@Injectable() +export class IndexService implements OnModuleDestroy { + private messagesQueue: MessageIndex[] + private queueTimer: any + + public constructor( + @Inject(CACHE_MANAGER) private cache: Cache, + private search: MeiliSearchService, + ) { + this.messagesQueue = [] + } + + public async recoverFromCache() { + const queue = await this.cache.get(MESSAGES_QUEUE_KEY) + if (queue && Array.isArray(queue)) { + this.messagesQueue = queue.concat(this.messagesQueue) + } + if (this.messagesQueue.length > 0) { + debug(`${this.messagesQueue.length} items recovered from cache`) + await this.importAllQueued() + } + } + + public async writeToCache() { + debug(`writing cache (${this.messagesQueue.length} items)`) + await this.cache.set(MESSAGES_QUEUE_KEY, this.messagesQueue, { ttl: 0 }) + } + + public async importAllQueued() { + if (this.messagesQueue.length < 1) { + return + } + debug('importing all queued message') + const queue = this.messagesQueue + this.messagesQueue = [] + try { + await this.search.importMessages(queue) + } catch (e) { + this.messagesQueue = queue.concat(this.messagesQueue) + throw e + } + await this.writeToCache() + } + + public queueMessage(message: MessageIndex) { + debug('adding message to queue') + this.messagesQueue.push(message) + + this.writeToCache().catch(console.error) + + this.queueTimer && clearTimeout(this.queueTimer) + + if (this.messagesQueue.length >= INSERT_BATCH) { + debug('message batch reached') + this.importAllQueued().catch(console.error) + } else { + this.queueTimer = setTimeout(() => { + debug('insert timeout reached') + this.importAllQueued().catch(console.error) + }, INSERT_TIMEOUT) + } + } + + public async onModuleDestroy() { + debug('app exiting, writing queue to cache') + // await this.writeToCache() + await this.importAllQueued() + } +} diff --git a/src/search/meili-search.service.ts b/src/search/meili-search.service.ts index f3cd661..23bf41a 100644 --- a/src/search/meili-search.service.ts +++ b/src/search/meili-search.service.ts @@ -7,7 +7,6 @@ import { import { ConfigType } from '@nestjs/config' import meilisearchConfig from '../config/meilisearch.config' import { Index, MeiliSearch, Settings } from 'meilisearch' -import { Cache } from 'cache-manager' import Debug from 'debug' import deepEqual = require('deep-equal') @@ -26,70 +25,28 @@ export type MessageIndex = { timestamp: number } -const MESSAGES_QUEUE_KEY = 'messages' -const INSERT_BATCH = 100 -const INSERT_TIMEOUT = 60 * 1000 - @Injectable() export class MeiliSearchService implements OnModuleDestroy { private client: MeiliSearch private indexPrefix: string private messagesIndex: Index - private messagesQueue: MessageIndex[] - private queueTimer: any - constructor( + public constructor( @Inject(meilisearchConfig.KEY) msConfig: ConfigType, - @Inject(CACHE_MANAGER) private cache: Cache, ) { this.client = new MeiliSearch(msConfig) this.indexPrefix = msConfig.indexPrefix this.messagesIndex = this.client.index( `${this.indexPrefix}messages`, ) - this.messagesQueue = [] - } - - async onModuleDestroy() { - debug('app exiting, writing queue to cache') - // await this.writeToCache() - await this.importAllQueued() } - async recoverFromCache() { - const queue = await this.cache.get(MESSAGES_QUEUE_KEY) - if (queue && Array.isArray(queue)) { - this.messagesQueue = queue.concat(this.messagesQueue) - } - if (this.messagesQueue.length > 0) { - debug(`${this.messagesQueue.length} items recovered from cache`) - await this.importAllQueued() - } + public onModuleDestroy() { + throw new Error('Method not implemented.') } - async writeToCache() { - debug(`writing cache (${this.messagesQueue.length} items)`) - await this.cache.set(MESSAGES_QUEUE_KEY, this.messagesQueue, { ttl: 0 }) - } - - async importAllQueued() { - if (this.messagesQueue.length < 1) { - return - } - debug('importing all queued message') - const queue = this.messagesQueue - this.messagesQueue = [] - try { - await this.importMessages(queue) - } catch (e) { - this.messagesQueue = queue.concat(this.messagesQueue) - throw e - } - await this.writeToCache() - } - - async migrate(): Promise { + public async migrate(): Promise { const settings: Settings = { searchableAttributes: ['text'], filterableAttributes: ['chatId', 'fromId', 'timestamp'], @@ -127,30 +84,11 @@ export class MeiliSearchService implements OnModuleDestroy { } } - queueMessage(message: MessageIndex) { - debug('adding message to queue') - this.messagesQueue.push(message) - - this.writeToCache().catch(console.error) - - this.queueTimer && clearTimeout(this.queueTimer) - - if (this.messagesQueue.length >= INSERT_BATCH) { - debug('message batch reached') - this.importAllQueued().catch(console.error) - } else { - this.queueTimer = setTimeout(() => { - debug('insert timeout reached') - this.importAllQueued().catch(console.error) - }, INSERT_TIMEOUT) - } - } - - async importMessages(messages: MessageIndex[]): Promise { + public async importMessages(messages: MessageIndex[]): Promise { await this.messagesIndex.addDocuments(messages) } - async search(query: string, chatId: string, fromId?: number) { + public async search(query: string, chatId: string, fromId?: number) { const result = await this.messagesIndex.search(query, { filter: [ `chatId = ${chatId}`, @@ -160,7 +98,7 @@ export class MeiliSearchService implements OnModuleDestroy { return result } - getMessagesIndex() { + public getMessagesIndex() { return this.messagesIndex } } diff --git a/src/search/search.module.ts b/src/search/search.module.ts index 94a0f1a..8ee03c2 100644 --- a/src/search/search.module.ts +++ b/src/search/search.module.ts @@ -2,11 +2,12 @@ import { Module } from '@nestjs/common' import { TokenModule } from 'src/token/token.module' import { MeiliSearchService } from './meili-search.service' import { SearchController } from './search.controller' +import { IndexService } from './index.service' @Module({ imports: [TokenModule], - providers: [MeiliSearchService], - exports: [MeiliSearchService], + providers: [MeiliSearchService, IndexService], + exports: [MeiliSearchService, IndexService], controllers: [SearchController], }) export class SearchModule {}