-
Notifications
You must be signed in to change notification settings - Fork 205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Whisper pipeline: use parallel streamer #1642
base: master
Are you sure you want to change the base?
Whisper pipeline: use parallel streamer #1642
Conversation
src/cpp/src/threaded_streamer.hpp
Outdated
std::shared_ptr<std::thread> m_worker_thread = nullptr; | ||
std::mutex m_mutex; | ||
std::condition_variable m_cv; | ||
std::queue<std::variant<int64_t, std::vector<int64_t>>> m_queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not SynchronizedQueue
from openvino.genai/src/cpp/src/synchronized_queue.hpp
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a deadlock for squeue.pull
. It happen on streamer.end
call.
- streamer thread waits for a new token
- main thread calls
streamer.end
, streamer thread should be stopped. But there is no API to gracefully unlock squeue.
One way to unlock squeue is to push some dummy token and handle is_stopped
flag.
I thought it may be not very clean.
I guess the trick with dummy token and sync queue can simplify implementation. Do you want me to implement that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look like we handled it via
openvino.genai/src/cpp/src/sequence_group.hpp
Line 655 in a0852d0
push_empty_outputs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reimplemented with SynchronizedQueue
@@ -18,6 +18,7 @@ class OPENVINO_GENAI_EXPORTS StreamerBase { | |||
/// @brief put is called every time new token is decoded, | |||
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops | |||
virtual bool put(int64_t token) = 0; | |||
virtual bool put(const std::vector<int64_t>& tokens) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That adds a requirements to the child classes to override one more method. I think it must have a default implementation.
IterableStreamer
only overrides single token version.
class IterableStreamer(openvino_genai.StreamerBase): |
bool put(const std::vector<int64_t>& tokens)
remains 0 by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default implementation will be suboptimal as it does not populate m_tokens_cache
, which is a field of derived class.
Do we need to have m_tokens_cache
in interface as well?
It will tell users "something" about possible implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default implementation will be suboptimal as it does not populate
m_tokens_cache
, which is a field of derived class.
A NotIimplementedException is enough.
Do we need to have
m_tokens_cache
in interface as well?
It will tell users "something" about possible implementation
No, a child class will end up with this field which may not be used. end()
already suggests that caching is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A NotIimplementedException is enough.
In this case such streamer cannot work with Spec Decoding, Prompt Look-up or even stop strings
It's not BW compatible as well, as in current PR we assume that such method put(vector)
is available
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Then the need to override IterableStreamer.put()
is even more important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the final decision is to add default sub-optimal implementation of put(vector)
? While our streamer will override this method to have optimal implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Maybe with a warning print
@@ -18,6 +18,7 @@ class OPENVINO_GENAI_EXPORTS StreamerBase { | |||
/// @brief put is called every time new token is decoded, | |||
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops | |||
virtual bool put(int64_t token) = 0; | |||
virtual bool put(const std::vector<int64_t>& tokens) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain in doc-strings what the new overload is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc-string added
src/cpp/src/whisper/streamer.hpp
Outdated
@@ -3,21 +3,35 @@ | |||
|
|||
#pragma once | |||
|
|||
#include <condition_variable> | |||
#include <queue> | |||
#include <thread> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These headers seem to be redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
src/python/py_openvino_genai.cpp
Outdated
if (auto _token = std::get_if<int64_t>(&token)) { | ||
return self.put(*_token); | ||
} else { | ||
auto tokens = std::get_if<std::vector<int64_t>>(&token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto tokens = std::get_if<std::vector<int64_t>>(&token); | |
auto tokens = std::get<std::vector<int64_t>>(&token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applied
} | ||
|
||
std::lock_guard<std::mutex> lock(m_mutex); | ||
return m_dropped; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
m_dropped
lives it's own live. It could have a separate lock. Pushing the idea further, you should simply wrap m_dropped
with atomic
so no lock is needed at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to atomic
TextCallbackStreamer(const Tokenizer& tokenizer, std::function<bool(std::string)> callback); | ||
|
||
std::function<bool(std::string)> on_finalized_subword_callback = [](std::string words)->bool { return false; }; | ||
bool put(int64_t token) override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it override put(vector)
to have optimal implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TextCallbackStreamer
cannot work with put(std::vector<int64_t>)
properly for the moment. The trick with inserting chunk of tokens to streamer's cache looks not valid to me. This is due to n_delay_tokens
feature: https://github.com/openvinotoolkit/openvino.genai/blob/master/src/cpp/src/text_callback_streamer.cpp#L42.
I propose to fix it in the next PRs.
const WhisperGenerationConfig& generation_config, | ||
const std::shared_ptr<ChunkStreamerBase>& streamer, | ||
const py::kwargs& kwargs) -> py::typing::Union<ov::genai::WhisperDecodedResults> { | ||
StreamerVariant _streamer = std::make_shared<ov::genai::ChunkToBaseStreamerAdapter>(streamer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can add deprecation warning using PyErr_WarnEx(PyExc_DeprecationWarning ..
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will do
@ilya-lavrenov @Wovchena @sbalandi It seems that adding overload I want to split this PR into:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I noticed before the decision to split the PR reached me.
assert streamer_instance.text == result.texts[0] | ||
|
||
config = genai_pipe.get_generation_config() | ||
config.return_timestamps = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parametrize the test with different instances of streamer. return_timestamps
is also a good candidate to be parametrized, but I guess the intention was to also test different ways of setting it.
A function can be defined like def foo(val): foo.stored_val = [] if not hasattr(foo, 'stored_val') else foo.stored_val; foo.stored_val.append(val); print(foo.stored_val)
|
||
ON_CALL(*this, end()).WillByDefault([this]() { | ||
if (should_sleep) { | ||
std::this_thread::sleep_for(m_sleep_for); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should end()
really sleep? heavy_callback_test
doesn't check anything after end()
.
Is it possible to parametrize MockStreamerBase
's constructor with a lambda taking const std::vector<int64_t>& tokens
? In that case you could define put(int64_t token)
as return put({token});
. Every test would provide it's own lambda which sleeps or not, drops or not. This would let you to reduce the number of members and the behavior description would be local to every test.
Should the tests also verify that no value is missed? A lambda would help with that as well.
bool put(const std::vector<int64_t> tokens)
toStreamerBase
StreamerBase
ctor. DeprecateChunkStreamerBase
ctor.ThreadedStreamerWrapper
. Utilize it in CB pipelines.Ticket: 160606