Skip to content

Commit

Permalink
refactor: seperate index and search
Browse files Browse the repository at this point in the history
  • Loading branch information
oott123 committed Oct 24, 2021
1 parent 14b1910 commit feecd7d
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 77 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ module.exports = {
'@typescript-eslint/explicit-module-boundary-types': 'off',
'@typescript-eslint/no-explicit-any': 'off',
'@typescript-eslint/no-floating-promises': 'error',
'@typescript-eslint/explicit-member-accessibility': 'error',
},
}
12 changes: 11 additions & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { OCRModule } from './ocr/ocr.module'
import { QueueModule } from './queue/queue.module'
import cacheConfig from './config/cache.config'
import redisStore = require('cache-manager-ioredis')
import ocrConfig from './config/ocr.config'
import queueConfig from './config/queue.config'

@Module({
imports: [
Expand All @@ -25,7 +27,15 @@ import redisStore = require('cache-manager-ioredis')
}),
ConfigModule.forRoot({
isGlobal: true,
load: [meilisearchConfig, botConfig, httpConfig, authConfig, cacheConfig],
load: [
meilisearchConfig,
botConfig,
httpConfig,
authConfig,
cacheConfig,
ocrConfig,
queueConfig,
],
}),
CacheModule.registerAsync({
isGlobal: true,
Expand Down
8 changes: 5 additions & 3 deletions src/bot/bot.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { PhotoSize, Update } from '@grammyjs/types'
import Debug = require('debug')
import fetch from 'node-fetch'
import createHttpsProxyAgent = require('https-proxy-agent')
import { IndexService } from 'src/search/index.service'

const debug = Debug('app:bot:bot.service')

Expand All @@ -19,12 +20,13 @@ export class BotService {
private updateToken: string
private agent: any

constructor(
public constructor(
@Inject(botConfig.KEY)
botCfg: ConfigType<typeof botConfig>,
@Inject(httpConfig.KEY)
httpCfg: ConfigType<typeof httpConfig>,
private search: MeiliSearchService,
private index: IndexService,
) {
this.useWebhook = botCfg.webhook
this.baseUrl = `${httpCfg.baseUrl}${httpCfg.globalPrefix}`
Expand Down Expand Up @@ -55,7 +57,7 @@ export class BotService {
this.bot.command('search', this.botOnSearchCommand)
}

async start() {
public async start() {
if (this.useWebhook) {
await this.bot.init()
return this.setWebhookUrl()
Expand Down Expand Up @@ -105,7 +107,7 @@ export class BotService {
return
}

await this.search.queueMessage({
await this.index.queueMessage({
id: `${chatId}__${msg.message_id}`,
messageId: msg.message_id,
chatId,
Expand Down
2 changes: 1 addition & 1 deletion src/bot/webhook.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { BotService } from './bot.service'

@Controller('bot/webhook')
export class WebhookController {
constructor(private botService: BotService) {}
public constructor(private botService: BotService) {}

@Post(':updateToken/update')
public async update(
Expand Down
6 changes: 5 additions & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Debug from 'debug'
import httpConfig from './config/http.config'
import { ConfigType } from '@nestjs/config'
import { BotService } from './bot/bot.service'
import { IndexService } from './search/index.service'

const debug = Debug('app:main')

Expand Down Expand Up @@ -50,7 +51,10 @@ async function bootstrap() {
debug('migrating search')
const search = app.get(MeiliSearchService)
await search.migrate()
await search.recoverFromCache()

debug('recovering index')
const index = app.get(IndexService)
await index.recoverFromCache()
}

debug('enable shutdown hooks')
Expand Down
2 changes: 2 additions & 0 deletions src/queue/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { QueueService } from './queue.service'

@Module({
providers: [
BullQueueService,
MemoryQueueService,
{
provide: QueueService,
useFactory: (
Expand Down
85 changes: 85 additions & 0 deletions src/search/index.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import {
CACHE_MANAGER,
Inject,
Injectable,
OnModuleDestroy,
} from '@nestjs/common'
import { MeiliSearchService, MessageIndex } from './meili-search.service'
import Debug from 'debug'
import { Cache } from 'cache-manager'

const debug = Debug('app:search:index')

const MESSAGES_QUEUE_KEY = 'messages'
const INSERT_BATCH = 100
const INSERT_TIMEOUT = 60 * 1000

@Injectable()
export class IndexService implements OnModuleDestroy {
private messagesQueue: MessageIndex[]
private queueTimer: any

public constructor(
@Inject(CACHE_MANAGER) private cache: Cache,
private search: MeiliSearchService,
) {
this.messagesQueue = []
}

public async recoverFromCache() {
const queue = await this.cache.get<MessageIndex[]>(MESSAGES_QUEUE_KEY)
if (queue && Array.isArray(queue)) {
this.messagesQueue = queue.concat(this.messagesQueue)
}
if (this.messagesQueue.length > 0) {
debug(`${this.messagesQueue.length} items recovered from cache`)
await this.importAllQueued()
}
}

public async writeToCache() {
debug(`writing cache (${this.messagesQueue.length} items)`)
await this.cache.set(MESSAGES_QUEUE_KEY, this.messagesQueue, { ttl: 0 })
}

public async importAllQueued() {
if (this.messagesQueue.length < 1) {
return
}
debug('importing all queued message')
const queue = this.messagesQueue
this.messagesQueue = []
try {
await this.search.importMessages(queue)
} catch (e) {
this.messagesQueue = queue.concat(this.messagesQueue)
throw e
}
await this.writeToCache()
}

public queueMessage(message: MessageIndex) {
debug('adding message to queue')
this.messagesQueue.push(message)

this.writeToCache().catch(console.error)

this.queueTimer && clearTimeout(this.queueTimer)

if (this.messagesQueue.length >= INSERT_BATCH) {
debug('message batch reached')
this.importAllQueued().catch(console.error)
} else {
this.queueTimer = setTimeout(() => {
debug('insert timeout reached')
this.importAllQueued().catch(console.error)
}, INSERT_TIMEOUT)
}
}

public async onModuleDestroy() {
debug('app exiting, writing queue to cache')
// await this.writeToCache()
await this.importAllQueued()
}
}
76 changes: 7 additions & 69 deletions src/search/meili-search.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
import { ConfigType } from '@nestjs/config'
import meilisearchConfig from '../config/meilisearch.config'
import { Index, MeiliSearch, Settings } from 'meilisearch'
import { Cache } from 'cache-manager'
import Debug from 'debug'
import deepEqual = require('deep-equal')

Expand All @@ -26,70 +25,28 @@ export type MessageIndex = {
timestamp: number
}

const MESSAGES_QUEUE_KEY = 'messages'
const INSERT_BATCH = 100
const INSERT_TIMEOUT = 60 * 1000

@Injectable()
export class MeiliSearchService implements OnModuleDestroy {
private client: MeiliSearch
private indexPrefix: string
private messagesIndex: Index<MessageIndex>
private messagesQueue: MessageIndex[]
private queueTimer: any

constructor(
public constructor(
@Inject(meilisearchConfig.KEY)
msConfig: ConfigType<typeof meilisearchConfig>,
@Inject(CACHE_MANAGER) private cache: Cache,
) {
this.client = new MeiliSearch(msConfig)
this.indexPrefix = msConfig.indexPrefix
this.messagesIndex = this.client.index<MessageIndex>(
`${this.indexPrefix}messages`,
)
this.messagesQueue = []
}

async onModuleDestroy() {
debug('app exiting, writing queue to cache')
// await this.writeToCache()
await this.importAllQueued()
}

async recoverFromCache() {
const queue = await this.cache.get<MessageIndex[]>(MESSAGES_QUEUE_KEY)
if (queue && Array.isArray(queue)) {
this.messagesQueue = queue.concat(this.messagesQueue)
}
if (this.messagesQueue.length > 0) {
debug(`${this.messagesQueue.length} items recovered from cache`)
await this.importAllQueued()
}
public onModuleDestroy() {
throw new Error('Method not implemented.')
}

async writeToCache() {
debug(`writing cache (${this.messagesQueue.length} items)`)
await this.cache.set(MESSAGES_QUEUE_KEY, this.messagesQueue, { ttl: 0 })
}

async importAllQueued() {
if (this.messagesQueue.length < 1) {
return
}
debug('importing all queued message')
const queue = this.messagesQueue
this.messagesQueue = []
try {
await this.importMessages(queue)
} catch (e) {
this.messagesQueue = queue.concat(this.messagesQueue)
throw e
}
await this.writeToCache()
}

async migrate(): Promise<void> {
public async migrate(): Promise<void> {
const settings: Settings = {
searchableAttributes: ['text'],
filterableAttributes: ['chatId', 'fromId', 'timestamp'],
Expand Down Expand Up @@ -127,30 +84,11 @@ export class MeiliSearchService implements OnModuleDestroy {
}
}

queueMessage(message: MessageIndex) {
debug('adding message to queue')
this.messagesQueue.push(message)

this.writeToCache().catch(console.error)

this.queueTimer && clearTimeout(this.queueTimer)

if (this.messagesQueue.length >= INSERT_BATCH) {
debug('message batch reached')
this.importAllQueued().catch(console.error)
} else {
this.queueTimer = setTimeout(() => {
debug('insert timeout reached')
this.importAllQueued().catch(console.error)
}, INSERT_TIMEOUT)
}
}

async importMessages(messages: MessageIndex[]): Promise<void> {
public async importMessages(messages: MessageIndex[]): Promise<void> {
await this.messagesIndex.addDocuments(messages)
}

async search(query: string, chatId: string, fromId?: number) {
public async search(query: string, chatId: string, fromId?: number) {
const result = await this.messagesIndex.search<MessageIndex>(query, {
filter: [
`chatId = ${chatId}`,
Expand All @@ -160,7 +98,7 @@ export class MeiliSearchService implements OnModuleDestroy {
return result
}

getMessagesIndex() {
public getMessagesIndex() {
return this.messagesIndex
}
}
5 changes: 3 additions & 2 deletions src/search/search.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { Module } from '@nestjs/common'
import { TokenModule } from 'src/token/token.module'
import { MeiliSearchService } from './meili-search.service'
import { SearchController } from './search.controller'
import { IndexService } from './index.service'

@Module({
imports: [TokenModule],
providers: [MeiliSearchService],
exports: [MeiliSearchService],
providers: [MeiliSearchService, IndexService],
exports: [MeiliSearchService, IndexService],
controllers: [SearchController],
})
export class SearchModule {}

0 comments on commit feecd7d

Please sign in to comment.