Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh cagg uses min value for dimension when start_time is NULL #7546

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_7546
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7546 Continuous aggregate refresh uses min time based on chunk metadata when start_offset is NULL and `enable_tiered_reads` GUC is false.
2 changes: 1 addition & 1 deletion src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extern bool ts_guc_enable_cagg_reorder_groupby;
extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations;
extern bool ts_guc_enable_now_constify;
extern bool ts_guc_enable_foreign_key_propagation;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_osm_reads;
#if PG16_GE
extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown;
#endif
Expand Down
61 changes: 61 additions & 0 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -2402,6 +2402,67 @@ ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index,
return max_value;
}

/*
* Get the min value of an open dimension for the hypertable based on the dimension slice info
* Note: only takes non-tiered chunks into account.
*/
int64
ts_hypertable_get_open_dim_min_value(const Hypertable *ht, int dimension_index, bool *isnull)
{
StringInfo command;
const Dimension *dim;
int res;
bool min_isnull;
Datum mindat;
Oid timetype;

const char *query_str = " SELECT min(dimsl.range_start) FROM _timescaledb_catalog.chunk srcch \
INNER JOIN _timescaledb_catalog.hypertable ht ON ht.id = srcch.hypertable_id \
INNER JOIN _timescaledb_catalog.chunk_constraint chcons ON srcch.id = chcons.chunk_id \
INNER JOIN _timescaledb_catalog.dimension dim ON srcch.hypertable_id = dim.hypertable_id \
INNER JOIN _timescaledb_catalog.dimension_slice dimsl ON dim.id = dimsl.dimension_id \
AND chcons.dimension_slice_id = dimsl.id \
AND dimsl.id = %d and ht.id = %d AND srcch.osm_chunk = false";

dim = hyperspace_get_open_dimension(ht->space, dimension_index);

if (NULL == dim)
elog(ERROR, "invalid open dimension index %d", dimension_index);

timetype = ts_dimension_get_partition_type(dim);

/*
* Query for the oldest chunk in the hypertable.
*/
command = makeStringInfo();
appendStringInfo(command, query_str, ht->fd.id, dim->fd.id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct arguments order? From looking at the query string it seems dim->fd.id should come first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: psprintf would do the same as StringInfo here, but would be shorter.


if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");

res = SPI_execute(command->data, true /* read_only */, 0 /*count*/);

if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not find the minimum time value for hypertable \"%s\"",
get_rel_name(ht->main_table_relid)))));

mindat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &min_isnull);

if (isnull)
*isnull = min_isnull;

/* we fetch the int64 value from the dimension slice catalog. so read it back as int64 */
int64 min_value = min_isnull ? ts_time_get_min(timetype) : DatumGetInt64(mindat);

res = SPI_finish();
if (res != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res));

return min_value;
}

bool
ts_hypertable_has_compression_table(const Hypertable *ht)
{
Expand Down
2 changes: 2 additions & 0 deletions src/hypertable.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ extern TSDLLEXPORT bool ts_hypertable_set_compress_interval(Hypertable *ht,
int64 compress_interval);
extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_max_value(const Hypertable *ht,
int dimension_index, bool *isnull);
extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_min_value(const Hypertable *ht,
int dimension_index, bool *isnull);

extern TSDLLEXPORT bool ts_hypertable_has_compression_table(const Hypertable *ht);
extern TSDLLEXPORT void ts_hypertable_formdata_fill(FormData_hypertable *fd, const TupleInfo *ti);
Expand Down
16 changes: 16 additions & 0 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,22 @@ ts_compute_circumscribed_bucketed_refresh_window_variable(int64 *start, int64 *e
*end = ts_time_value_to_internal(end_new, TIMESTAMPOID);
}

int64
ts_compute_circumscribed_bucketed_refresh_window_start_variable(
int64 start, const ContinuousAggsBucketFunction *bf)
{
Datum start_old, start_new;

/*
* It's OK to use TIMESTAMPOID here.
* See the comment in ts_compute_inscribed_bucketed_refresh_window_variable()
*/
start_old = ts_internal_to_time_value(start, TIMESTAMPOID);
start_new = generic_time_bucket(bf, start_old);

return ts_time_value_to_internal(start_new, TIMESTAMPOID);
}

/*
* Calculates the beginning of the next bucket.
*
Expand Down
3 changes: 3 additions & 0 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,6 @@ extern TSDLLEXPORT Query *ts_continuous_agg_get_query(ContinuousAgg *cagg);

extern TSDLLEXPORT int64
ts_continuous_agg_fixed_bucket_width(const ContinuousAggsBucketFunction *bucket_function);

extern TSDLLEXPORT int64 ts_compute_circumscribed_bucketed_refresh_window_start_variable(
int64 start, const ContinuousAggsBucketFunction *bf);
107 changes: 101 additions & 6 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ static InternalTimeRange
compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
const ContinuousAggsBucketFunction *bucket_function);
static int64
compute_circumscribed_refresh_window_start(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
const ContinuousAggsBucketFunction *bucket_function);
static void continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window);
static void continuous_agg_refresh_execute(const CaggRefreshState *refresh,
Expand Down Expand Up @@ -372,6 +376,57 @@ compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
return result;
}

static int64
compute_circumscribed_refresh_window_start(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
const ContinuousAggsBucketFunction *bucket_function)
{
Assert(refresh_window != NULL);
Assert(bucket_function != NULL);

if (bucket_function->bucket_fixed_interval == false)
{
InternalTimeRange result = *refresh_window;
result.start =
ts_compute_circumscribed_bucketed_refresh_window_start_variable(refresh_window->start,
bucket_function);
return result.start;
}

/* Interval is fixed */
int64 bucket_width = ts_continuous_agg_fixed_bucket_width(bucket_function);
Assert(bucket_width > 0);

InternalTimeRange result = *refresh_window;
InternalTimeRange largest_bucketed_window =
get_largest_bucketed_window(refresh_window->type, bucket_width);

/* Get offset and origin for bucket function */
NullableDatum offset = INIT_NULL_DATUM;
NullableDatum origin = INIT_NULL_DATUM;
fill_bucket_offset_origin(cagg, refresh_window, &offset, &origin);

/* Defined offset and origin in one function is not supported */
Assert(offset.isnull == true || origin.isnull == true);

if (refresh_window->start <= largest_bucketed_window.start)
{
result.start = largest_bucketed_window.start;
}
else
{
/* For alignment with a bucket, which includes the start of the refresh window, we just
* need to get start of the bucket. */
result.start = ts_time_bucket_by_type_extended(bucket_width,
refresh_window->start,
refresh_window->type,
offset,
origin);
}

return result.start;
}

/*
* Initialize the refresh state for a continuous aggregate.
*
Expand Down Expand Up @@ -794,26 +849,61 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
* still take a long time and it is probably best for consistency to always
* prevent transaction blocks. */
PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME);

/* No bucketing when open ended */
if (!(start_isnull && end_isnull))
/* elog(NOTICE,
"refresh window INPUT IS (%ld %ld) (start_is_null %d)",
refresh_window.start,
refresh_window.end,
start_isnull);
*/
/* No bucketing when open ended .
* Special case: if we have OSM reads disabled, use bucketed start so that
* the refresh window reflects the data that is visible to the cagg.
*/
if (!(start_isnull && end_isnull) || (start_isnull && !ts_guc_enable_osm_reads))
{
int64 bucket_mints = 0;
bool min_isnull = true;
if (start_isnull && ts_guc_enable_osm_reads == false)
{
/*set refresh window start to min(ts) of raw hypertable as tiered
* data is not visible */
int64 mints = 0;
InternalTimeRange tw = *refresh_window_arg;
const Hypertable *raw_ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id);
mints = ts_hypertable_get_open_dim_min_value(raw_ht, 0, &min_isnull);
tw.start = mints;
bucket_mints =
compute_circumscribed_refresh_window_start(cagg, &tw, cagg->bucket_function);
// elog(NOTICE, "got mints=%ld bucketmints=%ld ", mints, bucket_mints);
}
if (cagg->bucket_function->bucket_fixed_interval == false)
{
refresh_window = *refresh_window_arg;
if (!min_isnull)
refresh_window.start = bucket_mints;

ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
&refresh_window.end,
cagg->bucket_function);
// elog(NOTICE, "NOT FIXED INTERVAL adjusted (%ld %ld)", refresh_window.start,
// refresh_window.end);
}
else
{
InternalTimeRange refresh_window_arg_copy = *refresh_window_arg;
if (!min_isnull)
refresh_window_arg_copy.start = bucket_mints;
int64 bucket_width = ts_continuous_agg_fixed_bucket_width(cagg->bucket_function);
Assert(bucket_width > 0);
refresh_window =
compute_inscribed_bucketed_refresh_window(cagg, refresh_window_arg, bucket_width);
compute_inscribed_bucketed_refresh_window(cagg,
(const InternalTimeRange
*) (&refresh_window_arg_copy),
bucket_width);
// elog(NOTICE, "FIXED INTERVAL orig(%ld %ld) adjusted (%ld %ld)",
// refresh_window_arg_copy.start, refresh_window_arg_copy.end, refresh_window.start,
// refresh_window.end);
}
}

if (refresh_window.start >= refresh_window.end)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
Expand Down Expand Up @@ -863,6 +953,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
(IS_TIMESTAMP_TYPE(refresh_window.type) &&
invalidation_threshold == ts_time_get_min(refresh_window.type)))
{
elog(NOTICE,
"upto date (start=%ld end=%ld) threshold=%ld",
refresh_window.start,
refresh_window.end,
invalidation_threshold);
emit_up_to_date_notice(cagg, callctx);

/* Restore search_path */
Expand Down
Loading
Loading