Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Separate resolvers and repositories #6

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/api/APIServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { buildSchema } from 'type-graphql'
import { Container, Inject, Service } from 'typedi'
import { Config, CONFIG_TOKEN } from '../Config'
import { StreamResolver } from './StreamResolver'
import { SummaryResolver } from './SummaryResolver'

const logger = new Logger(module)

Expand All @@ -28,7 +29,7 @@ export class APIServer {

async start(): Promise<void> {
const schema = await buildSchema({
resolvers: [StreamResolver],
resolvers: [StreamResolver, SummaryResolver],
container: Container,
validate: false
})
Expand Down
12 changes: 4 additions & 8 deletions src/api/StreamResolver.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Arg, Int, Query, Resolver } from 'type-graphql'
import { Inject, Service } from 'typedi'
import { OrderBy, OrderDirection, Streams, Summary } from '../entities'
import { StreamRepository } from '../StreamRepository'
import { OrderDirection } from '../entities/OrderDirection'
import { StreamOrderBy, Streams } from '../entities/Stream'
import { StreamRepository } from '../repository/StreamRepository'

@Resolver()
@Service()
Expand All @@ -20,16 +21,11 @@ export class StreamResolver {
@Arg("ids", () => [String], { nullable: true }) ids?: string[],
@Arg("searchTerm", { nullable: true }) searchTerm?: string,
@Arg("owner", { nullable: true }) owner?: string,
@Arg("orderBy", () => OrderBy, { nullable: true }) orderBy?: OrderBy,
@Arg("orderBy", () => StreamOrderBy, { nullable: true }) orderBy?: StreamOrderBy,
@Arg("orderDirection", () => OrderDirection, { nullable: true }) orderDirection?: OrderDirection,
@Arg("pageSize", () => Int, { nullable: true }) pageSize?: number,
@Arg("cursor", { nullable: true }) cursor?: string,
): Promise<Streams> {
return this.repository.getStreams(ids, searchTerm, owner, orderBy, orderDirection, pageSize, cursor)
}

@Query(() => Summary)
async summary(): Promise<Summary> {
return this.repository.getSummary()
}
}
22 changes: 22 additions & 0 deletions src/api/SummaryResolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Query, Resolver } from 'type-graphql'
import { Inject, Service } from 'typedi'
import { Summary } from '../entities/Summary'
import { SummaryRepository } from '../repository/SummaryRepository'

@Resolver()
@Service()
export class SummaryResolver {

private repository: SummaryRepository

constructor(
@Inject() repository: SummaryRepository
) {
this.repository = repository
}

@Query(() => Summary)
async summary(): Promise<Summary> {
return this.repository.getSummary()
}
}
14 changes: 7 additions & 7 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pLimit from 'p-limit'
import { DhtAddress, Stream, StreamCreationEvent, StreamMetadata, StreamPermission } from 'streamr-client'
import { Inject, Service } from 'typedi'
import { CONFIG_TOKEN, Config } from '../Config'
import { StreamRepository } from '../StreamRepository'
import { StreamRepository } from '../repository/StreamRepository'
import { StreamrClientFacade } from '../StreamrClientFacade'
import { collect, retry } from '../utils'
import { NetworkNodeFacade } from './NetworkNodeFacade'
Expand Down Expand Up @@ -99,7 +99,7 @@ export const crawlTopology = async (
@Service()
export class Crawler {

private readonly database: StreamRepository
private readonly streamRepository: StreamRepository
private readonly client: StreamrClientFacade
private readonly config: Config
private subscribeGate?: SubscribeGate
Expand All @@ -110,7 +110,7 @@ export class Crawler {
@Inject() client: StreamrClientFacade,
@Inject(CONFIG_TOKEN) config: Config
) {
this.database = database
this.streamRepository = database
teogeb marked this conversation as resolved.
Show resolved Hide resolved
this.client = client
this.config = config
}
Expand Down Expand Up @@ -156,7 +156,7 @@ export class Crawler {
// the graph-node dependency may not be available immediately after the service has
// been started
const contractStreams = await retry(() => collect(this.client.getAllStreams()), 'Query streams')
const databaseStreams = await this.database.getAllStreams()
const databaseStreams = await this.streamRepository.getAllStreams()
logger.info(`Streams: contract=${contractStreams.length}, database=${databaseStreams.length}`)
const sortedContractStreams = sortBy(contractStreams, getCrawlOrderComparator(databaseStreams))

Expand Down Expand Up @@ -198,7 +198,7 @@ export class Crawler {
const publisherCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.PUBLISH)
const subscriberCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.SUBSCRIBE)
logger.info(`Replace ${id}`)
await this.database.replaceStream({
await this.streamRepository.replaceStream({
id,
description: metadata.description ?? null,
peerCount: peerIds.size,
Expand All @@ -220,7 +220,7 @@ export class Crawler {
const removedStreamsIds = difference(databaseStreamIds, contractStreamIds)
for (const streamId of removedStreamsIds) {
logger.info(`Delete ${streamId}`)
await this.database.deleteStream(streamId)
await this.streamRepository.deleteStream(streamId)
}
}

Expand All @@ -234,7 +234,7 @@ export class Crawler {
// - assume no peers and no traffic
// - assume that no explicit permissions have been granted yet (the creator
// is the only publisher and subscriber
await this.database.replaceStream({
await this.streamRepository.replaceStream({
id: payload.streamId,
description: payload.metadata.description ?? null,
peerCount: 0,
Expand Down
10 changes: 10 additions & 0 deletions src/entities/OrderDirection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { registerEnumType } from 'type-graphql'

export enum OrderDirection {
ASC = 'ASC',
DESC = 'DESC'
}

registerEnumType(OrderDirection, {
name: 'OrderDirection'
})
24 changes: 4 additions & 20 deletions src/entities.ts → src/entities/Stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable indent */
import { Field, Float, Int, ObjectType, registerEnumType } from 'type-graphql'

/* eslint-disable indent */
@ObjectType()
export class Stream {
@Field()
Expand All @@ -17,7 +17,7 @@ export class Stream {
subscriberCount!: number | null
}

export enum OrderBy {
export enum StreamOrderBy {
ID = 'ID',
DESCRIPTION = 'DESCRIPTION',
PEER_COUNT = 'PEER_COUNT',
Expand All @@ -26,31 +26,15 @@ export enum OrderBy {
PUBLISHER_COUNT = 'PUBLISHER_COUNT'
}

registerEnumType(OrderBy, {
registerEnumType(StreamOrderBy, {
name: 'OrderBy'
teogeb marked this conversation as resolved.
Show resolved Hide resolved
})

export enum OrderDirection {
ASC = 'ASC',
DESC = 'DESC'
}

registerEnumType(OrderDirection, {
name: 'OrderDirection'
})

/* eslint-disable indent */
@ObjectType()
export class Streams {
@Field(() => [Stream])
items!: Stream[]
@Field(() => String, { nullable: true })
cursor!: string | null
}

@ObjectType()
export class Summary {
@Field(() => Int)
streamCount!: number
@Field(() => Float)
messagesPerSecond!: number
}
10 changes: 10 additions & 0 deletions src/entities/Summary.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Field, Float, Int, ObjectType } from 'type-graphql'

/* eslint-disable indent */
@ObjectType()
export class Summary {
@Field(() => Int)
streamCount!: number
@Field(() => Float)
messagesPerSecond!: number
}
38 changes: 38 additions & 0 deletions src/repository/ConnectionPool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Pool, RowDataPacket, createPool } from 'mysql2/promise'
import { Inject, Service } from 'typedi'
import { CONFIG_TOKEN, Config } from '../Config'

@Service()
export class ConnectionPool {

private readonly delegatee: Pool

constructor(
@Inject(CONFIG_TOKEN) config: Config
) {
this.delegatee = createPool({
host: config.database.host,
database: config.database.name,
user: config.database.user,
password: config.database.password
})
}

// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async queryOrExecute<T extends RowDataPacket[]>(sql: string, params?: any): Promise<T> {
const connection = await this.delegatee.getConnection()
try {
const [ rows ] = await connection.query<T>(
sql,
params
)
return rows
} finally {
connection.release()
}
}

async destroy(): Promise<void> {
this.delegatee.end()
}
}
76 changes: 22 additions & 54 deletions src/StreamRepository.ts → src/repository/StreamRepository.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Logger } from '@streamr/utils'
import { Pool, RowDataPacket, createPool } from 'mysql2/promise'
import { RowDataPacket } from 'mysql2/promise'
import { Inject, Service } from 'typedi'
import { CONFIG_TOKEN, Config } from './Config'
import { StreamrClientFacade } from './StreamrClientFacade'
import { OrderBy, OrderDirection, Stream, Streams, Summary } from './entities'
import { collect } from './utils'
import { StreamrClientFacade } from '../StreamrClientFacade'
import { OrderDirection } from '../entities/OrderDirection'
import { StreamOrderBy, Stream, Streams } from '../entities/Stream'
import { collect } from '../utils'
import { ConnectionPool } from './ConnectionPool'

interface StreamRow extends RowDataPacket {
id: string
Expand All @@ -27,27 +28,22 @@ const logger = new Logger(module)
@Service()
export class StreamRepository {

private readonly connectionPool: Pool
private readonly connectionPool: ConnectionPool
private readonly client: StreamrClientFacade

constructor(
@Inject() client: StreamrClientFacade,
@Inject(CONFIG_TOKEN) config: Config
@Inject() connectionPool: ConnectionPool
) {
this.client = client
this.connectionPool = createPool({
host: config.database.host,
database: config.database.name,
user: config.database.user,
password: config.database.password
})
this.connectionPool = connectionPool
}

async getStreams(
ids?: string[],
searchTerm?: string,
owner?: string,
orderBy?: OrderBy,
orderBy?: StreamOrderBy,
orderDirection?: OrderDirection,
pageSize?: number,
cursor?: string
Expand All @@ -72,7 +68,7 @@ export class StreamRepository {
const streamIds = streams.map((s) => s.id)
params.push(streamIds)
}
const orderByExpression = StreamRepository.formOrderByExpression(orderBy ?? OrderBy.ID, orderDirection ?? OrderDirection.ASC)
const orderByExpression = StreamRepository.formOrderByExpression(orderBy ?? StreamOrderBy.ID, orderDirection ?? OrderDirection.ASC)
const sql = `
SELECT id, description, peerCount, messagesPerSecond, publisherCount, subscriberCount
FROM streams
Expand All @@ -85,27 +81,27 @@ export class StreamRepository {
// the result set or a token which references to a stateful cache).
const offset = (cursor !== undefined) ? parseInt(cursor, 10) : 0
params.push(limit, offset)
const rows = await this.queryOrExecute<StreamRow[]>(sql, params)
const rows = await this.connectionPool.queryOrExecute<StreamRow[]>(sql, params)
return {
items: rows,
cursor: (rows.length === pageSize) ? String(offset + rows.length) : null
}
}

private static formOrderByExpression(orderBy: OrderBy, orderDirection: OrderDirection) {
private static formOrderByExpression(orderBy: StreamOrderBy, orderDirection: OrderDirection) {
const getFieldName = () => {
switch (orderBy) {
case OrderBy.ID:
case StreamOrderBy.ID:
return 'id'
case OrderBy.DESCRIPTION:
case StreamOrderBy.DESCRIPTION:
return 'description'
case OrderBy.PEER_COUNT:
case StreamOrderBy.PEER_COUNT:
return 'peerCount'
case OrderBy.MESSAGES_PER_SECOND:
case StreamOrderBy.MESSAGES_PER_SECOND:
return 'messagesPerSecond'
case OrderBy.PUBLISHER_COUNT:
case StreamOrderBy.PUBLISHER_COUNT:
return 'publisherCount'
case OrderBy.SUBSCRIBER_COUNT:
case StreamOrderBy.SUBSCRIBER_COUNT:
return 'subscriberCount'
default:
throw new Error('assertion failed')
Expand All @@ -127,19 +123,8 @@ export class StreamRepository {
return `${fieldName} IS NULL ${directionSql}, ${fieldName} ${directionSql} ${stableSortSuffix}`
}

async getSummary(): Promise<Summary> {
interface SummaryRow extends RowDataPacket {
streamCount: number
messagesPerSecond: number
}
const rows = await this.queryOrExecute<SummaryRow[]>(
'SELECT count(*) as streamCount, sum(messagesPerSecond) as messagesPerSecond FROM streams'
)
return rows[0]
}

async getAllStreams(): Promise<{ id: string, crawlTimestamp: number }[]> {
const rows = await this.queryOrExecute<StreamRow[]>(
const rows = await this.connectionPool.queryOrExecute<StreamRow[]>(
'SELECT id, crawlTimestamp FROM streams'
)
return rows.map((r: StreamRow) => {
Expand All @@ -151,14 +136,14 @@ export class StreamRepository {
}

async deleteStream(id: string): Promise<void> {
await this.queryOrExecute(
await this.connectionPool.queryOrExecute(
'DELETE FROM streams WHERE id = ?',
[id]
)
}

async replaceStream(stream: Stream): Promise<void> {
await this.queryOrExecute(
await this.connectionPool.queryOrExecute(
`REPLACE INTO streams (
id, description, peerCount, messagesPerSecond, publisherCount, subscriberCount, crawlTimestamp
) VALUES (
Expand All @@ -167,21 +152,4 @@ export class StreamRepository {
[stream.id, stream.description, stream.peerCount, stream.messagesPerSecond, stream.publisherCount, stream.subscriberCount, new Date()]
)
}

private async queryOrExecute<T extends RowDataPacket[]>(sql: string, params?: any): Promise<T> {
const connection = await this.connectionPool.getConnection()
try {
const [ rows ] = await connection.query<T>(
sql,
params
)
return rows
} finally {
connection.release()
}
}

async destroy(): Promise<void> {
this.connectionPool.end()
}
}
Loading
Loading