From 703642ec9bf5c186f18cf7a7d97aa62dbfe57d37 Mon Sep 17 00:00:00 2001 From: Alberto Riccardo Martinelli Date: Mon, 25 Feb 2019 14:07:59 +0100 Subject: [PATCH] [#71] solved performance issues --- examples/stock-market/seq_stock_tweets.cpp | 2 +- include/pico/Operators/FlatMap.hpp | 3 +-- include/pico/Operators/FoldReduce.hpp | 4 +--- include/pico/Operators/InOut/ReadFromFile.hpp | 2 +- include/pico/Operators/InOut/ReadFromHDFS.hpp | 2 +- include/pico/Operators/InOut/ReadFromSocket.hpp | 3 +-- include/pico/Operators/InOut/WriteToDisk.hpp | 4 ++-- include/pico/Operators/InOut/WriteToStdOut.hpp | 3 +-- include/pico/Operators/JoinFlatMapByKey.hpp | 3 +-- include/pico/Operators/Map.hpp | 3 +-- include/pico/Operators/ReduceByKey.hpp | 3 +-- include/pico/Pipe.hpp | 2 +- .../OperatorsFFNodes/InOut/ReadFromFileFFNode.hpp | 4 ++-- .../OperatorsFFNodes/InOut/WriteToDiskFFNode.hpp | 4 ++-- tests/include/common_functions.hpp | 2 +- tests/iteration.cpp | 2 +- tests/run_stream_tests.sh | 2 +- 17 files changed, 20 insertions(+), 28 deletions(-) diff --git a/examples/stock-market/seq_stock_tweets.cpp b/examples/stock-market/seq_stock_tweets.cpp index cc981852..30474a54 100644 --- a/examples/stock-market/seq_stock_tweets.cpp +++ b/examples/stock-market/seq_stock_tweets.cpp @@ -48,7 +48,7 @@ class TweetProcessor { } } - void operator()(std::string tweet) { + void operator()(const std::string tweet) { std::string stock; unsigned len; if (filter(tweet, stock, len)) { diff --git a/include/pico/Operators/FlatMap.hpp b/include/pico/Operators/FlatMap.hpp index 30e1a635..4a936a93 100644 --- a/include/pico/Operators/FlatMap.hpp +++ b/include/pico/Operators/FlatMap.hpp @@ -61,8 +61,7 @@ class FlatMapBase : public UnaryOperator { * Creates a new FlatMap operator by defining its kernel function. */ FlatMapBase(std::function &)> flatmapf_, - unsigned par = def_par()) { - flatmapf = flatmapf_; + unsigned par = def_par()) : flatmapf(flatmapf_) { this->set_input_degree(1); this->set_output_degree(1); this->stype(StructureType::BAG, true); diff --git a/include/pico/Operators/FoldReduce.hpp b/include/pico/Operators/FoldReduce.hpp index 899ca1ba..37e23072 100644 --- a/include/pico/Operators/FoldReduce.hpp +++ b/include/pico/Operators/FoldReduce.hpp @@ -40,9 +40,7 @@ class FoldReduce : public UnaryOperator { */ FoldReduce(std::function foldf_, std::function reducef_, // - unsigned par = def_par()) { - foldf = foldf_; - reducef = reducef_; + unsigned par = def_par()) : foldf(foldf_), reducef(reducef_) { this->set_input_degree(1); this->set_output_degree(1); this->stype(StructureType::BAG, true); diff --git a/include/pico/Operators/InOut/ReadFromFile.hpp b/include/pico/Operators/InOut/ReadFromFile.hpp index 1f8357f6..e6546163 100644 --- a/include/pico/Operators/InOut/ReadFromFile.hpp +++ b/include/pico/Operators/InOut/ReadFromFile.hpp @@ -55,7 +55,7 @@ class ReadFromFile : public InputOperator { * Creates a new ReadFromFile operator, * yielding an unordered bounded collection. */ - ReadFromFile(std::string fname_, unsigned par = def_par()) + ReadFromFile(const std::string fname_, unsigned par = def_par()) : InputOperator(StructureType::BAG), fname(fname_) { this->pardeg(par); } diff --git a/include/pico/Operators/InOut/ReadFromHDFS.hpp b/include/pico/Operators/InOut/ReadFromHDFS.hpp index eae2a2c8..5b524390 100644 --- a/include/pico/Operators/InOut/ReadFromHDFS.hpp +++ b/include/pico/Operators/InOut/ReadFromHDFS.hpp @@ -53,7 +53,7 @@ class ReadFromHDFS : public InputOperator { * * Creates a new ReadFromHDFS. */ - ReadFromHDFS(std::string fname_) + ReadFromHDFS(const td::string fname_) : InputOperator(StructureType::BAG), fname(fname_) {} /** diff --git a/include/pico/Operators/InOut/ReadFromSocket.hpp b/include/pico/Operators/InOut/ReadFromSocket.hpp index 11e61515..96ffad30 100644 --- a/include/pico/Operators/InOut/ReadFromSocket.hpp +++ b/include/pico/Operators/InOut/ReadFromSocket.hpp @@ -53,8 +53,7 @@ class ReadFromSocket : public InputOperator { * operating on each token of the stream, delimited by the delimiter value. */ ReadFromSocket(std::string server_, int port_, char delimiter_) - : InputOperator(StructureType::STREAM) { - server_name = server_; + : InputOperator(StructureType::STREAM), server_name(server_) { port = port_; delimiter = delimiter_; } diff --git a/include/pico/Operators/InOut/WriteToDisk.hpp b/include/pico/Operators/InOut/WriteToDisk.hpp index a585ad76..7d9ac74a 100644 --- a/include/pico/Operators/InOut/WriteToDisk.hpp +++ b/include/pico/Operators/InOut/WriteToDisk.hpp @@ -57,7 +57,7 @@ class WriteToDisk : public OutputOperator { * * Creates a new WriteToDisk operator by defining its kernel function. */ - WriteToDisk(std::string fname_, std::function func_) + WriteToDisk(const std::string fname_, std::function func_) : OutputOperator(StructureType::BAG), fname(fname_), // usr_func(true), @@ -70,7 +70,7 @@ class WriteToDisk : public OutputOperator { * * Creates a new WriteToDisk writing by ostream. */ - WriteToDisk(std::string fname_) + WriteToDisk(const std::string fname_) : OutputOperator(StructureType::BAG), fname(fname_) {} /** diff --git a/include/pico/Operators/InOut/WriteToStdOut.hpp b/include/pico/Operators/InOut/WriteToStdOut.hpp index 95a33dfe..6bfc989b 100644 --- a/include/pico/Operators/InOut/WriteToStdOut.hpp +++ b/include/pico/Operators/InOut/WriteToStdOut.hpp @@ -57,9 +57,8 @@ class WriteToStdOut : public OutputOperator { * Creates a new WriteToStdOut operator by defining its kernel function. */ WriteToStdOut(std::function func_) - : OutputOperator(StructureType::STREAM) { + : OutputOperator(StructureType::STREAM), func(func_) { usr_func = true; - func = func_; } /** diff --git a/include/pico/Operators/JoinFlatMapByKey.hpp b/include/pico/Operators/JoinFlatMapByKey.hpp index ab23060d..0bdfa57d 100644 --- a/include/pico/Operators/JoinFlatMapByKey.hpp +++ b/include/pico/Operators/JoinFlatMapByKey.hpp @@ -46,8 +46,7 @@ class JoinFlatMapByKey : public BinaryOperator { */ JoinFlatMapByKey( std::function &)> kernel_, - unsigned par = def_par()) { - kernel = kernel_; + unsigned par = def_par()) : kernel(kernel_) { this->set_input_degree(2); this->set_output_degree(1); this->stype(StructureType::BAG, true); diff --git a/include/pico/Operators/Map.hpp b/include/pico/Operators/Map.hpp index 0ff201f8..32b1d086 100644 --- a/include/pico/Operators/Map.hpp +++ b/include/pico/Operators/Map.hpp @@ -59,8 +59,7 @@ class MapBase : public UnaryOperator { * * Creates a new Map operator by defining its kernel function. */ - MapBase(std::function mapf_, unsigned par = def_par()) { - mapf = mapf_; + MapBase(std::function mapf_, unsigned par = def_par()) : mapf(mapf_) { this->set_input_degree(1); this->set_output_degree(1); this->stype(StructureType::BAG, true); diff --git a/include/pico/Operators/ReduceByKey.hpp b/include/pico/Operators/ReduceByKey.hpp index 58f6a6b5..eb0a1150 100644 --- a/include/pico/Operators/ReduceByKey.hpp +++ b/include/pico/Operators/ReduceByKey.hpp @@ -65,8 +65,7 @@ class ReduceByKey : public UnaryOperator { * \ingroup op-api * ReduceByKey copy Constructor */ - ReduceByKey(const ReduceByKey& copy) : UnaryOperator(copy) { - reducef = copy.reducef; + ReduceByKey(const ReduceByKey& copy) : UnaryOperator(copy), reducef(copy.reducef) { win = copy.win ? copy.win->clone() : nullptr; } diff --git a/include/pico/Pipe.hpp b/include/pico/Pipe.hpp index 7a42805a..9cd798d4 100644 --- a/include/pico/Pipe.hpp +++ b/include/pico/Pipe.hpp @@ -531,7 +531,7 @@ class Pipe { * Encodes the semantic graph into a dot file. * @param filename dot file */ - void to_dotfile(std::string filename) { + void to_dotfile(const std::string filename) { #ifdef DEBUG std::cerr << "[PIPE] Writing semantic graph as dot\n"; #endif diff --git a/include/pico/ff_implementation/OperatorsFFNodes/InOut/ReadFromFileFFNode.hpp b/include/pico/ff_implementation/OperatorsFFNodes/InOut/ReadFromFileFFNode.hpp index f869dcd5..72527bea 100644 --- a/include/pico/ff_implementation/OperatorsFFNodes/InOut/ReadFromFileFFNode.hpp +++ b/include/pico/ff_implementation/OperatorsFFNodes/InOut/ReadFromFileFFNode.hpp @@ -69,7 +69,7 @@ class getline_textfile : public base_filter { typedef pico::Microbatch> mb_t; public: - getline_textfile(std::string fname_) : file(fname_) { + getline_textfile(const std::string fname_) : file(fname_) { assert(file.is_open()); } @@ -213,7 +213,7 @@ class ReadFromFileFFNode_par : public NonOrderingFarm { // using Worker = read_textfile; public: - ReadFromFileFFNode_par(int parallelism, std::string fname_) : fname(fname_) { + ReadFromFileFFNode_par(int parallelism, const std::string fname_) : fname(fname_) { std::vector workers; for (int i = 0; i < parallelism; ++i) workers.push_back(new Worker(fname)); auto e = new Partitioner(*this, fname, parallelism); diff --git a/include/pico/ff_implementation/OperatorsFFNodes/InOut/WriteToDiskFFNode.hpp b/include/pico/ff_implementation/OperatorsFFNodes/InOut/WriteToDiskFFNode.hpp index 4ccb1590..2c6827e3 100644 --- a/include/pico/ff_implementation/OperatorsFFNodes/InOut/WriteToDiskFFNode.hpp +++ b/include/pico/ff_implementation/OperatorsFFNodes/InOut/WriteToDiskFFNode.hpp @@ -36,7 +36,7 @@ template class WriteToDiskFFNode : public base_filter { public: - WriteToDiskFFNode(std::string fname, std::function kernel_) + WriteToDiskFFNode(const std::string fname, std::function kernel_) : wkernel(kernel_), outfile(fname) { if (!outfile.is_open()) { std::cerr << "Unable to open output file\n"; @@ -61,7 +61,7 @@ class WriteToDiskFFNode : public base_filter { template class WriteToDiskFFNode_ostream : public base_filter { public: - WriteToDiskFFNode_ostream(std::string fname) : outfile(fname) { + WriteToDiskFFNode_ostream(const std::string fname) : outfile(fname) { if (!outfile.is_open()) { std::cerr << "Unable to open output file\n"; assert(false); diff --git a/tests/include/common_functions.hpp b/tests/include/common_functions.hpp index 96dd7bed..112e89c0 100644 --- a/tests/include/common_functions.hpp +++ b/tests/include/common_functions.hpp @@ -28,7 +28,7 @@ template /* parse test output into char-int pairs */ static std::unordered_map> result_fltmapjoin( - std::string output_file) { + const std::string output_file) { std::unordered_map> observed; auto output_pairs_str = read_lines(output_file); for (auto pair : output_pairs_str) { diff --git a/tests/iteration.cpp b/tests/iteration.cpp index 7ee3b1e0..9c5688e0 100644 --- a/tests/iteration.cpp +++ b/tests/iteration.cpp @@ -159,7 +159,7 @@ void seq_flatmap_join(const KvMultiMap& partitions_1, } } -KvMultiMap seq_Iter_flatmap_join(KvMultiMap original_partitions, int num_iter) { +KvMultiMap seq_Iter_flatmap_join(const KvMultiMap original_partitions, int num_iter) { KvMultiMap res; KvMultiMap& ptr_res = res; KvMultiMap helper = original_partitions; diff --git a/tests/run_stream_tests.sh b/tests/run_stream_tests.sh index 6e7af733..42df293c 100755 --- a/tests/run_stream_tests.sh +++ b/tests/run_stream_tests.sh @@ -20,5 +20,5 @@ ## echo "streaming tests" -cat testdata/pairs.txt | nc -l -p 4000 -q 0 & +nc -l 4000+ < testdata/pairs.txt & ./stream_tests