Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update windows kafka client. #1466

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 152 additions & 197 deletions src/client/WFKafkaClient.cc

Large diffs are not rendered by default.

160 changes: 43 additions & 117 deletions src/factory/KafkaTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
limitations under the License.

Authors: Wang Zhulei ([email protected])
Xie Han ([email protected])
*/

#include <assert.h>
Expand Down Expand Up @@ -45,16 +46,15 @@ static KafkaCgroup __create_cgroup(const KafkaCgroup *c)

/**********Client**********/

class __ComplexKafkaTask : public WFComplexClientTask<KafkaRequest, KafkaResponse,
struct __ComplexKafkaTaskCtx>
class __ComplexKafkaTask : public WFComplexClientTask<KafkaRequest, KafkaResponse, int>
{
public:
__ComplexKafkaTask(int retry_max, __kafka_callback_t&& callback) :
WFComplexClientTask(retry_max, std::move(callback))
{
is_user_request_ = true;
is_redirect_ = false;
ctx_.kafka_error = 0;
ctx_ = 0;
}

protected:
Expand Down Expand Up @@ -117,7 +117,6 @@ class __ComplexKafkaTask : public WFComplexClientTask<KafkaRequest, KafkaRespons
virtual int keep_alive_timeout();
virtual int first_timeout();
bool has_next();
bool check_redirect();
bool process_produce();
bool process_fetch();
bool process_metadata();
Expand Down Expand Up @@ -310,19 +309,17 @@ CommMessageIn *__ComplexKafkaTask::message_in()

bool __ComplexKafkaTask::init_success()
{
TransportType type = TT_TCP;
if (uri_.scheme)
enum TransportType type;

if (uri_.scheme && strcasecmp(uri_.scheme, "kafka") == 0)
type = TT_TCP;
else if (uri_.scheme && strcasecmp(uri_.scheme, "kafkas") == 0)
type = TT_TCP_SSL;
else
{
if (strcasecmp(uri_.scheme, "kafka") == 0)
type = TT_TCP;
//else if (uri_.scheme && strcasecmp(uri_.scheme, "kafkas") == 0)
// type = TT_TCP_SSL;
else
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_URI_SCHEME_INVALID;
return false;
}
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_URI_SCHEME_INVALID;
return false;
}

std::string username, password, sasl, client;
Expand Down Expand Up @@ -367,7 +364,6 @@ bool __ComplexKafkaTask::init_success()
}

this->WFComplexClientTask::set_transport_type(type);

return true;
}

Expand Down Expand Up @@ -409,52 +405,11 @@ int __ComplexKafkaTask::first_timeout()
return ret + KAFKA_ROUNDTRIP_TIMEOUT;
}

bool __ComplexKafkaTask::check_redirect()
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
KafkaBroker *coordinator = this->get_req()->get_cgroup()->get_coordinator();

//always success
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
if (!coordinator->is_equal(paddr, addrlen))
{
if (coordinator->is_to_addr())
{
const struct sockaddr *addr_coord;
socklen_t addrlen_coord;

coordinator->get_broker_addr(&addr_coord, &addrlen_coord);
set_redirect(TT_TCP, addr_coord, addrlen_coord,
this->WFComplexClientTask::info_);
}
else
{
std::string url = "kafka://";
url += user_info_ + "@";
url += coordinator->get_host();
url += ":" + std::to_string(coordinator->get_port());

ParsedURI uri;
URIParser::parse(url, uri);
set_redirect(std::move(uri));
}

return true;
}
else
{
this->init(TT_TCP, paddr, addrlen, this->WFComplexClientTask::info_);
return false;
}
}

bool __ComplexKafkaTask::process_find_coordinator()
{
KafkaCgroup *cgroup = this->get_resp()->get_cgroup();
ctx_.kafka_error = cgroup->get_error();
if (ctx_.kafka_error)
ctx_ = cgroup->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_CGROUP_FAILED;
this->state = WFT_STATE_TASK_ERROR;
Expand All @@ -463,25 +418,25 @@ bool __ComplexKafkaTask::process_find_coordinator()
else
{
this->get_req()->set_cgroup(*cgroup);
is_redirect_ = check_redirect();
KafkaBroker *coordinator = cgroup->get_coordinator();
std::string url(uri_.scheme);
url += "://";
url += user_info_ + "@";
url += coordinator->get_host();
url += ":" + std::to_string(coordinator->get_port());

ParsedURI uri;
URIParser::parse(url, uri);
set_redirect(std::move(uri));
this->get_req()->set_api_type(Kafka_JoinGroup);
is_redirect_ = true;
return true;
}
}

bool __ComplexKafkaTask::process_join_group()
{
KafkaResponse *msg = this->get_resp();
if (!msg->get_cgroup()->get_coordinator()->is_to_addr())
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
msg->get_cgroup()->get_coordinator()->set_broker_addr(paddr, addrlen);
msg->get_cgroup()->get_coordinator()->set_to_addr(1);
}

switch(msg->get_cgroup()->get_error())
{
case KAFKA_MEMBER_ID_REQUIRED:
Expand All @@ -498,7 +453,7 @@ bool __ComplexKafkaTask::process_join_group()
break;

default:
ctx_.kafka_error = msg->get_cgroup()->get_error();
ctx_ = msg->get_cgroup()->get_error();
this->error = WFT_ERR_KAFKA_CGROUP_FAILED;
this->state = WFT_STATE_TASK_ERROR;
return false;
Expand All @@ -509,8 +464,8 @@ bool __ComplexKafkaTask::process_join_group()

bool __ComplexKafkaTask::process_sync_group()
{
ctx_.kafka_error = this->get_resp()->get_cgroup()->get_error();
if (ctx_.kafka_error)
ctx_ = this->get_resp()->get_cgroup()->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_CGROUP_FAILED;
this->state = WFT_STATE_TASK_ERROR;
Expand Down Expand Up @@ -538,7 +493,7 @@ bool __ComplexKafkaTask::process_metadata()
case 0:
break;
default:
ctx_.kafka_error = meta->get_error();
ctx_ = meta->get_error();
this->error = WFT_ERR_KAFKA_META_FAILED;
this->state = WFT_STATE_TASK_ERROR;
return false;
Expand Down Expand Up @@ -598,7 +553,7 @@ bool __ComplexKafkaTask::process_fetch()
case KAFKA_OFFSET_OUT_OF_RANGE:
break;
default:
ctx_.kafka_error = toppar->get_error();
ctx_ = toppar->get_error();
this->error = WFT_ERR_KAFKA_FETCH_FAILED;
this->state = WFT_STATE_TASK_ERROR;
return false;
Expand Down Expand Up @@ -658,8 +613,8 @@ bool __ComplexKafkaTask::process_produce()

bool __ComplexKafkaTask::process_sasl_handshake()
{
ctx_.kafka_error = this->get_resp()->get_broker()->get_error();
if (ctx_.kafka_error)
ctx_ = this->get_resp()->get_broker()->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_SASL_DISALLOWED;
this->state = WFT_STATE_TASK_ERROR;
Expand All @@ -670,8 +625,8 @@ bool __ComplexKafkaTask::process_sasl_handshake()

bool __ComplexKafkaTask::process_sasl_authenticate()
{
ctx_.kafka_error = this->get_resp()->get_broker()->get_error();
if (ctx_.kafka_error)
ctx_ = this->get_resp()->get_broker()->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_SASL_DISALLOWED;
this->state = WFT_STATE_TASK_ERROR;
Expand All @@ -681,18 +636,6 @@ bool __ComplexKafkaTask::process_sasl_authenticate()

bool __ComplexKafkaTask::has_next()
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
//always success
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);

const struct sockaddr *paddr = (const struct sockaddr *)&addr;
if (!this->get_resp()->get_broker()->is_to_addr())
{
this->get_resp()->get_broker()->set_broker_addr(paddr, addrlen);
this->get_resp()->get_broker()->set_to_addr(1);
}

switch (this->get_resp()->get_api_type())
{
case Kafka_Produce:
Expand All @@ -718,8 +661,8 @@ bool __ComplexKafkaTask::has_next()
case Kafka_LeaveGroup:
case Kafka_DescribeGroups:
case Kafka_Heartbeat:
ctx_.kafka_error = this->get_resp()->get_cgroup()->get_error();
if (ctx_.kafka_error)
ctx_ = this->get_resp()->get_cgroup()->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_CGROUP_FAILED;
this->state = WFT_STATE_TASK_ERROR;
Expand Down Expand Up @@ -752,10 +695,8 @@ bool __ComplexKafkaTask::finish_once()
{
this->get_req()->clear_buf();
is_redirect_ = false;
return true;
}

if (this->state == WFT_STATE_SUCCESS)
else if (this->state == WFT_STATE_SUCCESS)
{
if (!is_user_request_)
{
Expand All @@ -772,14 +713,11 @@ bool __ComplexKafkaTask::finish_once()
}
else
{
this->disable_retry();
this->get_resp()->set_api_type(this->get_req()->get_api_type());
this->get_resp()->set_api_version(this->get_req()->get_api_version());
}

if (ctx_.cb)
ctx_.cb(this);

is_user_request_ = true;
return true;
}

Expand Down Expand Up @@ -809,31 +747,19 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri,
return task;
}

__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const struct sockaddr *addr,
socklen_t addrlen,
const std::string& info,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));

task->init(TT_TCP, addr, addrlen, info);
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
return task;
}

__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const char *host,
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(enum TransportType type,
const char *host,
unsigned short port,
const std::string& info,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));

std::string url = "kafka://";
std::string url = (type == TT_TCP_SSL ? "kafkas://" : "kafka://");

if (!info.empty())
url += info;
url += info + "@";

url += host;
url += ":" + std::to_string(port);
Expand Down
16 changes: 2 additions & 14 deletions src/factory/KafkaTaskImpl.inl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
Authors: Wang Zhulei ([email protected])
*/

#include <set>
#include "WFTaskFactory.h"
#include "KafkaMessage.h"

Expand All @@ -40,22 +39,11 @@ public:
int retry_max,
__kafka_callback_t callback);

static __WFKafkaTask *create_kafka_task(const struct sockaddr *addr,
socklen_t addrlen,
const std::string& info,
int retry_max,
__kafka_callback_t callback);

static __WFKafkaTask *create_kafka_task(const char *host,
static __WFKafkaTask *create_kafka_task(enum TransportType type,
const char *host,
unsigned short port,
const std::string& info,
int retry_max,
__kafka_callback_t callback);
};

struct __ComplexKafkaTaskCtx
{
int kafka_error;
__kafka_callback_t cb;
};

4 changes: 2 additions & 2 deletions src/kernel/Communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ int Communicator::send_message_async(struct iovec vectors[], int cnt,
return 1;
}

#define ENCODE_IOV_MAX 8192
#define ENCODE_IOV_MAX 2048

int Communicator::send_message(struct CommConnEntry *entry)
{
Expand Down Expand Up @@ -1421,7 +1421,7 @@ int Communicator::create_poller(size_t poller_threads)
if ((ssize_t)params.max_open_files < 0)
return -1;

this->msgqueue = msgqueue_create(4096, sizeof (struct poller_result));
this->msgqueue = msgqueue_create(16 * 1024, sizeof (struct poller_result));
if (this->msgqueue)
{
params.context = this->msgqueue;
Expand Down
9 changes: 6 additions & 3 deletions src/kernel/Executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,16 @@ void Executor::executor_thread_routine(void *context)
ExecQueue *queue = (ExecQueue *)context;
struct ExecSessionEntry *entry;
ExecSession *session;
int empty;

pthread_mutex_lock(&queue->mutex);
entry = list_entry(queue->session_list.next, struct ExecSessionEntry, list);
pthread_mutex_lock(&queue->mutex);
list_del(&entry->list);
empty = list_empty(&queue->session_list);
pthread_mutex_unlock(&queue->mutex);

session = entry->session;
if (!list_empty(&queue->session_list))
if (!empty)
{
struct thrdpool_task task = {
.routine = Executor::executor_thread_routine,
Expand All @@ -94,7 +98,6 @@ void Executor::executor_thread_routine(void *context)
else
free(entry);

pthread_mutex_unlock(&queue->mutex);
session->execute();
session->handle(ES_STATE_FINISHED, 0);
}
Expand Down
Loading
Loading