Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Whisper pipeline: use parallel streamer #1642

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

as-suvorov
Copy link
Contributor

@as-suvorov as-suvorov commented Jan 29, 2025

  • add bool put(const std::vector<int64_t> tokens) to StreamerBase
  • add Whisper pipeline StreamerBase ctor. Deprecate ChunkStreamerBase ctor.
  • add parallel streaming for Whisper with async/wait.
  • add ThreadedStreamerWrapper. Utilize it in CB pipelines.

Ticket: 160606

src/cpp/src/whisper/streamer.hpp Outdated Show resolved Hide resolved
src/cpp/src/whisper/streamer.hpp Outdated Show resolved Hide resolved
@github-actions github-actions bot added the category: Python API Python API for GenAI label Jan 29, 2025
@iefode iefode self-assigned this Jan 30, 2025
@github-actions github-actions bot added category: continuous batching Continuous batching category: LLM LLM pipeline (stateful, static) category: speculative decoding Speculative decoding category: samples GenAI samples category: GenAI C++ API Changes in GenAI C++ public headers category: prompt lookup labels Feb 3, 2025
src/cpp/include/openvino/genai/whisper_pipeline.hpp Outdated Show resolved Hide resolved
src/cpp/include/openvino/genai/whisper_pipeline.hpp Outdated Show resolved Hide resolved
src/cpp/src/continuous_batching_impl.cpp Outdated Show resolved Hide resolved
src/cpp/src/text_callback_streamer.hpp Outdated Show resolved Hide resolved
src/cpp/src/threaded_streamer.hpp Outdated Show resolved Hide resolved
src/cpp/src/threaded_streamer.hpp Outdated Show resolved Hide resolved
src/cpp/src/whisper/streamer.hpp Show resolved Hide resolved
src/python/py_whisper_pipeline.cpp Outdated Show resolved Hide resolved
src/python/py_openvino_genai.cpp Show resolved Hide resolved
@github-actions github-actions bot removed the category: samples GenAI samples label Feb 4, 2025
src/cpp/src/continuous_batching_impl.cpp Outdated Show resolved Hide resolved
std::shared_ptr<std::thread> m_worker_thread = nullptr;
std::mutex m_mutex;
std::condition_variable m_cv;
std::queue<std::variant<int64_t, std::vector<int64_t>>> m_queue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not SynchronizedQueue from openvino.genai/src/cpp/src/synchronized_queue.hpp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a deadlock for squeue.pull. It happen on streamer.end call.

  1. streamer thread waits for a new token
  2. main thread calls streamer.end, streamer thread should be stopped. But there is no API to gracefully unlock squeue.

One way to unlock squeue is to push some dummy token and handle is_stopped flag.
I thought it may be not very clean.

I guess the trick with dummy token and sync queue can simplify implementation. Do you want me to implement that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look like we handled it via

push_empty_outputs();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reimplemented with SynchronizedQueue

@as-suvorov as-suvorov requested a review from Wovchena February 4, 2025 13:27
@as-suvorov as-suvorov marked this pull request as ready for review February 4, 2025 13:27
@@ -18,6 +18,7 @@ class OPENVINO_GENAI_EXPORTS StreamerBase {
/// @brief put is called every time new token is decoded,
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops
virtual bool put(int64_t token) = 0;
virtual bool put(const std::vector<int64_t>& tokens) = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That adds a requirements to the child classes to override one more method. I think it must have a default implementation.
IterableStreamer only overrides single token version.

class IterableStreamer(openvino_genai.StreamerBase):
While it passes because bindings handle this, this seems to be correct to follow C++ API in python as well and override both versions if bool put(const std::vector<int64_t>& tokens) remains 0 by default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation will be suboptimal as it does not populate m_tokens_cache , which is a field of derived class.

Do we need to have m_tokens_cache in interface as well?
It will tell users "something" about possible implementation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation will be suboptimal as it does not populate m_tokens_cache , which is a field of derived class.

A NotIimplementedException is enough.

Do we need to have m_tokens_cache in interface as well?
It will tell users "something" about possible implementation

No, a child class will end up with this field which may not be used. end() already suggests that caching is possible.

Copy link
Contributor

@ilya-lavrenov ilya-lavrenov Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A NotIimplementedException is enough.

In this case such streamer cannot work with Spec Decoding, Prompt Look-up or even stop strings

It's not BW compatible as well, as in current PR we assume that such method put(vector) is available

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then the need to override IterableStreamer.put() is even more important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the final decision is to add default sub-optimal implementation of put(vector)? While our streamer will override this method to have optimal implementation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Maybe with a warning print

@@ -18,6 +18,7 @@ class OPENVINO_GENAI_EXPORTS StreamerBase {
/// @brief put is called every time new token is decoded,
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops
virtual bool put(int64_t token) = 0;
virtual bool put(const std::vector<int64_t>& tokens) = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain in doc-strings what the new overload is for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc-string added

@@ -3,21 +3,35 @@

#pragma once

#include <condition_variable>
#include <queue>
#include <thread>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These headers seem to be redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

if (auto _token = std::get_if<int64_t>(&token)) {
return self.put(*_token);
} else {
auto tokens = std::get_if<std::vector<int64_t>>(&token);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto tokens = std::get_if<std::vector<int64_t>>(&token);
auto tokens = std::get<std::vector<int64_t>>(&token);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applied

src/cpp/src/threaded_streamer.hpp Outdated Show resolved Hide resolved
}

std::lock_guard<std::mutex> lock(m_mutex);
return m_dropped;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_dropped lives it's own live. It could have a separate lock. Pushing the idea further, you should simply wrap m_dropped with atomic so no lock is needed at all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to atomic

@github-actions github-actions bot added the category: samples GenAI samples label Feb 7, 2025
TextCallbackStreamer(const Tokenizer& tokenizer, std::function<bool(std::string)> callback);

std::function<bool(std::string)> on_finalized_subword_callback = [](std::string words)->bool { return false; };
bool put(int64_t token) override;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it override put(vector) to have optimal implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TextCallbackStreamer cannot work with put(std::vector<int64_t>) properly for the moment. The trick with inserting chunk of tokens to streamer's cache looks not valid to me. This is due to n_delay_tokens feature: https://github.com/openvinotoolkit/openvino.genai/blob/master/src/cpp/src/text_callback_streamer.cpp#L42.
I propose to fix it in the next PRs.

const WhisperGenerationConfig& generation_config,
const std::shared_ptr<ChunkStreamerBase>& streamer,
const py::kwargs& kwargs) -> py::typing::Union<ov::genai::WhisperDecodedResults> {
StreamerVariant _streamer = std::make_shared<ov::genai::ChunkToBaseStreamerAdapter>(streamer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can add deprecation warning using PyErr_WarnEx(PyExc_DeprecationWarning .. ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will do

@as-suvorov
Copy link
Contributor Author

as-suvorov commented Feb 7, 2025

@ilya-lavrenov @Wovchena @sbalandi It seems that adding overload put(const std::vector<int64_t>& tokens) will introduce a breaking change for the python API. That's because method overloading is not possible in python as I understand. Latter method definition with the same name will overwrite previous one. With a such change python API will have to have a union arg type def put(token: int | list[int]) -> bool (#1642 (comment)) which is a breaking change.
It looks like @sbalandi's PR #1476 should merge first and then we can introduce def write(token: int | list[int]) with no breaking changes.

I want to split this PR into:

  1. Use Whisper parallel streaming with async/wait
  2. Add ThreadedStreamer for CB pipelines (with all the small fixes from this PR)
  3. Add write(std::vector<int64_t>& tokens) method for StreamerBase. (based on Add a choice of how to end streaming from callback: STOP or CANCEL #1476)

@as-suvorov as-suvorov marked this pull request as draft February 7, 2025 13:26
Copy link
Collaborator

@Wovchena Wovchena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I noticed before the decision to split the PR reached me.

assert streamer_instance.text == result.texts[0]

config = genai_pipe.get_generation_config()
config.return_timestamps = True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parametrize the test with different instances of streamer. return_timestamps is also a good candidate to be parametrized, but I guess the intention was to also test different ways of setting it.

A function can be defined like def foo(val): foo.stored_val = [] if not hasattr(foo, 'stored_val') else foo.stored_val; foo.stored_val.append(val); print(foo.stored_val)


ON_CALL(*this, end()).WillByDefault([this]() {
if (should_sleep) {
std::this_thread::sleep_for(m_sleep_for);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should end() really sleep? heavy_callback_test doesn't check anything after end().

Is it possible to parametrize MockStreamerBase's constructor with a lambda taking const std::vector<int64_t>& tokens? In that case you could define put(int64_t token) as return put({token});. Every test would provide it's own lambda which sleeps or not, drops or not. This would let you to reduce the number of members and the behavior description would be local to every test.

Should the tests also verify that no value is missed? A lambda would help with that as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
category: continuous batching Continuous batching category: GenAI C++ API Changes in GenAI C++ public headers category: LLM LLM pipeline (stateful, static) category: prompt lookup category: Python API Python API for GenAI category: samples GenAI samples category: speculative decoding Speculative decoding category: whisper Whisper pipeline no-match-files
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants