Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Mar 3, 2025
1 parent 39afe7e commit b6d3952
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 179 deletions.
64 changes: 0 additions & 64 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -2328,70 +2328,6 @@ ts_hypertable_create_compressed(Oid table_relid, int32 hypertable_id)
return true;
}

// /*
// * Get the min value of an open dimension.
// */
// int64
// ts_hypertable_get_open_dim_min_value(const Hypertable *ht, int dimension_index, bool *isnull)
// {
// StringInfo command;
// const Dimension *dim;
// int res;
// bool min_isnull;
// Datum mindat;
// Oid timetype;

// dim = hyperspace_get_open_dimension(ht->space, dimension_index);

// if (NULL == dim)
// elog(ERROR, "invalid open dimension index %d", dimension_index);

// timetype = ts_dimension_get_partition_type(dim);

// /*
// * Query for the first bucket in the materialized hypertable.
// * Since this might be run as part of a parallel operation
// * we cannot use SET search_path here to lock down the
// * search_path and instead have to fully schema-qualify
// * everything.
// */
// command = makeStringInfo();
// appendStringInfo(command,
// "SELECT pg_catalog.min(%s) FROM %s.%s",
// quote_identifier(NameStr(dim->fd.column_name)),
// quote_identifier(NameStr(ht->fd.schema_name)),
// quote_identifier(NameStr(ht->fd.table_name)));

// if (SPI_connect() != SPI_OK_CONNECT)
// elog(ERROR, "could not connect to SPI");

// res = SPI_execute(command->data, true /* read_only */, 0 /*count*/);

// if (res < 0)
// ereport(ERROR,
// (errcode(ERRCODE_INTERNAL_ERROR),
// (errmsg("could not find the minimum time value for hypertable \"%s\"",
// get_rel_name(ht->main_table_relid)))));

// Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == timetype,
// "partition types for result (%d) and dimension (%d) do not match",
// SPI_gettypeid(SPI_tuptable->tupdesc, 1),
// ts_dimension_get_partition_type(dim));
// mindat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &min_isnull);

// if (isnull)
// *isnull = min_isnull;

// int64 min_value =
// min_isnull ? ts_time_get_min(timetype) : ts_time_value_to_internal(mindat, timetype);

// res = SPI_finish();
// if (res != SPI_OK_FINISH)
// elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res));

// return min_value;
// }

/*
* Get the min value of an open dimension for the hypertable based on the dimension slice info
* Note: only takes non-tiered chunks into account.
Expand Down
1 change: 1 addition & 0 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
PGC_S_SESSION);
}

/* TODO: split it into ranges */
continuous_agg_refresh_internal(policy_data.cagg,
&policy_data.refresh_window,
CAGG_REFRESH_POLICY,
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ cagg_compute_inscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
const int64 bucket_width);
InternalTimeRange cagg_compute_circumscribed_bucketed_refresh_window(
const ContinuousAgg *cagg, const InternalTimeRange *const refresh_window,
const ContinuousAggsBucketFunction *bucket_function);
const ContinuousAggsBucketFunction *bucket_function);
151 changes: 37 additions & 114 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -745,24 +745,6 @@ update_watermark(MaterializationContext *context)
}
}

// static void
// log_refresh_window(int elevel, const ContinuousAgg *cagg, const TimeRange *refresh_window,
// const char *msg)
// {
// Oid outfuncid = InvalidOid;
// bool isvarlena;

// getTypeOutputInfo(refresh_window->type, &outfuncid, &isvarlena);
// Assert(!isvarlena);

// elog(elevel,
// "%s \"%s\" in window [ %s, %s ]",
// msg,
// NameStr(cagg->data.user_view_name),
// DatumGetCString(OidFunctionCall1(outfuncid, refresh_window->start)),
// DatumGetCString(OidFunctionCall1(outfuncid, refresh_window->end)));
// }

static void
log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const char *msg)
Expand Down Expand Up @@ -792,18 +774,23 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang
}

static List *
make_refresh_window_list(MaterializationContext *context, int64 bucket_width, int32 range_factor)
create_materialization_refresh_window_list(MaterializationContext *context, int64 bucket_width,
int32 range_factor)
{
List *refresh_window_list = NIL;
InternalTimeRange refresh_window = context->internal_materialization_range;
Oid bucket_function = context->cagg->bucket_function->bucket_function;
FuncInfo *func_info = ts_func_cache_get_bucketing_func(bucket_function);
Ensure(func_info != NULL, "unable to get bucket function for Oid %d", bucket_function);
FuncInfo *func_info =
ts_func_cache_get_bucketing_func(context->cagg->bucket_function->bucket_function);
Ensure(func_info != NULL,
"unable to get bucket function for Oid %d",
context->cagg->bucket_function->bucket_function);

/* Do not produce batches for CAggs using deprecated time_bucket_ng function */
if (func_info->origin == ORIGIN_TIMESCALE_EXPERIMENTAL || range_factor == 0)
if (func_info->origin == ORIGIN_TIMESCALE_EXPERIMENTAL || range_factor == 0/* ||
ContinuousAggIsHierarchical(context->cagg)*/)
{
refresh_window_list = lappend(refresh_window_list, &refresh_window);
refresh_window_list =
lappend(refresh_window_list, &context->internal_materialization_range);
return refresh_window_list;
}

Expand All @@ -820,14 +807,9 @@ make_refresh_window_list(MaterializationContext *context, int64 bucket_width, in
min_bucket_start =
ts_time_bucket_by_type(bucket_width, min_bucket_start, refresh_window.type);
}
// else
// {
// min_bucket_start = ts_time_get_nobegin(refresh_window.type);
// }

/* If refresh window range start is NULL then get the first bucket from the original hypertable
*/
// if (TS_TIME_IS_MIN(refresh_window.start, refresh_window.type))
if (refresh_window.start == min_bucket_start ||
TS_TIME_IS_NOBEGIN(refresh_window.start, refresh_window.type))
{
Expand All @@ -843,76 +825,28 @@ make_refresh_window_list(MaterializationContext *context, int64 bucket_width, in
return refresh_window_list;
}

// refresh_window.start = int64_min(refresh_window.start,
// context->internal_materialization_range.start);
log_refresh_window(INFO, context->cagg, &refresh_window, "min");

if (!isnull)
{
if (refresh_window.start >= refresh_window.end)
refresh_window.end = refresh_window.start + 1;

// range = ts_compute_inscribed_refresh_window(context->cagg, &range);
refresh_window =
cagg_compute_circumscribed_bucketed_refresh_window(context->cagg,
&refresh_window,
context->cagg->bucket_function);

// if (context->cagg->bucket_function->bucket_fixed_interval == false)
// {
// ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
// &refresh_window.end,
// context->cagg->bucket_function);
// }
// else
// {
// refresh_window = cagg_compute_inscribed_bucketed_refresh_window(context->cagg,
// &refresh_window,
// bucket_width);
// }

// Datum start_old, start_new;

// start_old = ts_internal_to_time_value(refresh_window.start, refresh_window.type);
// start_new = generic_time_bucket(context->cagg->bucket_function, start_old);
// refresh_window.start = ts_time_value_to_internal(start_new, refresh_window.type);

// /* If there's no MIN data then produre only one range */
// if (TS_TIME_IS_MIN(refresh_window.start, refresh_window.type) || isnull)
// {
// refresh_window_list = lappend(refresh_window_list,
// &context->internal_materialization_range); return refresh_window_list;
// }

refresh_window.end = context->internal_materialization_range.end;
log_refresh_window(INFO, context->cagg, &refresh_window, "bucket");
}

// refresh_window.start = int64_max(refresh_window.start,
// context->internal_materialization_range.start); refresh_window.end =
// int64_max(refresh_window.end, context->internal_materialization_range.end); if
// (refresh_window.start > refresh_window.end) refresh_window.end = refresh_window.start;

// /* If there's no MIN data then produre only one range */
// if (TS_TIME_IS_MIN(refresh_window.start, refresh_window.type) || isnull)
// {
// refresh_window_list = lappend(refresh_window_list,
// &context->internal_materialization_range); return refresh_window_list;
// }
refresh_window.end = context->internal_materialization_range.end;
log_refresh_window(INFO, context->cagg, &refresh_window, "bucket");
}

// elog(DEBUG1,
// "TIME_IS_MIN %d, TIME_IS_MAX %d",
// TS_TIME_IS_MIN(start, context->cagg->partition_type),
// TS_TIME_IS_MAX(end, context->cagg->partition_type));

// refresh_window_list = lappend(refresh_window_list, &refresh_window);
// log_refresh_window(DEBUG1, context->cagg, &context->materialization_range, "splitting");
// return refresh_window_list;

int64 estimated_batches =
(refresh_window.end - refresh_window.start) / (bucket_width * range_factor);
if (estimated_batches > ts_guc_cagg_max_individual_materializations)
int64 refresh_size = refresh_window.end - refresh_window.start;
int64 batch_size = (bucket_width * range_factor);
int64 estimated_batches = refresh_size / batch_size;
if (estimated_batches > ts_guc_cagg_max_individual_materializations ||
refresh_size <= batch_size)
{
// elog(INFO, "Fallback to single refresh window: %ld", estimated_batches);
refresh_window_list =
Expand All @@ -923,7 +857,6 @@ make_refresh_window_list(MaterializationContext *context, int64 bucket_width, in
log_refresh_window(INFO, context->cagg, &refresh_window, "before produce ranges");

const Dimension *time_dim;
// Hypertable *ht = cagg_get_hypertable_or_fail(context->cagg->data.raw_hypertable_id);
time_dim = hyperspace_get_open_dimension(ht->space, 0);

const char *query_str = " \
Expand Down Expand Up @@ -953,25 +886,16 @@ make_refresh_window_list(MaterializationContext *context, int64 bucket_width, in
OPERATOR(pg_catalog.&&) \
pg_catalog.int8range(chunk_ranges.start, chunk_ranges.end) \
);";
/*
JOIN chunk_ranges ON \
pg_catalog.int8range(refresh_start, refresh_start + $7) \
OPERATOR(pg_catalog.&&) \
pg_catalog.int8range(chunk_ranges.start, chunk_ranges.end);"; */

int res;
Oid types[] = { INT4OID, INT4OID, INT8OID, INT8OID, INT8OID };
Datum values[] = { Int32GetDatum(ht->fd.id),
Int32GetDatum(time_dim->fd.id),
Int64GetDatum(bucket_width * range_factor),
Int64GetDatum(batch_size),
Int64GetDatum(refresh_window.start),
Int64GetDatum(refresh_window.end) };
char nulls[] = { false, false, false, false, false };

// elog(INFO, "%s: %s", __func__, query_str);
res = SPI_execute_with_args(query_str,
5,
types,
Expand All @@ -988,28 +912,32 @@ make_refresh_window_list(MaterializationContext *context, int64 bucket_width, in
bool isnull;
InternalTimeRange *range = palloc(sizeof(InternalTimeRange));
range->start = SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &isnull);

/* When dropping chunks we need to align the start of the first range to cover dropped
* chunks if they exist */
if (i == 0)
range->start = int64_min(range->start, refresh_window.start);

range->end = SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, &isnull);
range->type = refresh_window.type;
refresh_window_list = lappend(refresh_window_list, range);

log_refresh_window(INFO, context->cagg, range, "range refresh");
}
// while (start < end)
// {
// InternalTimeRange *new_range = palloc(sizeof(InternalTimeRange));
// new_range->start = start;
// new_range->end = start + (bucket_width * range_factor) + 1;
// new_range->type = context->materialization_range.type;
// ranges = lappend(ranges, new_range);
// start = new_range->end;

// const TimeRange range = internal_time_range_to_time_range(*new_range);
// log_refresh_window(LOG, context->cagg, &range, "splitting");
// }

return refresh_window_list;
}

/* MERGE statement is available starting on PG15 and we'll support it only in the new
* format of CAggs and for non-compressed hypertables */
static inline bool
should_use_merge(MaterializationContext *context)
{
return (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 &&
ContinuousAggIsFinalized(context->cagg) &&
!TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht));
}

static void
execute_materializations(MaterializationContext *context)
{
Expand All @@ -1021,7 +949,7 @@ execute_materializations(MaterializationContext *context)
Assert(bucket_width > 0);

ListCell *lc;
List *ranges = make_refresh_window_list(context, bucket_width, 2000);
List *ranges = create_materialization_refresh_window_list(context, bucket_width, 2000);

foreach (lc, ranges)
{
Expand All @@ -1037,14 +965,9 @@ execute_materializations(MaterializationContext *context)
NULL);
context->internal_materialization_range.end = range->end;

// log_refresh_window(LOG, context->cagg, &context->materialization_range,
// "refreshing");

/* MERGE statement is available starting on PG15 and we'll support it only in the new
* format of CAggs and for non-compressed hypertables */
if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 &&
ContinuousAggIsFinalized(context->cagg) &&
!TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(context->mat_ht))
if (should_use_merge(context))
{
/* Fallback to INSERT materializations if there are no rows to change on it */
if (execute_materialization_plan(context, PLAN_TYPE_EXISTS) == 0)
Expand Down

0 comments on commit b6d3952

Please sign in to comment.