Skip to content

Commit

Permalink
feat: adding queue
Browse files Browse the repository at this point in the history
  • Loading branch information
oott123 committed Oct 24, 2021
1 parent ee084bd commit 14b1910
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 20 deletions.
17 changes: 4 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,11 @@ curl \

### OCR 识别文字(TBD)

如果启用 OCR 队列,那么你需要部署一个 RabbitMQ 实例,并配置第三方识别服务。识别流程如下:

```mermaid
sequenceDiagram
autonumber
Bot实例->>+OCR实例: 通过 OCR 队列发送图片
OCR实例->>+OCR服务: 识别图片
OCR服务->>-OCR实例: 返回结果
OCR实例->>-Bot实例: 通过入库队列发送识别结果
activate Bot实例
Bot实例->>-MeiliSearch: 入库
```
如果启用 OCR 队列,那么 Redis 是必须的(可以和缓存共用一个实例),并配置第三方识别服务。识别流程如下:

[![](https://mermaid.ink/img/eyJjb2RlIjoic2VxdWVuY2VEaWFncmFtXG4gIGF1dG9udW1iZXJcbiAgQm905a6e5L6LLT4-K09DUuWunuS-izog6YCa6L-HIE9DUiDpmJ_liJflj5HpgIHlm77niYdcbiAgT0NS5a6e5L6LLT4-K09DUuacjeWKoTog6K-G5Yir5Zu-54mHXG4gIE9DUuacjeWKoS0-Pi1PQ1Llrp7kvos6IOi_lOWbnue7k-aenFxuICBPQ1Llrp7kvostPj4tQm905a6e5L6LOiDpgJrov4flhaXlupPpmJ_liJflj5HpgIHor4bliKvnu5PmnpxcbiAgYWN0aXZhdGUgQm905a6e5L6LXG4gIEJvdOWunuS-iy0-Pi1NZWlsaVNlYXJjaDog5YWl5bqTIiwibWVybWFpZCI6eyJ0aGVtZSI6ImRlZmF1bHQifSwidXBkYXRlRWRpdG9yIjp0cnVlLCJhdXRvU3luYyI6dHJ1ZSwidXBkYXRlRGlhZ3JhbSI6dHJ1ZX0)](https://mermaid.live/edit/#eyJjb2RlIjoic2VxdWVuY2VEaWFncmFtXG4gIGF1dG9udW1iZXJcbiAgQm905a6e5L6LLT4-K09DUuWunuS-izog6YCa6L-HIE9DUiDpmJ_liJflj5HpgIHlm77niYdcbiAgT0NS5a6e5L6LLT4-K09DUuacjeWKoTog6K-G5Yir5Zu-54mHXG4gIE9DUuacjeWKoS0-Pi1PQ1Llrp7kvos6IOi_lOWbnue7k-aenFxuICBPQ1Llrp7kvostPj4tQm905a6e5L6LOiDpgJrov4flhaXlupPpmJ_liJflj5HpgIHor4bliKvnu5PmnpxcbiAgYWN0aXZhdGUgQm905a6e5L6LXG4gIEJvdOWunuS-iy0-Pi1NZWlsaVNlYXJjaDog5YWl5bqTIiwibWVybWFpZCI6IntcbiAgXCJ0aGVtZVwiOiBcImRlZmF1bHRcIlxufSIsInVwZGF0ZUVkaXRvciI6dHJ1ZSwiYXV0b1N5bmMiOnRydWUsInVwZGF0ZURpYWdyYW0iOnRydWV9)

识别和入库可以在不同的角色实例上完成:图 0 片下载和文本入库将在 Bot 实例上完成,OCR 实例仅需访问 OCR 服务即可。
识别和入库可以在不同的角色实例上完成:图片下载和文本入库将在 Bot 实例上完成,OCR 实例仅需访问 OCR 服务即可。

这样的设计使得维护者可以设计离线式的集中识别(例如使用抢占式实例运行识别服务,队列清空后关机),降低识别成本。

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@nestjs/platform-express": "^8.0.0",
"@nestjs/platform-fastify": "^8.0.11",
"@nestjs/serve-static": "^2.2.2",
"bull": "^3.29.3",
"cache-manager": "^3.4.4",
"cache-manager-ioredis": "^2.1.0",
"debug": "^4.3.2",
Expand All @@ -47,6 +48,7 @@
"@nestjs/cli": "^8.0.0",
"@nestjs/schematics": "^8.0.0",
"@nestjs/testing": "^8.0.0",
"@types/bull": "^3.15.5",
"@types/cache-manager": "^3.4.2",
"@types/debug": "^4.1.7",
"@types/deep-equal": "^1.0.1",
Expand Down
8 changes: 7 additions & 1 deletion src/config/queue.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,11 @@ import { registerAs } from '@nestjs/config'

export default registerAs('queue', () => ({
enable: process.env.QUEUE_ENABLE === 'true',
amqpUrl: process.env.QUEUE_AMQP_URL || 'amqp://guest@localhost',
redis: {
host: process.env.QUEUE_REDIS_HOST || 'localhost',
port: Number(process.env.QUEUE_REDIS_PORT || 6379),
password: process.env.QUEUE_REDIS_PASSWORD,
db: Number(process.env.QUEUE_REDIS_DB || 0),
keyPrefix: process.env.QUEUE_REDIS_KEY_PREFIX || '',
},
}))
79 changes: 79 additions & 0 deletions src/queue/bull-queue.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common'
import { ConfigType } from '@nestjs/config'
import queueConfig from '../config/queue.config'
import Queue = require('bull')
import Debug from 'debug'
import { QueueMeta, QueueProcessor, QueueTypes } from './meta.types'
import { QueueService } from './queue.service'

const debug = Debug('app:queue:bull')

@Injectable()
export class BullQueueService implements OnModuleDestroy, QueueService {
private readonly queueMap: Map<QueueTypes, Queue.Queue<QueueMeta>>

constructor(
@Inject(queueConfig.KEY) queueCfg: ConfigType<typeof queueConfig>,
) {
this.queueMap = new Map()
for (const key of ['ocr', 'message'] as const) {
this.setQueue(
key,
new Queue(key, {
redis: queueCfg.redis,
prefix: queueCfg.redis.keyPrefix,
}),
)
}
}

private setQueue<T extends QueueTypes>(
key: T,
queue: Queue.Queue<QueueMeta<T>>,
) {
this.queueMap.set(key, queue)
}

private getQueue<T extends QueueTypes>(key: T): Queue.Queue<QueueMeta<T>> {
if (!this.queueMap.has(key)) {
throw new Error(`queue ${key} not found`)
}
return this.queueMap.get(key) as Queue.Queue<QueueMeta<T>>
}

public async onModuleDestroy() {
for (const key of this.queueMap.keys()) {
await this.getQueue(key).close()
}
}

public async process<T extends QueueTypes>(
queue: T,
handler: QueueProcessor<T>,
concurrency = 1,
) {
debug(`setup process handler for queue ${queue}`, handler)
await this.getQueue(queue).process(concurrency, (job, done) => {
const debug = Debug(`app:queue:bull:${queue}:${job.id}`)
debug(`running`, job.data)
handler(job.data).then(
() => {
debug('finished')
done()
},
(err) => {
debug('error', err)
done(err)
},
)
})
}

public async add<T extends QueueTypes>(
queue: T,
data: QueueMeta<T>,
): Promise<void> {
debug(`adding job to queue ${queue}`)
await this.getQueue(queue).add(data)
}
}
34 changes: 34 additions & 0 deletions src/queue/memory-queue.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Injectable } from '@nestjs/common'
import { Job } from 'bull'
import {
OCRMeta,
MessageMeta,
QueueProcessor,
QueueMeta,
QueueTypes,
} from './meta.types'
import { QueueService } from './queue.service'

@Injectable()
export class MemoryQueueService implements QueueService {
private processors = new Map<QueueTypes, QueueProcessor<any>>()

public async process<T extends keyof { ocr: OCRMeta; message: MessageMeta }>(
queue: T,
handler: QueueProcessor<T>,
concurrency: number,
): Promise<void> {
this.processors.set(queue, handler)
}

public async add<T extends keyof { ocr: OCRMeta; message: MessageMeta }>(
queue: T,
data: QueueMeta<T>,
): Promise<void> {
const processor = this.processors.get(queue)
if (!processor) {
throw new Error(`Queue ${queue} processor not found`)
}
await processor(data)
}
}
23 changes: 23 additions & 0 deletions src/queue/meta.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { MessageIndex } from '../search/meili-search.service'

export type OCRMeta = {
image: Buffer
message: Omit<MessageIndex, 'text'>
}

export type MessageMeta = {
message: MessageIndex
}

type QueueMetaMap = {
ocr: OCRMeta
message: MessageMeta
}

export type QueueTypes = keyof QueueMetaMap

export type QueueMeta<T extends QueueTypes = QueueTypes> = QueueMetaMap[T]

export type QueueProcessor<T extends QueueTypes = QueueTypes> = (
meta: QueueMeta<T>,
) => Promise<void>
27 changes: 25 additions & 2 deletions src/queue/queue.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
import { Module } from '@nestjs/common';
import { Module } from '@nestjs/common'
import { ConfigType } from '@nestjs/config'
import { ModuleRef } from '@nestjs/core'
import queueConfig from 'src/config/queue.config'
import { BullQueueService } from './bull-queue.service'
import { MemoryQueueService } from './memory-queue.service'
import { QueueService } from './queue.service'

@Module({})
@Module({
providers: [
{
provide: QueueService,
useFactory: (
moduleRef: ModuleRef,
queueCfg: ConfigType<typeof queueConfig>,
) => {
if (queueCfg.enable) {
return moduleRef.get(BullQueueService)
} else {
return moduleRef.get(MemoryQueueService)
}
},
inject: [ModuleRef, queueConfig.KEY],
},
],
})
export class QueueModule {}
14 changes: 14 additions & 0 deletions src/queue/queue.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { QueueMeta, QueueProcessor, QueueTypes } from './meta.types'

export abstract class QueueService {
public abstract process<T extends QueueTypes>(
queue: T,
handler: QueueProcessor<T>,
concurrency: number,
): Promise<void>

public abstract add<T extends QueueTypes>(
queue: T,
data: QueueMeta<T>,
): Promise<void>
}
Loading

0 comments on commit 14b1910

Please sign in to comment.