Skip to content

Commit

Permalink
feat(query-orchestrator): Use real queueId in events for processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Oct 28, 2023
1 parent 7b1a0c0 commit f11c755
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 25 deletions.
7 changes: 4 additions & 3 deletions packages/cubejs-base-driver/src/queue-driver.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ export interface QueryKeyHash extends String {
__type: 'QueryKeyHash'
}

export type GetActiveAndToProcessResponse = [active: string[], toProcess: string[]];
export type QueryKeysTuple = [keyHash: QueryKeyHash, queueId: QueueId | null /** Cube Store supports real QueueId */];
export type GetActiveAndToProcessResponse = [active: QueryKeysTuple[], toProcess: QueryKeysTuple[]];
export type AddToQueueResponse = [added: number, queueId: QueueId | null, queueSize: number, addedToQueueTime: number];
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
export type RetrieveForProcessingSuccess = [
Expand Down Expand Up @@ -73,8 +74,8 @@ export interface QueueDriverConnectionInterface {
*/
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
// Return query keys which was sorted by priority and time
getToProcessQueries(): Promise<string[]>;
getActiveQueries(): Promise<string[]>;
getToProcessQueries(): Promise<QueryKeyHash[]>;
getActiveQueries(): Promise<QueryKeyHash[]>;
getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise<QueryDef | null>;
// Queries which was added to queue, but was not processed and not needed
getOrphanedQueries(): Promise<string[]>;
Expand Down
22 changes: 14 additions & 8 deletions packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
QueryKey,
QueryKeyHash,
ProcessingId,
QueueId,
QueueId, GetActiveAndToProcessResponse, QueryKeysTuple,
} from '@cubejs-backend/base-driver';
import { getProcessUid } from '@cubejs-backend/shared';

Expand Down Expand Up @@ -106,33 +106,39 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
// nothing to do
}

public async getActiveQueries(): Promise<string[]> {
public async getActiveQueries(): Promise<QueryKeyHash[]> {
const rows = await this.driver.query('QUEUE ACTIVE ?', [
this.options.redisQueuePrefix
]);
return rows.map((row) => row.id);
}

public async getToProcessQueries(): Promise<string[]> {
public async getToProcessQueries(): Promise<QueryKeyHash[]> {
const rows = await this.driver.query('QUEUE PENDING ?', [
this.options.redisQueuePrefix
]);
return rows.map((row) => row.id);
}

public async getActiveAndToProcess(): Promise<[active: string[], toProcess: string[]]> {
public async getActiveAndToProcess(): Promise<GetActiveAndToProcessResponse> {
const rows = await this.driver.query('QUEUE LIST ?', [
this.options.redisQueuePrefix
]);
if (rows.length) {
const active: string[] = [];
const toProcess: string[] = [];
const active: QueryKeysTuple[] = [];
const toProcess: QueryKeysTuple[] = [];

for (const row of rows) {
if (row.status === 'active') {
active.push(row.id);
active.push([
row.id,
row.queue_id ? parseInt(row.queue_id, 10) : null,
]);
} else {
toProcess.push(row.id);
toProcess.push([
row.id,
row.queue_id ? parseInt(row.queue_id, 10) : null,
]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,17 @@ export class LocalQueueDriverConnection {
return stalled.concat(orphaned);
}

/**
* @returns {Promise<GetActiveAndToProcessResponse>}
*/
async getActiveAndToProcess() {
return [this.queueArray(this.active), this.queueArray(this.toProcess)];
const active = this.queueArray(this.active);
const toProcess = this.queueArray(this.toProcess);

return [
active.map((queryKeyHash) => [queryKeyHash, null]),
toProcess.map((queryKeyHash) => [queryKeyHash, null])
];
}

getResultPromise(resultListKey) {
Expand Down Expand Up @@ -247,12 +256,14 @@ export class LocalQueueDriverConnection {
retrieveForProcessing(queryKey, processingId) {
const key = this.redisHash(queryKey);
let lockAcquired = false;

if (!this.processingLocks[key]) {
this.processingLocks[key] = processingId;
lockAcquired = true;
} else {
return null;
}

let added = 0;
if (Object.keys(this.active).length < this.concurrency && !this.active[key]) {
this.active[key] = { key, order: processingId };
Expand Down
26 changes: 17 additions & 9 deletions packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import R from 'ramda';
import { EventEmitter } from 'events';
import { getEnv, getProcessUid } from '@cubejs-backend/shared';
import { QueueDriverInterface, QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver';
import { QueueDriverInterface, QueryKey, QueryKeyHash, QueueId } from '@cubejs-backend/base-driver';
import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';

import { TimeoutError } from './TimeoutError';
Expand Down Expand Up @@ -79,9 +79,9 @@ export class QueryQueue {

/**
* @protected
* @type {function(string): Promise<void>}
* @type {function(QueryKeyHash, QueueId | null): Promise<void>}
*/
this.sendProcessMessageFn = options.sendProcessMessageFn || ((queryKey) => { this.processQuery(queryKey); });
this.sendProcessMessageFn = options.sendProcessMessageFn || ((queryKey, queryId) => { this.processQuery(queryKey, queryId); });

/**
* @protected
Expand Down Expand Up @@ -527,9 +527,9 @@ export class QueryQueue {

await Promise.all(
R.pipe(
R.filter(p => {
if (active.indexOf(p) === -1) {
const subKeys = p.split('@');
R.filter(([queryKey, _queueId]) => {
if (active.indexOf(queryKey) === -1) {
const subKeys = queryKey.split('@');
if (subKeys.length === 1) {
// common queries
return true;
Expand Down Expand Up @@ -635,6 +635,7 @@ export class QueryQueue {
* Execute query without adding it to the queue.
*
* @param {*} query
* @param {QueueId} queueId
* @returns {Promise<{ result: undefined | Object, error: string | undefined }>}
*/
async processQuerySkipQueue(query, queueId) {
Expand Down Expand Up @@ -705,13 +706,13 @@ export class QueryQueue {
* of the logic related with the queues updates, heartbeat, etc.
*
* @param {QueryKeyHash} queryKeyHashed
* @param {QueueId | null} queueId Real queue id, only for Cube Store
* @return {Promise<{ result: undefined | Object, error: string | undefined }>}
*/
async processQuery(queryKeyHashed) {
async processQuery(queryKeyHashed, queueId) {
const queueConnection = await this.queueDriver.createConnection();

let insertedCount;
let queueId;
let activeKeys;
let queueSize;
let query;
Expand All @@ -722,7 +723,14 @@ export class QueryQueue {
const retrieveResult = await queueConnection.retrieveForProcessing(queryKeyHashed, processingId);

if (retrieveResult) {
[insertedCount, queueId, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult;
let retrieveQueueId;

[insertedCount, retrieveQueueId, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult;

// Backward compatibility for old Cube Store
if (retrieveQueueId) {
queueId = retrieveQueueId;
}
}

const activated = activeKeys && activeKeys.indexOf(queryKeyHashed) !== -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ export class RedisQueueDriverConnection implements QueueDriverConnectionInterfac
const active = await this.getActiveQueries();
const toProcess = await this.getToProcessQueries();

return [active, toProcess];
return [
active.map((queryKeyHash) => [queryKeyHash, null]),
toProcess.map((queryKeyHash) => [queryKeyHash, null])
];
}

public async addToQueue(
Expand Down Expand Up @@ -142,11 +145,11 @@ export class RedisQueueDriverConnection implements QueueDriverConnectionInterfac
}

public getToProcessQueries() {
return this.redisClient.zrangeAsync([this.toProcessRedisKey(), 0, -1]);
return this.redisClient.zrangeAsync([this.toProcessRedisKey(), 0, -1]) as Promise<QueryKeyHash[]>;
}

public getActiveQueries() {
return this.redisClient.zrangeAsync([this.activeRedisKey(), 0, -1]);
return this.redisClient.zrangeAsync([this.activeRedisKey(), 0, -1]) as Promise<QueryKeyHash[]>;
}

public async getQueryAndRemove(queryKey: QueryKeyHash): Promise<[QueryDef]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
test('removed before reconciled', async () => {
const query: QueryKey = ['select * from', []];
const key = queue.redisHash(query);
await queue.processQuery(key);
await queue.processQuery(key, null);
const result = await queue.executeInQueue('foo', key, query);
expect(result).toBe('select * from bar');
});
Expand Down

0 comments on commit f11c755

Please sign in to comment.