From feecd7d1f12ef93d7b80d4af051f1e54554d917c Mon Sep 17 00:00:00 2001
From: oott123 <git@public.oott123.com>
Date: Sun, 24 Oct 2021 11:22:39 +0800
Subject: [PATCH] refactor: seperate index and search

---
 .eslintrc.js                       |  1 +
 src/app.module.ts                  | 12 ++++-
 src/bot/bot.service.ts             |  8 +--
 src/bot/webhook.controller.ts      |  2 +-
 src/main.ts                        |  6 ++-
 src/queue/queue.module.ts          |  2 +
 src/search/index.service.ts        | 85 ++++++++++++++++++++++++++++++
 src/search/meili-search.service.ts | 76 +++-----------------------
 src/search/search.module.ts        |  5 +-
 9 files changed, 120 insertions(+), 77 deletions(-)
 create mode 100644 src/search/index.service.ts

diff --git a/.eslintrc.js b/.eslintrc.js
index 99c52f7..48ff1e5 100644
--- a/.eslintrc.js
+++ b/.eslintrc.js
@@ -18,5 +18,6 @@ module.exports = {
     '@typescript-eslint/explicit-module-boundary-types': 'off',
     '@typescript-eslint/no-explicit-any': 'off',
     '@typescript-eslint/no-floating-promises': 'error',
+    '@typescript-eslint/explicit-member-accessibility': 'error',
   },
 }
diff --git a/src/app.module.ts b/src/app.module.ts
index 70fc133..afaf626 100644
--- a/src/app.module.ts
+++ b/src/app.module.ts
@@ -17,6 +17,8 @@ import { OCRModule } from './ocr/ocr.module'
 import { QueueModule } from './queue/queue.module'
 import cacheConfig from './config/cache.config'
 import redisStore = require('cache-manager-ioredis')
+import ocrConfig from './config/ocr.config'
+import queueConfig from './config/queue.config'
 
 @Module({
   imports: [
@@ -25,7 +27,15 @@ import redisStore = require('cache-manager-ioredis')
     }),
     ConfigModule.forRoot({
       isGlobal: true,
-      load: [meilisearchConfig, botConfig, httpConfig, authConfig, cacheConfig],
+      load: [
+        meilisearchConfig,
+        botConfig,
+        httpConfig,
+        authConfig,
+        cacheConfig,
+        ocrConfig,
+        queueConfig,
+      ],
     }),
     CacheModule.registerAsync({
       isGlobal: true,
diff --git a/src/bot/bot.service.ts b/src/bot/bot.service.ts
index f3cce29..dba0eab 100644
--- a/src/bot/bot.service.ts
+++ b/src/bot/bot.service.ts
@@ -8,6 +8,7 @@ import { PhotoSize, Update } from '@grammyjs/types'
 import Debug = require('debug')
 import fetch from 'node-fetch'
 import createHttpsProxyAgent = require('https-proxy-agent')
+import { IndexService } from 'src/search/index.service'
 
 const debug = Debug('app:bot:bot.service')
 
@@ -19,12 +20,13 @@ export class BotService {
   private updateToken: string
   private agent: any
 
-  constructor(
+  public constructor(
     @Inject(botConfig.KEY)
     botCfg: ConfigType<typeof botConfig>,
     @Inject(httpConfig.KEY)
     httpCfg: ConfigType<typeof httpConfig>,
     private search: MeiliSearchService,
+    private index: IndexService,
   ) {
     this.useWebhook = botCfg.webhook
     this.baseUrl = `${httpCfg.baseUrl}${httpCfg.globalPrefix}`
@@ -55,7 +57,7 @@ export class BotService {
     this.bot.command('search', this.botOnSearchCommand)
   }
 
-  async start() {
+  public async start() {
     if (this.useWebhook) {
       await this.bot.init()
       return this.setWebhookUrl()
@@ -105,7 +107,7 @@ export class BotService {
       return
     }
 
-    await this.search.queueMessage({
+    await this.index.queueMessage({
       id: `${chatId}__${msg.message_id}`,
       messageId: msg.message_id,
       chatId,
diff --git a/src/bot/webhook.controller.ts b/src/bot/webhook.controller.ts
index 0eefdda..d77be4b 100644
--- a/src/bot/webhook.controller.ts
+++ b/src/bot/webhook.controller.ts
@@ -10,7 +10,7 @@ import { BotService } from './bot.service'
 
 @Controller('bot/webhook')
 export class WebhookController {
-  constructor(private botService: BotService) {}
+  public constructor(private botService: BotService) {}
 
   @Post(':updateToken/update')
   public async update(
diff --git a/src/main.ts b/src/main.ts
index 655e845..ee7c159 100644
--- a/src/main.ts
+++ b/src/main.ts
@@ -9,6 +9,7 @@ import Debug from 'debug'
 import httpConfig from './config/http.config'
 import { ConfigType } from '@nestjs/config'
 import { BotService } from './bot/bot.service'
+import { IndexService } from './search/index.service'
 
 const debug = Debug('app:main')
 
@@ -50,7 +51,10 @@ async function bootstrap() {
     debug('migrating search')
     const search = app.get(MeiliSearchService)
     await search.migrate()
-    await search.recoverFromCache()
+
+    debug('recovering index')
+    const index = app.get(IndexService)
+    await index.recoverFromCache()
   }
 
   debug('enable shutdown hooks')
diff --git a/src/queue/queue.module.ts b/src/queue/queue.module.ts
index 1584858..8efaba5 100644
--- a/src/queue/queue.module.ts
+++ b/src/queue/queue.module.ts
@@ -8,6 +8,8 @@ import { QueueService } from './queue.service'
 
 @Module({
   providers: [
+    BullQueueService,
+    MemoryQueueService,
     {
       provide: QueueService,
       useFactory: (
diff --git a/src/search/index.service.ts b/src/search/index.service.ts
new file mode 100644
index 0000000..e1640bc
--- /dev/null
+++ b/src/search/index.service.ts
@@ -0,0 +1,85 @@
+import {
+  CACHE_MANAGER,
+  Inject,
+  Injectable,
+  OnModuleDestroy,
+} from '@nestjs/common'
+import { MeiliSearchService, MessageIndex } from './meili-search.service'
+import Debug from 'debug'
+import { Cache } from 'cache-manager'
+
+const debug = Debug('app:search:index')
+
+const MESSAGES_QUEUE_KEY = 'messages'
+const INSERT_BATCH = 100
+const INSERT_TIMEOUT = 60 * 1000
+
+@Injectable()
+export class IndexService implements OnModuleDestroy {
+  private messagesQueue: MessageIndex[]
+  private queueTimer: any
+
+  public constructor(
+    @Inject(CACHE_MANAGER) private cache: Cache,
+    private search: MeiliSearchService,
+  ) {
+    this.messagesQueue = []
+  }
+
+  public async recoverFromCache() {
+    const queue = await this.cache.get<MessageIndex[]>(MESSAGES_QUEUE_KEY)
+    if (queue && Array.isArray(queue)) {
+      this.messagesQueue = queue.concat(this.messagesQueue)
+    }
+    if (this.messagesQueue.length > 0) {
+      debug(`${this.messagesQueue.length} items recovered from cache`)
+      await this.importAllQueued()
+    }
+  }
+
+  public async writeToCache() {
+    debug(`writing cache (${this.messagesQueue.length} items)`)
+    await this.cache.set(MESSAGES_QUEUE_KEY, this.messagesQueue, { ttl: 0 })
+  }
+
+  public async importAllQueued() {
+    if (this.messagesQueue.length < 1) {
+      return
+    }
+    debug('importing all queued message')
+    const queue = this.messagesQueue
+    this.messagesQueue = []
+    try {
+      await this.search.importMessages(queue)
+    } catch (e) {
+      this.messagesQueue = queue.concat(this.messagesQueue)
+      throw e
+    }
+    await this.writeToCache()
+  }
+
+  public queueMessage(message: MessageIndex) {
+    debug('adding message to queue')
+    this.messagesQueue.push(message)
+
+    this.writeToCache().catch(console.error)
+
+    this.queueTimer && clearTimeout(this.queueTimer)
+
+    if (this.messagesQueue.length >= INSERT_BATCH) {
+      debug('message batch reached')
+      this.importAllQueued().catch(console.error)
+    } else {
+      this.queueTimer = setTimeout(() => {
+        debug('insert timeout reached')
+        this.importAllQueued().catch(console.error)
+      }, INSERT_TIMEOUT)
+    }
+  }
+
+  public async onModuleDestroy() {
+    debug('app exiting, writing queue to cache')
+    // await this.writeToCache()
+    await this.importAllQueued()
+  }
+}
diff --git a/src/search/meili-search.service.ts b/src/search/meili-search.service.ts
index f3cd661..23bf41a 100644
--- a/src/search/meili-search.service.ts
+++ b/src/search/meili-search.service.ts
@@ -7,7 +7,6 @@ import {
 import { ConfigType } from '@nestjs/config'
 import meilisearchConfig from '../config/meilisearch.config'
 import { Index, MeiliSearch, Settings } from 'meilisearch'
-import { Cache } from 'cache-manager'
 import Debug from 'debug'
 import deepEqual = require('deep-equal')
 
@@ -26,70 +25,28 @@ export type MessageIndex = {
   timestamp: number
 }
 
-const MESSAGES_QUEUE_KEY = 'messages'
-const INSERT_BATCH = 100
-const INSERT_TIMEOUT = 60 * 1000
-
 @Injectable()
 export class MeiliSearchService implements OnModuleDestroy {
   private client: MeiliSearch
   private indexPrefix: string
   private messagesIndex: Index<MessageIndex>
-  private messagesQueue: MessageIndex[]
-  private queueTimer: any
 
-  constructor(
+  public constructor(
     @Inject(meilisearchConfig.KEY)
     msConfig: ConfigType<typeof meilisearchConfig>,
-    @Inject(CACHE_MANAGER) private cache: Cache,
   ) {
     this.client = new MeiliSearch(msConfig)
     this.indexPrefix = msConfig.indexPrefix
     this.messagesIndex = this.client.index<MessageIndex>(
       `${this.indexPrefix}messages`,
     )
-    this.messagesQueue = []
-  }
-
-  async onModuleDestroy() {
-    debug('app exiting, writing queue to cache')
-    // await this.writeToCache()
-    await this.importAllQueued()
   }
 
-  async recoverFromCache() {
-    const queue = await this.cache.get<MessageIndex[]>(MESSAGES_QUEUE_KEY)
-    if (queue && Array.isArray(queue)) {
-      this.messagesQueue = queue.concat(this.messagesQueue)
-    }
-    if (this.messagesQueue.length > 0) {
-      debug(`${this.messagesQueue.length} items recovered from cache`)
-      await this.importAllQueued()
-    }
+  public onModuleDestroy() {
+    throw new Error('Method not implemented.')
   }
 
-  async writeToCache() {
-    debug(`writing cache (${this.messagesQueue.length} items)`)
-    await this.cache.set(MESSAGES_QUEUE_KEY, this.messagesQueue, { ttl: 0 })
-  }
-
-  async importAllQueued() {
-    if (this.messagesQueue.length < 1) {
-      return
-    }
-    debug('importing all queued message')
-    const queue = this.messagesQueue
-    this.messagesQueue = []
-    try {
-      await this.importMessages(queue)
-    } catch (e) {
-      this.messagesQueue = queue.concat(this.messagesQueue)
-      throw e
-    }
-    await this.writeToCache()
-  }
-
-  async migrate(): Promise<void> {
+  public async migrate(): Promise<void> {
     const settings: Settings = {
       searchableAttributes: ['text'],
       filterableAttributes: ['chatId', 'fromId', 'timestamp'],
@@ -127,30 +84,11 @@ export class MeiliSearchService implements OnModuleDestroy {
     }
   }
 
-  queueMessage(message: MessageIndex) {
-    debug('adding message to queue')
-    this.messagesQueue.push(message)
-
-    this.writeToCache().catch(console.error)
-
-    this.queueTimer && clearTimeout(this.queueTimer)
-
-    if (this.messagesQueue.length >= INSERT_BATCH) {
-      debug('message batch reached')
-      this.importAllQueued().catch(console.error)
-    } else {
-      this.queueTimer = setTimeout(() => {
-        debug('insert timeout reached')
-        this.importAllQueued().catch(console.error)
-      }, INSERT_TIMEOUT)
-    }
-  }
-
-  async importMessages(messages: MessageIndex[]): Promise<void> {
+  public async importMessages(messages: MessageIndex[]): Promise<void> {
     await this.messagesIndex.addDocuments(messages)
   }
 
-  async search(query: string, chatId: string, fromId?: number) {
+  public async search(query: string, chatId: string, fromId?: number) {
     const result = await this.messagesIndex.search<MessageIndex>(query, {
       filter: [
         `chatId = ${chatId}`,
@@ -160,7 +98,7 @@ export class MeiliSearchService implements OnModuleDestroy {
     return result
   }
 
-  getMessagesIndex() {
+  public getMessagesIndex() {
     return this.messagesIndex
   }
 }
diff --git a/src/search/search.module.ts b/src/search/search.module.ts
index 94a0f1a..8ee03c2 100644
--- a/src/search/search.module.ts
+++ b/src/search/search.module.ts
@@ -2,11 +2,12 @@ import { Module } from '@nestjs/common'
 import { TokenModule } from 'src/token/token.module'
 import { MeiliSearchService } from './meili-search.service'
 import { SearchController } from './search.controller'
+import { IndexService } from './index.service'
 
 @Module({
   imports: [TokenModule],
-  providers: [MeiliSearchService],
-  exports: [MeiliSearchService],
+  providers: [MeiliSearchService, IndexService],
+  exports: [MeiliSearchService, IndexService],
   controllers: [SearchController],
 })
 export class SearchModule {}