Vowpal Wabbit
Classes | Functions
parser.h File Reference
#include "io_buf.h"
#include "parse_primitives.h"
#include "example.h"
#include "future_compat.h"
#include <mutex>
#include <condition_variable>
#include <memory>
#include "queue.h"
#include "object_pool.h"

Go to the source code of this file.

Classes

struct  example_initializer
 
struct  parser
 

Functions

void enable_sources (vw &all, bool quiet, size_t passes, input_options &input_options)
 
void adjust_used_index (vw &all)
 
void lock_done (parser &p)
 
void set_done (vw &all)
 
void reset_source (vw &all, size_t numbits)
 
void finalize_source (parser *source)
 
void set_compressed (parser *par)
 
void free_parser (vw &all)
 

Function Documentation

◆ adjust_used_index()

void adjust_used_index ( vw all)

Definition at line 968 of file parser.cc.

Referenced by VW_Finish_Passes().

969 { /* no longer used */
970 }

◆ enable_sources()

void enable_sources ( vw all,
bool  quiet,
size_t  passes,
input_options input_options 
)

Definition at line 312 of file parser.cc.

References vw::active, parser::audit, vw::audit, binary_print_result(), parser::bound_sock, input_options::cache_files, children(), io_buf::count, io_buf::current, vw::daemon, vw::data_filename, parser::decision_service_json, input_options::dsjson, v_array< T >::empty(), f, io_buf::files, vw::final_prediction_sink, VW::finish(), input_options::foreground, handle_sigterm(), vw::hash_inv, parser::input, isbinary(), input_options::json, input_options::kill_cache, parser::label_sock, vw::length(), MAP_ANONYMOUS, parser::max_fd, vw::num_children, io_buf::open_file(), vw::options, vw::p, parse_cache(), input_options::pid_file, input_options::port, input_options::port_file, vw::print, print_result(), v_array< T >::push_back(), vw::quiet, io_buf::READ, read_cached_features(), read_features_string(), VW::read_lines(), parser::reader, parser::resettable, v_array< T >::resize(), vw::sd, parameters::share(), v_array< T >::size(), parser::sorted_cache, vw::stdin_off, parser::text_reader, THROW, THROWERRNO, vw::trace_message, VW::config::options_i::was_supplied(), vw::weights, and parser::write_cache.

Referenced by parse_sources().

313 {
314  all.p->input->current = 0;
315  parse_cache(all, input_options.cache_files, input_options.kill_cache, quiet);
316 
317  if (all.daemon || all.active)
318  {
319 #ifdef _WIN32
320  WSAData wsaData;
321  int lastError = WSAStartup(MAKEWORD(2, 2), &wsaData);
322  if (lastError != 0)
323  THROWERRNO("WSAStartup() returned error:" << lastError);
324 #endif
325  all.p->bound_sock = (int)socket(PF_INET, SOCK_STREAM, 0);
326  if (all.p->bound_sock < 0)
327  {
328  std::stringstream msg;
329  msg << "socket: " << strerror(errno);
330  all.trace_message << msg.str() << endl;
331  THROW(msg.str().c_str());
332  }
333 
334  int on = 1;
335  if (setsockopt(all.p->bound_sock, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on)) < 0)
336  all.trace_message << "setsockopt SO_REUSEADDR: " << strerror(errno) << endl;
337 
338  // Enable TCP Keep Alive to prevent socket leaks
339  int enableTKA = 1;
340  if (setsockopt(all.p->bound_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&enableTKA, sizeof(enableTKA)) < 0)
341  all.trace_message << "setsockopt SO_KEEPALIVE: " << strerror(errno) << endl;
342 
343  sockaddr_in address;
344  address.sin_family = AF_INET;
345  address.sin_addr.s_addr = htonl(INADDR_ANY);
346  short unsigned int port = 26542;
347  if (all.options->was_supplied("port"))
348  port = (uint16_t)input_options.port;
349  address.sin_port = htons(port);
350 
351  // attempt to bind to socket
352  if (::bind(all.p->bound_sock, (sockaddr*)&address, sizeof(address)) < 0)
353  THROWERRNO("bind");
354 
355  // listen on socket
356  if (listen(all.p->bound_sock, 1) < 0)
357  THROWERRNO("listen");
358 
359  // write port file
360  if (all.options->was_supplied("port_file"))
361  {
362  socklen_t address_size = sizeof(address);
363  if (getsockname(all.p->bound_sock, (sockaddr*)&address, &address_size) < 0)
364  {
365  all.trace_message << "getsockname: " << strerror(errno) << endl;
366  }
367  std::ofstream port_file;
368  port_file.open(input_options.port_file.c_str());
369  if (!port_file.is_open())
370  THROW("error writing port file: " << input_options.port_file);
371 
372  port_file << ntohs(address.sin_port) << endl;
373  port_file.close();
374  }
375 
376  // background process (if foreground is not set)
377  if (!input_options.foreground)
378  {
379  // FIXME switch to posix_spawn
380  if (!all.active && daemon(1, 1))
381  THROWERRNO("daemon");
382  }
383 
384  // write pid file
385  if (all.options->was_supplied("pid_file"))
386  {
387  std::ofstream pid_file;
388  pid_file.open(input_options.pid_file.c_str());
389  if (!pid_file.is_open())
390  THROW("error writing pid file");
391 
392  pid_file << getpid() << endl;
393  pid_file.close();
394  }
395 
396  if (all.daemon && !all.active)
397  {
398 #ifdef _WIN32
399  THROW("not supported on windows");
400 #else
401  fclose(stdin);
402  // weights will be shared across processes, accessible to children
403  all.weights.share(all.length());
404 
405  // learning state to be shared across children
406  shared_data* sd =
407  (shared_data*)mmap(0, sizeof(shared_data), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
408  memcpy(sd, all.sd, sizeof(shared_data));
409  free(all.sd);
410  all.sd = sd;
411 
412  // create children
413  size_t num_children = all.num_children;
414  v_array<int> children = v_init<int>();
415  children.resize(num_children);
416  for (size_t i = 0; i < num_children; i++)
417  {
418  // fork() returns pid if parent, 0 if child
419  // store fork value and run child process if child
420  if ((children[i] = fork()) == 0)
421  {
422  all.quiet |= (i > 0);
423  goto child;
424  }
425  }
426 
427  // install signal handler so we can kill children when killed
428  {
429  struct sigaction sa;
430  // specifically don't set SA_RESTART in sa.sa_flags, so that
431  // waitid will be interrupted by SIGTERM with handler installed
432  memset(&sa, 0, sizeof(sa));
433  sa.sa_handler = handle_sigterm;
434  sigaction(SIGTERM, &sa, nullptr);
435  }
436 
437  while (true)
438  {
439  // wait for child to change state; if finished, then respawn
440  int status;
441  pid_t pid = wait(&status);
442  if (got_sigterm)
443  {
444  for (size_t i = 0; i < num_children; i++) kill(children[i], SIGTERM);
445  VW::finish(all);
446  exit(0);
447  }
448  if (pid < 0)
449  continue;
450  for (size_t i = 0; i < num_children; i++)
451  if (pid == children[i])
452  {
453  if ((children[i] = fork()) == 0)
454  {
455  all.quiet |= (i > 0);
456  goto child;
457  }
458  break;
459  }
460  }
461 
462 #endif
463  }
464 
465 #ifndef _WIN32
466  child:
467 #endif
468  sockaddr_in client_address;
469  socklen_t size = sizeof(client_address);
470  all.p->max_fd = 0;
471  if (!all.quiet)
472  all.trace_message << "calling accept" << endl;
473  int f = (int)accept(all.p->bound_sock, (sockaddr*)&client_address, &size);
474  if (f < 0)
475  THROWERRNO("accept");
476 
477  all.p->label_sock = f;
478  all.print = print_result;
479 
480  all.final_prediction_sink.push_back((size_t)f);
481 
482  all.p->input->files.push_back(f);
483  all.p->max_fd = std::max(f, all.p->max_fd);
484  if (!all.quiet)
485  all.trace_message << "reading data from port " << port << endl;
486 
487  all.p->max_fd++;
488  if (all.active)
490  else
491  {
492  if (isbinary(*(all.p->input)))
493  {
496  }
497  else
498  {
500  }
501  all.p->sorted_cache = true;
502  }
503  all.p->resettable = all.p->write_cache || all.daemon;
504  }
505  else
506  {
507  if (!all.p->input->files.empty())
508  {
509  if (!quiet)
510  all.trace_message << "ignoring text input in favor of cache input" << endl;
511  }
512  else
513  {
514  std::string temp = all.data_filename;
515  if (!quiet)
516  all.trace_message << "Reading datafile = " << temp << endl;
517  try
518  {
519  all.p->input->open_file(temp.c_str(), all.stdin_off, io_buf::READ);
520  }
521  catch (std::exception const&)
522  {
523  // when trying to fix this exception, consider that an empty temp is valid if all.stdin_off is false
524  if (!temp.empty())
525  {
526  all.trace_message << "can't open '" << temp << "', sailing on!" << endl;
527  }
528  else
529  {
530  throw;
531  }
532  }
533 
534  if (input_options.json || input_options.dsjson)
535  {
536  // TODO: change to class with virtual method
537  // --invert_hash requires the audit parser version to save the extra information.
538  if (all.audit || all.hash_inv)
539  {
540  all.p->reader = &read_features_json<true>;
541  all.p->text_reader = &line_to_examples_json<true>;
542  all.p->audit = true;
543  }
544  else
545  {
546  all.p->reader = &read_features_json<false>;
547  all.p->text_reader = &line_to_examples_json<false>;
548  all.p->audit = false;
549  }
550 
551  all.p->decision_service_json = input_options.dsjson;
552  }
553  else
554  {
557  }
558 
559  all.p->resettable = all.p->write_cache;
560  }
561  }
562 
563  if (passes > 1 && !all.p->resettable)
564  THROW("need a cache file for multiple passes : try using --cache_file");
565 
566  all.p->input->count = all.p->input->files.size();
567  if (!quiet && !all.daemon)
568  all.trace_message << "num sources = " << all.p->input->files.size() << endl;
569 }
std::string port_file
Definition: parse_args.h:17
void resize(size_t length)
Definition: v_array.h:69
size_t length()
Definition: global_data.h:513
parameters weights
Definition: global_data.h:537
bool got_sigterm
Definition: parser.cc:66
bool audit
Definition: parser.h:104
size_t current
Definition: io_buf.h:66
VW::config::options_i * options
Definition: global_data.h:428
bool sorted_cache
Definition: parser.h:78
bool hash_inv
Definition: global_data.h:541
int read_features_string(vw *all, v_array< example *> &examples)
void handle_sigterm(int)
Definition: parser.cc:68
v_array< int > final_prediction_sink
Definition: global_data.h:518
io_buf * input
Definition: parser.h:69
void binary_print_result(int f, float res, float weight, v_array< char >)
Definition: global_data.cc:72
bool quiet
Definition: global_data.h:487
std::vector< std::string > cache_files
Definition: parse_args.h:20
int max_fd
Definition: parser.h:98
size_t port
Definition: parse_args.h:15
void finish(vw &all, bool delete_all)
Definition: parse_args.cc:1823
size_t count
Definition: io_buf.h:65
void read_lines(vw *all, char *line, size_t, v_array< example *> &examples)
size_t size() const
Definition: v_array.h:68
static constexpr int READ
Definition: io_buf.h:71
#define MAP_ANONYMOUS
Definition: parser.cc:309
parser * p
Definition: global_data.h:377
virtual int open_file(const char *name, bool stdin_off)
Definition: io_buf.h:90
int(* reader)(vw *, v_array< example *> &examples)
Definition: parser.h:70
void push_back(const T &new_ele)
Definition: v_array.h:107
shared_data * sd
Definition: global_data.h:375
void parse_cache(vw &all, std::vector< std::string > cache_files, bool kill_cache, bool quiet)
Definition: parser.cc:255
v_array< int > files
Definition: io_buf.h:64
bool active
Definition: global_data.h:489
vw_ostream trace_message
Definition: global_data.h:424
virtual bool was_supplied(const std::string &key)=0
bool kill_cache
Definition: parse_args.h:23
void share(size_t length)
bool foreground
Definition: parse_args.h:14
#define THROWERRNO(args)
Definition: vw_exception.h:167
bool decision_service_json
Definition: parser.h:105
void print_result(int f, float res, v_array< char > tag, float lb, float ub)
Definition: bs.cc:136
bool empty() const
Definition: v_array.h:59
int bound_sock
Definition: parser.h:97
void(* text_reader)(vw *, char *, size_t, v_array< example *> &)
Definition: parser.h:71
int label_sock
Definition: parser.h:96
int read_cached_features(vw *all, v_array< example *> &examples)
Definition: cache.cc:65
bool audit
Definition: global_data.h:486
bool stdin_off
Definition: global_data.h:527
size_t num_children
Definition: global_data.h:406
std::string pid_file
Definition: parse_args.h:16
bool isbinary(io_buf &i)
Definition: io_buf.cc:45
bool children(log_multi &b, uint32_t &current, uint32_t &class_index, uint32_t label)
Definition: log_multi.cc:178
#define THROW(args)
Definition: vw_exception.h:181
bool resettable
Definition: parser.h:74
void(* print)(int, float, float, v_array< char >)
Definition: global_data.h:521
float f
Definition: cache.cc:40
bool write_cache
Definition: parser.h:76
std::string data_filename
Definition: global_data.h:403
bool daemon
Definition: global_data.h:405

◆ finalize_source()

void finalize_source ( parser source)

Definition at line 206 of file parser.cc.

References io_buf::close_files(), v_array< T >::empty(), f, io_buf::files, parser::input, v_array< T >::last(), parser::output, and v_array< T >::pop().

Referenced by VW::finish(), and set_compressed().

207 {
208 #ifdef _WIN32
209  int f = _fileno(stdin);
210 #else
211  int f = fileno(stdin);
212 #endif
213  while (!p->input->files.empty() && p->input->files.last() == f) p->input->files.pop();
214  p->input->close_files();
215 
216  delete p->input;
217  p->input = nullptr;
218  p->output->close_files();
219  delete p->output;
220  p->output = nullptr;
221 }
float f
Definition: cache.cc:40

◆ free_parser()

void free_parser ( vw all)

Definition at line 976 of file parser.cc.

References parser::counts, io_buf::currentname, VW::dealloc_example(), label_parser::delete_label, vw::delete_prediction, v_array< T >::delete_v(), VW::object_pool< T, TInitializer, TCleanup >::empty(), parser::example_pool, io_buf::finalname, VW::object_pool< T, TInitializer, TCleanup >::get_object(), parser::gram_mask, parser::lp, parser::name, vw::ngram_strings, parser::output, vw::p, VW::ptr_queue< T >::pop(), parser::ready_parsed_examples, VW::ptr_queue< T >::size(), and parser::words.

Referenced by VW::finish().

977 {
978  all.p->words.delete_v();
979  all.p->name.delete_v();
980 
981  if (!all.ngram_strings.empty())
982  all.p->gram_mask.delete_v();
983 
984  io_buf* output = all.p->output;
985  if (output != nullptr)
986  {
987  output->finalname.delete_v();
988  output->currentname.delete_v();
989  }
990 
991  while (!all.p->example_pool.empty())
992  {
993  example* temp = all.p->example_pool.get_object();
995  }
996 
997  while (all.p->ready_parsed_examples.size() != 0)
998  {
999  example* temp = all.p->ready_parsed_examples.pop();
1001  }
1002  all.p->counts.delete_v();
1003 }
void(* delete_prediction)(void *)
Definition: global_data.h:485
VW::object_pool< example, example_initializer > example_pool
Definition: parser.h:66
v_array< substring > words
Definition: parser.h:63
void(* delete_label)(void *)
Definition: label_parser.h:16
std::vector< std::string > ngram_strings
Definition: global_data.h:469
void dealloc_example(void(*delete_label)(void *), example &ec, void(*delete_prediction)(void *))
Definition: example.cc:219
VW::ptr_queue< example > ready_parsed_examples
Definition: parser.h:67
v_array< size_t > counts
Definition: parser.h:94
v_array< substring > name
Definition: parser.h:64
parser * p
Definition: global_data.h:377
v_array< char > finalname
Definition: io_buf.h:69
v_array< char > currentname
Definition: io_buf.h:68
v_array< size_t > gram_mask
Definition: parser.h:91
size_t size() const
Definition: queue.h:70
Definition: io_buf.h:54
io_buf * output
Definition: parser.h:75
void delete_v()
Definition: v_array.h:98
bool empty() const
Definition: object_pool.h:186
T * pop()
Definition: queue.h:27
label_parser lp
Definition: parser.h:102

◆ lock_done()

void lock_done ( parser p)

Definition at line 571 of file parser.cc.

References parser::done, parser::ready_parsed_examples, and VW::ptr_queue< T >::set_done().

Referenced by parse_dispatch(), and set_done().

572 {
573  p.done = true;
574  // in case get_example() is waiting for a fresh example, wake so it can realize there are no more.
576 }
VW::ptr_queue< example > ready_parsed_examples
Definition: parser.h:67
void set_done()
Definition: queue.h:62
bool done
Definition: parser.h:90

◆ reset_source()

void reset_source ( vw all,
size_t  numbits 
)

Definition at line 126 of file parser.cc.

References v_array< T >::begin(), binary_print_result(), parser::bound_sock, cache_numbits(), v_array< T >::clear(), io_buf::close_file(), io_buf::close_file_or_socket(), io_buf::compressed(), io_buf::current, io_buf::currentname, vw::daemon, f, io_buf::files, vw::final_prediction_sink, io_buf::finalname, recall_tree_ns::find(), io_buf::flush(), parser::input, isbinary(), io_buf::num_files(), io_buf::open_file(), parser::output, parser::output_done, parser::output_lock, vw::p, v_array< T >::pop(), vw::print, print_result(), v_array< T >::push_back(), io_buf::READ, read_cached_features(), read_features_string(), parser::reader, parser::ready_parsed_examples, io_buf::reset_file(), parser::resettable, v_array< T >::size(), VW::ptr_queue< T >::size(), vw::stdin_off, THROW, and parser::write_cache.

Referenced by parse_dispatch().

127 {
128  io_buf* input = all.p->input;
129  input->current = 0;
130  if (all.p->write_cache)
131  {
132  all.p->output->flush();
133  all.p->write_cache = false;
134  all.p->output->close_file();
135  remove(all.p->output->finalname.begin());
136 
137  if (0 != rename(all.p->output->currentname.begin(), all.p->output->finalname.begin()))
138  THROW("WARN: reset_source(vw& all, size_t numbits) cannot rename: " << all.p->output->currentname << " to "
139  << all.p->output->finalname);
140 
141  while (input->num_files() > 0)
142  if (input->compressed())
143  input->close_file();
144  else
145  {
146  int fd = input->files.pop();
147  const auto& fps = all.final_prediction_sink;
148 
149  // If the current popped file is not in the list of final predictions sinks, close it.
150  if (std::find(fps.cbegin(), fps.cend(), fd) == fps.cend())
152  }
153  input->open_file(all.p->output->finalname.begin(), all.stdin_off, io_buf::READ); // pushing is merged into
154  // open_file
156  }
157  if (all.p->resettable == true)
158  {
159  if (all.daemon)
160  {
161  // wait for all predictions to be sent back to client
162  {
163  std::unique_lock<std::mutex> lock(all.p->output_lock);
164  all.p->output_done.wait(lock, [&] { return all.p->ready_parsed_examples.size() == 0; });
165  }
166 
167  // close socket, erase final prediction sink and socket
170  all.p->input->files.clear();
171 
172  sockaddr_in client_address;
173  socklen_t size = sizeof(client_address);
174  int f = (int)accept(all.p->bound_sock, (sockaddr*)&client_address, &size);
175  if (f < 0)
176  THROW("accept: " << strerror(errno));
177 
178  // note: breaking cluster parallel online learning by dropping support for id
179 
180  all.final_prediction_sink.push_back((size_t)f);
181  all.p->input->files.push_back(f);
182 
183  if (isbinary(*(all.p->input)))
184  {
187  }
188  else
189  {
191  all.print = print_result;
192  }
193  }
194  else
195  {
196  for (size_t i = 0; i < input->files.size(); i++)
197  {
198  input->reset_file(input->files[i]);
199  if (cache_numbits(input, input->files[i]) < numbits)
200  THROW("argh, a bug in caching of some sort!");
201  }
202  }
203  }
204 }
virtual bool compressed()
Definition: io_buf.h:214
T pop()
Definition: v_array.h:58
size_t current
Definition: io_buf.h:66
virtual bool close_file()
Definition: io_buf.h:204
uint32_t cache_numbits(io_buf *buf, int filepointer)
Definition: parser.cc:91
int read_features_string(vw *all, v_array< example *> &examples)
VW::ptr_queue< example > ready_parsed_examples
Definition: parser.h:67
v_array< int > final_prediction_sink
Definition: global_data.h:518
io_buf * input
Definition: parser.h:69
void binary_print_result(int f, float res, float weight, v_array< char >)
Definition: global_data.cc:72
virtual void reset_file(int f)
Definition: io_buf.h:136
T *& begin()
Definition: v_array.h:42
static void close_file_or_socket(int f)
Definition: io_buf.cc:152
size_t size() const
Definition: v_array.h:68
static constexpr int READ
Definition: io_buf.h:71
std::mutex output_lock
Definition: parser.h:87
parser * p
Definition: global_data.h:377
virtual int open_file(const char *name, bool stdin_off)
Definition: io_buf.h:90
v_array< char > finalname
Definition: io_buf.h:69
int(* reader)(vw *, v_array< example *> &examples)
Definition: parser.h:70
virtual size_t num_files()
Definition: io_buf.h:165
virtual void flush()
Definition: io_buf.h:194
void push_back(const T &new_ele)
Definition: v_array.h:107
v_array< int > files
Definition: io_buf.h:64
void clear()
Definition: v_array.h:88
v_array< char > currentname
Definition: io_buf.h:68
size_t size() const
Definition: queue.h:70
Definition: io_buf.h:54
node_pred * find(recall_tree &b, uint32_t cn, example &ec)
Definition: recall_tree.cc:126
void print_result(int f, float res, v_array< char > tag, float lb, float ub)
Definition: bs.cc:136
io_buf * output
Definition: parser.h:75
int bound_sock
Definition: parser.h:97
int read_cached_features(vw *all, v_array< example *> &examples)
Definition: cache.cc:65
bool stdin_off
Definition: global_data.h:527
std::condition_variable output_done
Definition: parser.h:88
bool isbinary(io_buf &i)
Definition: io_buf.cc:45
#define THROW(args)
Definition: vw_exception.h:181
bool resettable
Definition: parser.h:74
void(* print)(int, float, float, v_array< char >)
Definition: global_data.h:521
float f
Definition: cache.cc:40
bool write_cache
Definition: parser.h:76
bool daemon
Definition: global_data.h:405

◆ set_compressed()

void set_compressed ( parser par)

Definition at line 82 of file parser.cc.

References finalize_source(), parser::input, and parser::output.

Referenced by parse_source().

83 {
84  finalize_source(par);
85  delete par->input;
86  par->input = new comp_io_buf;
87  delete par->output;
88  par->output = new comp_io_buf;
89 }
void finalize_source(parser *p)
Definition: parser.cc:206
io_buf * input
Definition: parser.h:69
io_buf * output
Definition: parser.h:75

◆ set_done()

void set_done ( vw all)

Definition at line 578 of file parser.cc.

References vw::early_terminate, lock_done(), and vw::p.

Referenced by GD::end_pass(), end_pass(), and finish_example().

579 {
580  all.early_terminate = true;
581  lock_done(*all.p);
582 }
parser * p
Definition: global_data.h:377
bool early_terminate
Definition: global_data.h:500
void lock_done(parser &p)
Definition: parser.cc:571