-
Notifications
You must be signed in to change notification settings - Fork 72
/
Copy pathzmqFlatbuffersTest.cpp
184 lines (145 loc) · 5.16 KB
/
zmqFlatbuffersTest.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Library includes
#include <string>
#include <ostream>
#include <iostream>
#include <memory>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options/parsers.hpp>
#include <boost/program_options/variables_map.hpp>
#include <boost/tokenizer.hpp>
#include <boost/math/special_functions/binomial.hpp>
#include <boost/filesystem.hpp>
#include <cmath>
#include <thread>
#include <Eigen/Core>
#include <Eigen/Dense>
#include <Eigen/SVD>
#include <Eigen/Eigenvalues>
#include <Eigen/Geometry>
#include "grl/flatbuffer/Geometry_generated.h"
#include "grl/flatbuffer/VrepControlPoint_generated.h"
#include "grl/flatbuffer/VrepPath_generated.h"
#include "grl/AzmqFlatbuffer.hpp"
namespace po = boost::program_options;
/// @todo what does this help accomplish?
struct monitor_handler {
#if defined BOOST_MSVC
#pragma pack(push, 1)
struct event_t
{
uint16_t e;
uint32_t i;
};
#pragma pack(pop)
#else
struct event_t
{
uint16_t e;
uint32_t i;
} __attribute__((packed));
#endif
azmq::socket socket_;
std::string role_;
std::vector<event_t> events_;
monitor_handler(boost::asio::io_service & ios, azmq::socket& s, std::string role)
: socket_(s.monitor(ios, ZMQ_EVENT_ALL))
, role_(std::move(role))
{ }
void start()
{
socket_.async_receive([this](boost::system::error_code const& ec,
azmq::message & msg, size_t) {
if (ec)
return;
event_t event;
msg.buffer_copy(boost::asio::buffer(&event, sizeof(event)));
events_.push_back(event);
socket_.flush();
start();
});
}
void cancel()
{
socket_.cancel();
}
};
/// Send messages between a client and server synchronously.
///
/// @see overview of zmq socket types https://sachabarbs.wordpress.com/2014/08/21/zeromq-2-the-socket-types-2/
/// @see bounce is based on https://github.com/zeromq/azmq/blob/master/test/socket/main.cpp
/// @see flatbuffers https://google.github.io/flatbuffers/md__cpp_usage.html
void bounce(azmq::socket & server, azmq::socket & client, bool send_only = true) {
flatbuffers::FlatBufferBuilder fbb;
std::array<uint8_t, 512> buf;
for (int x = 0; x<100; ++x) {
/////////////////////////
// Client sends to server
grl::flatbuffer::Vector3d rv(x,0,0);
auto controlPoint = grl::flatbuffer::CreateVrepControlPoint(fbb,&rv);
grl::flatbuffer::FinishVrepControlPointBuffer(fbb, controlPoint);
client.send(boost::asio::buffer(fbb.GetBufferPointer(), fbb.GetSize()));
std::cout << "sent: " << rv.x() << "\n";
//////////////////////////////
// Server receives from client
if(! send_only) {
auto size = server.receive(boost::asio::buffer(buf));
auto verifier = flatbuffers::Verifier(buf.begin(),buf.size());
auto bufOK = grl::flatbuffer::VerifyVrepControlPointBuffer(verifier);
if(size == fbb.GetSize() && bufOK){
const grl::flatbuffer::VrepControlPoint* VCPin = grl::flatbuffer::GetVrepControlPoint(buf.begin());
std::cout << "received: " << VCPin->position()->x() << "\n";
} else {
std::cout << "wrong size or failed verification. size: "<< size <<" bufOk: " <<bufOK << "\n";
}
}
fbb.Clear();
}
}
void RunSocketMonitor() {
boost::asio::io_service ios;
boost::asio::io_service ios_m;
using socket_ptr = std::unique_ptr<azmq::socket>;
socket_ptr client(new azmq::socket(ios, ZMQ_DEALER));
socket_ptr server(new azmq::socket(ios, ZMQ_DEALER));
monitor_handler client_monitor(ios_m, *client, "client");
monitor_handler server_monitor(ios_m, *server, "server");
client_monitor.start();
server_monitor.start();
std::thread t([&] {
ios_m.run();
});
server->bind("tcp://127.0.0.1:9998");
client->connect("tcp://127.0.0.1:9998");
bounce(*client, *server);
// On Windows monitored sockets must be closed before their monitors,
// otherwise ZMQ crashes or deadlocks during the context termination.
// ZMQ's bug?
client.reset();
server.reset();
std::this_thread::sleep_for(std::chrono::seconds(1));
ios_m.stop();
t.join();
// BOOST_REQUIRE(client_monitor.events_.size() == 3);
// CHECK(client_monitor.events_[0].e == ZMQ_EVENT_CONNECT_DELAYED);
// CHECK(client_monitor.events_[1].e == ZMQ_EVENT_CONNECTED);
// CHECK(client_monitor.events_[2].e == ZMQ_EVENT_MONITOR_STOPPED);
//
// REQUIRE(server_monitor.events_.size() == 4);
// CHECK(server_monitor.events_[0].e == ZMQ_EVENT_LISTENING);
// CHECK(server_monitor.events_[1].e == ZMQ_EVENT_ACCEPTED);
// CHECK(server_monitor.events_[2].e == ZMQ_EVENT_CLOSED);
// CHECK(server_monitor.events_[3].e == ZMQ_EVENT_MONITOR_STOPPED);
}
/**************************************************************************/
/**
* @brief Main function
*
* @param argc Number of input arguments
* @param argv Pointer to input arguments
*
* @return int
*/
int main(int argc,char**argv) {
RunSocketMonitor();
return 0;
}