Skip to content

Commit

Permalink
New ir (#38)
Browse files Browse the repository at this point in the history
* reformatting in repl

stage

overflow

* solve an oom

* do not allow  copy on relation btree

* fix aggregation column order

w

ci ground truth

* fix parallel aggregation

deduplicate in relation btree

clean code

using  same string hash function in c++ and  py

delete extra output

fix docker

w

* fix  IO size problem

tmp

 try remove  seletction in agg

w

* allow wildcard

interpretor aggregation break

interpreter aggregation  break

ww

try fix  agg  col selection

fix ci

debuging

* fix a divide 0 error

w

remove dirty file from other branch

when 0-arity in head, always 1 count generated?

* add max/min

ww

fix a history code

add an example

w

* fix builtin = withd 0-arity

try fix

merge testcase

fix truth

w

w

add DeALS to compare

* refact API

remove dead code

* change CI time

add more example

* add some aggregation join support in backend

* fix a divide 0 error

* remove dirty file from other branch

* fix builtin = withd 0-arity

try fix

* New aggregators design

Aggregators specify the number of their input and output columns. They no longer specify the arity of the aggregated relation.
Aggregation is handled similar to joins now.

stage change

* implement new  aggregation API

* fix nullptr in range test

* seperate example to another repo

* adjust example folder

* add more ci time

* reorgainize code

* add negative number support

* update base

* w

* before merge aggregation

* add output ir

* on progress, test parsing

* add negation and aggregation back

* comment old compiler CI

* compat with icc

* add time output

fix

w

w

* add bypass

w

satge

stage

stage

resolve all bb

Co-authored-by: Kristopher Micinski <[email protected]>

stage

stage

stage

stage

stage

test ex_simple

fix switch

w

stage

stage

stage

w

fix

* switch list before

w

add v3

* better print

fix

* extend string hash

limit dump

fix

slightly better

clean and fix pure number string in fact

fix ci

fix string agian...

* fix string printting

* fixed entry point on docker (#43)

* Preprocess slog file using mzpp(runslog only)

* clean file

---------

Co-authored-by: Kristopher Micinski <[email protected]>
Co-authored-by: Yihao Sun <[email protected]>
Co-authored-by: Yihao Sun <[email protected]>
Co-authored-by: Arash Sahebolamri <[email protected]>
Co-authored-by: Yihao Sun <[email protected]>
Co-authored-by: Yihao Sun <[email protected]>
Co-authored-by: Michael Gathara <[email protected]>
  • Loading branch information
8 people authored Jan 24, 2024
1 parent 4174d23 commit 8af0e6f
Show file tree
Hide file tree
Showing 110 changed files with 7,738 additions and 210,569 deletions.
10 changes: 10 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
BasedOnStyle: LLVM
UseTab: Never
IndentWidth: 4
TabWidth: 4
AllowShortIfStatementsOnASingleLine: false
IndentCaseLabels: false
ColumnLimit: 0
AccessModifierOffset: -4
NamespaceIndentation: All
FixNamespaceComments: false
18 changes: 9 additions & 9 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
jobs:
build:
runs-on: ubuntu-latest
timeout-minutes: 25
timeout-minutes: 60
steps:

- name: Cache Docker layers
Expand Down Expand Up @@ -43,17 +43,17 @@ jobs:
# - name: Build docker image
# run: docker build -t slog-ci-test .

- name: Compiler tests
run: docker run --rm --entrypoint racket slog-ci-test /slog/compiler/unit-tests/tests.rkt -c --run-slow-tests
# - name: Compiler tests
# run: docker run --rm --entrypoint racket slog-ci-test /slog/compiler/unit-tests/tests.rkt -c --run-slow-tests

- name: Compiler tests with --merge-builtins
run: docker run --rm --entrypoint racket slog-ci-test /slog/compiler/unit-tests/tests.rkt -c --merge-builtins --run-slow-tests
# - name: Compiler tests with --merge-builtins
# run: docker run --rm --entrypoint racket slog-ci-test /slog/compiler/unit-tests/tests.rkt -c --merge-builtins --run-slow-tests

- name: Compiler unit tests
run: docker run --rm --entrypoint raco slog-ci-test test /slog/compiler/src/generic-utils.rkt
# - name: Compiler unit tests
# run: docker run --rm --entrypoint raco slog-ci-test test /slog/compiler/src/generic-utils.rkt

- name: C++ compilation tests
run: docker run --rm --entrypoint bash slog-ci-test /slog/compiler/unit-tests/cpp_compilation_tests.sh
# - name: C++ compilation tests
# run: docker run --rm --entrypoint bash slog-ci-test /slog/compiler/unit-tests/cpp_compilation_tests.sh

# - name: Compile hashes
# run: docker run --rm --entrypoint bash slog-ci-test /slog/slog/tests/run_test.sh compile_hashes
Expand Down
11 changes: 10 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
!/data/README.MD
/data/binaries/*
!/data/binaries/CMakeLists.txt
.vscode/c_cpp_properties.json
.vscode/settings.json
__pycache__
parser.out
parsertab.py
Expand All @@ -23,4 +25,11 @@ temp-out/
test-input
souffle-out
compiler/test/
.DS_Store
nonsense/

# evaluation
*.scm
syntax-edb
cfa-expt-*
.DS_Store

6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[submodule "examples/examples/datalog-examples"]
path = examples/examples/datalog-examples
url = [email protected]:harp-lab/rosetta-slog.git
[submodule "examples/datalog-example"]
path = examples/datalog-example
url = [email protected]:harp-lab/rosetta-slog.git
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RUN wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | apt-key add -
RUN add-apt-repository ppa:plt/racket
RUN apt-get update && apt-get install -y clang-format clang-tidy clang-tools clang clangd libc++-dev libc++1 libc++abi-dev \
libc++abi1 libclang-dev libclang1 liblldb-dev libomp-dev libomp5 lld lldb \
llvm-dev llvm-runtime llvm python3-clang mcpp cmake racket build-essential openmpi-bin libopenmpi-dev z3 \
llvm-dev llvm-runtime llvm python3-clang mcpp cmake racket build-essential mpich libmpich-dev z3 \
git python3-pip sqlite3 ninja-build valgrind apt-utils libssl-dev vim valgrind apt-utils
RUN raco setup --doc-index --force-user-docs
RUN raco pkg install --batch --deps search-auto binaryio graph rparallel pmap csv-reading
Expand All @@ -17,12 +17,13 @@ ENV OMPI_ALLOW_RUN_AS_ROOT_CONFIRM=1
ENV OMPI_MCA_btl_vader_single_copy_mechanism=none
ENV CC=mpicc
ENV CXX=mpicxx
ENV MPI_BIN_HOME=/usr/bin

COPY . /slog

# build backend
RUN cd /slog/backend ; rm -rf build ; \
cmake --no-warn-unused-cli -DCMAKE_EXPORT_COMPILE_COMMANDS:BOOL=TRUE -DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo -DCMAKE_C_COMPILER:FILEPATH=/usr/bin/clang -H/slog/backend -B/slog/backend/build -G Ninja ; \
cmake --no-warn-unused-cli -DCMAKE_EXPORT_COMPILE_COMMANDS:BOOL=TRUE -DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo -H/slog/backend -B/slog/backend/build -G Ninja ; \
cmake --build /slog/backend/build --config Release --target all -j --

WORKDIR /slog
Expand Down
93 changes: 57 additions & 36 deletions backend/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,39 +1,58 @@
cmake_minimum_required (VERSION 3.9)
cmake_minimum_required(VERSION 3.9)

project (BTree)
project(BTree)

add_compile_options(--std=c++17 -lstdc++fs -Wno-strict-aliasing -Werror=class-memaccess -fpermissive)

find_package(MPI REQUIRED)


option(PROFILE "Adding Profiling to each RA operation on rank 0" ON)

if(PROFILE)
add_definitions(-DPROFILE)
endif()

# find_package(OpenMP)
# if (OPENMP_FOUND)
# set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}")
# set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}")
# set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${OpenMP_EXE_LINKER_FLAGS}")
# set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}")
# set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}")
# set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${OpenMP_EXE_LINKER_FLAGS}")
# endif()

set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -std=c++17 -lstdc++fs -Werror=class-memaccess -fpermissive")
set (source_dir "${PROJECT_SOURCE_DIR}/src")
set (tests_dir "${PROJECT_SOURCE_DIR}/tests")
set (data_dir "${PROJECT_SOURCE_DIR}/data")
set (utility_dir "${PROJECT_SOURCE_DIR}/utility")

file (GLOB source_files_parallel_RA "${source_dir}/parallel_RA_inc.h" "${source_dir}/log/logger.cpp" "${source_dir}/hash/hash.h" "${source_dir}/buffer/vector_buffer.cpp" "${source_dir}/comm/comm.cpp" "${source_dir}/relation/shmap_relation_exp.cpp" "${source_dir}/comm/all_to_allv_comm.cpp" "${source_dir}/IO/parallel_io.cpp" "${source_dir}/RA/parallel_join.cpp" "${source_dir}/RA/parallel_agg.cpp" "${source_dir}/comm/intra_bucket_comm.cpp" "${source_dir}/RA/parallel_copy.cpp" "${source_dir}/RA/parallel_copy_filter.cpp" "${source_dir}/RA/parallel_copy_generate.cpp" "${source_dir}/RA/parallel_RA.h" "${source_dir}/RA/parallel_acopy.cpp" "${source_dir}/relation/balanced_hash_relation.cpp" "${source_dir}/relation/relation_load_balancer.cpp" "${source_dir}/RAM/RA_tasks.cpp" "${source_dir}/RAM/RA_intra_bucket.cpp" "${source_dir}/RAM/RA_buffer_manager.cpp" "${source_dir}/RAM/RA_local_join.cpp" "${source_dir}/RAM/RA_local_inserts.cpp" "${source_dir}/RAM/RA_fixed_point_loop.cpp" "${source_dir}/lie/lie.cpp")
file (GLOB source_files_ata "${tests_dir}/all_to_all_benchmark.cpp")
file (GLOB source_files_tc "${tests_dir}/transitive_closure.cpp")
#file (GLOB source_files_builtin "${tests_dir}/builtin.cpp")
#file (GLOB source_files_prim_32_3 "${tests_dir}/prim-32-3.cpp")
file (GLOB source_files_fact "${tests_dir}/fact.cpp")
file (GLOB source_files_out "${tests_dir}/out.cpp")
file (GLOB source_files_wc110_4 "${tests_dir}/worstcase-110-terms-4-m.cpp")
file (GLOB source_files_wc110_5 "${tests_dir}/worstcase-110-terms-5-m.cpp")
file (GLOB source_files_wc110_6 "${tests_dir}/worstcase-110-terms-6-m.cpp")
file (GLOB source_files_wc110_7 "${tests_dir}/worstcase-110-terms-7-m.cpp")
file (GLOB source_files_wc110_8 "${tests_dir}/worstcase-110-terms-8-m.cpp")
file (GLOB source_files_wc110_9 "${tests_dir}/worstcase-110-terms-9-m.cpp")
file (GLOB source_files_wc110_10 "${tests_dir}/worstcase-110-terms-10-m.cpp")
file (GLOB source_files_fv "${tests_dir}/freevars.cpp")
file (GLOB source_files_kcfa "${tests_dir}/kcfa-tiny.cpp")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -std=c++17 -lstdc++fs -Werror=class-memaccess -fpermissive")
set(source_dir "${PROJECT_SOURCE_DIR}/src")
set(tests_dir "${PROJECT_SOURCE_DIR}/tests")
set(data_dir "${PROJECT_SOURCE_DIR}/data")
set(utility_dir "${PROJECT_SOURCE_DIR}/utility")

file(GLOB source_files_parallel_RA
"${source_dir}/parallel_RA_inc.h" "${source_dir}/log/logger.cpp" "${source_dir}/hash/hash.h"
"${source_dir}/buffer/vector_buffer.cpp" "${source_dir}/comm/comm.cpp" "${source_dir}/relation/shmap_relation_exp.cpp"
"${source_dir}/comm/all_to_allv_comm.cpp" "${source_dir}/IO/parallel_io.cpp" "${source_dir}/RA/parallel_join.cpp"
"${source_dir}/RA/parallel_agg.cpp" "${source_dir}/comm/intra_bucket_comm.cpp" "${source_dir}/RA/parallel_copy.cpp"
"${source_dir}/RA/parallel_copy_filter.cpp" "${source_dir}/RA/parallel_copy_generate.cpp" "${source_dir}/RA/parallel_RA.h"
"${source_dir}/RA/parallel_acopy.cpp" "${source_dir}/relation/balanced_hash_relation.cpp" "${source_dir}/relation/relation_load_balancer.cpp"
"${source_dir}/RAM/RA_tasks.cpp" "${source_dir}/RAM/RA_intra_bucket.cpp" "${source_dir}/RAM/RA_buffer_manager.cpp"
"${source_dir}/RAM/RA_local_join.cpp" "${source_dir}/RAM/RA_local_inserts.cpp" "${source_dir}/RAM/RA_fixed_point_loop.cpp"
"${source_dir}/ast.h" "${source_dir}/builtin.h" "${source_dir}/builtin.cpp"
"${source_dir}/lie/lie.cpp")
file(GLOB source_files_ata "${tests_dir}/all_to_all_benchmark.cpp")
file(GLOB source_files_tc "${tests_dir}/transitive_closure.cpp")

# file (GLOB source_files_builtin "${tests_dir}/builtin.cpp")
# file (GLOB source_files_prim_32_3 "${tests_dir}/prim-32-3.cpp")
file(GLOB source_files_fact "${tests_dir}/fact.cpp")
file(GLOB source_files_out "${tests_dir}/out.cpp")
file(GLOB source_files_wc110_4 "${tests_dir}/worstcase-110-terms-4-m.cpp")
file(GLOB source_files_wc110_5 "${tests_dir}/worstcase-110-terms-5-m.cpp")
file(GLOB source_files_wc110_6 "${tests_dir}/worstcase-110-terms-6-m.cpp")
file(GLOB source_files_wc110_7 "${tests_dir}/worstcase-110-terms-7-m.cpp")
file(GLOB source_files_wc110_8 "${tests_dir}/worstcase-110-terms-8-m.cpp")
file(GLOB source_files_wc110_9 "${tests_dir}/worstcase-110-terms-9-m.cpp")
file(GLOB source_files_wc110_10 "${tests_dir}/worstcase-110-terms-10-m.cpp")
file(GLOB source_files_fv "${tests_dir}/freevars.cpp")
file(GLOB source_files_kcfa "${tests_dir}/kcfa-tiny.cpp")
file(GLOB source_files_slogc "${source_dir}/slog.cpp")

link_libraries(stdc++fs)

Expand Down Expand Up @@ -79,17 +98,16 @@ ADD_LIBRARY(parallel_RA "${source_files_parallel_RA}")
# INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})
# TARGET_LINK_LIBRARIES(out parallel_RA ${MPI_LIBRARIES})

#add_executable(prim_32_3 ${source_files_prim_32_3})
#INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})
#TARGET_LINK_LIBRARIES(prim_32_3 parallel_RA ${MPI_LIBRARIES})

# add_executable(prim_32_3 ${source_files_prim_32_3})
# INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})
# TARGET_LINK_LIBRARIES(prim_32_3 parallel_RA ${MPI_LIBRARIES})
add_executable(TC ${source_files_tc})
INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})
TARGET_LINK_LIBRARIES(TC parallel_RA ${MPI_LIBRARIES})

#add_executable(builtin ${source_files_builtin})
#INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})
#TARGET_LINK_LIBRARIES(builtin parallel_RA ${MPI_LIBRARIES})
# add_executable(builtin ${source_files_builtin})
# INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})
# TARGET_LINK_LIBRARIES(builtin parallel_RA ${MPI_LIBRARIES})

# add_executable(freevars ${source_files_fv})
# INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})
Expand All @@ -98,9 +116,12 @@ TARGET_LINK_LIBRARIES(TC parallel_RA ${MPI_LIBRARIES})
# add_executable(kcfa ${source_files_kcfa})
# INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})
# TARGET_LINK_LIBRARIES(kcfa parallel_RA ${MPI_LIBRARIES})
INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})

add_executable(bin_parser "${utility_dir}/binary_parser.cpp")

add_executable(tsv_to_bin "${utility_dir}/tsv_to_bin.cpp")
INCLUDE_DIRECTORIES(${MPI_INCLUDE_PATH})
TARGET_LINK_LIBRARIES(tsv_to_bin parallel_RA ${MPI_LIBRARIES})

add_executable(slog "${source_dir}/slog.cpp")
TARGET_LINK_LIBRARIES(slog parallel_RA ${MPI_LIBRARIES})
17 changes: 6 additions & 11 deletions backend/src/IO/parallel_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ void parallel_io::parallel_read_input_relation_from_file_to_local_buffer(u32 ari
//std::cout << "222222 Filename " << file_name << std::endl;
uintmax_t size_data_file = 0;
// if input file not exists, we assume input relation is empty size
if(std::filesystem::exists(file_name))
if(fs::exists(file_name))
{
size_data_file = std::filesystem::file_size(file_name);
size_data_file = fs::file_size(file_name);
if (size_data_file % (8 * (arity + 1)) != 0)
{
std::cout << "Input file :" << file_name << " "
Expand All @@ -149,16 +149,11 @@ void parallel_io::parallel_read_input_relation_from_file_to_local_buffer(u32 ari
global_row_count = size_data_file / (8 * (arity + 1));
col_count = arity + 1;


/* Broadcast the total number of rows and column to all processes */
MPI_Bcast(&global_row_count, 1, MPI_INT, 0, lcomm);
MPI_Bcast(&col_count, 1, MPI_INT, 0, lcomm);

#if 1

/* Read all data in parallel */
uint64_t read_offset;
read_offset = (int)ceil((float)global_row_count / nprocs) * rank;
read_offset = (uint64_t)ceil((float)global_row_count / nprocs) * rank;

if (read_offset > (uint64_t)global_row_count)
{
Expand All @@ -172,7 +167,7 @@ void parallel_io::parallel_read_input_relation_from_file_to_local_buffer(u32 ari
entry_count = global_row_count - read_offset;
}
else {
entry_count = (int) ceil((float)global_row_count / nprocs);
entry_count = (u64) ceil((float)global_row_count / nprocs);
}
}

Expand Down Expand Up @@ -261,15 +256,15 @@ void parallel_io::buffer_data_to_hash_buffer_col(u32 arity, u32 join_column_coun
process_data_vector_size = process_data_vector_size + (col_count);
}


/* Transmit the packaged data process_data_vector to all processes */
all_to_all_comm(process_data_vector, process_data_vector_size, process_size, &hash_buffer_size, &hash_buffer, comm);

//u32 g_hash_buffer_size = 0;
//MPI_Allreduce(&hash_buffer_size, &g_hash_buffer_size, 1, MPI_INT, MPI_SUM, comm);
//if (rank == 0)
// std::cout << "After Comm " << fname << " " << g_hash_buffer_size/((arity+1)) << std::endl;

// for (int i = 0; i < nprocs; i++)
// process_data_vector[i].vector_buffer_free();
/* Free the data buffer after all to all */
free (process_data_vector);

Expand Down
2 changes: 1 addition & 1 deletion backend/src/RA/parallel_RA.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,5 @@ class parallel_RA


/// JOIN, COPY, ACOPY
u32 get_RA_type() {return RA_type;}
int get_RA_type() {return RA_type;}
};
87 changes: 85 additions & 2 deletions backend/src/RA/parallel_agg.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

#include "../parallel_RA_inc.h"
#include <array>
#include <cassert>
#include <iostream>
#include <ostream>

bool parallel_join_negate::local_negation(
int threshold, int* offset, int join_order, u32 buckets,
Expand Down Expand Up @@ -29,7 +30,7 @@ bool parallel_join_negate::local_negation(
for (u32 bucket_id = 0; bucket_id < buckets; bucket_id++)
{
input1[bucket_id].as_all_to_allv_right_outer_join_buffer(
// negated_target,
join_negation_target_table->get_full() + mcomm.get_rank(),
input0_buffer, input0_buffer_size, input0_buffer_width, offset,
join_buffer,
counter, buckets, output_sub_bucket_count,
Expand Down Expand Up @@ -62,3 +63,85 @@ void parallel_join_negate::local_copy(
}
}
}

void parallel_join_aggregate::local_aggregate(
u32 buckets, int *offset,
int input0_buffer_size, u64 *input0_buffer,
all_to_allv_buffer &agg_buffer, int ra_counter
) {

relation* input = this->join_aggregate_input_table;
relation* target = this->join_aggregate_target_table;
relation* output = this->join_aggregate_output_table;
int input0_buffer_width = target->get_arity() + 1;

u32* output_sub_bucket_count = output->get_sub_bucket_per_bucket_count();
u32** output_sub_bucket_rank = output->get_sub_bucket_rank();
int real_join_count = output->get_join_column_count() - 1;
agg_buffer.width[ra_counter] = real_join_count + 1;

shmap_relation* agg_target;
if (*(target->get_sub_bucket_per_bucket_count()) == 1) {
agg_target = target->get_full() + mcomm.get_rank();
} else {
agg_target = new shmap_relation(target->get_arity()+1, false);
for (int k1 = *offset; k1 < input0_buffer_size; k1 = k1 + input0_buffer_width)
{
agg_target->insert_tuple_from_array(input0_buffer+k1, target->get_arity()+1);
}
}

btree::btree_map<std::vector<u64>, u64, shmap_relation::t_comparator> res_map;
for (int bucket=0; bucket < buckets; bucket ++) {
for (auto tuple: input->get_full()[bucket]) {
std::vector<u64> data_v(tuple.begin(), tuple.begin()+target->get_join_column_count());
// std::cout << "On rank " << mcomm.get_rank() << " bucket " << *(target->get_sub_bucket_per_bucket_count()) << std::endl;
auto joined_range = agg_target->prefix_range(data_v);
auto agg_data = local_func(joined_range);
if (*(target->get_sub_bucket_per_bucket_count()) != 1 &&
res_map.find(data_v) != res_map.end()) {
// std::cout << "reduce"
res_map[data_v] = reduce_func(res_map[data_v], agg_data);
} else {
res_map[data_v] = agg_data;
}
}
}

for (int bucket=0; bucket < buckets; bucket ++) {
for (auto input_tuple: input->get_full()[bucket]) {
std::vector<u64> joined_input_tuple(input_tuple.begin(), input_tuple.begin()+input->get_join_column_count());
auto agg_res = res_map[joined_input_tuple];
std::vector<u64> tuple(reorder_mapping.size(), 0);
int reorder_agg_index = input->get_arity() + 1;
for (int j = 0; j < reorder_mapping.size(); j++) {
if (reorder_mapping[j] == reorder_agg_index) {
tuple[j] = agg_res;
} else {
tuple[j] = input_tuple[reorder_mapping[j]];
}
}

uint64_t bucket_id = tuple_hash(tuple.data(), output->get_join_column_count()) % buckets;
uint64_t sub_bucket_id = 0;
if (input->get_is_canonical() == false && output->get_arity() != 0 && output->get_arity() >= real_join_count) {

sub_bucket_id = tuple_hash(tuple.data()+real_join_count, output->get_arity()-real_join_count) % output_sub_bucket_count[bucket_id];
}
int index = output_sub_bucket_rank[bucket_id][sub_bucket_id];
// std::cout << "index : " << index << std::endl;
agg_buffer.local_compute_output_size_rel[ra_counter] = agg_buffer.local_compute_output_size_rel[ra_counter] + agg_buffer.width[ra_counter];
agg_buffer.local_compute_output_size_total = agg_buffer.local_compute_output_size_total+agg_buffer.width[ra_counter];
agg_buffer.local_compute_output_size_flat[index*agg_buffer.ra_count+ra_counter] = agg_buffer.local_compute_output_size_flat[index*agg_buffer.ra_count + ra_counter] + agg_buffer.width[ra_counter];
agg_buffer.local_compute_output_count_flat[index*agg_buffer.ra_count+ra_counter]++;
agg_buffer.local_compute_output_size[ra_counter][index] = agg_buffer.local_compute_output_size[ra_counter][index]+agg_buffer.width[ra_counter];
agg_buffer.cumulative_tuple_process_map[index] = agg_buffer.cumulative_tuple_process_map[index] + agg_buffer.width[ra_counter];
agg_buffer.local_compute_output[ra_counter][index].vector_buffer_append((const unsigned char*)tuple.data(), sizeof(u64)*agg_buffer.width[ra_counter]);
}
}
if (*(target->get_sub_bucket_per_bucket_count()) != 1) {
agg_target->remove_tuple();
delete agg_target;
}
res_map.clear();
}
Loading

0 comments on commit 8af0e6f

Please sign in to comment.