Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Snowflake OTEL E2E tests to use Snowflake runs. #1849

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
65 changes: 23 additions & 42 deletions examples/experimental/otel_exporter.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
" return f\"answer: {self.nested(query)}\"\n",
"\n",
" @instrument(\n",
" attributes={f\"{SpanAttributes.UNKNOWN.base}.nested_attr1\": \"value1\"}\n",
" attributes={f\"{SpanAttributes.UNKNOWN.base}.nested_attr1\": \"query\"}\n",
" )\n",
" def nested(self, query: str) -> str:\n",
" return f\"nested: {self.nested2(query)}\"\n",
Expand Down Expand Up @@ -154,19 +154,22 @@
"outputs": [],
"source": [
"# Create TruLens instrumented app from custom app.\n",
"\n",
"from datetime import datetime\n",
"\n",
"from trulens.apps.app import TruApp\n",
"\n",
"APP_NAME = f\"{os.getlogin()}'s test app\"\n",
"APP_NAME = f\"{os.getlogin()} test app\" # TODO(this_pr): Allow for apostrophe?\n",
"APP_NAME = (\n",
" APP_NAME.upper()\n",
") # TODO(this_pr): Remove this requirement or give a better error message!\n",
"APP_VERSION = str(datetime.now())\n",
"\n",
"test_app = TestApp()\n",
"tru_app = TruApp(\n",
" test_app,\n",
" app_name=APP_NAME,\n",
" app_version=APP_VERSION,\n",
" connector=tru_session.connector,\n",
" main_method=test_app.respond_to_query,\n",
")"
]
Expand All @@ -177,23 +180,21 @@
"metadata": {},
"outputs": [],
"source": [
"# Run app directly (a few separate ways).\n",
"\n",
"tru_app.instrumented_invoke_main_method(\n",
" run_name=\"test_run\",\n",
" input_id=\"456\",\n",
" main_method_args=(\"test\",),\n",
")\n",
"# Create run.\n",
"\n",
"with tru_app.run(\"test run 2\"):\n",
" with tru_app.input(\"789\"):\n",
" test_app.respond_to_query(\"789\")\n",
"import uuid\n",
"\n",
"with tru_app:\n",
" test_app.respond_to_query(\"throw\")\n",
"from trulens.core.run import RunConfig\n",
"\n",
"# Without flushing, the spans are not guaranteed to be sent.\n",
"tru_session.force_flush()"
"run_name = str(uuid.uuid4())\n",
"run_config = RunConfig(\n",
" run_name=run_name,\n",
" dataset_name=\"My test dataframe name\",\n",
" source_type=\"DATAFRAME\",\n",
" dataset_spec={\"input\": \"custom_input\"},\n",
" label=\"label\",\n",
" description=\"desciption\",\n",
")"
]
},
{
Expand All @@ -202,34 +203,14 @@
"metadata": {},
"outputs": [],
"source": [
"# Run the app on a pandas DataFrame.\n",
"# Record and invoke.\n",
"\n",
"import pandas as pd\n",
"from trulens.core.app import App\n",
"\n",
"\n",
"def run_on_df(\n",
" tru_app: App,\n",
" run_name: str,\n",
" inputs: pd.DataFrame,\n",
" input_id_column: str,\n",
" input_column: str,\n",
"):\n",
" with tru_app.run(run_name):\n",
" for _, row in inputs.iterrows():\n",
" with tru_app.input(row[input_id_column]):\n",
" main_method = getattr(test_app, tru_app.main_method_name)\n",
" main_method(row[input_column])\n",
" tru_app.session.force_flush()\n",
"\n",
"\n",
"run_on_df(\n",
" tru_app,\n",
" \"df_run\",\n",
" pd.DataFrame({\"input_id\": [1, 2], \"input\": [\"a\", \"b\"]}),\n",
" \"input_id\",\n",
" \"input\",\n",
")"
"run = tru_app.add_run(run_config=run_config)\n",
"input_df = pd.DataFrame({\"custom_input\": [\"test\", \"789\", \"throw\"]})\n",
"run.start(input_df=input_df)\n",
"tru_session.force_flush()"
]
},
{
Expand Down
34 changes: 29 additions & 5 deletions tests/e2e/data/staged_packages_with_otel.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@
"source": [
"# Create app.\n",
"\n",
"APP_NAME = \"Greeter\"\n",
"APP_NAME = str(uuid.uuid4())\n",
"APP_NAME = (\n",
" APP_NAME.upper()\n",
") # TODO(this_pr): Remove this requirement or give a better error message!\n",
"\n",
"\n",
"class MyApp:\n",
Expand All @@ -135,6 +138,7 @@
" app,\n",
" app_name=APP_NAME,\n",
" app_version=\"v1\",\n",
" connector=tru_session.connector,\n",
" main_method=app.greet,\n",
")"
]
Expand All @@ -145,16 +149,36 @@
"metadata": {},
"outputs": [],
"source": [
"# Record and invoke.\n",
"# Create run.\n",
"\n",
"from trulens.core.run import RunConfig\n",
"\n",
"run_name = str(uuid.uuid4())\n",
"tru_recorder.instrumented_invoke_main_method(\n",
"run_config = RunConfig(\n",
" run_name=run_name,\n",
" input_id=\"42\",\n",
" main_method_args=(\"What is multi-headed attention?\",),\n",
" dataset_name=\"My test dataframe name\",\n",
" source_type=\"DATAFRAME\",\n",
" dataset_spec={\"input\": \"custom_input\"},\n",
" label=\"label\",\n",
" description=\"desciption\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Record and invoke.\n",
"\n",
"import pandas as pd\n",
"\n",
"run = tru_recorder.add_run(run_config=run_config)\n",
"input_df = pd.DataFrame({\"custom_input\": [\"What is multi-headed attention?\"]})\n",
"run.start(input_df=input_df)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
144 changes: 78 additions & 66 deletions tests/e2e/test_snowflake_event_table_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
import logging
import os
import time
from typing import List, Sequence
from typing import Any, Callable, Dict, List, Tuple, Type
import uuid

import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.row import Row
from trulens.apps.app import TruApp
from trulens.apps.langchain import TruChain
from trulens.apps.llamaindex import TruLlama
from trulens.connectors import snowflake as snowflake_connector
from trulens.core.app import App
from trulens.core.run import RunConfig
from trulens.core.session import TruSession
from trulens.feedback.computer import MinimalSpanInfo
from trulens.feedback.computer import RecordGraphNode
Expand Down Expand Up @@ -76,7 +79,7 @@ def _wait_for_num_results(
expected_num_results: int,
num_retries: int = 15,
retry_cooldown_in_seconds: int = 10,
) -> Sequence:
) -> List[Row]:
for _ in range(num_retries):
results = self.run_query(q, params)
if len(results) == expected_num_results:
Expand All @@ -91,7 +94,7 @@ def _wait_for_num_results(

def _validate_results(
self, app_name: str, run_name: str, num_expected_spans: int
):
) -> List[Row]:
# Flush exporter and wait for data to be made to stage.
self._tru_session.force_flush()
# Check that there are no other tables in the schema.
Expand Down Expand Up @@ -126,93 +129,102 @@ def _validate_results(
num_expected_spans,
)

def test_tru_custom_app(self):
def _test_tru_app(
self,
app: Any,
main_method: Callable,
TruAppClass: Type[App],
dataset_spec: Dict[str, str],
input_df: pd.DataFrame,
num_expected_spans: int,
) -> Tuple[str, str, List[Row]]:
# Create app.
app = tests.unit.test_otel_tru_custom.TestApp()
tru_recorder = TruApp(
app_name = str(uuid.uuid4())
app_name = app_name.upper() # TODO(this_pr): Remove this requirement or give a better error message!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider refactoring the enforced uppercase transformation for app_name. Either remove the requirement or raise a meaningful error instead of silently modifying the name.

tru_recorder = TruAppClass(
app,
app_name="custom app",
app_name=app_name,
app_version="v1",
main_method=app.respond_to_query,
connector=self._tru_session.connector,
main_method=main_method,
)
# Record and invoke.
# Create run.
run_name = str(uuid.uuid4())
tru_recorder.instrumented_invoke_main_method(
run_name=run_name, input_id="42", main_method_args=("Kojikun",)
run_config = RunConfig(
run_name=run_name,
description="desc",
dataset_name="My test dataframe name",
source_type="DATAFRAME",
label="label",
dataset_spec=dataset_spec,
)
# Record and invoke again.
# Record and invoke.
run = tru_recorder.add_run(run_config=run_config)
run.start(input_df=input_df)
self._tru_session.force_flush()
tru_recorder.instrumented_invoke_main_method(
run_name=run_name, input_id="21", main_method_args=("Nolan",)
)
# Validate results.
self._validate_results("custom app", run_name, 8)
return (
app_name,
run_name,
self._validate_results(app_name, run_name, num_expected_spans),
)

def test_tru_custom_app(self):
app = tests.unit.test_otel_tru_custom.TestApp()
self._test_tru_app(
app,
app.respond_to_query,
TruApp,
{"input": "custom_input"},
pd.DataFrame({"custom_input": ["Kojikun", "Nolan"]}),
8,
)

def test_tru_llama(self):
# Create app.
rag = (
app = (
tests.unit.test_otel_tru_llama.TestOtelTruLlama._create_simple_rag()
)
tru_recorder = TruLlama(
rag,
app_name="llama-index app",
app_version="v1",
main_method=rag.query,
)
# Record and invoke.
run_name = str(uuid.uuid4())
tru_recorder.instrumented_invoke_main_method(
run_name=run_name,
input_id="42",
main_method_args=("What is multi-headed attention?",),
self._test_tru_app(
app,
app.query,
TruLlama,
{"input": "custom_input"},
pd.DataFrame({"custom_input": ["What is multi-headed attention?"]}),
7,
)
# Validate results.
self._validate_results("llama-index app", run_name, 7)

def test_tru_chain(self):
# Create app.
rag = (
app = (
tests.unit.test_otel_tru_chain.TestOtelTruChain._create_simple_rag()
)
tru_recorder = TruChain(
rag,
app_name="langchain app",
app_version="v1",
main_method=rag.invoke,
)
# Record and invoke.
run_name = str(uuid.uuid4())
tru_recorder.instrumented_invoke_main_method(
run_name=run_name,
input_id="42",
main_method_args=("What is multi-headed attention?",),
self._test_tru_app(
app,
app.invoke,
TruChain,
{"input": "custom_input"},
pd.DataFrame({"custom_input": ["What is multi-headed attention?"]}),
9,
)
# Validate results.
self._validate_results("langchain app", run_name, 9)

def test_feedback_computation(self) -> None:
# Create app.
rag_chain = (
app = (
tests.unit.test_otel_tru_chain.TestOtelTruChain._create_simple_rag()
)
app_name = "Simple RAG"
tru_recorder = TruChain(
rag_chain,
app_name=app_name,
app_version="v1",
main_method=rag_chain.invoke,
)
# Record and invoke.
run_name = str(uuid.uuid4())
tru_recorder.instrumented_invoke_main_method(
run_name=run_name,
input_id="42",
ground_truth_output="Like attention but with more heads.",
main_method_args=("What is multi-headed attention?",),
app_name, run_name, events = self._test_tru_app(
app,
app.invoke,
TruChain,
{
"input": "custom_input",
"ground_truth_output": "expected_response",
},
pd.DataFrame({
"custom_input": ["What is multi-headed attention?"],
"expected_response": ["Like attention but with more heads."],
}),
9,
)
TruSession().force_flush()
# Compute feedback on record we just ingested.
events = self._validate_results(app_name, run_name, 9)
spans = _convert_events_to_MinimalSpanInfos(events)
record_root = RecordGraphNode.build_graph(spans)
_compute_feedback(
Expand Down