Skip to content

Commit

Permalink
QUIC - RTT handling code, improved packet expiry
Browse files Browse the repository at this point in the history
  • Loading branch information
nbridge-jump committed Jan 22, 2025
1 parent 5c2707e commit 132d916
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 51 deletions.
174 changes: 133 additions & 41 deletions src/waltz/quic/fd_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
#define MAP_QUERY_OPT 1
#include "../../util/tmpl/fd_map_dynamic.c"

/* FD_QUIC_PING_ENABLE
/* FD_QUIC_KEEP_ALIVE
*
* This compile time option specifies whether the server should use
* QUIC PING frames to keep connections alive
*
* Set to 1 to keep connections alive
* Set to 0 to allow connections to close on idle */
# define FD_QUIC_PING_ENABLE 0
# define FD_QUIC_KEEP_ALIVE 0

/* FD_QUIC_MAX_STREAMS_ALWAYS */
/* Defines whether a MAX_STREAMS frame is sent even if it was just */
Expand All @@ -56,8 +56,6 @@
/* the most recent value */
# define FD_QUIC_MAX_STREAMS_ALWAYS 0



/* Construction API ***************************************************/

FD_QUIC_API FD_FN_CONST ulong
Expand Down Expand Up @@ -2168,7 +2166,6 @@ fd_quic_handle_v1_one_rtt( fd_quic_t * quic,

/* update last activity */
conn->last_activity = fd_quic_get_state( quic )->now;
conn->flags &= ~( FD_QUIC_CONN_FLAGS_PING_SENT | FD_QUIC_CONN_FLAGS_PING );

/* update expected packet number */
conn->exp_pkt_number[2] = fd_ulong_max( conn->exp_pkt_number[2], pkt_number+1UL );
Expand Down Expand Up @@ -2290,6 +2287,10 @@ fd_quic_process_quic_packet_v1( fd_quic_t * quic,
int ack_type = fd_quic_lazy_ack_pkt( quic, conn, pkt );
quic->metrics.ack_tx[ ack_type ]++;

if( pkt->rtt_ack_time ) {
fd_quic_sample_rtt( conn, (long)pkt->rtt_ack_time, (long)pkt->rtt_ack_delay );
}

/* return bytes consumed */
return (ulong)( cur_ptr - orig_ptr );
}
Expand All @@ -2316,7 +2317,9 @@ fd_quic_process_packet( fd_quic_t * quic,

fd_quic_pkt_t pkt = { .datagram_sz = (uint)data_sz };

pkt.rcv_time = state->now;
pkt.rcv_time = state->now;
pkt.rtt_pkt_number = 0;
pkt.rtt_ack_time = 0;

/* parse ip, udp */

Expand Down Expand Up @@ -2719,6 +2722,24 @@ fd_quic_tls_cb_peer_params( void * context,
conn->idle_timeout = fd_ulong_min( (ulong)(1e6) * peer_tp->max_idle_timeout, conn->idle_timeout );
}

/* set ack_delay_exponent so we can properly interpret peer's ack_delays
if unspecified, the value is 3 */
ulong peer_ack_delay_exponent = fd_ulong_if(
peer_tp->ack_delay_exponent_present,
peer_tp->ack_delay_exponent,
3UL );

float tick_per_us = (float)conn->quic->config.tick_per_us;
conn->rtt->peer_ack_delay_scale = (float)( 1UL << peer_ack_delay_exponent ) * tick_per_us;

/* peer max ack delay in microseconds
peer_tp->max_ack_delay is milliseconds */
float peer_max_ack_delay_us = (float)fd_ulong_if(
peer_tp->max_ack_delay_present,
peer_tp->max_ack_delay * 1000UL,
25000UL );
conn->rtt->peer_max_ack_delay_ticks = peer_max_ack_delay_us * tick_per_us;

conn->transport_params_set = 1;
}

Expand Down Expand Up @@ -2862,7 +2883,7 @@ fd_quic_svc_poll( fd_quic_t * quic,
conn->state = FD_QUIC_CONN_STATE_DEAD;
quic->metrics.conn_timeout_cnt++;
}
} else if( FD_QUIC_PING_ENABLE ) {
} else if( FD_QUIC_KEEP_ALIVE ) {
/* send PING */
if( !( conn->flags & FD_QUIC_CONN_FLAGS_PING ) ) {
conn->flags |= FD_QUIC_CONN_FLAGS_PING;
Expand All @@ -2872,6 +2893,15 @@ fd_quic_svc_poll( fd_quic_t * quic,
}
}

if( now > conn->last_ack + (ulong)conn->rtt->rtt_period_ticks ) {
/* send PING */
if( !( conn->flags & ( FD_QUIC_CONN_FLAGS_PING | FD_QUIC_CONN_FLAGS_PING_SENT ) )
&& conn->state == FD_QUIC_CONN_STATE_ACTIVE ) {
conn->flags |= FD_QUIC_CONN_FLAGS_PING;
conn->upd_pkt_number = FD_QUIC_PKT_NUM_PENDING; /* update to be sent in next packet */
}
}

if( FD_UNLIKELY( conn->state == FD_QUIC_CONN_STATE_DEAD ) ) {
fd_quic_cb_conn_final( quic, conn ); /* inform user before freeing */
fd_quic_conn_free( quic, conn );
Expand Down Expand Up @@ -3133,7 +3163,7 @@ fd_quic_gen_close_frame( fd_quic_conn_t * conn,

/* update packet meta */
pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_CLOSE;
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, now + 3u * conn->rtt );
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, fd_quic_calc_expiry( conn, now ) );
return frame_sz;
}

Expand Down Expand Up @@ -3209,7 +3239,7 @@ fd_quic_gen_handshake_frames( fd_quic_conn_t * conn,
pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_HS_DATA;
pkt_meta->range.offset_lo = offset_lo;
pkt_meta->range.offset_hi = offset_hi;
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, now + 3u * conn->rtt );
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, fd_quic_calc_expiry( conn, now ) );
}

return payload_ptr;
Expand All @@ -3227,7 +3257,7 @@ fd_quic_gen_handshake_done_frame( fd_quic_conn_t * conn,
if( FD_UNLIKELY( payload_ptr >= payload_end ) ) return 0UL;
/* send handshake done frame */
pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_HS_DONE;
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, now + 3u * conn->rtt );
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, fd_quic_calc_expiry( conn, now ) );
payload_ptr[0] = 0x1E;
return 1UL;
}
Expand Down Expand Up @@ -3256,7 +3286,7 @@ fd_quic_gen_max_data_frame( fd_quic_conn_t * conn,
/* set flag on pkt meta */
if( pkt_meta->var_sz < FD_QUIC_PKT_META_VAR_MAX ) {
pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_MAX_DATA;
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, now + 3u * conn->rtt );
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, fd_quic_calc_expiry( conn, now ) );
pkt_meta->var[pkt_meta->var_sz].key =
(fd_quic_pkt_meta_key_t){
.type = FD_QUIC_PKT_META_TYPE_OTHER,
Expand Down Expand Up @@ -3301,7 +3331,7 @@ fd_quic_gen_max_streams_frame( fd_quic_conn_t * conn,

if( pkt_meta->var_sz < FD_QUIC_PKT_META_VAR_MAX ) {
pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_MAX_STREAMS_UNIDIR;
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, now + 3u * conn->rtt );
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, fd_quic_calc_expiry( conn, now ) );
pkt_meta->var[pkt_meta->var_sz].key = (fd_quic_pkt_meta_key_t){
.type = FD_QUIC_PKT_META_TYPE_OTHER,
.flags = FD_QUIC_CONN_FLAGS_MAX_STREAMS_UNIDIR
Expand All @@ -3315,10 +3345,12 @@ fd_quic_gen_max_streams_frame( fd_quic_conn_t * conn,
}

static ulong
fd_quic_gen_ping_frame( fd_quic_conn_t * conn,
uchar * payload_ptr,
uchar * payload_end,
ulong pkt_number ) {
fd_quic_gen_ping_frame( fd_quic_conn_t * conn,
uchar * payload_ptr,
uchar * payload_end,
fd_quic_pkt_meta_t * pkt_meta,
ulong pkt_number,
ulong now ) {

if( ~conn->flags & FD_QUIC_CONN_FLAGS_PING ) return 0UL;
if( conn->flags & FD_QUIC_CONN_FLAGS_PING_SENT ) return 0UL;
Expand All @@ -3329,8 +3361,14 @@ fd_quic_gen_ping_frame( fd_quic_conn_t * conn,
&ping );
if( FD_UNLIKELY( frame_sz==FD_QUIC_ENCODE_FAIL ) ) return 0UL;
conn->flags |= FD_QUIC_CONN_FLAGS_PING_SENT;
conn->flags &= ~FD_QUIC_CONN_FLAGS_PING;

conn->upd_pkt_number = pkt_number;

/* update packet metadata */
pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_PING;
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, fd_quic_calc_expiry( conn, now ) );

return frame_sz;
}

Expand Down Expand Up @@ -3414,8 +3452,7 @@ fd_quic_gen_stream_frames( fd_quic_conn_t * conn,

/* Packet metadata for potential retransmits */
pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_STREAM;
/* FIXME don't hardcode RTT */
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, now + 3u * conn->rtt );
pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, fd_quic_calc_expiry( conn, now ) );
pkt_meta->var[pkt_meta->var_sz].key =
FD_QUIC_PKT_META_KEY( FD_QUIC_PKT_META_TYPE_STREAM_DATA, 0, cur_stream->stream_id );
pkt_meta->var[pkt_meta->var_sz].range.offset_lo = stream_off;
Expand Down Expand Up @@ -3466,7 +3503,7 @@ fd_quic_gen_frames( fd_quic_conn_t * conn,
if( conn->upd_pkt_number >= pkt_number ) {
payload_ptr += fd_quic_gen_max_data_frame ( conn, payload_ptr, payload_end, pkt_meta, pkt_number, now );
payload_ptr += fd_quic_gen_max_streams_frame( conn, payload_ptr, payload_end, pkt_meta, pkt_number, now );
payload_ptr += fd_quic_gen_ping_frame ( conn, payload_ptr, payload_end, pkt_number );
payload_ptr += fd_quic_gen_ping_frame ( conn, payload_ptr, payload_end, pkt_meta, pkt_number, now );
}
if( FD_LIKELY( !conn->tls_hs ) ) {
payload_ptr = fd_quic_gen_stream_frames( conn, payload_ptr, payload_end, pkt_meta, pkt_number, now );
Expand Down Expand Up @@ -3560,6 +3597,9 @@ fd_quic_conn_tx( fd_quic_t * quic,
/* initialize expiry */
pkt_meta->expiry = now + conn->idle_timeout - (ulong)50e3;

/* initialize tx_time */
pkt_meta->tx_time = now;

/* remaining in datagram */
/* invariant: tx_buf >= tx_ptr */
ulong datagram_rem = tx_max_datagram_sz - (ulong)( conn->tx_ptr - conn->tx_buf );
Expand Down Expand Up @@ -4057,9 +4097,9 @@ fd_quic_conn_free( fd_quic_t * quic,
quic->metrics.conn_active_cnt--;

/* clear keys */
fd_memset( &conn->secrets, 0, sizeof(fd_quic_crypto_secrets_t) );
fd_memset( conn->keys, 0, sizeof( conn->keys ) );
fd_memset( conn->new_keys, 0, sizeof( conn->new_keys ) );
memset( &conn->secrets, 0, sizeof(fd_quic_crypto_secrets_t) );
memset( conn->keys, 0, sizeof( conn->keys ) );
memset( conn->new_keys, 0, sizeof( conn->new_keys ) );
}

fd_quic_conn_t *
Expand Down Expand Up @@ -4228,7 +4268,7 @@ fd_quic_conn_create( fd_quic_t * quic,
config->net.listen_udp_port,
state->next_ephem_udp_port )
};
fd_memset( &conn->peer[0], 0, sizeof( conn->peer ) );
memset( &conn->peer[0], 0, sizeof( conn->peer ) );
conn->conn_gen++;
conn->token_len = 0;

Expand Down Expand Up @@ -4274,17 +4314,16 @@ fd_quic_conn_create( fd_quic_t * quic,
MUST increase the packet number by at least 1
rfc9002: s3
It is permitted for some packet numbers to never be used, leaving intentional gaps. */
fd_memset( conn->exp_pkt_number, 0, sizeof( conn->exp_pkt_number ) );
fd_memset( conn->last_pkt_number, 0, sizeof( conn->last_pkt_number ) );
fd_memset( conn->pkt_number, 0, sizeof( conn->pkt_number ) );
memset( conn->exp_pkt_number, 0, sizeof( conn->exp_pkt_number ) );
memset( conn->last_pkt_number, 0, sizeof( conn->last_pkt_number ) );
memset( conn->pkt_number, 0, sizeof( conn->pkt_number ) );

/* TODO lots of fd_memset calls that should really be builtin memset */
fd_memset( conn->hs_sent_bytes, 0, sizeof( conn->hs_sent_bytes ) );
fd_memset( conn->hs_ackd_bytes, 0, sizeof( conn->hs_ackd_bytes ) );
memset( conn->hs_sent_bytes, 0, sizeof( conn->hs_sent_bytes ) );
memset( conn->hs_ackd_bytes, 0, sizeof( conn->hs_ackd_bytes ) );

fd_memset( &conn->secrets, 0, sizeof( conn->secrets ) );
fd_memset( &conn->keys, 0, sizeof( conn->keys ) );
fd_memset( &conn->new_keys, 0, sizeof( conn->new_keys ) );
memset( &conn->secrets, 0, sizeof( conn->secrets ) );
memset( &conn->keys, 0, sizeof( conn->keys ) );
memset( &conn->new_keys, 0, sizeof( conn->new_keys ) );
/* suites initialized above */

conn->key_phase = 0;
Expand Down Expand Up @@ -4320,7 +4359,20 @@ fd_quic_conn_create( fd_quic_t * quic,
conn->tx_tot_data = 0;

/* initial rtt */
conn->rtt = (ulong)500e6;
/* overridden when acks start returning */
fd_quic_conn_rtt_t * rtt = conn->rtt;

ulong peer_ack_delay_exponent = 3UL; /* by spec, default is 3 */
rtt->peer_ack_delay_scale = (float)( 1UL << peer_ack_delay_exponent )
* (float)quic->config.tick_per_us;
rtt->peer_max_ack_delay_ticks = 0.0f; /* starts at zero, since peers respond immediately to */
/* INITIAL and HANDSHAKE */
/* updated when we get transport parameters */
rtt->smoothed_rtt = FD_QUIC_INITIAL_RTT_US * (float)quic->config.tick_per_us;
rtt->latest_rtt = FD_QUIC_INITIAL_RTT_US * (float)quic->config.tick_per_us;
rtt->min_rtt = FD_QUIC_INITIAL_RTT_US * (float)quic->config.tick_per_us;
rtt->var_rtt = FD_QUIC_INITIAL_RTT_US * (float)quic->config.tick_per_us * 0.5f;
rtt->rtt_period_ticks = FD_QUIC_RTT_PERIOD_US * (float)quic->config.tick_per_us;

/* highest peer encryption level */
conn->peer_enc_level = 0;
Expand All @@ -4329,8 +4381,8 @@ fd_quic_conn_create( fd_quic_t * quic,
conn->idle_timeout = config->idle_timeout;
conn->last_activity = state->now;

fd_memset( conn->exp_pkt_number, 0, sizeof( conn->exp_pkt_number ) );
fd_memset( conn->last_pkt_number, 0, sizeof( conn->last_pkt_number ) );
memset( conn->exp_pkt_number, 0, sizeof( conn->exp_pkt_number ) );
memset( conn->last_pkt_number, 0, sizeof( conn->last_pkt_number ) );

/* update metrics */
quic->metrics.conn_active_cnt++;
Expand Down Expand Up @@ -4578,6 +4630,11 @@ fd_quic_pkt_meta_retry( fd_quic_t * quic,
conn->flags &= ~FD_QUIC_CONN_FLAGS_CLOSE_SENT;
conn->upd_pkt_number = FD_QUIC_PKT_NUM_PENDING;
}
if( flags & FD_QUIC_PKT_META_FLAGS_PING ) {
conn->flags = ( conn->flags & ~FD_QUIC_CONN_FLAGS_PING_SENT )
| FD_QUIC_CONN_FLAGS_PING;
conn->upd_pkt_number = FD_QUIC_PKT_NUM_PENDING;
}

/* reschedule to ensure the data gets processed */
fd_quic_svc_schedule1( conn, FD_QUIC_SVC_INSTANT );
Expand Down Expand Up @@ -4606,6 +4663,10 @@ fd_quic_reclaim_pkt_meta( fd_quic_conn_t * conn,
uint flags = pkt_meta->flags;
fd_quic_range_t range = pkt_meta->range;

if( flags & FD_QUIC_PKT_META_FLAGS_PING ) {
conn->flags &= ~( FD_QUIC_CONN_FLAGS_PING | FD_QUIC_CONN_FLAGS_PING_SENT );
}

if( flags & FD_QUIC_PKT_META_FLAGS_HS_DATA ) {
/* Note that tls_hs could already be freed */
/* is this ack'ing the next consecutive bytes?
Expand Down Expand Up @@ -4866,13 +4927,19 @@ fd_quic_process_lost( fd_quic_conn_t * conn, uint enc_level, ulong cnt ) {
/* process ack range
applies to pkt_number in [largest_ack - ack_range, largest_ack] */
void
fd_quic_process_ack_range( fd_quic_conn_t * conn,
uint enc_level,
ulong largest_ack,
ulong ack_range ) {
fd_quic_process_ack_range( fd_quic_conn_t * conn,
fd_quic_frame_ctx_t * context,
uint enc_level,
ulong largest_ack,
ulong ack_range,
int is_largest,
ulong now,
ulong ack_delay ) {
/* FIXME: This would benefit from algorithmic improvements */
/* FIXME: Close connection if peer ACKed a higher packet number than we sent */

fd_quic_pkt_t * pkt = context->pkt;

/* inclusive range */
ulong hi = largest_ack;
ulong lo = largest_ack - ack_range;
Expand All @@ -4897,6 +4964,14 @@ fd_quic_process_ack_range( fd_quic_conn_t * conn,

/* packet number is in range, so reclaim the resources */
if( pkt_meta->pkt_number <= hi ) {

/* note: rtt_pkt_number is zero when unused, so using >= for the test */
if( is_largest && pkt_meta->pkt_number == hi && hi >= pkt->rtt_pkt_number ) {
pkt->rtt_pkt_number = hi;
pkt->rtt_ack_time = now - pkt_meta->tx_time; /* in ticks */
pkt->rtt_ack_delay = ack_delay; /* in peer units */
}

fd_quic_reclaim_pkt_meta( conn,
pkt_meta,
enc_level );
Expand Down Expand Up @@ -4933,12 +5008,22 @@ fd_quic_handle_ack_frame(
return FD_QUIC_PARSE_FAIL;
}

fd_quic_state_t * state = fd_quic_get_state( context->quic );
conn->last_ack = state->now;

/* track lowest packet acked */
ulong low_ack_pkt_number = data->largest_ack - data->first_ack_range;

/* process ack range
applies to pkt_number in [largest_ack - first_ack_range, largest_ack] */
fd_quic_process_ack_range( conn, enc_level, data->largest_ack, data->first_ack_range );
fd_quic_process_ack_range( conn,
context,
enc_level,
data->largest_ack,
data->first_ack_range,
1 /* is_largest */,
state->now,
data->ack_delay );

uchar const * p_str = p;
uchar const * p_end = p + p_sz;
Expand Down Expand Up @@ -4995,7 +5080,14 @@ fd_quic_handle_ack_frame(
low_ack_pkt_number = fd_ulong_min( low_ack_pkt_number, lo_pkt_number );

/* process ack range */
fd_quic_process_ack_range( conn, enc_level, cur_pkt_number - skip, length );
fd_quic_process_ack_range( conn,
context,
enc_level,
cur_pkt_number - skip,
length,
0 /* is_largest */,
state->now,
0 /* ack_delay not used here */ );

/* Find the next lowest processed and acknowledged packet number
This should get us to the next lowest processed and acknowledged packet
Expand Down
Loading

0 comments on commit 132d916

Please sign in to comment.