Vowpal Wabbit
Public Member Functions | Private Member Functions | Private Attributes | List of all members
AllReduceSockets Class Reference

#include <allreduce.h>

Inheritance diagram for AllReduceSockets:
AllReduce

Public Member Functions

 AllReduceSockets (std::string pspan_server, const int pport, const size_t punique_id, size_t ptotal, const size_t pnode, bool pquiet)
 
virtual ~AllReduceSockets ()=default
 
template<class T , void(*)(T &, const T &) f>
void all_reduce (T *buffer, const size_t n)
 
- Public Member Functions inherited from AllReduce
 AllReduce (size_t ptotal, const size_t pnode, bool pquiet=false)
 
virtual ~AllReduce ()=default
 

Private Member Functions

void all_reduce_init ()
 
template<class T >
void pass_up (char *buffer, size_t left_read_pos, size_t right_read_pos, size_t &parent_sent_pos)
 
template<class T , void(*)(T &, const T &) f>
void reduce (char *buffer, const size_t n)
 
void pass_down (char *buffer, const size_t parent_read_pos, size_t &children_sent_pos)
 
void broadcast (char *buffer, const size_t n)
 
socket_t sock_connect (const uint32_t ip, const int port)
 
socket_t getsock ()
 

Private Attributes

node_socks socks
 
std::string span_server
 
int port
 
size_t unique_id
 

Additional Inherited Members

- Public Attributes inherited from AllReduce
const size_t total
 
const size_t node
 
bool quiet
 

Detailed Description

Definition at line 179 of file allreduce.h.

Constructor & Destructor Documentation

◆ AllReduceSockets()

AllReduceSockets::AllReduceSockets ( std::string  pspan_server,
const int  pport,
const size_t  punique_id,
size_t  ptotal,
const size_t  pnode,
bool  pquiet 
)
inline

Definition at line 290 of file allreduce.h.

292  : AllReduce(ptotal, pnode, pquiet), span_server(pspan_server), port(pport), unique_id(punique_id)
293  {
294  }
AllReduce(size_t ptotal, const size_t pnode, bool pquiet=false)
Definition: allreduce.h:84
std::string span_server
Definition: allreduce.h:183
size_t unique_id
Definition: allreduce.h:185

◆ ~AllReduceSockets()

virtual AllReduceSockets::~AllReduceSockets ( )
virtualdefault

Member Function Documentation

◆ all_reduce()

template<class T , void(*)(T &, const T &) f>
void AllReduceSockets::all_reduce ( T *  buffer,
const size_t  n 
)
inline

Definition at line 299 of file allreduce.h.

References node_socks::current_master.

300  {
302  all_reduce_init();
303  reduce<T, f>((char*)buffer, n * sizeof(T));
304  broadcast((char*)buffer, n * sizeof(T));
305  }
node_socks socks
Definition: allreduce.h:182
std::string current_master
Definition: allreduce.h:53
std::string span_server
Definition: allreduce.h:183
void broadcast(char *buffer, const size_t n)

◆ all_reduce_init()

void AllReduceSockets::all_reduce_init ( )
private

Definition at line 113 of file allreduce_sockets.cc.

References node_socks::children, CLOSESOCK, node_socks::current_master, f, getsock(), node_socks::parent, port, AllReduce::quiet, sock_connect(), socks, span_server, THROW, THROWERRNO, AllReduce::total, and unique_id.

114 {
115 #ifdef _WIN32
116  WSAData wsaData;
117  int lastError = WSAStartup(MAKEWORD(2, 2), &wsaData);
118  if (lastError != 0)
119  THROWERRNO("WSAStartup() returned error:" << lastError);
120 #endif
121 
122  struct hostent* master = gethostbyname(span_server.c_str());
123 
124  if (master == nullptr)
125  THROWERRNO("gethostbyname(" << span_server << ")");
126 
128 
129  uint32_t master_ip = *((uint32_t*)master->h_addr);
130 
131  socket_t master_sock = sock_connect(master_ip, htons(port));
132  if (send(master_sock, (const char*)&unique_id, sizeof(unique_id), 0) < (int)sizeof(unique_id))
133  {
134  THROW("write unique_id=" << unique_id << " to span server failed");
135  }
136  else
137  {
138  if (!quiet)
139  cerr << "wrote unique_id=" << unique_id << endl;
140  }
141  if (send(master_sock, (const char*)&total, sizeof(total), 0) < (int)sizeof(total))
142  {
143  THROW("write total=" << total << " to span server failed");
144  }
145  else
146  {
147  if (!quiet)
148  cerr << "wrote total=" << total << endl;
149  }
150  if (send(master_sock, (char*)&node, sizeof(node), 0) < (int)sizeof(node))
151  {
152  THROW("write node=" << node << " to span server failed");
153  }
154  else
155  {
156  if (!quiet)
157  cerr << "wrote node=" << node << endl;
158  }
159  int ok;
160  if (recv(master_sock, (char*)&ok, sizeof(ok), 0) < (int)sizeof(ok))
161  {
162  THROW("read ok from span server failed");
163  }
164  else
165  {
166  if (!quiet)
167  cerr << "read ok=" << ok << endl;
168  }
169  if (!ok)
170  THROW("mapper already connected");
171 
172  uint16_t kid_count;
173  uint16_t parent_port;
174  uint32_t parent_ip;
175 
176  if (recv(master_sock, (char*)&kid_count, sizeof(kid_count), 0) < (int)sizeof(kid_count))
177  {
178  THROW("read kid_count from span server failed");
179  }
180  else
181  {
182  if (!quiet)
183  cerr << "read kid_count=" << kid_count << endl;
184  }
185 
186  socket_t sock = -1;
187  short unsigned int netport = htons(26544);
188  if (kid_count > 0)
189  {
190  sock = getsock();
191  sockaddr_in address;
192  address.sin_family = AF_INET;
193  address.sin_addr.s_addr = htonl(INADDR_ANY);
194  address.sin_port = netport;
195 
196  bool listening = false;
197  while (!listening)
198  {
199  if (::bind(sock, (sockaddr*)&address, sizeof(address)) < 0)
200  {
201 #ifdef _WIN32
202  if (WSAGetLastError() == WSAEADDRINUSE)
203 #else
204  if (errno == EADDRINUSE)
205 #endif
206  {
207  netport = htons(ntohs(netport) + 1);
208  address.sin_port = netport;
209  }
210  else
211  THROWERRNO("bind");
212  }
213  else
214  {
215  if (listen(sock, kid_count) < 0)
216  {
217  if (!quiet)
218  cerr << "listen: " << strerror(errno) << endl;
219  CLOSESOCK(sock);
220  sock = getsock();
221  }
222  else
223  {
224  listening = true;
225  }
226  }
227  }
228  }
229 
230  if (send(master_sock, (const char*)&netport, sizeof(netport), 0) < (int)sizeof(netport))
231  THROW("write netport failed!");
232 
233  if (recv(master_sock, (char*)&parent_ip, sizeof(parent_ip), 0) < (int)sizeof(parent_ip))
234  {
235  THROW("read parent_ip failed!");
236  }
237  else
238  {
239  char dotted_quad[INET_ADDRSTRLEN];
240  if (nullptr == inet_ntop(AF_INET, (char*)&parent_ip, dotted_quad, INET_ADDRSTRLEN))
241  {
242  if (!quiet)
243  cerr << "read parent_ip=" << parent_ip << "(inet_ntop: " << strerror(errno) << ")" << endl;
244  }
245  else
246  {
247  if (!quiet)
248  cerr << "read parent_ip=" << dotted_quad << endl;
249  }
250  }
251  if (recv(master_sock, (char*)&parent_port, sizeof(parent_port), 0) < (int)sizeof(parent_port))
252  {
253  THROW("read parent_port failed!");
254  }
255  else
256  {
257  if (!quiet)
258  cerr << "read parent_port=" << parent_port << endl;
259  }
260 
261  CLOSESOCK(master_sock);
262 
263  if (parent_ip != (uint32_t)-1)
264  {
265  socks.parent = sock_connect(parent_ip, parent_port);
266  }
267  else
268  socks.parent = -1;
269 
270  socks.children[0] = -1;
271  socks.children[1] = -1;
272  for (int i = 0; i < kid_count; i++)
273  {
274  sockaddr_in child_address;
275  socklen_t size = sizeof(child_address);
276  socket_t f = accept(sock, (sockaddr*)&child_address, &size);
277  if (f < 0)
278  THROWERRNO("accept");
279 
280  // char hostname[NI_MAXHOST];
281  // char servInfo[NI_MAXSERV];
282  // getnameinfo((sockaddr *) &child_address, sizeof(sockaddr), hostname, NI_MAXHOST, servInfo, NI_MAXSERV,
283  // NI_NUMERICSERV); cerr << "connected to " << hostname << ':' << ntohs(port) << endl;
284  socks.children[i] = f;
285  }
286 
287  if (kid_count > 0)
288  CLOSESOCK(sock);
289 }
socket_t children[2]
Definition: allreduce.h:55
node_socks socks
Definition: allreduce.h:182
bool quiet
Definition: allreduce.h:82
const size_t total
Definition: allreduce.h:80
socket_t parent
Definition: allreduce.h:54
#define CLOSESOCK
Definition: allreduce.h:43
std::string current_master
Definition: allreduce.h:53
#define THROWERRNO(args)
Definition: vw_exception.h:167
std::string span_server
Definition: allreduce.h:183
int socket_t
Definition: allreduce.h:42
socket_t sock_connect(const uint32_t ip, const int port)
size_t unique_id
Definition: allreduce.h:185
#define THROW(args)
Definition: vw_exception.h:181
float f
Definition: cache.cc:40

◆ broadcast()

void AllReduceSockets::broadcast ( char *  buffer,
const size_t  n 
)
private

Definition at line 313 of file allreduce_sockets.cc.

References ar_buf_size, node_socks::children, node_socks::parent, pass_down(), socks, and THROW.

314 {
315  size_t parent_read_pos = 0; // First unread float from parent
316  size_t children_sent_pos = 0; // First unsent float to children
317  // parent_sent_pos <= left_read_pos
318  // parent_sent_pos <= right_read_pos
319 
320  if (socks.parent == -1)
321  {
322  parent_read_pos = n;
323  }
324  if (socks.children[0] == -1 && socks.children[1] == -1)
325  children_sent_pos = n;
326 
327  while (parent_read_pos < n || children_sent_pos < n)
328  {
329  pass_down(buffer, parent_read_pos, children_sent_pos);
330  if (parent_read_pos >= n && children_sent_pos >= n)
331  break;
332 
333  if (socks.parent != -1)
334  {
335  // there is data to be read from the parent
336  if (parent_read_pos == n)
337  THROW("I think parent has no data to send but he thinks he has");
338 
339  size_t count = std::min(ar_buf_size, n - parent_read_pos);
340  int read_size = recv(socks.parent, buffer + parent_read_pos, (int)count, 0);
341  if (read_size == -1)
342  {
343  THROW(" recv from parent: " << strerror(errno));
344  }
345  parent_read_pos += read_size;
346  }
347  }
348 }
constexpr size_t ar_buf_size
Definition: allreduce.h:49
socket_t children[2]
Definition: allreduce.h:55
node_socks socks
Definition: allreduce.h:182
socket_t parent
Definition: allreduce.h:54
void pass_down(char *buffer, const size_t parent_read_pos, size_t &children_sent_pos)
#define THROW(args)
Definition: vw_exception.h:181

◆ getsock()

socket_t AllReduceSockets::getsock ( )
private

Definition at line 85 of file allreduce_sockets.cc.

References AllReduce::quiet, and THROWERRNO.

Referenced by all_reduce_init().

86 {
87  socket_t sock = socket(PF_INET, SOCK_STREAM, 0);
88  if (sock < 0)
89  THROWERRNO("socket");
90 
91  // SO_REUSEADDR will allow port rebinding on Windows, causing multiple instances
92  // of VW on the same machine to potentially contact the wrong tree node.
93 #ifndef _WIN32
94  int on = 1;
95  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on)) < 0)
96  {
97  if (!quiet)
98  cerr << "setsockopt SO_REUSEADDR: " << strerror(errno) << endl;
99  }
100 #endif
101 
102  // Enable TCP Keep Alive to prevent socket leaks
103  int enableTKA = 1;
104  if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&enableTKA, sizeof(enableTKA)) < 0)
105  {
106  if (!quiet)
107  cerr << "setsockopt SO_KEEPALIVE: " << strerror(errno) << endl;
108  }
109 
110  return sock;
111 }
bool quiet
Definition: allreduce.h:82
#define THROWERRNO(args)
Definition: vw_exception.h:167
int socket_t
Definition: allreduce.h:42

◆ pass_down()

void AllReduceSockets::pass_down ( char *  buffer,
const size_t  parent_read_pos,
size_t &  children_sent_pos 
)
private

Definition at line 291 of file allreduce_sockets.cc.

References ar_buf_size, node_socks::children, socks, and THROW.

Referenced by broadcast().

292 {
293  size_t my_bufsize = std::min(ar_buf_size, (parent_read_pos - children_sent_pos));
294 
295  if (my_bufsize > 0)
296  {
297  // going to pass up this chunk of data to the children
298  if (socks.children[0] != -1 &&
299  send(socks.children[0], buffer + children_sent_pos, (int)my_bufsize, 0) < (int)my_bufsize)
300  {
301  THROW("Write to left child failed");
302  }
303  if (socks.children[1] != -1 &&
304  send(socks.children[1], buffer + children_sent_pos, (int)my_bufsize, 0) < (int)my_bufsize)
305  {
306  THROW("Write to right child failed");
307  }
308 
309  children_sent_pos += my_bufsize;
310  }
311 }
constexpr size_t ar_buf_size
Definition: allreduce.h:49
socket_t children[2]
Definition: allreduce.h:55
node_socks socks
Definition: allreduce.h:182
#define THROW(args)
Definition: vw_exception.h:181

◆ pass_up()

template<class T >
void AllReduceSockets::pass_up ( char *  buffer,
size_t  left_read_pos,
size_t  right_read_pos,
size_t &  parent_sent_pos 
)
inlineprivate

Definition at line 190 of file allreduce.h.

References ar_buf_size, node_socks::parent, and THROW.

191  {
192  size_t my_bufsize =
193  std::min(ar_buf_size, std::min(left_read_pos, right_read_pos) / sizeof(T) * sizeof(T) - parent_sent_pos);
194 
195  if (my_bufsize > 0)
196  { // going to pass up this chunk of data to the parent
197  int write_size = send(socks.parent, buffer + parent_sent_pos, (int)my_bufsize, 0);
198  if (write_size < 0)
199  THROW("Write to parent failed " << my_bufsize << " " << write_size << " " << parent_sent_pos << " "
200  << left_read_pos << " " << right_read_pos);
201 
202  parent_sent_pos += write_size;
203  }
204  }
constexpr size_t ar_buf_size
Definition: allreduce.h:49
node_socks socks
Definition: allreduce.h:182
socket_t parent
Definition: allreduce.h:54
#define THROW(args)
Definition: vw_exception.h:181

◆ reduce()

template<class T , void(*)(T &, const T &) f>
void AllReduceSockets::reduce ( char *  buffer,
const size_t  n 
)
inlineprivate

Definition at line 207 of file allreduce.h.

References ar_buf_size, node_socks::children, node_socks::parent, THROW, and THROWERRNO.

208  {
209  fd_set fds;
210  FD_ZERO(&fds);
211  if (socks.children[0] != -1)
212  FD_SET(socks.children[0], &fds);
213  if (socks.children[1] != -1)
214  FD_SET(socks.children[1], &fds);
215 
216  socket_t max_fd = std::max(socks.children[0], socks.children[1]) + 1;
217  size_t child_read_pos[2] = {0, 0}; // First unread float from left and right children
218  int child_unprocessed[2] = {0, 0}; // The number of bytes sent by the child but not yet added to the buffer
219  char child_read_buf[2][ar_buf_size + sizeof(T) - 1];
220  size_t parent_sent_pos = 0; // First unsent float to parent
221  // parent_sent_pos <= left_read_pos
222  // parent_sent_pos <= right_read_pos
223 
224  if (socks.children[0] == -1)
225  {
226  child_read_pos[0] = n;
227  }
228  if (socks.children[1] == -1)
229  {
230  child_read_pos[1] = n;
231  }
232 
233  while (parent_sent_pos < n || child_read_pos[0] < n || child_read_pos[1] < n)
234  {
235  if (socks.parent != -1)
236  pass_up<T>(buffer, child_read_pos[0], child_read_pos[1], parent_sent_pos);
237 
238  if (parent_sent_pos >= n && child_read_pos[0] >= n && child_read_pos[1] >= n)
239  break;
240 
241  if (child_read_pos[0] < n || child_read_pos[1] < n)
242  {
243  if (max_fd > 0 && select((int)max_fd, &fds, nullptr, nullptr, nullptr) == -1)
244  THROWERRNO("select");
245 
246  for (int i = 0; i < 2; i++)
247  {
248  if (socks.children[i] != -1 && FD_ISSET(socks.children[i], &fds))
249  { // there is data to be left from left child
250  if (child_read_pos[i] == n)
251  THROW("I think child has no data to send but he thinks he has "
252  << FD_ISSET(socks.children[0], &fds) << " " << FD_ISSET(socks.children[1], &fds));
253 
254  size_t count = std::min(ar_buf_size, n - child_read_pos[i]);
255  int read_size = recv(socks.children[i], &child_read_buf[i][child_unprocessed[i]], (int)count, 0);
256  if (read_size == -1)
257  THROWERRNO("recv from child");
258 
259  addbufs<T, f>((T*)buffer + child_read_pos[i] / sizeof(T), (T*)child_read_buf[i],
260  (child_read_pos[i] + read_size) / sizeof(T) - child_read_pos[i] / sizeof(T));
261 
262  child_read_pos[i] += read_size;
263  int old_unprocessed = child_unprocessed[i];
264  child_unprocessed[i] = child_read_pos[i] % (int)sizeof(T);
265  for (int j = 0; j < child_unprocessed[i]; j++)
266  {
267  child_read_buf[i][j] =
268  child_read_buf[i][((old_unprocessed + read_size) / (int)sizeof(T)) * sizeof(T) + j];
269  }
270 
271  if (child_read_pos[i] == n) // Done reading parent
272  FD_CLR(socks.children[i], &fds);
273  }
274  else if (socks.children[i] != -1 && child_read_pos[i] != n)
275  FD_SET(socks.children[i], &fds);
276  }
277  }
278  if (socks.parent == -1 && child_read_pos[0] == n && child_read_pos[1] == n)
279  parent_sent_pos = n;
280  }
281  }
constexpr size_t ar_buf_size
Definition: allreduce.h:49
socket_t children[2]
Definition: allreduce.h:55
node_socks socks
Definition: allreduce.h:182
socket_t parent
Definition: allreduce.h:54
#define THROWERRNO(args)
Definition: vw_exception.h:167
int socket_t
Definition: allreduce.h:42
#define THROW(args)
Definition: vw_exception.h:181

◆ sock_connect()

socket_t AllReduceSockets::sock_connect ( const uint32_t  ip,
const int  port 
)
private

Definition at line 36 of file allreduce_sockets.cc.

References port, AllReduce::quiet, THROW, and THROWERRNO.

Referenced by all_reduce_init().

37 {
38  socket_t sock = socket(PF_INET, SOCK_STREAM, 0);
39  if (sock == -1)
40  THROWERRNO("socket");
41 
42  sockaddr_in far_end;
43  far_end.sin_family = AF_INET;
44  far_end.sin_port = port;
45 
46  far_end.sin_addr = *(in_addr*)&ip;
47  memset(&far_end.sin_zero, '\0', 8);
48 
49  {
50  char dotted_quad[INET_ADDRSTRLEN];
51  if (nullptr == inet_ntop(AF_INET, &(far_end.sin_addr), dotted_quad, INET_ADDRSTRLEN))
52  THROWERRNO("inet_ntop");
53 
54  char hostname[NI_MAXHOST];
55  char servInfo[NI_MAXSERV];
56  if (getnameinfo((sockaddr*)&far_end, sizeof(sockaddr), hostname, NI_MAXHOST, servInfo, NI_MAXSERV, NI_NUMERICSERV))
57  THROWERRNO("getnameinfo(" << dotted_quad << ")");
58 
59  if (!quiet)
60  cerr << "connecting to " << dotted_quad << " = " << hostname << ':' << ntohs(port) << endl;
61  }
62 
63  size_t count = 0;
64  int ret;
65  while ((ret = connect(sock, (sockaddr*)&far_end, sizeof(far_end))) == -1 && count < 100)
66  {
67  count++;
68  std::stringstream msg;
69  if (!quiet)
70  {
71  msg << "connect attempt " << count << " failed: " << strerror(errno);
72  cerr << msg.str() << endl;
73  }
74 #ifdef _WIN32
75  Sleep(1);
76 #else
77  sleep(1);
78 #endif
79  }
80  if (ret == -1)
81  THROW("cannot connect");
82  return sock;
83 }
bool quiet
Definition: allreduce.h:82
#define THROWERRNO(args)
Definition: vw_exception.h:167
int socket_t
Definition: allreduce.h:42
#define THROW(args)
Definition: vw_exception.h:181

Member Data Documentation

◆ port

int AllReduceSockets::port
private

Definition at line 184 of file allreduce.h.

Referenced by all_reduce_init(), and sock_connect().

◆ socks

node_socks AllReduceSockets::socks
private

Definition at line 182 of file allreduce.h.

Referenced by all_reduce_init(), broadcast(), and pass_down().

◆ span_server

std::string AllReduceSockets::span_server
private

Definition at line 183 of file allreduce.h.

Referenced by all_reduce_init().

◆ unique_id

size_t AllReduceSockets::unique_id
private

Definition at line 185 of file allreduce.h.

Referenced by all_reduce_init().


The documentation for this class was generated from the following files: