Skip to content

Commit

Permalink
[alpha-unito#71] solved performance issues
Browse files Browse the repository at this point in the history
  • Loading branch information
ARMartinelli committed Feb 26, 2019
1 parent 453b9cc commit 703642e
Show file tree
Hide file tree
Showing 17 changed files with 20 additions and 28 deletions.
2 changes: 1 addition & 1 deletion examples/stock-market/seq_stock_tweets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TweetProcessor {
}
}

void operator()(std::string tweet) {
void operator()(const std::string tweet) {
std::string stock;
unsigned len;
if (filter(tweet, stock, len)) {
Expand Down
3 changes: 1 addition & 2 deletions include/pico/Operators/FlatMap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class FlatMapBase : public UnaryOperator<In, Out> {
* Creates a new FlatMap operator by defining its kernel function.
*/
FlatMapBase(std::function<void(In &, FlatMapCollector<Out> &)> flatmapf_,
unsigned par = def_par()) {
flatmapf = flatmapf_;
unsigned par = def_par()) : flatmapf(flatmapf_) {
this->set_input_degree(1);
this->set_output_degree(1);
this->stype(StructureType::BAG, true);
Expand Down
4 changes: 1 addition & 3 deletions include/pico/Operators/FoldReduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ class FoldReduce : public UnaryOperator<In, Out> {
*/
FoldReduce(std::function<void(const In&, State&)> foldf_,
std::function<void(const State&, State&)> reducef_, //
unsigned par = def_par()) {
foldf = foldf_;
reducef = reducef_;
unsigned par = def_par()) : foldf(foldf_), reducef(reducef_) {
this->set_input_degree(1);
this->set_output_degree(1);
this->stype(StructureType::BAG, true);
Expand Down
2 changes: 1 addition & 1 deletion include/pico/Operators/InOut/ReadFromFile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ReadFromFile : public InputOperator<std::string> {
* Creates a new ReadFromFile operator,
* yielding an unordered bounded collection.
*/
ReadFromFile(std::string fname_, unsigned par = def_par())
ReadFromFile(const std::string fname_, unsigned par = def_par())
: InputOperator<std::string>(StructureType::BAG), fname(fname_) {
this->pardeg(par);
}
Expand Down
2 changes: 1 addition & 1 deletion include/pico/Operators/InOut/ReadFromHDFS.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ReadFromHDFS : public InputOperator<std::string> {
*
* Creates a new ReadFromHDFS.
*/
ReadFromHDFS(std::string fname_)
ReadFromHDFS(const td::string fname_)
: InputOperator<std::string>(StructureType::BAG), fname(fname_) {}

/**
Expand Down
3 changes: 1 addition & 2 deletions include/pico/Operators/InOut/ReadFromSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ class ReadFromSocket : public InputOperator<std::string> {
* operating on each token of the stream, delimited by the delimiter value.
*/
ReadFromSocket(std::string server_, int port_, char delimiter_)
: InputOperator<std::string>(StructureType::STREAM) {
server_name = server_;
: InputOperator<std::string>(StructureType::STREAM), server_name(server_) {
port = port_;
delimiter = delimiter_;
}
Expand Down
4 changes: 2 additions & 2 deletions include/pico/Operators/InOut/WriteToDisk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class WriteToDisk : public OutputOperator<In> {
*
* Creates a new WriteToDisk operator by defining its kernel function.
*/
WriteToDisk(std::string fname_, std::function<std::string(In)> func_)
WriteToDisk(const std::string fname_, std::function<std::string(In)> func_)
: OutputOperator<In>(StructureType::BAG),
fname(fname_), //
usr_func(true),
Expand All @@ -70,7 +70,7 @@ class WriteToDisk : public OutputOperator<In> {
*
* Creates a new WriteToDisk writing by ostream.
*/
WriteToDisk(std::string fname_)
WriteToDisk(const std::string fname_)
: OutputOperator<In>(StructureType::BAG), fname(fname_) {}

/**
Expand Down
3 changes: 1 addition & 2 deletions include/pico/Operators/InOut/WriteToStdOut.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ class WriteToStdOut : public OutputOperator<In> {
* Creates a new WriteToStdOut operator by defining its kernel function.
*/
WriteToStdOut(std::function<std::string(In)> func_)
: OutputOperator<In>(StructureType::STREAM) {
: OutputOperator<In>(StructureType::STREAM), func(func_) {
usr_func = true;
func = func_;
}

/**
Expand Down
3 changes: 1 addition & 2 deletions include/pico/Operators/JoinFlatMapByKey.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ class JoinFlatMapByKey : public BinaryOperator<In1, In2, Out> {
*/
JoinFlatMapByKey(
std::function<void(In1 &, In2 &, FlatMapCollector<Out> &)> kernel_,
unsigned par = def_par()) {
kernel = kernel_;
unsigned par = def_par()) : kernel(kernel_) {
this->set_input_degree(2);
this->set_output_degree(1);
this->stype(StructureType::BAG, true);
Expand Down
3 changes: 1 addition & 2 deletions include/pico/Operators/Map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class MapBase : public UnaryOperator<In, Out> {
*
* Creates a new Map operator by defining its kernel function.
*/
MapBase(std::function<Out(In &)> mapf_, unsigned par = def_par()) {
mapf = mapf_;
MapBase(std::function<Out(In &)> mapf_, unsigned par = def_par()) : mapf(mapf_) {
this->set_input_degree(1);
this->set_output_degree(1);
this->stype(StructureType::BAG, true);
Expand Down
3 changes: 1 addition & 2 deletions include/pico/Operators/ReduceByKey.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ class ReduceByKey : public UnaryOperator<In, In> {
* \ingroup op-api
* ReduceByKey copy Constructor
*/
ReduceByKey(const ReduceByKey& copy) : UnaryOperator<In, In>(copy) {
reducef = copy.reducef;
ReduceByKey(const ReduceByKey& copy) : UnaryOperator<In, In>(copy), reducef(copy.reducef) {
win = copy.win ? copy.win->clone() : nullptr;
}

Expand Down
2 changes: 1 addition & 1 deletion include/pico/Pipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ class Pipe {
* Encodes the semantic graph into a dot file.
* @param filename dot file
*/
void to_dotfile(std::string filename) {
void to_dotfile(const std::string filename) {
#ifdef DEBUG
std::cerr << "[PIPE] Writing semantic graph as dot\n";
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class getline_textfile : public base_filter {
typedef pico::Microbatch<pico::Token<std::string>> mb_t;

public:
getline_textfile(std::string fname_) : file(fname_) {
getline_textfile(const std::string fname_) : file(fname_) {
assert(file.is_open());
}

Expand Down Expand Up @@ -213,7 +213,7 @@ class ReadFromFileFFNode_par : public NonOrderingFarm {
// using Worker = read_textfile;

public:
ReadFromFileFFNode_par(int parallelism, std::string fname_) : fname(fname_) {
ReadFromFileFFNode_par(int parallelism, const std::string fname_) : fname(fname_) {
std::vector<ff_node *> workers;
for (int i = 0; i < parallelism; ++i) workers.push_back(new Worker(fname));
auto e = new Partitioner(*this, fname, parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
template <typename In>
class WriteToDiskFFNode : public base_filter {
public:
WriteToDiskFFNode(std::string fname, std::function<std::string(In)> kernel_)
WriteToDiskFFNode(const std::string fname, std::function<std::string(In)> kernel_)
: wkernel(kernel_), outfile(fname) {
if (!outfile.is_open()) {
std::cerr << "Unable to open output file\n";
Expand All @@ -61,7 +61,7 @@ class WriteToDiskFFNode : public base_filter {
template <typename In>
class WriteToDiskFFNode_ostream : public base_filter {
public:
WriteToDiskFFNode_ostream(std::string fname) : outfile(fname) {
WriteToDiskFFNode_ostream(const std::string fname) : outfile(fname) {
if (!outfile.is_open()) {
std::cerr << "Unable to open output file\n";
assert(false);
Expand Down
2 changes: 1 addition & 1 deletion tests/include/common_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
template <typename KV>
/* parse test output into char-int pairs */
static std::unordered_map<char, std::unordered_multiset<int>> result_fltmapjoin(
std::string output_file) {
const std::string output_file) {
std::unordered_map<char, std::unordered_multiset<int>> observed;
auto output_pairs_str = read_lines(output_file);
for (auto pair : output_pairs_str) {
Expand Down
2 changes: 1 addition & 1 deletion tests/iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void seq_flatmap_join(const KvMultiMap& partitions_1,
}
}

KvMultiMap seq_Iter_flatmap_join(KvMultiMap original_partitions, int num_iter) {
KvMultiMap seq_Iter_flatmap_join(const KvMultiMap original_partitions, int num_iter) {
KvMultiMap res;
KvMultiMap& ptr_res = res;
KvMultiMap helper = original_partitions;
Expand Down
2 changes: 1 addition & 1 deletion tests/run_stream_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
##

echo "streaming tests"
cat testdata/pairs.txt | nc -l -p 4000 -q 0 &
nc -l 4000+ < testdata/pairs.txt &
./stream_tests

0 comments on commit 703642e

Please sign in to comment.