Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Whisper pipeline: use parallel streamer #1642

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cpp/include/openvino/genai/streamer_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class OPENVINO_GENAI_EXPORTS StreamerBase {
/// @brief put is called every time new token is decoded,
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops
virtual bool put(int64_t token) = 0;
virtual bool put(const std::vector<int64_t>& tokens) = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That adds a requirements to the child classes to override one more method. I think it must have a default implementation.
IterableStreamer only overrides single token version.

class IterableStreamer(openvino_genai.StreamerBase):
While it passes because bindings handle this, this seems to be correct to follow C++ API in python as well and override both versions if bool put(const std::vector<int64_t>& tokens) remains 0 by default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation will be suboptimal as it does not populate m_tokens_cache , which is a field of derived class.

Do we need to have m_tokens_cache in interface as well?
It will tell users "something" about possible implementation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default implementation will be suboptimal as it does not populate m_tokens_cache , which is a field of derived class.

A NotIimplementedException is enough.

Do we need to have m_tokens_cache in interface as well?
It will tell users "something" about possible implementation

No, a child class will end up with this field which may not be used. end() already suggests that caching is possible.

Copy link
Contributor

@ilya-lavrenov ilya-lavrenov Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A NotIimplementedException is enough.

In this case such streamer cannot work with Spec Decoding, Prompt Look-up or even stop strings

It's not BW compatible as well, as in current PR we assume that such method put(vector) is available

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then the need to override IterableStreamer.put() is even more important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the final decision is to add default sub-optimal implementation of put(vector)? While our streamer will override this method to have optimal implementation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Maybe with a warning print

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain in doc-strings what the new overload is for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc-string added


/// @brief end is called at the end of generation. It can be used to flush cache if your own streamer has one
virtual void end() = 0;
Expand Down
30 changes: 22 additions & 8 deletions src/cpp/include/openvino/genai/whisper_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@ using RawSpeechInput = std::vector<float>;
*
* @param m_tokenizer tokenizer
*/
class OPENVINO_GENAI_EXPORTS ChunkStreamerBase : public StreamerBase {
class OPENVINO_GENAI_EXPORTS ChunkStreamerBase {
ilya-lavrenov marked this conversation as resolved.
Show resolved Hide resolved
public:
/// @brief put is called every time new token is decoded,
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops
virtual bool put(int64_t token) = 0;

/// @brief put is called every time new token chunk is generated,
/// @return bool flag to indicate whether generation should be stopped, if return true generation stops
virtual bool put_chunk(std::vector<int64_t> tokens) = 0;
};

// Return flag corresponds whether generation should be stopped: false means continue generation, true means stop.
using ChunkStreamerVariant =
std::variant<std::function<bool(std::string)>, std::shared_ptr<ChunkStreamerBase>, std::monostate>;
/// @brief end is called at the end of generation. It can be used to flush cache if your own streamer has one
virtual void end() = 0;

virtual ~ChunkStreamerBase() = 0;
};

struct OPENVINO_GENAI_EXPORTS WhisperRawPerfMetrics {
struct WhisperRawPerfMetrics {
/** @brief Duration for each features extraction call */
std::vector<MicroSeconds> features_extraction_durations;
};
Expand Down Expand Up @@ -151,7 +156,13 @@ class OPENVINO_GENAI_EXPORTS WhisperPipeline {
*/
WhisperDecodedResults generate(const RawSpeechInput& raw_speech_input,
OptionalWhisperGenerationConfig generation_config = std::nullopt,
ChunkStreamerVariant streamer = std::monostate());
StreamerVariant streamer = std::monostate());

OPENVINO_DEPRECATED("ChunkStreamerBase is deprecated. "
"Use StreamerBase instead. Support will be removed in 2026.0")
WhisperDecodedResults generate(const RawSpeechInput& raw_speech_input,
WhisperGenerationConfig generation_config,
std::shared_ptr<ChunkStreamerBase> streamer);

/**
* @brief High level generate that receives raw speech as a vector of floats and returns decoded output.
Expand All @@ -174,6 +185,9 @@ class OPENVINO_GENAI_EXPORTS WhisperPipeline {
void set_generation_config(const WhisperGenerationConfig& config);
};

OPENVINO_GENAI_EXPORTS std::pair<std::string, Any> streamer(ChunkStreamerVariant func);
OPENVINO_DEPRECATED("ChunkStreamerBase is deprecated. "
"Use StreamerBase instead. Support will be removed in 2026.0")
OPENVINO_GENAI_EXPORTS std::pair<std::string, Any> streamer(std::shared_ptr<ChunkStreamerBase> func);

OPENVINO_GENAI_EXPORTS std::pair<std::string, Any> generation_config(const WhisperGenerationConfig& config);
} // namespace ov::genai
64 changes: 7 additions & 57 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "lora_helper.hpp"
#include "cache_state_dumper.hpp"
#include "utils.hpp"
#include "threaded_streamer.hpp"

namespace {

Expand Down Expand Up @@ -429,19 +430,9 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}
set_adapters(sampling_params[0].adapters);

const std::shared_ptr<StreamerBase>& streamer_ptr = std::visit(overloaded{
[](std::monostate) -> std::shared_ptr<StreamerBase> {
return nullptr;
},
[](const std::shared_ptr<StreamerBase>& streamer) {
return streamer;
},
[this](const std::function<bool(std::string)>& streamer) -> std::shared_ptr<StreamerBase> {
return std::make_unique<TextCallbackStreamer>(m_tokenizer, streamer);
}
}, streamer);
const auto streamer_ptr = std::make_shared<ThreadedStreamerWrapper>(streamer, m_tokenizer);

OPENVINO_ASSERT(streamer_ptr == nullptr || input_ids.size() == 1 && sampling_params[0].num_return_sequences == 1 &&
OPENVINO_ASSERT(!streamer_ptr->has_callback() || input_ids.size() == 1 && sampling_params[0].num_return_sequences == 1 &&
(sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()),
"Currently streaming is possible only with batch size=1 and only for greedy or multinomial decoding");

Expand All @@ -452,49 +443,12 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}
auto all_requests = m_awaiting_requests; // we need to store all requests to get results from them once generation has finished

std::atomic<bool> has_active_requests = has_non_finished_requests();
GenerationHandle& generation = generations.at(0);

// create variables to make optimal thread-safe streaming
std::mutex mutex;
std::unique_lock lock(mutex);
std::condition_variable cv;

// to define streaming thread
std::shared_ptr<std::thread> t_stream_ptr = nullptr;
if (streamer_ptr) {
// define stream token lambda to use in `t_stream_ptr`
auto stream_tokens = [this, &generation, &streamer_ptr, &has_active_requests, &cv, &lock]() {
while (has_active_requests || generation->can_read()) {
// waiting for any tokens or request finishing
cv.wait(lock, [&generation, &has_active_requests]{
return generation->can_read() || !has_active_requests;
});

if (generation->can_read()) {
std::unordered_map<uint64_t, GenerationOutput> generation_outputs = generation->read();
OPENVINO_ASSERT(generation_outputs.size() <= 1);
if (!generation_outputs.empty()) {
for (const auto& generated_token_id : generation_outputs.begin()->second.generated_ids) {
if (streamer_ptr->put(generated_token_id)) {
generation->drop();
break;
}
}
}
}
};
streamer_ptr->end();
};

// to define streaming thread
t_stream_ptr = std::make_shared<std::thread>([&stream_tokens] {
stream_tokens();
});
}
streamer_ptr->start();

std::exception_ptr thrown_exception = nullptr;
while (has_active_requests) {
while (has_non_finished_requests()) {
try {
const auto infer_start = std::chrono::steady_clock::now();
step();
Expand All @@ -510,17 +464,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
drop_requests(); // remove all requests from pipeline state in case of exception
thrown_exception = std::current_exception();
}
has_active_requests = has_non_finished_requests();
cv.notify_one();
stream_tokens(streamer_ptr, generation);
if (thrown_exception) {
throw thrown_exception;
}
}

// waiting for competion of streaming
if (t_stream_ptr && t_stream_ptr->joinable()) {
t_stream_ptr->join();
}
streamer_ptr->end();

OPENVINO_ASSERT(m_requests.empty(), "Internal error: current request is supposed to be dropped within step() function as completed");

Expand Down
24 changes: 24 additions & 0 deletions src/cpp/src/icontinuous_batching.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,28 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate(

return decoded;
}

void ContinuousBatchingPipeline::IContinuousBatchingPipeline::stream_tokens(
const std::shared_ptr<ThreadedStreamerWrapper>& streamer_ptr,
const GenerationHandle& handle
) {
if (!streamer_ptr->has_callback() || !handle->can_read()) {
return;
}

if (streamer_ptr->is_dropped()) {
handle->drop();
return;
}

std::unordered_map<uint64_t, GenerationOutput> generation_outputs = handle->read();
OPENVINO_ASSERT(generation_outputs.size() <= 1);
if (generation_outputs.empty()) {
return;
}

const auto tokens = generation_outputs.begin()->second.generated_ids;
streamer_ptr->put(tokens);
}

}
3 changes: 3 additions & 0 deletions src/cpp/src/icontinuous_batching.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "sampler.hpp"
#include "model_runner.hpp"
#include "scheduler.hpp"
#include "threaded_streamer.hpp"

namespace ov::genai {

Expand Down Expand Up @@ -46,6 +47,8 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline {
// to access m_load_time_ms
friend class ContinuousBatchingPipeline;

void stream_tokens(const std::shared_ptr<ThreadedStreamerWrapper>& streamer_ptr, const GenerationHandle& handle);

public:
GenerationConfig get_config() const;
PipelineMetrics get_metrics() const;
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/src/perf_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,19 @@ PerfMetrics PerfMetrics::operator+(const PerfMetrics& right) const {

// Concatenate durations, batch_sizes first token times.
auto& new_durations = res.raw_metrics.m_durations;
auto& new_inference_durations = res.raw_metrics.m_inference_durations;
auto& new_token_infer_durations = res.raw_metrics.m_token_infer_durations;
auto& new_batch_sizes = res.raw_metrics.m_batch_sizes;
auto& new_times_to_first_token = res.raw_metrics.m_times_to_first_token;
auto& right_inference_durations = right.raw_metrics.m_inference_durations;
auto& right_token_infer_durations = right.raw_metrics.m_token_infer_durations;
auto& right_durations = right.raw_metrics.m_durations;
auto& right_batch_sizes = right.raw_metrics.m_batch_sizes;
auto& right_times_to_first_token = right.raw_metrics.m_times_to_first_token;

new_durations.insert(new_durations.end(), right_durations.begin(), right_durations.end());
new_inference_durations.insert(new_inference_durations.end(), right_inference_durations.begin(), right_inference_durations.end());
new_token_infer_durations.insert(new_token_infer_durations.end(), right_token_infer_durations.begin(), right_token_infer_durations.end());
new_times_to_first_token.insert(new_times_to_first_token.end(), right_times_to_first_token.begin(), right_times_to_first_token.end());
new_batch_sizes.insert(new_batch_sizes.end(), right_batch_sizes.begin(), right_batch_sizes.end());

Expand Down
64 changes: 7 additions & 57 deletions src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "prompt_lookup_impl.hpp"
#include "text_callback_streamer.hpp"
#include "threaded_streamer.hpp"

namespace ov::genai {
template<class... Ts> struct overloaded : Ts... {using Ts::operator()...;};
Expand Down Expand Up @@ -108,19 +109,9 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Ten
}
m_pipeline->set_adapters(sampling_params[0].adapters);

const std::shared_ptr<StreamerBase>& streamer_ptr = std::visit(overloaded{
[](std::monostate) -> std::shared_ptr<StreamerBase> {
return nullptr;
},
[](const std::shared_ptr<StreamerBase>& streamer) {
return streamer;
},
[this](const std::function<bool(std::string)>& streamer) -> std::shared_ptr<StreamerBase> {
return std::make_unique<TextCallbackStreamer>(m_tokenizer, streamer);
}
}, streamer);
const auto streamer_ptr = std::make_shared<ThreadedStreamerWrapper>(streamer, m_tokenizer);

OPENVINO_ASSERT(streamer_ptr == nullptr || input_ids.size() == 1 && (sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()),
OPENVINO_ASSERT(!streamer_ptr->has_callback() || input_ids.size() == 1 && (sampling_params[0].is_greedy_decoding() || sampling_params[0].is_multinomial()),
"Currently streaming is possible only with batch size=1 and only for greedy or multinomial decoding");

std::vector<GenerationHandle> generations;
Expand All @@ -131,66 +122,25 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Ten
}
auto all_requests = m_pipeline->get_awaiting_requests();

std::atomic<bool> has_active_requests = has_non_finished_requests();
auto& generation = generations.at(0);

// create variables to make optimal thread-safe streaming
std::mutex mutex;
std::unique_lock lock(mutex);
std::condition_variable cv;

// to define streaming thread
std::shared_ptr<std::thread> t_stream_ptr = nullptr;
if (streamer_ptr) {
// define stream token lambda to use in `t_stream_ptr`
auto stream_tokens = [this, &generation, &streamer_ptr, &has_active_requests, &cv, &lock]() {
while (has_active_requests || generation->can_read()) {
// waiting for any tokens or request finishing
cv.wait(lock, [&generation, &has_active_requests]{
return generation->can_read() || !has_active_requests;
});

if (generation->can_read()) {
std::unordered_map<uint64_t, GenerationOutput> generation_outputs = generation->read();
OPENVINO_ASSERT(generation_outputs.size() <= 1);
if (!generation_outputs.empty()) {
for (const auto& generated_token_id : generation_outputs.begin()->second.generated_ids) {
if (streamer_ptr->put(generated_token_id)) {
generation->drop();
break;
}
}
}
}
};
streamer_ptr->end();
};

// to define streaming thread
t_stream_ptr = std::make_shared<std::thread>([&stream_tokens] {
stream_tokens();
});
}
streamer_ptr->start();

std::exception_ptr thrown_exception = nullptr;
while (has_active_requests) {
while (has_non_finished_requests()) {
try {
step();
} catch (...) {
drop_requests(); // remove all requests from pipeline state in case of exception
thrown_exception = std::current_exception();
}
has_active_requests = has_non_finished_requests();
cv.notify_one();
stream_tokens(streamer_ptr, generation);
if (thrown_exception) {
throw thrown_exception;
}
}

// waiting for competion of streaming
if (t_stream_ptr && t_stream_ptr->joinable()) {
t_stream_ptr->join();
}
streamer_ptr->end();

OPENVINO_ASSERT(m_pipeline->is_requests_empty(), "Internal error: current request is supposed to be dropped within step() function as completed");

Expand Down
Loading
Loading