Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #44 from SciPhi-AI/Nolan/ImproveStreaming
Browse files Browse the repository at this point in the history
Improve streaming support
  • Loading branch information
NolanTrem authored Jul 22, 2024
2 parents 5aec51c + 857bdc5 commit 12b0b35
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 69 deletions.
5 changes: 5 additions & 0 deletions examples/hello_r2r.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ async function main() {
{ path: "examples/data/raskolnikov.txt", name: "raskolnikov.txt" },
];

const EMAIL = "[email protected]";
const PASSWORD = "change_me_immediately";
console.log("Logging in...");
await client.login(EMAIL, PASSWORD);

console.log("Ingesting file...");
const ingestResult = await client.ingestFiles(files, {
metadatas: [{ title: "raskolnikov.txt" }],
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "r2r-js",
"version": "1.2.7",
"version": "1.2.8",
"description": "",
"main": "dist/index.js",
"browser": "dist/index.browser.js",
Expand Down
67 changes: 21 additions & 46 deletions src/r2rClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import axios, {
} from "axios";
import FormData from "form-data";
import { URLSearchParams } from "url";
import { Readable } from "stream";

let fs: any;
if (typeof window === "undefined") {
Expand Down Expand Up @@ -92,6 +93,12 @@ export class r2rClient {
initializeTelemetry();
}

private isReadable(obj: any): obj is Readable {
return (
obj && typeof obj.on === "function" && typeof obj.read === "function"
);
}

private async _makeRequest<T = any>(
method: Method,
endpoint: string,
Expand Down Expand Up @@ -133,54 +140,22 @@ export class r2rClient {
config.headers.Authorization = `Bearer ${this.accessToken}`;
}

if (options.responseType === "stream") {
return new Promise<T>((resolve, reject) => {
let buffer = new Uint8Array(0);
const stream = new ReadableStream({
start: (controller) => {
this.axiosInstance
.request({
...config,
responseType: "arraybuffer",
onDownloadProgress: (progressEvent) => {
const chunk = progressEvent.event.currentTarget.response;
const newBuffer = new Uint8Array(
buffer.length + chunk.byteLength,
);
newBuffer.set(buffer);
newBuffer.set(new Uint8Array(chunk), buffer.length);
buffer = newBuffer;

try {
controller.enqueue(new Uint8Array(chunk));
} catch (error) {
console.warn("Stream closed, unable to enqueue more data");
}
},
})
.then((response) => {})
.catch((error) => {
controller.error(error);
reject(error);
})
.finally(() => {
if (buffer.length > 0) {
try {
controller.enqueue(buffer);
} catch (error) {
console.warn("Unable to enqueue final data");
}
}
controller.close();
resolve(stream as any as T);
});
try {
const response = await this.axiosInstance.request(config);

if (options.responseType === "stream") {
const stream = response.data as unknown as Readable;
return new ReadableStream({
start(controller) {
stream.on("data", (chunk: Buffer) => {
controller.enqueue(new Uint8Array(chunk));
});
stream.on("end", () => controller.close());
stream.on("error", (err: Error) => controller.error(err));
},
});
});
}
}) as unknown as T;
}

try {
const response = await this.axiosInstance.request<T>(config);
return options.returnFullResponse
? (response as any as T)
: response.data;
Expand Down
21 changes: 0 additions & 21 deletions testr2r.js

This file was deleted.

0 comments on commit 12b0b35

Please sign in to comment.