26 #include <arpa/inet.h> 28 #include <sys/timeb.h> 38 socket_t sock = socket(PF_INET, SOCK_STREAM, 0);
43 far_end.sin_family = AF_INET;
44 far_end.sin_port =
port;
46 far_end.sin_addr = *(in_addr*)&ip;
47 memset(&far_end.sin_zero,
'\0', 8);
50 char dotted_quad[INET_ADDRSTRLEN];
51 if (
nullptr == inet_ntop(AF_INET, &(far_end.sin_addr), dotted_quad, INET_ADDRSTRLEN))
54 char hostname[NI_MAXHOST];
55 char servInfo[NI_MAXSERV];
56 if (getnameinfo((sockaddr*)&far_end,
sizeof(sockaddr), hostname, NI_MAXHOST, servInfo, NI_MAXSERV, NI_NUMERICSERV))
57 THROWERRNO(
"getnameinfo(" << dotted_quad <<
")");
60 cerr <<
"connecting to " << dotted_quad <<
" = " << hostname <<
':' << ntohs(port) << endl;
65 while ((ret = connect(sock, (sockaddr*)&far_end,
sizeof(far_end))) == -1 && count < 100)
68 std::stringstream msg;
71 msg <<
"connect attempt " << count <<
" failed: " << strerror(errno);
72 cerr << msg.str() << endl;
81 THROW(
"cannot connect");
87 socket_t sock = socket(PF_INET, SOCK_STREAM, 0);
95 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (
char*)&on,
sizeof(on)) < 0)
98 cerr <<
"setsockopt SO_REUSEADDR: " << strerror(errno) << endl;
104 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (
char*)&enableTKA,
sizeof(enableTKA)) < 0)
107 cerr <<
"setsockopt SO_KEEPALIVE: " << strerror(errno) << endl;
117 int lastError = WSAStartup(MAKEWORD(2, 2), &wsaData);
119 THROWERRNO(
"WSAStartup() returned error:" << lastError);
122 struct hostent* master = gethostbyname(
span_server.c_str());
124 if (master ==
nullptr)
129 uint32_t master_ip = *((uint32_t*)master->h_addr);
132 if (send(master_sock, (
const char*)&
unique_id,
sizeof(unique_id), 0) < (
int)
sizeof(unique_id))
134 THROW(
"write unique_id=" << unique_id <<
" to span server failed");
139 cerr <<
"wrote unique_id=" << unique_id << endl;
141 if (send(master_sock, (
const char*)&
total,
sizeof(total), 0) < (
int)
sizeof(total))
143 THROW(
"write total=" << total <<
" to span server failed");
148 cerr <<
"wrote total=" << total << endl;
150 if (send(master_sock, (
char*)&
node,
sizeof(node), 0) < (
int)
sizeof(node))
152 THROW(
"write node=" << node <<
" to span server failed");
157 cerr <<
"wrote node=" << node << endl;
160 if (recv(master_sock, (
char*)&ok,
sizeof(ok), 0) < (
int)
sizeof(ok))
162 THROW(
"read ok from span server failed");
167 cerr <<
"read ok=" << ok << endl;
170 THROW(
"mapper already connected");
173 uint16_t parent_port;
176 if (recv(master_sock, (
char*)&kid_count,
sizeof(kid_count), 0) < (
int)
sizeof(kid_count))
178 THROW(
"read kid_count from span server failed");
183 cerr <<
"read kid_count=" << kid_count << endl;
187 short unsigned int netport = htons(26544);
192 address.sin_family = AF_INET;
193 address.sin_addr.s_addr = htonl(INADDR_ANY);
194 address.sin_port = netport;
196 bool listening =
false;
199 if (::bind(sock, (sockaddr*)&address,
sizeof(address)) < 0)
202 if (WSAGetLastError() == WSAEADDRINUSE)
204 if (errno == EADDRINUSE)
207 netport = htons(ntohs(netport) + 1);
208 address.sin_port = netport;
215 if (listen(sock, kid_count) < 0)
218 cerr <<
"listen: " << strerror(errno) << endl;
230 if (send(master_sock, (
const char*)&netport,
sizeof(netport), 0) < (
int)
sizeof(netport))
231 THROW(
"write netport failed!");
233 if (recv(master_sock, (
char*)&parent_ip,
sizeof(parent_ip), 0) < (
int)
sizeof(parent_ip))
235 THROW(
"read parent_ip failed!");
239 char dotted_quad[INET_ADDRSTRLEN];
240 if (
nullptr == inet_ntop(AF_INET, (
char*)&parent_ip, dotted_quad, INET_ADDRSTRLEN))
243 cerr <<
"read parent_ip=" << parent_ip <<
"(inet_ntop: " << strerror(errno) <<
")" << endl;
248 cerr <<
"read parent_ip=" << dotted_quad << endl;
251 if (recv(master_sock, (
char*)&parent_port,
sizeof(parent_port), 0) < (
int)
sizeof(parent_port))
253 THROW(
"read parent_port failed!");
258 cerr <<
"read parent_port=" << parent_port << endl;
263 if (parent_ip != (uint32_t)-1)
272 for (
int i = 0; i < kid_count; i++)
274 sockaddr_in child_address;
275 socklen_t size =
sizeof(child_address);
276 socket_t f = accept(sock, (sockaddr*)&child_address, &size);
293 size_t my_bufsize = std::min(
ar_buf_size, (parent_read_pos - children_sent_pos));
299 send(
socks.
children[0], buffer + children_sent_pos, (
int)my_bufsize, 0) < (
int)my_bufsize)
301 THROW(
"Write to left child failed");
304 send(
socks.
children[1], buffer + children_sent_pos, (
int)my_bufsize, 0) < (
int)my_bufsize)
306 THROW(
"Write to right child failed");
309 children_sent_pos += my_bufsize;
315 size_t parent_read_pos = 0;
316 size_t children_sent_pos = 0;
325 children_sent_pos = n;
327 while (parent_read_pos < n || children_sent_pos < n)
329 pass_down(buffer, parent_read_pos, children_sent_pos);
330 if (parent_read_pos >= n && children_sent_pos >= n)
336 if (parent_read_pos == n)
337 THROW(
"I think parent has no data to send but he thinks he has");
339 size_t count = std::min(
ar_buf_size, n - parent_read_pos);
340 int read_size = recv(
socks.
parent, buffer + parent_read_pos, (
int)count, 0);
343 THROW(
" recv from parent: " << strerror(errno));
345 parent_read_pos += read_size;
constexpr size_t ar_buf_size
std::string current_master
void pass_down(char *buffer, const size_t parent_read_pos, size_t &children_sent_pos)
socket_t sock_connect(const uint32_t ip, const int port)
void broadcast(char *buffer, const size_t n)