Skip to content

Commit

Permalink
get circumscribed start
Browse files Browse the repository at this point in the history
  • Loading branch information
gayyappan committed Dec 19, 2024
1 parent 6ff123e commit 0de7b82
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 3 deletions.
16 changes: 16 additions & 0 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,22 @@ ts_compute_circumscribed_bucketed_refresh_window_variable(int64 *start, int64 *e
*end = ts_time_value_to_internal(end_new, TIMESTAMPOID);
}

int64
ts_compute_circumscribed_bucketed_refresh_window_start_variable(
int64 start, const ContinuousAggsBucketFunction *bf)
{
Datum start_old, start_new;

/*
* It's OK to use TIMESTAMPOID here.
* See the comment in ts_compute_inscribed_bucketed_refresh_window_variable()
*/
start_old = ts_internal_to_time_value(start, TIMESTAMPOID);
start_new = generic_time_bucket(bf, start_old);

return ts_time_value_to_internal(start_new, TIMESTAMPOID);
}

/*
* Calculates the beginning of the next bucket.
*
Expand Down
3 changes: 3 additions & 0 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,6 @@ extern TSDLLEXPORT Query *ts_continuous_agg_get_query(ContinuousAgg *cagg);

extern TSDLLEXPORT int64
ts_continuous_agg_fixed_bucket_width(const ContinuousAggsBucketFunction *bucket_function);

extern TSDLLEXPORT int64 ts_compute_circumscribed_bucketed_refresh_window_start_variable(
int64 start, const ContinuousAggsBucketFunction *bf);
85 changes: 82 additions & 3 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,56 @@ compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
return result;
}

static int64
compute_circumscribed_refresh_window_start(const ContinuousAgg *cagg, const InternalTimeRange *const refresh_window,
const ContinuousAggsBucketFunction *bucket_function)
{
Assert(refresh_window != NULL);
Assert(bucket_function != NULL);

if (bucket_function->bucket_fixed_interval == false)
{
InternalTimeRange result = *refresh_window;
result.start =
ts_compute_circumscribed_bucketed_refresh_window_start_variable(refresh_window->start,
bucket_function);
return result.start;
}

/* Interval is fixed */
int64 bucket_width = ts_continuous_agg_fixed_bucket_width(bucket_function);
Assert(bucket_width > 0);

InternalTimeRange result = *refresh_window;
InternalTimeRange largest_bucketed_window =
get_largest_bucketed_window(refresh_window->type, bucket_width);

/* Get offset and origin for bucket function */
NullableDatum offset = INIT_NULL_DATUM;
NullableDatum origin = INIT_NULL_DATUM;
fill_bucket_offset_origin(cagg, refresh_window, &offset, &origin);

/* Defined offset and origin in one function is not supported */
Assert(offset.isnull == true || origin.isnull == true);

if (refresh_window->start <= largest_bucketed_window.start)
{
result.start = largest_bucketed_window.start;
}
else
{
/* For alignment with a bucket, which includes the start of the refresh window, we just
* need to get start of the bucket. */
result.start = ts_time_bucket_by_type_extended(bucket_width,
refresh_window->start,
refresh_window->type,
offset,
origin);
}

return result.start;
}

/*
* Initialize the refresh state for a continuous aggregate.
*
Expand Down Expand Up @@ -790,42 +840,66 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
* still take a long time and it is probably best for consistency to always
* prevent transaction blocks. */
PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME);
elog(NOTICE,
"HERE refrech window INPUT IS (%ld %ld) (start_is_null %d)",
refresh_window.start,
refresh_window.end,
start_isnull);

/* No bucketing when open ended */
if (!(start_isnull && end_isnull) || (start_isnull && !ts_guc_enable_osm_reads))
{
int64 mints = 0;
int64 mints = 0, bucket_mints = 0;
bool min_isnull = true;
if (start_isnull && ts_guc_enable_osm_reads == false)
{
/*set refresh window start to min(ts) of raw hypertable as tiered
* data is not visible */
InternalTimeRange tw = *refresh_window_arg;
const Hypertable *raw_ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id);
mints = ts_hypertable_get_open_dim_min_value(raw_ht, 0, &min_isnull);
tw.start = mints;
bucket_mints = compute_circumscribed_refresh_window_start(cagg, &tw, cagg->bucket_function);
elog(NOTICE, "got mints=%ld bucketmints=%ld ", mints, bucket_mints);
}
if (cagg->bucket_function->bucket_fixed_interval == false)
{
elog(NOTICE,
"NOT FIXED INTERVAL original (%ld %ld)",
refresh_window.start,
refresh_window.end);
if (!min_isnull)
refresh_window.start = mints;
refresh_window.start = bucket_mints;

ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
&refresh_window.end,
cagg->bucket_function);
elog(NOTICE,
"NOT FIXED INTERVAL adjusted (%ld %ld)",
refresh_window.start,
refresh_window.end);
}
else
{
InternalTimeRange refresh_window_arg_copy = *refresh_window_arg;
if (!min_isnull)
refresh_window_arg_copy.start = mints;
refresh_window_arg_copy.start = bucket_mints;
int64 bucket_width = ts_continuous_agg_fixed_bucket_width(cagg->bucket_function);
Assert(bucket_width > 0);
refresh_window =
compute_inscribed_bucketed_refresh_window(cagg,
(const InternalTimeRange
*) (&refresh_window_arg_copy),
bucket_width);
elog(NOTICE,
"FIXED INTERVAL orig(%ld %ld) adjusted (%ld %ld)",
refresh_window_arg_copy.start,
refresh_window_arg_copy.end,
refresh_window.start,
refresh_window.end);
}
}
elog(NOTICE, "HERE refrech window (%ld %ld)", refresh_window.start, refresh_window.end);
if (refresh_window.start >= refresh_window.end)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
Expand Down Expand Up @@ -875,6 +949,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
(IS_TIMESTAMP_TYPE(refresh_window.type) &&
invalidation_threshold == ts_time_get_min(refresh_window.type)))
{
elog(NOTICE,
"upto date (start=%ld end=%ld) threshold=%ld",
refresh_window.start,
refresh_window.end,
invalidation_threshold);
emit_up_to_date_notice(cagg, callctx);

/* Restore search_path */
Expand Down

0 comments on commit 0de7b82

Please sign in to comment.