Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For merge #156

Open
wants to merge 64 commits into
base: for_auto_test
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
08967d1
ADD: master loader and slave loader as well as some config variables …
yukai2014 Apr 12, 2016
a78cbad
FIX: name from 'inject' to 'ingest'
yukai2014 Apr 13, 2016
00b42f0
txn_manager first commit
frankliee Apr 16, 2016
202c782
ADD: stl_guard to implement RAII for STL container; ADD: function imp…
yukai2014 Apr 16, 2016
8ffb3fc
Merge remote-tracking branch 'remotes/origin/lizhifang-dLoad-160416' …
yukai2014 Apr 16, 2016
9c2a150
add log to txn_manager
frankliee Apr 19, 2016
9e7edd0
change path of txn_server/client_test
frankliee Apr 19, 2016
b8e03ae
rm cout
frankliee Apr 19, 2016
95605d8
for cluster test
frankliee Apr 19, 2016
6a09bdd
add txn manager and log to config and enviroment
frankliee Apr 19, 2016
941616e
complete master loader but GetMessage(); ADD: load packet; ADD: resou…
yukai2014 Apr 21, 2016
429e63f
Merge remote-tracking branch 'remotes/origin/lizhifang-dLoad-160421' …
yukai2014 Apr 21, 2016
2801b6d
add txnPartitionReadIterator
frankliee Apr 21, 2016
706393b
find a caf actor bug
frankliee Apr 21, 2016
2a5097c
refactor txn_manager
frankliee Apr 21, 2016
2ec1376
slave loader is to be finished
yukai2014 Apr 22, 2016
78fec97
basiclly implement pipeline of txn manager
frankliee Apr 24, 2016
03c13f4
finish test project
frankliee Apr 24, 2016
06e6b8e
finish slave loader but failed to compile
yukai2014 Apr 24, 2016
bf4f8d4
update txn_test
frankliee Apr 25, 2016
e6db4bd
ADD: simple test case; compile ok
yukai2014 Apr 25, 2016
23f2c02
ADD: some debug info log
yukai2014 Apr 26, 2016
bca77d3
ADD: InitConnector() in table.cpp
yukai2014 Apr 26, 2016
a4877ca
network connection ok, but master loader lack node info
yukai2014 Apr 30, 2016
b74c277
init txn-manager from catalog
frankliee May 1, 2016
5c6d5ff
Completed Loader without MQ! FIX: add bracket around macro definition…
yukai2014 May 1, 2016
e374453
ADD:AMQ consumer in multi-threading; FIX: bug in ChunkStorage.cpp; AD…
yukai2014 May 2, 2016
5315ac8
Merge remote-tracking branch 'origin/lizhifang-dLoad-160502' into Dis…
yukai2014 May 2, 2016
23c2809
Distributed Load basically works; FIX: memory leak by getSchema(); FI…
yukai2014 May 3, 2016
7dfad8a
Distributed Load in one partition Ok! FIX: use std::unordered_map rat…
yukai2014 May 5, 2016
f4ee689
Distributed Load in mutliple parttion OK! FIX: extend waiting time of…
yukai2014 May 5, 2016
2aad51b
OTPIMIZE: replace LOG(INFO) with DLOG(INFO) in loader folder for info…
yukai2014 May 5, 2016
e05330c
OPTIMIZE: support compiling to RELEASE version; ADD: PERFLOG in loader
yukai2014 May 7, 2016
17e8f38
OPTIMIZE: new GetRequesetFromMessage(); OPTIMIZE: set new broker IP f…
yukai2014 May 9, 2016
c04badd
ADD: elapsed time for 1000 transactions
yukai2014 May 9, 2016
11336e3
OPTIMIZE: doing sending packet in another thread
yukai2014 May 9, 2016
8d10e0b
OPTIMIZE: disable some debug log to get better performance;BUG: some …
yukai2014 May 10, 2016
c2f8208
FIX: bug caused by multiple txn_servers and lack of lock in reading c…
yukai2014 May 11, 2016
06a347d
ADD: performance info output
yukai2014 May 13, 2016
6c71cc3
Great performance improvment! OTPIMIZE: set remote actor as static ac…
yukai2014 May 13, 2016
ae12356
ADD: performance log
yukai2014 May 15, 2016
83557cf
Ready for measuring performance. ADD: handle work thread based on CAF…
yukai2014 May 15, 2016
40268e4
Performance test version
yukai2014 May 16, 2016
2067270
FIX: using the old partition reader
yukai2014 May 31, 2016
805faee
FIX: a bug in InitTxnManager()
yukai2014 Jun 1, 2016
33d011d
No complain from compiler; Failed to execute
yukai2014 Jun 2, 2016
365b82e
OPTIMIZE:adjust the order of initialize master_loader and master_node…
yukai2014 Jun 8, 2016
6b59d84
ADD: multi-thread slave loader; ADD: concurrency control on storage w…
yukai2014 Jun 15, 2016
e2d0ae8
ADD: comments about why current scan can't read new data
yukai2014 Jun 16, 2016
6e71ec6
FIX: bug appeared when txn_log is disable
yukai2014 Jun 18, 2016
2977d05
Merge branch 'auto_test_ingest' into ingestion_yukai
frankliee Jul 8, 2016
b1da537
rm useless txn_manager/log_client.hpp,log_client.cpp,log_server.hpp,l…
frankliee Jul 8, 2016
fd067e0
implement timestamp and txn binning for ingestion
frankliee Jul 29, 2016
ca73e30
add txn info for a select query
frankliee Aug 4, 2016
5523080
implement real-time chunk_list for ingest,
frankliee Aug 22, 2016
d6453d8
implement persist
frankliee Aug 28, 2016
01b07d1
fix bugs for ingestion and txn-scan(rt chunk list bug)
frankliee Sep 2, 2016
5db604e
fix bug for memory merge and persist(don't implement push to HDFS)
frankliee Sep 5, 2016
0934b46
fix a bug for rt-scan
frankliee Sep 11, 2016
98d14f8
fix a bug for "count (*) " miss lines caused by block tail wrong init
frankliee Sep 17, 2016
b50393d
fix bug for memory leak(data ingestion) and chunk memory alloc(level …
frankliee Oct 22, 2016
4577613
implement data persitence for ingestion
frankliee Jan 17, 2017
54ae08c
Merge branch 'ingestion_yukai' into for_merge
frankliee Feb 9, 2017
7aa2942
ADD: code comment for txn_manager
frankliee Feb 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ Client::query_result Client::submit(std::string command, std::string &message,

command = "#" + command;

write(m_clientFd, command.c_str(), command.length() + 1);
int bytes = write(m_clientFd, command.c_str(), command.length() + 1);
if (bytes != command.length() + 1) perror("failed to send SQL to claims");
ClientLogging::log("Client: message from server!\n");
const int maxBytes = 75536 + sizeof(int) * 2;
char *buf = new char[maxBytes];
Expand Down
7 changes: 3 additions & 4 deletions Client/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ LDADD = ../catalog/libcatalog.a \
../common/Block/libblock.a \
../common/Schema/libschema.a \
${BOOST_HOME}/stage/lib/libboost_serialization.a \
${BOOST_HOME}/stage/lib/libboost_serialization.so \
${THERON_HOME}/Lib/libtherond.a
${BOOST_HOME}/stage/lib/libboost_serialization.so

noinst_LIBRARIES=libclient.a
libclient_a_SOURCES = \
Expand All @@ -27,5 +26,5 @@ libclient_a_SOURCES = \
ClientResponse.cpp ClientResponse.h \
jsoncpp.cpp

SUBDIRS = json Test
DIST_SUBDIRS = json Test
#SUBDIRS = json Test
#DIST_SUBDIRS = json Test
56 changes: 28 additions & 28 deletions Client/Test/TestSeverClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ static int loadData() {
catalog->add_table(table_1);

////////////////////////////////////Create table
///right//////////////////////////
/// right//////////////////////////
TableDescriptor* table_2 = new TableDescriptor(
"sb",
Environment::getInstance()->getCatalog()->allocate_unique_table_id());
Expand Down Expand Up @@ -212,20 +212,20 @@ static int loadData() {
}
// partitioned by row_id
// for(unsigned
//i=0;i<table_1->getProjectoin(14)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(14)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(14)->getPartitioner()->RegisterPartition(i,2);
// }
//
// // 8 partitions
// for(unsigned
//i=0;i<table_1->getProjectoin(2)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(2)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(2)->getPartitioner()->RegisterPartition(i,1);
// }
//
// for(unsigned
//i=0;i<table_1->getProjectoin(3)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(3)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(3)->getPartitioner()->RegisterPartition(i,3);
// }
Expand All @@ -245,100 +245,100 @@ static int loadData() {
i, 6);
}
// for(unsigned
//i=0;i<table_2->getProjectoin(2)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(2)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(2)->getPartitioner()->RegisterPartition(i,1);
// }

// for(unsigned
//i=0;i<table_2->getProjectoin(3)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(3)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(3)->getPartitioner()->RegisterPartition(i,3);
// }
//
// //partitioned by row_id
// for(unsigned
//i=0;i<table_2->getProjectoin(14)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(14)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(14)->getPartitioner()->RegisterPartition(i,2);
// }
//
// ////////////////////////////////////////
//
// ///////////////////ONE
//MONTH/////////////////////////////////////////////////////////////
// MONTH/////////////////////////////////////////////////////////////
// //CJ
// // 4 partition
// for(unsigned
//i=0;i<table_1->getProjectoin(4)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(4)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(4)->getPartitioner()->RegisterPartition(i,40);
// }
//
// for(unsigned
//i=0;i<table_1->getProjectoin(5)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(5)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(5)->getPartitioner()->RegisterPartition(i,104);
// }
// //8 partitions
// for(unsigned
//i=0;i<table_1->getProjectoin(10)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(10)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(10)->getPartitioner()->RegisterPartition(i,20);
// }
//
// for(unsigned
//i=0;i<table_1->getProjectoin(11)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(11)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(11)->getPartitioner()->RegisterPartition(i,52);
// }
// // 18 partitions
// for(unsigned
//i=0;i<table_1->getProjectoin(6)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(6)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(6)->getPartitioner()->RegisterPartition(i,10);
// }
//
// for(unsigned
//i=0;i<table_1->getProjectoin(7)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(7)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(7)->getPartitioner()->RegisterPartition(i,24);
// }
//
// //SB
// // 4 partition
// for(unsigned
//i=0;i<table_2->getProjectoin(4)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(4)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(4)->getPartitioner()->RegisterPartition(i,39);
// }
//
// for(unsigned
//i=0;i<table_2->getProjectoin(5)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(5)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(5)->getPartitioner()->RegisterPartition(i,131);
// }
// // 8 partitions
// for(unsigned
//i=0;i<table_2->getProjectoin(10)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(10)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(10)->getPartitioner()->RegisterPartition(i,20);
// }
//
// for(unsigned
//i=0;i<table_2->getProjectoin(11)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(11)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(11)->getPartitioner()->RegisterPartition(i,66);
// }
// // 18 partitions
// for(unsigned
//i=0;i<table_2->getProjectoin(6)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(6)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(6)->getPartitioner()->RegisterPartition(i,10);
// }
//
// for(unsigned
//i=0;i<table_2->getProjectoin(7)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(7)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(7)->getPartitioner()->RegisterPartition(i,30);
// }
Expand All @@ -349,51 +349,51 @@ static int loadData() {
// //// cj////
// // 4 partitions
// for(unsigned
//i=0;i<table_1->getProjectoin(8)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(8)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(8)->getPartitioner()->RegisterPartition(i,14);
// }
//
// for(unsigned
//i=0;i<table_1->getProjectoin(9)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(9)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(9)->getPartitioner()->RegisterPartition(i,36);
// }
// // 8 partitions
// for(unsigned
//i=0;i<table_1->getProjectoin(12)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(12)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(12)->getPartitioner()->RegisterPartition(i,7);
// }
//
// for(unsigned
//i=0;i<table_1->getProjectoin(13)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_1->getProjectoin(13)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(0)->getProjectoin(13)->getPartitioner()->RegisterPartition(i,19);
// }
//
// //// sb ////
// // 4 partitions//
// for(unsigned
//i=0;i<table_2->getProjectoin(8)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(8)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(8)->getPartitioner()->RegisterPartition(i,14);
// }
//
// for(unsigned
//i=0;i<table_2->getProjectoin(9)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(9)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(9)->getPartitioner()->RegisterPartition(i,131);
// }
// // 8 partitions//
// for(unsigned
//i=0;i<table_2->getProjectoin(12)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(12)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(12)->getPartitioner()->RegisterPartition(i,7);
// }
//
// for(unsigned
//i=0;i<table_2->getProjectoin(13)->getPartitioner()->getNumberOfPartitions();i++){
// i=0;i<table_2->getProjectoin(13)->getPartitioner()->getNumberOfPartitions();i++){
//
// catalog->getTable(1)->getProjectoin(13)->getPartitioner()->RegisterPartition(i,23);
// }
Expand Down
54 changes: 54 additions & 0 deletions Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,23 @@ int Config::thread_pool_init_thread_num;
int Config::load_thread_num;
int Config::memory_utilization;

bool Config::is_master_loader;
std::string Config::master_loader_ip;
int Config::master_loader_port;
std::string Config::amq_url;
std::string Config::amq_topic;

bool Config::enable_txn_server;
int Config::txn_server_cores;
std::string Config::txn_server_ip;
int Config::txn_server_port;

bool Config::enable_txn_log = false;
std::string Config::txn_log_path;

int Config::master_loader_thread_num;
int Config::slave_loader_thread_num;

Config *Config::getInstance() {
if (instance_ == 0) {
instance_ = new Config();
Expand Down Expand Up @@ -152,6 +169,33 @@ void Config::initialize() {

load_thread_num = getInt("load_thread_num", sysconf(_SC_NPROCESSORS_CONF));

is_master_loader = getBoolean("is_master_loader", true);

master_loader_ip = getString("master_loader_ip", "10.11.1.193");

master_loader_port = getInt("master_loader_port", 9001);

amq_url = getString("amq_url", "58.198.176.92:61616");

amq_topic = getString("amq_topic", "claims");

// txn manager
enable_txn_server = getBoolean("txn_server", true);

txn_server_cores = getInt("txn_server_cores", 4);

txn_server_ip = getString("txn_server_ip", "127.0.0.1");

txn_server_port = getInt("txn_server_port", 9100);

// txn log
enable_txn_log = getBoolean("txn_log", true);

txn_log_path = getString("txn_log_path", ".");

master_loader_thread_num = getInt("master_loader_thread_num", 4);
slave_loader_thread_num = getInt("slave_loader_thread_num", 4);

memory_utilization = getInt("memory_utilization", 100);

#ifdef DEBUG_Config
Expand Down Expand Up @@ -212,6 +256,16 @@ void Config::print_configure() const {
std::cout << "catalog_file:" << catalog_file << std::endl;
std::cout << "codegen:" << enable_codegen << std::endl;
std::cout << "load_thread_num:" << load_thread_num << std::endl;
std::cout << "amq_url:" << amq_url << std::endl;
std::cout << "amq_topic:" << amq_topic << std::endl;

std::cout << "enable_txn_serverr:" << enable_txn_server << std::endl;
std::cout << "txn_server_cores:" << txn_server_cores << std::endl;
std::cout << "txn_server_ip:" << txn_server_ip << std::endl;
std::cout << "txn_server_port:" << txn_server_port << std::endl;

std::cout << "enable_txn_log:" << enable_txn_log << std::endl;
std::cout << "txn_log_path:" << txn_log_path << std::endl;
}

void Config::setConfigFile(std::string file_name) { config_file = file_name; }
16 changes: 16 additions & 0 deletions Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ class Config {
static int memory_utilization;

static int load_thread_num;
static bool is_master_loader;
static std::string master_loader_ip;
static int master_loader_port;
static std::string amq_url;
static std::string amq_topic;

static bool enable_txn_server;
static int txn_server_cores;
static std::string txn_server_ip;
static int txn_server_port;

static bool enable_txn_log;
static std::string txn_log_path;

static int master_loader_thread_num;
static int slave_loader_thread_num;

private:
static Config* instance_;
Expand Down
14 changes: 7 additions & 7 deletions Debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
#define BLOCK_SIZE (64 * 1024)
#define SUCHUNK_SIZE (64 * 1024)
#define CHUNK_SIZE (64 * 1024 * 1024)
#define CHUNK_SIZE_IN_MB 64
#define HEARTBEAT_MESSAGE_LEN 64
#define REGISTER_MESSAGE_LEN 64
#define BLOCK_STATUS_MESSAGE_LEN 256
#define MATCHER_MESSAGE_FILENAME_LEN 256
#define MATCHER_MESSAGE_BMI_LEN 256
#define MATCHER_MESSAGE_PROJECT_LEN 256
#define CHUNK_SIZE_IN_MB (64)
#define HEARTBEAT_MESSAGE_LEN (64)
#define REGISTER_MESSAGE_LEN (64)
#define BLOCK_STATUS_MESSAGE_LEN (256)
#define MATCHER_MESSAGE_FILENAME_LEN (256)
#define MATCHER_MESSAGE_BMI_LEN (256)
#define MATCHER_MESSAGE_PROJECT_LEN (256)
// 分布式文件系统的主节点
#define HDFS_N "10.11.1.190"
// 磁盘目录
Expand Down
Loading