Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve UX of using timeouts #254

Merged
merged 4 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions src/connection/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ export function connectToWeaviateCloud(
grpcHost = `grpc-${url.hostname}`;
}

const { authCredentials: auth, headers, ...rest } = options || {};

return clientMaker({
connectionParams: {
http: {
Expand All @@ -99,8 +101,9 @@ export function connectToWeaviateCloud(
port: 443,
},
},
auth: options?.authCredentials,
headers: addWeaviateEmbeddingServiceHeaders(clusterURL, options),
auth,
headers: addWeaviateEmbeddingServiceHeaders(clusterURL, auth, headers),
...rest,
}).catch((e) => {
throw new WeaviateStartUpError(`Weaviate failed to startup with message: ${e.message}`);
});
Expand All @@ -110,21 +113,22 @@ export function connectToLocal(
clientMaker: (params: ClientParams) => Promise<WeaviateClient>,
options?: ConnectToLocalOptions
): Promise<WeaviateClient> {
const { host, port, grpcPort, authCredentials: auth, ...rest } = options || {};
return clientMaker({
connectionParams: {
http: {
secure: false,
host: options?.host || 'localhost',
port: options?.port || 8080,
host: host || 'localhost',
port: port || 8080,
},
grpc: {
secure: false,
host: options?.host || 'localhost',
port: options?.grpcPort || 50051,
host: host || 'localhost',
port: grpcPort || 50051,
},
},
auth: options?.authCredentials,
headers: options?.headers,
auth,
...rest,
}).catch((e) => {
throw new WeaviateStartUpError(`Weaviate failed to startup with message: ${e.message}`);
});
Expand All @@ -134,37 +138,49 @@ export function connectToCustom(
clientMaker: (params: ClientParams) => Promise<WeaviateClient>,
options?: ConnectToCustomOptions
): Promise<WeaviateClient> {
const {
httpHost,
httpPath,
httpPort,
httpSecure,
grpcHost,
grpcPort,
grpcSecure,
authCredentials: auth,
...rest
} = options || {};
return clientMaker({
connectionParams: {
http: {
secure: options?.httpSecure || false,
host: options?.httpHost || 'localhost',
path: options?.httpPath || '',
port: options?.httpPort || 8080,
secure: httpSecure || false,
host: httpHost || 'localhost',
path: httpPath || '',
port: httpPort || 8080,
},
grpc: {
secure: options?.grpcSecure || false,
host: options?.grpcHost || 'localhost',
port: options?.grpcPort || 50051,
secure: grpcSecure || false,
host: grpcHost || 'localhost',
port: grpcPort || 50051,
},
},
auth: options?.authCredentials,
headers: options?.headers,
proxies: options?.proxies,
auth,
...rest,
}).catch((e) => {
throw new WeaviateStartUpError(`Weaviate failed to startup with message: ${e.message}`);
});
}

function addWeaviateEmbeddingServiceHeaders(clusterURL: string, options?: ConnectToWeaviateCloudOptions) {
const creds = options?.authCredentials;

function addWeaviateEmbeddingServiceHeaders(
clusterURL: string,
creds?: AuthCredentials,
headers?: Record<string, string>
) {
if (!isApiKey(creds)) {
return options?.headers;
return headers;
}

return {
...options?.headers,
...headers,
'X-Weaviate-Api-Key': mapApiKey(creds).apiKey,
'X-Weaviate-Cluster-Url': clusterURL,
};
Expand Down
130 changes: 130 additions & 0 deletions src/connection/unit.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import express from 'express';
import { Server as HttpServer } from 'http';

import { testServer } from '../../test/server.js';
import {
ApiKey,
Expand All @@ -7,6 +10,23 @@ import {
} from './auth.js';
import Connection from './index.js';

import { createServer, Server as GrpcServer } from 'nice-grpc';
import {
HealthCheckRequest,
HealthCheckResponse,
HealthCheckResponse_ServingStatus,
HealthDefinition,
HealthServiceImplementation,
} from '../proto/google/health/v1/health';
import { TenantsGetReply } from '../proto/v1/tenants';
import { WeaviateDefinition, WeaviateServiceImplementation } from '../proto/v1/weaviate';

import { WeaviateRequestTimeoutError } from '../errors.js';
import weaviate, { Collection, WeaviateClient } from '../index';
import { BatchObjectsReply } from '../proto/v1/batch.js';
import { BatchDeleteReply } from '../proto/v1/batch_delete.js';
import { SearchReply } from '../proto/v1/search_get.js';

describe('mock server auth tests', () => {
const server = testServer();
describe('OIDC auth flows', () => {
Expand Down Expand Up @@ -197,3 +217,113 @@ describe('mock server auth tests', () => {
return server.close();
});
});

const COLLECTION_NAME = 'TestCollectionTimeouts';

const makeRestApp = (version: string) => {
const httpApp = express();
httpApp.get(`/v1/schema/${COLLECTION_NAME}`, (req, res) =>
new Promise((r) => setTimeout(r, 2000)).then(() => res.send({ class: COLLECTION_NAME }))
);
httpApp.get('/v1/meta', (req, res) => res.send({ version }));
return httpApp;
};

const makeGrpcApp = () => {
const weaviateMockImpl: WeaviateServiceImplementation = {
tenantsGet: (): Promise<TenantsGetReply> =>
new Promise((r) => {
setTimeout(r, 2000);
}).then(() => {
return {
took: 5000,
tenants: [],
};
}),
search: (): Promise<SearchReply> =>
new Promise((r) => {
setTimeout(r, 2000);
}).then(() => {
return {
results: [],
took: 5000,
groupByResults: [],
};
}),
batchDelete: (): Promise<BatchDeleteReply> =>
new Promise((r) => {
setTimeout(r, 2000);
}).then(() => {
return {
took: 5000,
status: 'SUCCESS',
failed: 0,
matches: 0,
successful: 0,
objects: [],
};
}),
batchObjects: (): Promise<BatchObjectsReply> =>
new Promise((r) => {
setTimeout(r, 2000);
}).then(() => {
return {
took: 5000,
errors: [],
};
}),
};
const healthMockImpl: HealthServiceImplementation = {
check: (request: HealthCheckRequest): Promise<HealthCheckResponse> =>
Promise.resolve(HealthCheckResponse.create({ status: HealthCheckResponse_ServingStatus.SERVING })),
watch: jest.fn(),
};

const grpcApp = createServer();
grpcApp.add(WeaviateDefinition, weaviateMockImpl);
grpcApp.add(HealthDefinition, healthMockImpl);

return grpcApp;
};

const makeMockServers = async (weaviateVersion: string, httpPort: number, grpcAddress: string) => {
const rest = makeRestApp(weaviateVersion);
const grpc = makeGrpcApp();
const server = await rest.listen(httpPort);
await grpc.listen(grpcAddress);
return { rest: server, grpc, express };
};

describe('Mock testing of timeout behaviour', () => {
let servers: {
rest: HttpServer;
grpc: GrpcServer;
};
let client: WeaviateClient;
let collection: Collection;

beforeAll(async () => {
servers = await makeMockServers('1.28.2', 8954, 'localhost:8955');
client = await weaviate.connectToLocal({ port: 8954, grpcPort: 8955, timeout: { query: 1, insert: 1 } });
collection = client.collections.get(COLLECTION_NAME);
});

it('should timeout when calling REST GET v1/schema', () =>
expect(collection.config.get()).rejects.toThrow(WeaviateRequestTimeoutError));

it('should timeout when calling gRPC TenantsGet', () =>
expect(collection.tenants.get()).rejects.toThrow(WeaviateRequestTimeoutError));

it('should timeout when calling gRPC Search', () =>
expect(collection.query.fetchObjects()).rejects.toThrow(WeaviateRequestTimeoutError));

it('should timeout when calling gRPC BatchObjects', () =>
expect(collection.data.insertMany([{ thing: 'what' }])).rejects.toThrow(WeaviateRequestTimeoutError));

it('should timeout when calling gRPC BatchDelete', () =>
expect(collection.data.deleteMany(collection.filter.byId().equal('123' as any))).rejects.toThrow(
WeaviateRequestTimeoutError
));

afterAll(() => Promise.all([servers.rest.close(), servers.grpc.shutdown()]));
});
9 changes: 9 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ export class WeaviateDeleteManyError extends WeaviateError {
}
}

/**
* Is thrown if a gRPC tenants get to Weaviate fails in any way.
*/
export class WeaviateTenantsGetError extends WeaviateError {
constructor(message: string) {
super(`Tenants get failed with message: ${message}`);
}
}

/**
* Is thrown if a gRPC batch query to Weaviate fails in any way.
*/
Expand Down
11 changes: 1 addition & 10 deletions src/grpc/base.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { isAbortError } from 'abort-controller-x';
import { ConsistencyLevel } from '../data/index.js';

import { Metadata } from 'nice-grpc';
import { RetryOptions } from 'nice-grpc-client-middleware-retry';
import { WeaviateRequestTimeoutError } from '../errors.js';
import { ConsistencyLevel as ConsistencyLevelGRPC } from '../proto/v1/base.js';
import { WeaviateClient } from '../proto/v1/weaviate.js';

Expand Down Expand Up @@ -47,13 +45,6 @@ export default class Base {
protected sendWithTimeout = <T>(send: (signal: AbortSignal) => Promise<T>): Promise<T> => {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeout * 1000);
return send(controller.signal)
.catch((error) => {
if (isAbortError(error)) {
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
}
throw error;
})
.finally(() => clearTimeout(timeoutId));
return send(controller.signal).finally(() => clearTimeout(timeoutId));
};
}
8 changes: 8 additions & 0 deletions src/grpc/batcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import {
WeaviateBatchError,
WeaviateDeleteManyError,
WeaviateInsufficientPermissionsError,
WeaviateRequestTimeoutError,
} from '../errors.js';
import { Filters } from '../proto/v1/base.js';
import { BatchDeleteReply, BatchDeleteRequest } from '../proto/v1/batch_delete.js';
import Base from './base.js';

import { isAbortError } from 'abort-controller-x';
import { retryOptions } from './retry.js';

export interface Batch {
Expand Down Expand Up @@ -65,6 +67,9 @@ export default class Batcher extends Base implements Batch {
if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) {
throw new WeaviateInsufficientPermissionsError(7, err.message);
}
if (isAbortError(err)) {
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
}
throw new WeaviateDeleteManyError(err.message);
});
}
Expand All @@ -87,6 +92,9 @@ export default class Batcher extends Base implements Batch {
if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) {
throw new WeaviateInsufficientPermissionsError(7, err.message);
}
if (isAbortError(err)) {
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
}
throw new WeaviateBatchError(err.message);
})
);
Expand Down
10 changes: 9 additions & 1 deletion src/grpc/searcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ import {
} from '../proto/v1/search_get.js';
import { WeaviateClient } from '../proto/v1/weaviate.js';

import { isAbortError } from 'abort-controller-x';
import { RetryOptions } from 'nice-grpc-client-middleware-retry';
import { WeaviateInsufficientPermissionsError, WeaviateQueryError } from '../errors.js';
import {
WeaviateInsufficientPermissionsError,
WeaviateQueryError,
WeaviateRequestTimeoutError,
} from '../errors.js';
import { GenerativeSearch } from '../proto/v1/generative.js';
import Base from './base.js';
import { retryOptions } from './retry.js';
Expand Down Expand Up @@ -160,6 +165,9 @@ export default class Searcher extends Base implements Search {
if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) {
throw new WeaviateInsufficientPermissionsError(7, err.message);
}
if (isAbortError(err)) {
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
}
throw new WeaviateQueryError(err.message, 'gRPC');
})
);
Expand Down
12 changes: 10 additions & 2 deletions src/grpc/tenantsManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { isAbortError } from 'abort-controller-x';
import { Metadata, ServerError, Status } from 'nice-grpc';
import { RetryOptions } from 'nice-grpc-client-middleware-retry';
import { WeaviateDeleteManyError, WeaviateInsufficientPermissionsError } from '../errors.js';
import {
WeaviateInsufficientPermissionsError,
WeaviateRequestTimeoutError,
WeaviateTenantsGetError,
} from '../errors.js';
import { TenantsGetReply, TenantsGetRequest } from '../proto/v1/tenants.js';
import { WeaviateClient } from '../proto/v1/weaviate.js';
import Base from './base.js';
Expand Down Expand Up @@ -45,7 +50,10 @@ export default class TenantsManager extends Base implements Tenants {
if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) {
throw new WeaviateInsufficientPermissionsError(7, err.message);
}
throw new WeaviateDeleteManyError(err.message);
if (isAbortError(err)) {
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
}
throw new WeaviateTenantsGetError(err.message);
})
);
}
Expand Down
Loading