Skip to content

Commit

Permalink
Introduced ResumableTaskStore and ResumableTaskManager (#749)
Browse files Browse the repository at this point in the history
- Introduced `ResumableTaskStore` interface and its `level`
implementation.
- Introduced `ResumableTaskManager`.
- Refactoring to `RecordsDelete` handler.
  • Loading branch information
thehenrytsai authored Jun 11, 2024
1 parent 40567c9 commit 0311b9e
Show file tree
Hide file tree
Showing 39 changed files with 1,124 additions and 229 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ try.js
# default location for levelDB data storage in a non-browser env
MESSAGESTORE
DATASTORE
RESUMABLE-TASK-STORE
EVENTLOG
RESOLVERCACHE
# location for levelDB data storage for non-browser tests
TEST-DATASTORE
TEST-MESSAGESTORE
TEST-RESUMABLE-TASK-STORE
TEST-EVENTLOG

# default location for index specific levelDB data storage in a non-browser env
Expand Down
195 changes: 89 additions & 106 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@
"@types/sinon": "10.0.11",
"@types/uuid": "^9.0.1",
"@types/varint": "6.0.0",
"@typescript-eslint/eslint-plugin": "^7.8.0",
"@typescript-eslint/parser": "^7.8.0",
"@typescript-eslint/eslint-plugin": "^7.9.0",
"@typescript-eslint/parser": "^7.9.0",
"c8": "^8.0.0",
"chai": "4.3.6",
"chai-as-promised": "7.1.1",
Expand Down
113 changes: 113 additions & 0 deletions src/core/resumable-task-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import type { StorageController } from '../store/storage-controller.js';
import type { ManagedResumableTask, ResumableTaskStore } from '../types/resumable-task-store.js';

export enum ResumableTaskName {
RecordsDelete = 'RecordsDelete',
}

export type ResumableTask = {
name: ResumableTaskName;
data: any;
};


export class ResumableTaskManager {

/**
* The frequency at which the automatic timeout extension is requested for a resumable task.
*/
public static readonly timeoutExtensionFrequencyInSeconds = 30;

private resumableTaskBatchSize = 100;
private resumableTaskHandlers: { [key:string]: (taskData: any) => Promise<void> };

public constructor(private resumableTaskStore: ResumableTaskStore, storageController: StorageController) {
// assign resumable task handlers
this.resumableTaskHandlers = {
// NOTE: The arrow function is IMPORTANT here, else the `this` context will be lost within the invoked method.
// e.g. code within performRecordsDelete() won't know `this` refers to the `storageController` instance.
[ResumableTaskName.RecordsDelete]: async (task): Promise<void> => await storageController.performRecordsDelete(task),
};
}

/**
* Runs a new resumable task.
*/
public async run(task: ResumableTask): Promise<void> {
const timeoutInSeconds = ResumableTaskManager.timeoutExtensionFrequencyInSeconds * 2; // give ample time for extension to take place

// register the new resumable task before running it so that it can be resumed if it times out for any reason
const managedResumableTask = await this.resumableTaskStore.register(task, timeoutInSeconds);
await this.runWithAutomaticTimeoutExtension(managedResumableTask);
}

/**
* Runs a resumable task with automatic timeout extension.
* Deletes the task from the resumable task store once it is completed.
*/
private async runWithAutomaticTimeoutExtension(managedTask: ManagedResumableTask): Promise<void> {
const timeoutInSeconds = ResumableTaskManager.timeoutExtensionFrequencyInSeconds * 2; // give ample time for extension to take place

let timer!: NodeJS.Timer;
try {
// start a timer loop to keep extending the timeout of the task until it is completed
timer = setInterval(() => {
this.resumableTaskStore.extend(managedTask.id, timeoutInSeconds);
}, ResumableTaskManager.timeoutExtensionFrequencyInSeconds * 1000);

const handler = this.resumableTaskHandlers[managedTask.task.name];
await handler(managedTask.task.data);
await this.resumableTaskStore.delete(managedTask.id);
} finally {
ResumableTaskManager.clearTimeoutExtensionTimer(timer);
}
}

/**
* Removes the specified timeout extension loop timer.
* NOTE: created mainly for testing purposes so we can spy on this specific method without needing to filter out other `clearInterval` calls.
*/
public static clearTimeoutExtensionTimer(timer: NodeJS.Timer): void {
clearInterval(timer);

Check failure on line 71 in src/core/resumable-task-manager.ts

View workflow job for this annotation

GitHub Actions / build

No overload matches this call.
}

/**
* Resumes the execution of resumable tasks until all are completed successfully.
*/
public async resumeTasksAndWaitForCompletion(): Promise<void> {
while (true) {
const resumableTasks = await this.resumableTaskStore.grab(this.resumableTaskBatchSize);

if (resumableTasks === undefined || resumableTasks.length === 0) {
break;
}

// Handle this batch of tasks before grabbing the next batch.
await this.retryTasksUntilCompletion(resumableTasks);
}
}

/**
* Repeatedly retry the given tasks until all are completed successfully.
*/
private async retryTasksUntilCompletion(resumableTasks: ManagedResumableTask[]): Promise<void> {

let managedTasks = resumableTasks;
while (managedTasks.length > 0) {
const managedTasksCopy = managedTasks;
managedTasks = [];

const allTaskPromises = managedTasksCopy.map(async (managedTask) => {
try {
await this.runWithAutomaticTimeoutExtension(managedTask);
} catch (error) {
console.error('Error while running resumable task:', error);
console.error('Resumable task:', resumableTasks);
managedTasks.push(managedTask);
}
});

await Promise.all(allTaskPromises);
}
}
}
32 changes: 27 additions & 5 deletions src/dwn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { EventStream } from './types/subscriptions.js';
import type { MessageStore } from './types/message-store.js';
import type { MethodHandler } from './types/method-handler.js';
import type { Readable } from 'readable-stream';
import type { ResumableTaskStore } from './types/resumable-task-store.js';
import type { TenantGate } from './core/tenant-gate.js';
import type { UnionMessageReply } from './core/message-reply.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeMessageOptions, EventsSubscribeReply, MessageSubscriptionHandler } from './types/events-types.js';
Expand All @@ -28,6 +29,8 @@ import { RecordsQueryHandler } from './handlers/records-query.js';
import { RecordsReadHandler } from './handlers/records-read.js';
import { RecordsSubscribeHandler } from './handlers/records-subscribe.js';
import { RecordsWriteHandler } from './handlers/records-write.js';
import { ResumableTaskManager } from './core/resumable-task-manager.js';
import { StorageController } from './store/storage-controller.js';
import { DidDht, DidIon, DidKey, DidResolverCacheLevel, UniversalResolver } from '@web5/dids';
import { DwnInterfaceName, DwnMethodName } from './enums/dwn-interface-method.js';

Expand All @@ -36,18 +39,32 @@ export class Dwn {
private didResolver: DidResolver;
private messageStore: MessageStore;
private dataStore: DataStore;
private resumableTaskStore: ResumableTaskStore;
private eventLog: EventLog;
private tenantGate: TenantGate;
private eventStream?: EventStream;
private storageController: StorageController;
private resumableTaskManager: ResumableTaskManager;

private constructor(config: DwnConfig) {
this.didResolver = config.didResolver!;
this.tenantGate = config.tenantGate!;
this.eventStream = config.eventStream!;
this.messageStore = config.messageStore;
this.dataStore = config.dataStore;
this.resumableTaskStore = config.resumableTaskStore;
this.eventLog = config.eventLog;
this.eventStream = config.eventStream;
this.storageController = new StorageController({
messageStore : this.messageStore,
dataStore : this.dataStore,
eventLog : this.eventLog,
eventStream : this.eventStream
});
this.resumableTaskManager = new ResumableTaskManager(
config.resumableTaskStore,
this.storageController
);

this.methodHandlers = {
[DwnInterfaceName.Events + DwnMethodName.Get]: new EventsGetHandler(
Expand Down Expand Up @@ -81,9 +98,7 @@ export class Dwn {
[DwnInterfaceName.Records + DwnMethodName.Delete]: new RecordsDeleteHandler(
this.didResolver,
this.messageStore,
this.dataStore,
this.eventLog,
this.eventStream
this.resumableTaskManager
),
[DwnInterfaceName.Records + DwnMethodName.Query]: new RecordsQueryHandler(
this.didResolver,
Expand Down Expand Up @@ -122,21 +137,27 @@ export class Dwn {

const dwn = new Dwn(config);
await dwn.open();

return dwn;
}

private async open(): Promise<void> {
/**
* Initializes the DWN instance and opens the connection to it.
*/
public async open(): Promise<void> {
await this.messageStore.open();
await this.dataStore.open();
await this.resumableTaskStore.open();
await this.eventLog.open();
await this.eventStream?.open();

await this.resumableTaskManager.resumeTasksAndWaitForCompletion();
}

public async close(): Promise<void> {
await this.eventStream?.close();
await this.messageStore.close();
await this.dataStore.close();
await this.resumableTaskStore.close();
await this.eventLog.close();
}

Expand Down Expand Up @@ -244,4 +265,5 @@ export type DwnConfig = {
messageStore: MessageStore;
dataStore: DataStore;
eventLog: EventLog;
resumableTaskStore: ResumableTaskStore;
};
55 changes: 14 additions & 41 deletions src/handlers/records-delete.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import type { DataStore } from '../types/data-store.js';
import type { DidResolver } from '@web5/dids';
import type { EventLog } from '../types/event-log.js';
import type { EventStream } from '../types/subscriptions.js';
import type { GenericMessageReply } from '../types/message-types.js';
import type { MessageStore } from '../types//message-store.js';
import type { MethodHandler } from '../types/method-handler.js';
import type { ResumableTaskManager } from '../core/resumable-task-manager.js';
import type { RecordsDeleteMessage, RecordsWriteMessage } from '../types/records-types.js';

import { authenticate } from '../core/auth.js';
Expand All @@ -13,7 +11,7 @@ import { messageReplyFromError } from '../core/message-reply.js';
import { ProtocolAuthorization } from '../core/protocol-authorization.js';
import { RecordsDelete } from '../interfaces/records-delete.js';
import { RecordsWrite } from '../interfaces/records-write.js';
import { StorageController } from '../store/storage-controller.js';
import { ResumableTaskName } from '../core/resumable-task-manager.js';
import { DwnError, DwnErrorCode } from '../core/dwn-error.js';
import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';

Expand All @@ -22,9 +20,7 @@ export class RecordsDeleteHandler implements MethodHandler {
constructor(
private didResolver: DidResolver,
private messageStore: MessageStore,
private dataStore: DataStore,
private eventLog: EventLog,
private eventStream?: EventStream
private resumableTaskManager: ResumableTaskManager,
) { }

public async handle({
Expand Down Expand Up @@ -54,26 +50,19 @@ export class RecordsDeleteHandler implements MethodHandler {

// find which message is the newest, and if the incoming message is the newest
const newestExistingMessage = await Message.getNewestMessage(existingMessages);
let incomingMessageIsNewest = false;
let newestMessage;
// if incoming message is newest
if (newestExistingMessage === undefined || await Message.isNewer(message, newestExistingMessage)) {
incomingMessageIsNewest = true;
newestMessage = message;
} else { // existing message is the same age or newer than the incoming message
newestMessage = newestExistingMessage;
}

if (!incomingMessageIsNewest) {
// return Not Found if record does not exist or is already deleted
if (newestExistingMessage === undefined || newestExistingMessage.descriptor.method === DwnMethodName.Delete) {
return {
status: { code: 409, detail: 'Conflict' }
status: { code: 404, detail: 'Not Found' }
};
}

// return Not Found if record does not exist or is already deleted
if (newestExistingMessage === undefined || newestExistingMessage.descriptor.method === DwnMethodName.Delete) {
// if the incoming message is not the newest, return Conflict
const incomingDeleteIsNewest = await Message.isNewer(message, newestExistingMessage);
if (!incomingDeleteIsNewest) {
return {
status: { code: 404, detail: 'Not Found' }
status: { code: 409, detail: 'Conflict' }
};
}

Expand All @@ -89,26 +78,10 @@ export class RecordsDeleteHandler implements MethodHandler {
return messageReplyFromError(e, 401);
}

const initialWrite = await RecordsWrite.getInitialWrite(existingMessages);
const indexes = recordsDelete.constructIndexes(initialWrite);
const messageCid = await Message.getCid(message);
await this.messageStore.put(tenant, message, indexes);
await this.eventLog.append(tenant, messageCid, indexes);

// only emit if the event stream is set
if (this.eventStream !== undefined) {
this.eventStream.emit(tenant, { message, initialWrite }, indexes);
}

if (message.descriptor.prune) {
// purge/hard-delete all descendent records
await StorageController.purgeRecordDescendants(tenant, message.descriptor.recordId, this.messageStore, this.dataStore, this.eventLog);
}

// delete all existing messages that are not newest, except for the initial write
await StorageController.deleteAllOlderMessagesButKeepInitialWrite(
tenant, existingMessages, newestMessage, this.messageStore, this.dataStore, this.eventLog
);
await this.resumableTaskManager.run({
name : ResumableTaskName.RecordsDelete,
data : { tenant, message }
});

const messageReply = {
status: { code: 202, detail: 'Accepted' }
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export { ActiveTenantCheckResult, AllowAllTenantGate, TenantGate } from './core/
export { Cid } from './utils/cid.js';
export { RecordsQuery, RecordsQueryOptions } from './interfaces/records-query.js';
export { DataStore, DataStorePutResult, DataStoreGetResult } from './types/data-store.js';
export { ResumableTaskStore, ManagedResumableTask } from './types/resumable-task-store.js';
export { DataStream } from './utils/data-stream.js';
export { DateSort } from './types/records-types.js';
export { DerivedPrivateJwk, HdKey, KeyDerivationScheme } from './utils/hd-key.js';
Expand Down Expand Up @@ -53,6 +54,7 @@ export { Time } from './utils/time.js';
export { DataStoreLevel } from './store/data-store-level.js';
export { EventLogLevel } from './event-log/event-log-level.js';
export { MessageStoreLevel } from './store/message-store-level.js';
export { ResumableTaskStoreLevel } from './store/resumable-task-store-level.js';
export { EventEmitterStream } from './event-log/event-emitter-stream.js';

// test library exports
Expand Down
4 changes: 2 additions & 2 deletions src/store/index-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { DwnError, DwnErrorCode } from '../core/dwn-error.js';
import { FilterSelector, FilterUtility } from '../utils/filter.js';

type IndexLevelConfig = {
location?: string,
location: string,
createLevelDatabase?: typeof createLevelDatabase
};

Expand All @@ -35,7 +35,7 @@ export class IndexLevel {
};

this.db = new LevelWrapper<string>({
location : this.config.location!,
location : this.config.location,
createLevelDatabase : this.config.createLevelDatabase,
keyEncoding : 'utf8'
});
Expand Down
Loading

0 comments on commit 0311b9e

Please sign in to comment.