-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Chains Streaming, fixes BT-10339 #1261
Conversation
…ngs in repo and cleanup code gen. Try bytes iterator, works local
0659db2
to
5a026f9
Compare
e75d8c1
to
b14aa3b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Just a few minor comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! One implementation question -- what's the story for how this could be consumed by the end user?
if __name__ == "__main__": | ||
with chains.run_local(): | ||
chain = Consumer() | ||
result = asyncio.run(chain.run_remote()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the final chain streams output, what would be an easy way of consuming that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have 3 choices:
- raw strings: works directly.
- raw bytes: works directly.
- structured/typed pydantic models: you create
stream_reader
with the same model definitions client side.
For the last one, we can discuss ways to distribute that implementation. It depends only on pydantic and builtins, so you wouldn't need to install the whole truss package, you just need that source file. Or we could even generate a "client".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the last one, we can discuss ways to distribute that implementation. It depends only on pydantic and builtins, so you wouldn't need to install the whole truss package, you just need that source file. Or we could even generate a "client".
Sounds good
return struct.pack(">BI", delimiter.value, length) | ||
|
||
def _serialize(self, obj: pydantic.BaseModel, delimiter: _Delimiter) -> bytes: | ||
data_bytes = obj.model_dump_json().encode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to be clear, if someone has binary data (say audio) that they are sending back, they don't have to use this right? They would just stream the bytes directly?
I'm not sure how a pydantic model would serialize a field that contained binary data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to return raw binary data, you don't need any of these streamers, you can directly stream the bytes.
🚀 What
Support raw string/byte streaming and streaming of structured (pydantic) data with separate item and optional header/footer types.
💻 How
AsyncIterator[bytes|str]
endpoints. Enforce that generator functions have this annotation and vice versa.generator
tostreaming
.DockerTrussService
.StreamWriter
andStreamReader
, linked byStreamTypes
to consistently deserialize and serialize structured data. Some fun typing to make this work variadically...🔬 Testing
streaming.py
.