Skip to content

Commit

Permalink
Add server task timeout.
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang86 committed Jan 12, 2024
1 parent 70212f6 commit a58aa6d
Showing 1 changed file with 26 additions and 1 deletion.
27 changes: 26 additions & 1 deletion src/runtime/request_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "flexflow/parallel_ops/parallel_op.h"
// #include "flexflow/tokenizers.h"
#include <bitset>
#include <chrono>
#include <filesystem>
#include <future>
#include <iomanip>
Expand All @@ -29,6 +30,8 @@ namespace FlexFlow {
using namespace Legion;
using tokenizers::Tokenizer;

const std::chrono::seconds TIMEOUT_DURATION(300);

LegionRuntime::Logger::Category log_req_mgr("RequestManager");

std::string LoadBytesFromFile(std::string const &path) {
Expand All @@ -46,7 +49,7 @@ std::string LoadBytesFromFile(std::string const &path) {
RequestManager::RequestManager()
: request_manager_status(INITIALIZED), verbose(false),
next_available_guid(1000000), num_processed_requests(0),
total_request_run_time(0.0f){
total_request_run_time(0.0f) {
// The following config parameters are set
// during ffmodel.compile()
// Initialize them to -1 to make sure no one
Expand Down Expand Up @@ -2366,7 +2369,17 @@ void RequestManager::serve_incr_decoding(FFModel *llm) {
std::queue<std::pair<BatchConfigFuture, InferenceResultFuture>>
batch_pipeline;
{ batch_pipeline.push(std::make_pair(last_bcf, last_irf)); }

auto last_process_time = std::chrono::steady_clock::now();
while (!is_background_server_terminated()) {
auto current_time = std::chrono::steady_clock::now();
if (current_time - last_process_time > TIMEOUT_DURATION) {
std::cout
<< "No more new request for 300 seconds. Background server exits."
<< std::endl;
break; // Exit the loop after timeout
}

if (batch_pipeline.size() >= 4) {
// Block here to avoid launching too many batches
auto const &batch = batch_pipeline.front();
Expand All @@ -2377,6 +2390,7 @@ void RequestManager::serve_incr_decoding(FFModel *llm) {
auto const &batch = batch_pipeline.front();
if (batch.second.is_ready()) {
batch_pipeline.pop();
last_process_time = std::chrono::steady_clock::now();
} else {
break;
}
Expand Down Expand Up @@ -2435,7 +2449,17 @@ void RequestManager::serve_spec_infer(FFModel *llm) {
last_tree_irf = Future::from_value<InferenceResult>(tree_ir);
}
batch_pipeline.push(std::make_pair(last_tree_bcf, last_tree_irf));

auto last_process_time = std::chrono::steady_clock::now();
while (!is_background_server_terminated()) {
auto current_time = std::chrono::steady_clock::now();
if (current_time - last_process_time > TIMEOUT_DURATION) {
std::cout
<< "No more new request for 300 seconds. Background server exits."
<< std::endl;
break; // Exit the loop after timeout
}

if (batch_pipeline.size() >= 4) {
// Block here to avoid launching too many batches
auto const &batch = batch_pipeline.front();
Expand All @@ -2446,6 +2470,7 @@ void RequestManager::serve_spec_infer(FFModel *llm) {
auto const &batch = batch_pipeline.front();
if (batch.second.is_ready()) {
batch_pipeline.pop();
last_process_time = std::chrono::steady_clock::now();
} else {
break;
}
Expand Down

0 comments on commit a58aa6d

Please sign in to comment.