Skip to content

Commit

Permalink
Support vectorized aggregation on Hypercore TAM
Browse files Browse the repository at this point in the history
Add support for vectorized aggregation over Hypercore TAM. This
includes some refactoring of the VectorAgg node in order to plan
vectorized aggregation on top of ColumnarScans.

Currently, only ColumnarScan can run below VectorAgg, because it is
doing qual filtering. In theory, a SeqScan reading from Hypercore TAM
should also work because it would produce Arrow slots. However, a
SeqScan doesn't do vectorized filtering, which is currently assumed to
be done before the VectorAgg node.

In ColumnarScan, it necessary to turn off projection when VectorAgg is
used. Otherwise, it would project the arrow slot into a virtual slot,
thus losing the vector data. Ideally, a projection should never be
planned to begin with, but this isn't possible since VectorAgg relies
on replacing existing non-vectorized Agg plans added by PostgreSQL.
  • Loading branch information
erimatnor committed Feb 4, 2025
1 parent 4ddf002 commit 486e2b1
Show file tree
Hide file tree
Showing 10 changed files with 767 additions and 55 deletions.
8 changes: 7 additions & 1 deletion tsl/src/hypercore/arrow_tts.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow_cache.h"
#include "compression/arrow_c_data_interface.h"
#include "debug_assert.h"
#include "nodes/decompress_chunk/compressed_batch.h"

#include <limits.h>

Expand Down Expand Up @@ -88,6 +89,10 @@ typedef struct ArrowTupleTableSlot
const uint64 *arrow_qual_result; /* Bitmap with result of qual
* filtering over arrow_array. NULL if
* no filtering has been applied. */

/* Struct to hold values for one column. Necessary for compatibility with
* vector aggs. */
struct CompressedColumnValues ccvalues;
} ArrowTupleTableSlot;

extern const TupleTableSlotOps TTSOpsArrowTuple;
Expand Down Expand Up @@ -402,8 +407,9 @@ arrow_slot_per_segment_memory_context(const TupleTableSlot *slot)
return aslot->per_segment_mcxt;
}

extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno);
extern const ArrowArray *arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno);

extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno);
extern void arrow_slot_set_referenced_attrs(TupleTableSlot *slot, Bitmapset *attrs);
extern void arrow_slot_set_index_attrs(TupleTableSlot *slot, Bitmapset *attrs);

Expand Down
118 changes: 110 additions & 8 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <nodes/pg_list.h>
#include <optimizer/optimizer.h>

#include "nodes/vector_agg/exec.h"

#include "compression/arrow_c_data_interface.h"
#include "hypercore/arrow_tts.h"
#include "hypercore/vector_quals.h"
#include "nodes/columnar_scan/columnar_scan.h"
#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/decompress_chunk/exec.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "nodes/vector_agg.h"
#include "nodes/vector_agg/plan.h"
#include "nodes/vector_agg/vector_slot.h"

static int
get_input_offset(const CustomScanState *state, const Var *var)
get_input_offset_decompress_chunk(const DecompressChunkState *decompress_state, const Var *var)
{
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;

/*
Expand Down Expand Up @@ -57,15 +61,47 @@ get_input_offset(const CustomScanState *state, const Var *var)
return index;
}

/*
* Given a Var reference, get the offset of the corresponding attribute in the
* input tuple.
*
* For a node returning arrow slots, this is just the attribute number in the
* Var. But if the node is DecompressChunk, it is necessary to translate
* between the compressed and non-compressed columns.
*/
static int
get_value_bytes(const CustomScanState *state, int input_offset)
get_input_offset(const CustomScanState *state, const Var *var)
{
if (TTS_IS_ARROWTUPLE(state->ss.ss_ScanTupleSlot))
return AttrNumberGetAttrOffset(var->varattno);

return get_input_offset_decompress_chunk((const DecompressChunkState *) state, var);
}

static int
get_value_bytes_decompress_chunk(const DecompressChunkState *decompress_state, int input_offset)
{
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;
const CompressionColumnDescription *desc = &dcontext->compressed_chunk_columns[input_offset];
return desc->value_bytes;
}

/*
* Get the type length of the value referenced by the input offset.
*
* For a node returning arrow slots, the type length can be read directly from
* the scanned relation's tuple descriptor. For DecompressChunk, the input
* offset references the compressed relation.
*/
static int
get_value_bytes(const CustomScanState *state, int input_offset)
{
if (TTS_IS_ARROWTUPLE(state->ss.ss_ScanTupleSlot))
return RelationGetDescr(state->ss.ss_currentRelation)->attrs[input_offset].attlen;

Check warning on line 100 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L100

Added line #L100 was not covered by tests

return get_value_bytes_decompress_chunk((const DecompressChunkState *) state, input_offset);
}

static void
vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
{
Expand Down Expand Up @@ -310,6 +346,33 @@ compressed_batch_get_next_slot(VectorAggState *vector_agg_state)
return &batch_state->decompressed_scan_slot_data.base;
}

/*
* Get the next slot to aggregate for a arrow tuple table slot.
*
* Implements "get next slot" on top of ColumnarScan (or any node producing
* ArrowTupleTableSlots). It just reads the slot from the child node.
*/
static TupleTableSlot *
arrow_get_next_slot(VectorAggState *vector_agg_state)

Check warning on line 356 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L356

Added line #L356 was not covered by tests
{
TupleTableSlot *slot = ExecProcNode(linitial(vector_agg_state->custom.custom_ps));

Check warning on line 358 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L358

Added line #L358 was not covered by tests

if (TupIsNull(slot))
{
/* The input has ended. */
vector_agg_state->input_ended = true;
return NULL;

Check warning on line 364 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L363-L364

Added lines #L363 - L364 were not covered by tests
}

Assert(TTS_IS_ARROWTUPLE(slot));

/* Filtering should have happened in the scan node below so the slot
* should not be consumed here. */
Assert(!arrow_slot_is_consumed(slot));

return slot;

Check warning on line 373 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L373

Added line #L373 was not covered by tests
}

/*
* Initialize vector quals for a compressed batch.
*
Expand Down Expand Up @@ -339,6 +402,18 @@ compressed_batch_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_
return &agg_state->vqual_state.vqstate;
}

/*
* Initialize FILTER vector quals for an arrow tuple slot.
*
* Used to implement vectorized aggregate function filter clause.
*/
static VectorQualState *
arrow_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_def, TupleTableSlot *slot)

Check warning on line 411 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L411

Added line #L411 was not covered by tests
{
vector_qual_state_init(&agg_state->vqual_state.vqstate, agg_def->filter_clauses, slot);
return &agg_state->vqual_state.vqstate;

Check warning on line 414 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L413-L414

Added lines #L413 - L414 were not covered by tests
}

static TupleTableSlot *
vector_agg_exec(CustomScanState *node)
{
Expand Down Expand Up @@ -423,6 +498,11 @@ vector_agg_exec(CustomScanState *node)
* Finally, pass the compressed batch to the grouping policy.
*/
grouping->gp_add_batch(grouping, slot);

/* The entire arrow array should be aggregated, so mark it is consumed
* so that we get the next array (or end) in the next iteration of the
* loop. */
vector_slot_mark_consumed(slot);
}

/*
Expand Down Expand Up @@ -479,20 +559,42 @@ Node *
vector_agg_state_create(CustomScan *cscan)
{
VectorAggState *state = (VectorAggState *) newNode(sizeof(VectorAggState), T_CustomScanState);
CustomScan *childscan = castNode(CustomScan, linitial(cscan->custom_plans));

state->custom.methods = &exec_methods;

/*
* Initialize VectorAggState to process vector slots from different
* subnodes. Currently, only compressed batches are supported, but arrow
* slots will be supported as well.
* subnodes.
*
* VectorAgg supports two child nodes: ColumnarScan (producing arrow tuple
* table slots) and DecompressChunk (producing compressed batches).
*
* When the child is ColumnarScan, VectorAgg expects Arrow slots that
* carry arrow arrays. ColumnarScan performs standard qual filtering and
* vectorized qual filtering prior to handing the slot up to VectorAgg.
*
* When the child is DecompressChunk, VectorAgg doesn't read the slot from
* the child node. Instead, it bypasses DecompressChunk and reads
* compressed tuples directly from the grandchild. It therefore needs to
* handle batch decompression and vectorized qual filtering itself, in its
* own "get next slot" implementation.
*
* The vector qual init functions are needed to implement vectorized
* aggregate function FILTER clauses for arrow tuple table slots and
* compressed batches, respectively.
*/
state->get_next_slot = compressed_batch_get_next_slot;
state->init_vector_quals = compressed_batch_init_vector_quals;
if (is_columnar_scan(childscan))
{
state->get_next_slot = arrow_get_next_slot;
state->init_vector_quals = arrow_init_vector_quals;

Check warning on line 590 in tsl/src/nodes/vector_agg/exec.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/exec.c#L589-L590

Added lines #L589 - L590 were not covered by tests
}
else
{
Assert(strcmp(childscan->methods->CustomName, "DecompressChunk") == 0);
state->get_next_slot = compressed_batch_get_next_slot;
state->init_vector_quals = compressed_batch_init_vector_quals;
}

return (Node *) state;
}
Loading

0 comments on commit 486e2b1

Please sign in to comment.