Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Whisper pipeline: use parallel streamer #1642

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@ def main():

if "__main__" == __name__:
main()

# todo: check base streamer inheritance (gil lock)
1 change: 1 addition & 0 deletions src/cpp/include/openvino/genai/streamer_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class OPENVINO_GENAI_EXPORTS StreamerBase {
/// @brief put is called every time new token is decoded,
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops
virtual bool put(int64_t token) = 0;
virtual bool put(const std::vector<int64_t>& tokens) = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That adds a requirements to the child classes to override one more method. I think it must have a default implementation.
IterableStreamer only overrides single token version.

class IterableStreamer(openvino_genai.StreamerBase):
While it passes because bindings handle this, this seems to be correct to follow C++ API in python as well and override both versions if bool put(const std::vector<int64_t>& tokens) remains 0 by default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation will be suboptimal as it does not populate m_tokens_cache , which is a field of derived class.

Do we need to have m_tokens_cache in interface as well?
It will tell users "something" about possible implementation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation will be suboptimal as it does not populate m_tokens_cache , which is a field of derived class.

A NotIimplementedException is enough.

Do we need to have m_tokens_cache in interface as well?
It will tell users "something" about possible implementation

No, a child class will end up with this field which may not be used. end() already suggests that caching is possible.

Copy link
Contributor

@ilya-lavrenov ilya-lavrenov Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A NotIimplementedException is enough.

In this case such streamer cannot work with Spec Decoding, Prompt Look-up or even stop strings

It's not BW compatible as well, as in current PR we assume that such method put(vector) is available

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then the need to override IterableStreamer.put() is even more important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the final decision is to add default sub-optimal implementation of put(vector)? While our streamer will override this method to have optimal implementation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Maybe with a warning print

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain in doc-strings what the new overload is for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc-string added


/// @brief end is called at the end of generation. It can be used to flush cache if your own streamer has one
virtual void end() = 0;
Expand Down
29 changes: 22 additions & 7 deletions src/cpp/include/openvino/genai/whisper_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@ using RawSpeechInput = std::vector<float>;
*
* @param m_tokenizer tokenizer
*/
class OPENVINO_GENAI_EXPORTS ChunkStreamerBase : public StreamerBase {
class OPENVINO_GENAI_EXPORTS ChunkStreamerBase {
ilya-lavrenov marked this conversation as resolved.
Show resolved Hide resolved
public:
/// @brief put is called every time new token is decoded,
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops
virtual bool put(int64_t token) = 0;

/// @brief put is called every time new token chunk is generated,
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops
virtual bool put_chunk(std::vector<int64_t> tokens) = 0;
};

// Return flag corresponds whether generation should be stopped: false means continue generation, true means stop.
using ChunkStreamerVariant =
std::variant<std::function<bool(std::string)>, std::shared_ptr<ChunkStreamerBase>, std::monostate>;
/// @brief end is called at the end of generation. It can be used to flush cache if your own streamer has one
virtual void end() = 0;

virtual ~ChunkStreamerBase() = 0;
};

struct OPENVINO_GENAI_EXPORTS WhisperRawPerfMetrics {
ilya-lavrenov marked this conversation as resolved.
Show resolved Hide resolved
/** @brief Duration for each features extraction call */
Expand Down Expand Up @@ -151,7 +156,13 @@ class OPENVINO_GENAI_EXPORTS WhisperPipeline {
*/
WhisperDecodedResults generate(const RawSpeechInput& raw_speech_input,
OptionalWhisperGenerationConfig generation_config = std::nullopt,
ChunkStreamerVariant streamer = std::monostate());
StreamerVariant streamer = std::monostate());
// todo: double check removal version
OPENVINO_DEPRECATED("ChunkStreamerBase is deprecated. "
"Use StreamerBase instead. Support will be removed in 2026.1")
ilya-lavrenov marked this conversation as resolved.
Show resolved Hide resolved
WhisperDecodedResults generate(const RawSpeechInput& raw_speech_input,
WhisperGenerationConfig generation_config,
std::shared_ptr<ChunkStreamerBase> streamer);

/**
* @brief High level generate that receives raw speech as a vector of floats and returns decoded output.
Expand All @@ -174,6 +185,10 @@ class OPENVINO_GENAI_EXPORTS WhisperPipeline {
void set_generation_config(const WhisperGenerationConfig& config);
};

OPENVINO_GENAI_EXPORTS std::pair<std::string, Any> streamer(ChunkStreamerVariant func);
// todo: double check removal version
OPENVINO_DEPRECATED("ChunkStreamerBase is deprecated. "
"Use StreamerBase instead. Support will be removed in 2026.1")
OPENVINO_GENAI_EXPORTS std::pair<std::string, Any> streamer(std::shared_ptr<ChunkStreamerBase> func);

OPENVINO_GENAI_EXPORTS std::pair<std::string, Any> generation_config(const WhisperGenerationConfig& config);
} // namespace ov::genai
82 changes: 26 additions & 56 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "lora_helper.hpp"
#include "cache_state_dumper.hpp"
#include "utils.hpp"
#include "threaded_streamer.hpp"

namespace {

Expand Down Expand Up @@ -429,19 +430,9 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}
set_adapters(sampling_params[0].adapters);

const std::shared_ptr<StreamerBase>& streamer_ptr = std::visit(overloaded{
[](std::monostate) -> std::shared_ptr<StreamerBase> {
return nullptr;
},
[](const std::shared_ptr<StreamerBase>& streamer) {
return streamer;
},
[this](const std::function<bool(std::string)>& streamer) -> std::shared_ptr<StreamerBase> {
return std::make_unique<TextCallbackStreamer>(m_tokenizer, streamer);
}
}, streamer);
const auto streamer_ptr = std::make_shared<ThreadedStreamerWrapper>(streamer, m_tokenizer);

OPENVINO_ASSERT(streamer_ptr == nullptr || input_ids.size() == 1 && sampling_params[0].num_return_sequences == 1 &&
OPENVINO_ASSERT(!streamer_ptr->has_callback() || input_ids.size() == 1 && sampling_params[0].num_return_sequences == 1 &&
(sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()),
"Currently streaming is possible only with batch size=1 and only for greedy or multinomial decoding");

Expand All @@ -452,49 +443,32 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}
auto all_requests = m_awaiting_requests; // we need to store all requests to get results from them once generation has finished

std::atomic<bool> has_active_requests = has_non_finished_requests();
GenerationHandle& generation = generations.at(0);

// create variables to make optimal thread-safe streaming
std::mutex mutex;
std::unique_lock lock(mutex);
std::condition_variable cv;

// to define streaming thread
std::shared_ptr<std::thread> t_stream_ptr = nullptr;
if (streamer_ptr) {
// define stream token lambda to use in `t_stream_ptr`
auto stream_tokens = [this, &generation, &streamer_ptr, &has_active_requests, &cv, &lock]() {
while (has_active_requests || generation->can_read()) {
// waiting for any tokens or request finishing
cv.wait(lock, [&generation, &has_active_requests]{
return generation->can_read() || !has_active_requests;
});

if (generation->can_read()) {
std::unordered_map<uint64_t, GenerationOutput> generation_outputs = generation->read();
OPENVINO_ASSERT(generation_outputs.size() <= 1);
if (!generation_outputs.empty()) {
for (const auto& generated_token_id : generation_outputs.begin()->second.generated_ids) {
if (streamer_ptr->put(generated_token_id)) {
generation->drop();
break;
}
}
}
}
};
streamer_ptr->end();
};
auto stream_tokens = [&generation, &streamer_ptr]() {
iefode marked this conversation as resolved.
Show resolved Hide resolved
if (!generation->can_read() || !streamer_ptr->has_callback()) {
ilya-lavrenov marked this conversation as resolved.
Show resolved Hide resolved
return;
}

// to define streaming thread
t_stream_ptr = std::make_shared<std::thread>([&stream_tokens] {
stream_tokens();
});
}
if (streamer_ptr->is_dropped()) {
generation->drop();
return;
}

std::unordered_map<uint64_t, GenerationOutput> generation_outputs = generation->read();
OPENVINO_ASSERT(generation_outputs.size() <= 1);
if (generation_outputs.empty()) {
return;
}

const auto tokens = generation_outputs.begin()->second.generated_ids;
streamer_ptr->put(tokens);
};

streamer_ptr->start();

std::exception_ptr thrown_exception = nullptr;
while (has_active_requests) {
while (has_non_finished_requests()) {
try {
const auto infer_start = std::chrono::steady_clock::now();
step();
Expand All @@ -510,17 +484,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
drop_requests(); // remove all requests from pipeline state in case of exception
thrown_exception = std::current_exception();
}
has_active_requests = has_non_finished_requests();
cv.notify_one();
stream_tokens();
if (thrown_exception) {
throw thrown_exception;
}
}

// waiting for competion of streaming
if (t_stream_ptr && t_stream_ptr->joinable()) {
t_stream_ptr->join();
}
streamer_ptr->end();

OPENVINO_ASSERT(m_requests.empty(), "Internal error: current request is supposed to be dropped within step() function as completed");

Expand Down
6 changes: 6 additions & 0 deletions src/cpp/src/perf_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,19 @@ PerfMetrics PerfMetrics::operator+(const PerfMetrics& right) const {

// Concatenate durations, batch_sizes first token times.
auto& new_durations = res.raw_metrics.m_durations;
auto& new_inference_durations = res.raw_metrics.m_inference_durations;
auto& new_token_infer_durations = res.raw_metrics.m_token_infer_durations;
auto& new_batch_sizes = res.raw_metrics.m_batch_sizes;
auto& new_times_to_first_token = res.raw_metrics.m_times_to_first_token;
auto& right_inference_durations = right.raw_metrics.m_inference_durations;
auto& right_token_infer_durations = right.raw_metrics.m_token_infer_durations;
auto& right_durations = right.raw_metrics.m_durations;
auto& right_batch_sizes = right.raw_metrics.m_batch_sizes;
auto& right_times_to_first_token = right.raw_metrics.m_times_to_first_token;

new_durations.insert(new_durations.end(), right_durations.begin(), right_durations.end());
new_inference_durations.insert(new_inference_durations.end(), right_inference_durations.begin(), right_inference_durations.end());
new_token_infer_durations.insert(new_token_infer_durations.end(), right_token_infer_durations.begin(), right_token_infer_durations.end());
new_times_to_first_token.insert(new_times_to_first_token.end(), right_times_to_first_token.begin(), right_times_to_first_token.end());
new_batch_sizes.insert(new_batch_sizes.end(), right_batch_sizes.begin(), right_batch_sizes.end());

Expand Down
84 changes: 27 additions & 57 deletions src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "prompt_lookup_impl.hpp"
#include "text_callback_streamer.hpp"
#include "threaded_streamer.hpp"

namespace ov::genai {
template<class... Ts> struct overloaded : Ts... {using Ts::operator()...;};
Expand Down Expand Up @@ -108,19 +109,9 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Ten
}
m_pipeline->set_adapters(sampling_params[0].adapters);

const std::shared_ptr<StreamerBase>& streamer_ptr = std::visit(overloaded{
[](std::monostate) -> std::shared_ptr<StreamerBase> {
return nullptr;
},
[](const std::shared_ptr<StreamerBase>& streamer) {
return streamer;
},
[this](const std::function<bool(std::string)>& streamer) -> std::shared_ptr<StreamerBase> {
return std::make_unique<TextCallbackStreamer>(m_tokenizer, streamer);
}
}, streamer);
const auto streamer_ptr = std::make_shared<ThreadedStreamerWrapper>(streamer, m_tokenizer);

OPENVINO_ASSERT(streamer_ptr == nullptr || input_ids.size() == 1 && (sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()),
OPENVINO_ASSERT(!streamer_ptr->has_callback() || input_ids.size() == 1 && (sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()),
"Currently streaming is possible only with batch size=1 and only for greedy or multinomial decoding");

std::vector<GenerationHandle> generations;
Expand All @@ -131,66 +122,45 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Ten
}
auto all_requests = m_pipeline->get_awaiting_requests();

std::atomic<bool> has_active_requests = has_non_finished_requests();
auto& generation = generations.at(0);

// create variables to make optimal thread-safe streaming
std::mutex mutex;
std::unique_lock lock(mutex);
std::condition_variable cv;

// to define streaming thread
std::shared_ptr<std::thread> t_stream_ptr = nullptr;
if (streamer_ptr) {
// define stream token lambda to use in `t_stream_ptr`
auto stream_tokens = [this, &generation, &streamer_ptr, &has_active_requests, &cv, &lock]() {
while (has_active_requests || generation->can_read()) {
// waiting for any tokens or request finishing
cv.wait(lock, [&generation, &has_active_requests]{
return generation->can_read() || !has_active_requests;
});

if (generation->can_read()) {
std::unordered_map<uint64_t, GenerationOutput> generation_outputs = generation->read();
OPENVINO_ASSERT(generation_outputs.size() <= 1);
if (!generation_outputs.empty()) {
for (const auto& generated_token_id : generation_outputs.begin()->second.generated_ids) {
if (streamer_ptr->put(generated_token_id)) {
generation->drop();
break;
}
}
}
}
};
streamer_ptr->end();
};

// to define streaming thread
t_stream_ptr = std::make_shared<std::thread>([&stream_tokens] {
stream_tokens();
});
}
auto stream_tokens = [&generation, &streamer_ptr]() {
if (!generation->can_read() || !streamer_ptr->has_callback()) {
return;
}

if (streamer_ptr->is_dropped()) {
generation->drop();
return;
}

std::unordered_map<uint64_t, GenerationOutput> generation_outputs = generation->read();
OPENVINO_ASSERT(generation_outputs.size() <= 1);
if (generation_outputs.empty()) {
return;
}

const auto tokens = generation_outputs.begin()->second.generated_ids;
streamer_ptr->put(tokens);
};

streamer_ptr->start();

std::exception_ptr thrown_exception = nullptr;
while (has_active_requests) {
while (has_non_finished_requests()) {
try {
step();
} catch (...) {
drop_requests(); // remove all requests from pipeline state in case of exception
thrown_exception = std::current_exception();
}
has_active_requests = has_non_finished_requests();
cv.notify_one();
stream_tokens();
if (thrown_exception) {
throw thrown_exception;
}
}

// waiting for competion of streaming
if (t_stream_ptr && t_stream_ptr->joinable()) {
t_stream_ptr->join();
}
streamer_ptr->end();

OPENVINO_ASSERT(m_pipeline->is_requests_empty(), "Internal error: current request is supposed to be dropped within step() function as completed");

Expand Down
Loading
Loading