Skip to content

Commit

Permalink
Merge pull request #4 from streamr-dev/use-connection-pool
Browse files Browse the repository at this point in the history
Use connection pooling for database connections. This change is needed in the future when database transactions are used. This may also improve query performance.
  • Loading branch information
teogeb authored Feb 19, 2024
2 parents 02d28b3 + 134cdd0 commit 120af25
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions src/StreamRepository.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Logger } from '@streamr/utils'
import { Connection, createConnection, RowDataPacket } from 'mysql2/promise'
import { Pool, RowDataPacket, createPool } from 'mysql2/promise'
import { Inject, Service } from 'typedi'
import { Config, CONFIG_TOKEN } from './Config'
import { OrderBy, OrderDirection, Stream, Streams, Summary } from './entities'
import { CONFIG_TOKEN, Config } from './Config'
import { StreamrClientFacade } from './StreamrClientFacade'
import { OrderBy, OrderDirection, Stream, Streams, Summary } from './entities'
import { collect } from './utils'

interface StreamRow extends RowDataPacket {
Expand All @@ -27,15 +27,15 @@ const logger = new Logger(module)
@Service()
export class StreamRepository {

private readonly connection: Promise<Connection>
private readonly client: StreamrClientFacade
private readonly connectionPool: Pool
private readonly client: StreamrClientFacade

constructor(
@Inject() client: StreamrClientFacade,
@Inject(CONFIG_TOKEN) config: Config
) {
this.client = client
this.connection = createConnection({
this.connectionPool = createPool({
host: config.database.host,
database: config.database.name,
user: config.database.user,
Expand Down Expand Up @@ -85,11 +85,7 @@ export class StreamRepository {
// the result set or a token which references to a stateful cache).
const offset = (cursor !== undefined) ? parseInt(cursor, 10) : 0
params.push(limit, offset)
const connection = await this.connection
const [ rows ] = await connection.query<StreamRow[]>(
sql,
params
)
const rows = await this.queryOrExecute<StreamRow[]>(sql, params)
return {
items: rows,
cursor: (rows.length === pageSize) ? String(offset + rows.length) : null
Expand Down Expand Up @@ -136,19 +132,17 @@ export class StreamRepository {
streamCount: number
messagesPerSecond: number
}
const connection = await this.connection
const [ rows ] = await connection.query<SummaryRow[]>(
const rows = await this.queryOrExecute<SummaryRow[]>(
'SELECT count(*) as streamCount, sum(messagesPerSecond) as messagesPerSecond FROM streams'
)
return rows[0]
}

async getAllStreams(): Promise<{ id: string, crawlTimestamp: number }[]> {
const connection = await this.connection
const [ rows ] = await connection.query<StreamRow[]>(
const rows = await this.queryOrExecute<StreamRow[]>(
'SELECT id, crawlTimestamp FROM streams'
)
return rows.map((r) => {
return rows.map((r: StreamRow) => {
return {
id: r.id,
crawlTimestamp: Date.parse(r.crawlTimestamp)
Expand All @@ -157,16 +151,14 @@ export class StreamRepository {
}

async deleteStream(id: string): Promise<void> {
const connection = await this.connection
await connection.query(
await this.queryOrExecute(
'DELETE FROM streams WHERE id = ?',
[id]
)
}

async replaceStream(stream: Stream): Promise<void> {
const connection = await this.connection
await connection.query(
await this.queryOrExecute(
`REPLACE INTO streams (
id, description, peerCount, messagesPerSecond, publisherCount, subscriberCount, crawlTimestamp
) VALUES (
Expand All @@ -176,7 +168,20 @@ export class StreamRepository {
)
}

private async queryOrExecute<T extends RowDataPacket[]>(sql: string, params?: any): Promise<T> {
const connection = await this.connectionPool.getConnection()
try {
const [ rows ] = await connection.query<T>(
sql,
params
)
return rows
} finally {
connection.release()
}
}

async destroy(): Promise<void> {
(await this.connection).destroy()
this.connectionPool.end()
}
}

0 comments on commit 120af25

Please sign in to comment.