Skip to content

Commit

Permalink
Add support for iterator & streaming
Browse files Browse the repository at this point in the history
Fix issue where Streaming responses wouldn't return a result to the
client
  • Loading branch information
flenter committed Sep 16, 2024
1 parent a0e138d commit d9a96de
Show file tree
Hide file tree
Showing 12 changed files with 430 additions and 81 deletions.
52 changes: 52 additions & 0 deletions packages/client-library-otel/sample/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Hono } from "hono";
import { stream } from "hono/streaming";
import { instrument, measure } from "../src";

const app = new Hono();
Expand Down Expand Up @@ -48,4 +49,55 @@ app.get("/error", async () => {
await delayedError();
});

// This is an async generator function (and so returns an async iterator)
const generateRelaxedWelcome = measure("relaxedWelcome", async function* () {
await sleep(500);
yield "hello! ";
await sleep(500);
yield "Hono ";
await sleep(500);
yield "is ";
await sleep(500);
yield "awesome";
});

app.get("/stream", async (c) => {
c.header("Content-Type", "text/plain");
return stream(c, async (stream) => {
const result = generateRelaxedWelcome();

for await (const content of result) {
await stream.write(content);
}
});
});

const fibonacci = measure(
"fibonacci",
function* (arg: number): Generator<number> {
let a = 1;
let b = 1;
for (let i = 0; i < arg; i++) {
yield a;
[a, b] = [b, a + b];
}
},
);
// Example usage:
app.get("/fibonacci/:count", (c) => {
const count = Number.parseInt(c.req.param("count"), 10);

const result = fibonacci(count);
const values = Array.from(result);

return c.text(`Fibonacci sequence (${count} numbers): ${values.join(", ")}`);
});

app.get("/quick", async (c) => {
c.header("Content-Type", "text/plain");
return stream(c, async (stream) => {
stream.write("ok");
});
});

export default instrument(app);
38 changes: 29 additions & 9 deletions packages/client-library-otel/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
patchFetch,
patchWaitUntil,
} from "./patch";
import { PromiseStore } from "./promiseStore";
import { propagateFpxTraceId } from "./propagation";
import { isRouteInspectorRequest, respondWithRoutes } from "./routes";
import type { HonoLikeApp, HonoLikeEnv, HonoLikeFetch } from "./types";
Expand All @@ -30,6 +31,7 @@ import {
getRequestAttributes,
getResponseAttributes,
getRootRequestAttributes,
isPromise,
} from "./utils";

/**
Expand Down Expand Up @@ -148,10 +150,10 @@ export function instrument(app: HonoLikeApp, config?: FpxConfigOptions) {
endpoint,
});

const promiseStore = new PromiseStore();
// Enable tracing for waitUntil
const patched = executionContext && patchWaitUntil(executionContext);
const promises = patched?.promises ?? [];
const proxyExecutionCtx = patched?.proxyContext ?? executionContext;
const proxyExecutionCtx =
executionContext && patchWaitUntil(executionContext, promiseStore);

const activeContext = propagateFpxTraceId(request);

Expand Down Expand Up @@ -206,11 +208,29 @@ export function instrument(app: HonoLikeApp, config?: FpxConfigOptions) {
};
span.setAttributes(requestAttributes);
},
onSuccess: async (span, response) => {
const attributes = await getResponseAttributes(
(await response).clone(),
);
span.setAttributes(attributes);
endSpanManually: true,
onSuccess: async (span, responsePromise) => {
span.addEvent("first-response");

const response = isPromise(responsePromise)
? await responsePromise
: responsePromise;

const attributesResponse = response.clone();
const newResponse = response;

const updateSpan = async (response: Response) => {
const attributes = await getResponseAttributes(response);
span.setAttributes(attributes);
console.log(
"attributes",
attributes["fpx.http.response.body"],
);
span.end();
};

promiseStore.add(updateSpan(attributesResponse));
return newResponse;
},
checkResult: async (result) => {
const r = await result;
Expand All @@ -231,7 +251,7 @@ export function instrument(app: HonoLikeApp, config?: FpxConfigOptions) {
// Make sure all promises are resolved before sending data to the server
if (proxyExecutionCtx) {
proxyExecutionCtx.waitUntil(
Promise.allSettled(promises).finally(() => {
promiseStore.allSettled().finally(() => {
return provider.forceFlush();
}),
);
Expand Down
Loading

0 comments on commit d9a96de

Please sign in to comment.