Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rotate02 #435

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion c/src/core/buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ static void pn_buffer_rotate (pn_buffer_t *buf, size_t sz) {

static inline int pn_buffer_defrag(pn_buffer_t *buf)
{
pn_buffer_rotate(buf, buf->start);
if (pni_buffer_wrapped(buf))
pn_buffer_rotate(buf, buf->start);
else
memmove(buf->bytes, buf->bytes + buf->start, buf->size);
Comment on lines +239 to +242
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic is actually part of the _rotate function and should be there.

Also our code formatting has the braces explicitly present for this format of if:

if (...) {
  ...
} else {
  ...
}'''

buf->start = 0;
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions c/src/core/engine-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ struct pn_delivery_t {
pn_delivery_state_t state;
pn_buffer_t *bytes;
pn_record_t *context;
uint32_t bytes_offset; // start of content remaining to send on transport
bool updated;
bool settled; // tracks whether we're in the unsettled list or not
bool work;
Expand Down
31 changes: 28 additions & 3 deletions c/src/core/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,7 @@ static void pn_delivery_finalize(void *object)
pn_bytes_free(delivery->tag);
delivery->tag = (pn_delivery_tag_t){0, NULL};
pn_buffer_clear(delivery->bytes);
delivery->bytes_offset = 0;
pn_record_clear(delivery->context);
delivery->settled = true;
pn_connection_t *conn = link->session->connection;
Expand Down Expand Up @@ -1728,6 +1729,7 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
delivery->tpwork_prev = NULL;
delivery->tpwork = false;
pn_buffer_clear(delivery->bytes);
delivery->bytes_offset = 0;
delivery->done = false;
delivery->aborted = false;
pn_record_clear(delivery->context);
Expand All @@ -1751,6 +1753,12 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
return delivery;
}

static uint32_t pni_delivery_buffer_size(pn_delivery_t *delivery)
{
assert(pn_buffer_size(delivery->bytes) >= delivery->bytes_offset);
return pn_buffer_size(delivery->bytes) - delivery->bytes_offset;
}

bool pn_delivery_buffered(pn_delivery_t *delivery)
{
assert(delivery);
Expand All @@ -1760,7 +1768,7 @@ bool pn_delivery_buffered(pn_delivery_t *delivery)
if (state->sent) {
return false;
} else {
return delivery->done || (pn_buffer_size(delivery->bytes) > 0);
return delivery->done || (pni_delivery_buffer_size(delivery) > 0);
}
} else {
return false;
Expand Down Expand Up @@ -2062,6 +2070,22 @@ ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n)
pn_delivery_t *current = pn_link_current(sender);
if (!current) return PN_EOS;
if (!bytes || !n) return 0;
// Performance optimization: if alternating several writes into and multiple reads
// from the bytes pn_buffer_t, only a single trim+defrag is necessary at transition
// from read to write.
if (current->bytes_offset) {
// Streaming message. Update bytes buffer accounting.
pn_buffer_trim(current->bytes, current->bytes_offset, 0);
current->bytes_offset = 0;
// Expensive defrag/rotate is here. Future calls to pn_buffer_bytes() are fast if no trim.
pn_buffer_bytes(current->bytes);
if (current->tpwork) {
// Some content was sent: bytes_offset > 0. Give other senders a turn.
pn_connection_t *connection = current->link->session->connection;
LL_REMOVE(connection, tpwork, current);
current->tpwork = false;
}
}
pn_buffer_append(current->bytes, bytes, n);
sender->session->outgoing_bytes += n;
pni_add_tpwork(current);
Expand Down Expand Up @@ -2259,7 +2283,7 @@ size_t pn_delivery_pending(pn_delivery_t *delivery)
the PN_ABORTED error return code.
*/
if (delivery->aborted) return 1;
return pn_buffer_size(delivery->bytes);
return pni_delivery_buffer_size(delivery);
}

bool pn_delivery_partial(pn_delivery_t *delivery)
Expand All @@ -2271,8 +2295,9 @@ void pn_delivery_abort(pn_delivery_t *delivery) {
if (!delivery->local.settled) { /* Can't abort a settled delivery */
delivery->aborted = true;
pn_delivery_settle(delivery);
delivery->link->session->outgoing_bytes -= pn_buffer_size(delivery->bytes);
delivery->link->session->outgoing_bytes -= pni_delivery_buffer_size(delivery);
pn_buffer_clear(delivery->bytes);
delivery->bytes_offset = 0;
}
}

Expand Down
20 changes: 14 additions & 6 deletions c/src/core/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -2209,15 +2209,22 @@ static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *d
pn_session_state_t *ssn_state = &link->session->state;
pn_link_state_t *link_state = &link->state;
bool xfr_posted = false;
uint32_t unsent = pn_buffer_size(delivery->bytes) - delivery->bytes_offset;
if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
if (!state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
if (!state->sent && (delivery->done || unsent > 0) &&
ssn_state->remote_incoming_window > 0 && link_state->link_credit > 0) {
if (!state->init) {
state = pni_delivery_map_push(&ssn_state->outgoing, delivery);
}

// Content may be consumed in chunks via multiple calls to transport_produce().
// pn_link_send() ensures next call is always fast: buffer already in rotated state.
pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
size_t full_size = bytes.size;
if (delivery->bytes_offset) {
// Account for previous sent data while avoiding expensive buffer rotate.
bytes.size -= delivery->bytes_offset;
bytes.start += delivery->bytes_offset;
}
size_t remaining_size = bytes.size;
int count = pni_post_amqp_transfer_frame(transport,
ssn_state->local_channel,
link_state->local_handle,
Expand All @@ -2237,10 +2244,11 @@ static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *d
ssn_state->outgoing_transfer_count += count;
ssn_state->remote_incoming_window -= count;

int sent = full_size - bytes.size;
pn_buffer_trim(delivery->bytes, sent, 0);
int sent = remaining_size - bytes.size;
delivery->bytes_offset += sent;
link->session->outgoing_bytes -= sent;
if (!pn_buffer_size(delivery->bytes) && delivery->done) {
remaining_size -= sent;
if (!remaining_size && delivery->done) {
state->sent = true;
link_state->delivery_count++;
link_state->link_credit--;
Expand Down