Vowpal Wabbit
Namespaces | Macros | Functions | Variables
parser.cc File Reference
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <csignal>
#include <fstream>
#include <netdb.h>
#include <cerrno>
#include <cstdio>
#include <cassert>
#include "parse_example.h"
#include "cache.h"
#include "unique_sort.h"
#include "constant.h"
#include "vw.h"
#include "interactions.h"
#include "vw_exception.h"
#include "parse_example_json.h"
#include "parse_dispatch_loop.h"
#include "parse_args.h"

Go to the source code of this file.

Namespaces

 VW
 

Macros

#define MAP_ANONYMOUS   MAP_ANON
 

Functions

void handle_sigterm (int)
 
bool is_test_only (uint32_t counter, uint32_t period, uint32_t after, bool holdout_off, uint32_t target_modulus)
 
void set_compressed (parser *par)
 
uint32_t cache_numbits (io_buf *buf, int filepointer)
 
void reset_source (vw &all, size_t numbits)
 
void finalize_source (parser *p)
 
void make_write_cache (vw &all, std::string &newname, bool quiet)
 
void parse_cache (vw &all, std::vector< std::string > cache_files, bool kill_cache, bool quiet)
 
void enable_sources (vw &all, bool quiet, size_t passes, input_options &input_options)
 
void lock_done (parser &p)
 
void set_done (vw &all)
 
void addgrams (vw &all, size_t ngram, size_t skip_gram, features &fs, size_t initial_length, v_array< size_t > &gram_mask, size_t skips)
 
void generateGrams (vw &all, example *&ex)
 
void end_pass_example (vw &all, example *ae)
 
void feature_limit (vw &all, example *ex)
 
exampleVW::get_unused_example (vw *all)
 
void VW::setup_examples (vw &all, v_array< example *> &examples)
 
void VW::setup_example (vw &all, example *ae)
 
exampleVW::new_unused_example (vw &all)
 
exampleVW::read_example (vw &all, char *example_line)
 
exampleVW::read_example (vw &all, std::string example_line)
 
void VW::add_constant_feature (vw &vw, example *ec)
 
void VW::add_label (example *ec, float label, float weight, float base)
 
exampleVW::import_example (vw &all, const std::string &label, primitive_feature_space *features, size_t len)
 
primitive_feature_space * VW::export_example (vw &all, example *ec, size_t &len)
 
void VW::releaseFeatureSpace (primitive_feature_space *features, size_t len)
 
void VW::parse_example_label (vw &all, example &ec, std::string label)
 
void VW::empty_example (vw &, example &ec)
 
void VW::clean_example (vw &, example &, bool rewind)
 
void VW::finish_example (vw &, example &)
 
void thread_dispatch (vw &all, v_array< example *> examples)
 
void main_parse_loop (vw *all)
 
exampleVW::get_example (parser *p)
 
float VW::get_topic_prediction (example *ec, size_t i)
 
float VW::get_label (example *ec)
 
float VW::get_importance (example *ec)
 
float VW::get_initial (example *ec)
 
float VW::get_prediction (example *ec)
 
float VW::get_cost_sensitive_prediction (example *ec)
 
v_array< float > & VW::get_cost_sensitive_prediction_confidence_scores (example *ec)
 
uint32_t * VW::get_multilabel_predictions (example *ec, size_t &len)
 
float VW::get_action_score (example *ec, size_t i)
 
size_t VW::get_action_score_length (example *ec)
 
size_t VW::get_tag_length (example *ec)
 
const char * VW::get_tag (example *ec)
 
size_t VW::get_feature_number (example *ec)
 
float VW::get_confidence (example *ec)
 
void adjust_used_index (vw &)
 
void VW::start_parser (vw &all)
 
void free_parser (vw &all)
 
void VW::end_parser (vw &all)
 
bool VW::is_ring_example (vw &all, example *ae)
 

Variables

bool got_sigterm
 

Macro Definition Documentation

◆ MAP_ANONYMOUS

#define MAP_ANONYMOUS   MAP_ANON

Definition at line 309 of file parser.cc.

Referenced by enable_sources(), and dense_parameters::share().

Function Documentation

◆ addgrams()

void addgrams ( vw all,
size_t  ngram,
size_t  skip_gram,
features fs,
size_t  initial_length,
v_array< size_t > &  gram_mask,
size_t  skips 
)

Definition at line 584 of file parser.cc.

References features::indicies, v_array< T >::last(), v_array< T >::pop(), v_array< T >::push_back(), features::push_back(), quadratic_constant, v_array< T >::size(), and features::space_names.

Referenced by generateGrams().

586 {
587  if (ngram == 0 && gram_mask.last() < initial_length)
588  {
589  size_t last = initial_length - gram_mask.last();
590  for (size_t i = 0; i < last; i++)
591  {
592  uint64_t new_index = fs.indicies[i];
593  for (size_t n = 1; n < gram_mask.size(); n++)
594  new_index = new_index * quadratic_constant + fs.indicies[i + gram_mask[n]];
595 
596  fs.push_back(1., new_index);
597  if (fs.space_names.size() > 0)
598  {
599  std::string feature_name(fs.space_names[i].get()->second);
600  for (size_t n = 1; n < gram_mask.size(); n++)
601  {
602  feature_name += std::string("^");
603  feature_name += std::string(fs.space_names[i + gram_mask[n]].get()->second);
604  }
605  fs.space_names.push_back(audit_strings_ptr(new audit_strings(fs.space_names[i].get()->first, feature_name)));
606  }
607  }
608  }
609  if (ngram > 0)
610  {
611  gram_mask.push_back(gram_mask.last() + 1 + skips);
612  addgrams(all, ngram - 1, skip_gram, fs, initial_length, gram_mask, 0);
613  gram_mask.pop();
614  }
615  if (skip_gram > 0 && ngram > 0)
616  addgrams(all, ngram, skip_gram - 1, fs, initial_length, gram_mask, skips + 1);
617 }
T pop()
Definition: v_array.h:58
void push_back(feature_value v, feature_index i)
std::shared_ptr< audit_strings > audit_strings_ptr
Definition: feature_group.h:23
constexpr int quadratic_constant
Definition: constant.h:7
v_array< feature_index > indicies
void addgrams(vw &all, size_t ngram, size_t skip_gram, features &fs, size_t initial_length, v_array< size_t > &gram_mask, size_t skips)
Definition: parser.cc:584
size_t size() const
Definition: v_array.h:68
void push_back(const T &new_ele)
Definition: v_array.h:107
v_array< audit_strings_ptr > space_names
T last() const
Definition: v_array.h:57
std::pair< std::string, std::string > audit_strings
Definition: feature_group.h:22

◆ adjust_used_index()

void adjust_used_index ( vw )

Definition at line 968 of file parser.cc.

Referenced by VW_Finish_Passes().

969 { /* no longer used */
970 }

◆ cache_numbits()

uint32_t cache_numbits ( io_buf buf,
int  filepointer 
)

Definition at line 91 of file parser.cc.

References io_buf::read_file(), THROW, and VW::version().

Referenced by parse_cache(), and reset_source().

92 {
93  size_t v_length;
94  buf->read_file(filepointer, (char*)&v_length, sizeof(v_length));
95  if (v_length > 61)
96  THROW("cache version too long, cache file is probably invalid");
97 
98  if (v_length == 0)
99  THROW("cache version too short, cache file is probably invalid");
100 
101  std::vector<char> t(v_length);
102  buf->read_file(filepointer, t.data(), v_length);
103  VW::version_struct v_tmp(t.data());
104  if (v_tmp != VW::version)
105  {
106  // cout << "cache has possibly incompatible version, rebuilding" << endl;
107  return 0;
108  }
109 
110  char temp;
111  if (buf->read_file(filepointer, &temp, 1) < 1)
112  THROW("failed to read");
113 
114  if (temp != 'c')
115  THROW("data file is not a cache file");
116 
117  uint32_t cache_numbits;
118  if (buf->read_file(filepointer, &cache_numbits, sizeof(cache_numbits)) < (int)sizeof(cache_numbits))
119  {
120  return true;
121  }
122 
123  return cache_numbits;
124 }
uint32_t cache_numbits(io_buf *buf, int filepointer)
Definition: parser.cc:91
const version_struct version(PACKAGE_VERSION)
virtual ssize_t read_file(int f, void *buf, size_t nbytes)
Definition: io_buf.h:167
#define THROW(args)
Definition: vw_exception.h:181

◆ enable_sources()

void enable_sources ( vw all,
bool  quiet,
size_t  passes,
input_options input_options 
)

Definition at line 312 of file parser.cc.

References vw::active, parser::audit, vw::audit, binary_print_result(), parser::bound_sock, input_options::cache_files, children(), io_buf::count, io_buf::current, vw::daemon, vw::data_filename, parser::decision_service_json, input_options::dsjson, v_array< T >::empty(), f, io_buf::files, vw::final_prediction_sink, VW::finish(), input_options::foreground, handle_sigterm(), vw::hash_inv, parser::input, isbinary(), input_options::json, input_options::kill_cache, parser::label_sock, vw::length(), MAP_ANONYMOUS, parser::max_fd, vw::num_children, io_buf::open_file(), vw::options, vw::p, parse_cache(), input_options::pid_file, input_options::port, input_options::port_file, vw::print, print_result(), v_array< T >::push_back(), vw::quiet, io_buf::READ, read_cached_features(), read_features_string(), VW::read_lines(), parser::reader, parser::resettable, v_array< T >::resize(), vw::sd, parameters::share(), v_array< T >::size(), parser::sorted_cache, vw::stdin_off, parser::text_reader, THROW, THROWERRNO, vw::trace_message, VW::config::options_i::was_supplied(), vw::weights, and parser::write_cache.

Referenced by parse_sources().

313 {
314  all.p->input->current = 0;
315  parse_cache(all, input_options.cache_files, input_options.kill_cache, quiet);
316 
317  if (all.daemon || all.active)
318  {
319 #ifdef _WIN32
320  WSAData wsaData;
321  int lastError = WSAStartup(MAKEWORD(2, 2), &wsaData);
322  if (lastError != 0)
323  THROWERRNO("WSAStartup() returned error:" << lastError);
324 #endif
325  all.p->bound_sock = (int)socket(PF_INET, SOCK_STREAM, 0);
326  if (all.p->bound_sock < 0)
327  {
328  std::stringstream msg;
329  msg << "socket: " << strerror(errno);
330  all.trace_message << msg.str() << endl;
331  THROW(msg.str().c_str());
332  }
333 
334  int on = 1;
335  if (setsockopt(all.p->bound_sock, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on)) < 0)
336  all.trace_message << "setsockopt SO_REUSEADDR: " << strerror(errno) << endl;
337 
338  // Enable TCP Keep Alive to prevent socket leaks
339  int enableTKA = 1;
340  if (setsockopt(all.p->bound_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&enableTKA, sizeof(enableTKA)) < 0)
341  all.trace_message << "setsockopt SO_KEEPALIVE: " << strerror(errno) << endl;
342 
343  sockaddr_in address;
344  address.sin_family = AF_INET;
345  address.sin_addr.s_addr = htonl(INADDR_ANY);
346  short unsigned int port = 26542;
347  if (all.options->was_supplied("port"))
348  port = (uint16_t)input_options.port;
349  address.sin_port = htons(port);
350 
351  // attempt to bind to socket
352  if (::bind(all.p->bound_sock, (sockaddr*)&address, sizeof(address)) < 0)
353  THROWERRNO("bind");
354 
355  // listen on socket
356  if (listen(all.p->bound_sock, 1) < 0)
357  THROWERRNO("listen");
358 
359  // write port file
360  if (all.options->was_supplied("port_file"))
361  {
362  socklen_t address_size = sizeof(address);
363  if (getsockname(all.p->bound_sock, (sockaddr*)&address, &address_size) < 0)
364  {
365  all.trace_message << "getsockname: " << strerror(errno) << endl;
366  }
367  std::ofstream port_file;
368  port_file.open(input_options.port_file.c_str());
369  if (!port_file.is_open())
370  THROW("error writing port file: " << input_options.port_file);
371 
372  port_file << ntohs(address.sin_port) << endl;
373  port_file.close();
374  }
375 
376  // background process (if foreground is not set)
377  if (!input_options.foreground)
378  {
379  // FIXME switch to posix_spawn
380  if (!all.active && daemon(1, 1))
381  THROWERRNO("daemon");
382  }
383 
384  // write pid file
385  if (all.options->was_supplied("pid_file"))
386  {
387  std::ofstream pid_file;
388  pid_file.open(input_options.pid_file.c_str());
389  if (!pid_file.is_open())
390  THROW("error writing pid file");
391 
392  pid_file << getpid() << endl;
393  pid_file.close();
394  }
395 
396  if (all.daemon && !all.active)
397  {
398 #ifdef _WIN32
399  THROW("not supported on windows");
400 #else
401  fclose(stdin);
402  // weights will be shared across processes, accessible to children
403  all.weights.share(all.length());
404 
405  // learning state to be shared across children
406  shared_data* sd =
407  (shared_data*)mmap(0, sizeof(shared_data), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
408  memcpy(sd, all.sd, sizeof(shared_data));
409  free(all.sd);
410  all.sd = sd;
411 
412  // create children
413  size_t num_children = all.num_children;
414  v_array<int> children = v_init<int>();
415  children.resize(num_children);
416  for (size_t i = 0; i < num_children; i++)
417  {
418  // fork() returns pid if parent, 0 if child
419  // store fork value and run child process if child
420  if ((children[i] = fork()) == 0)
421  {
422  all.quiet |= (i > 0);
423  goto child;
424  }
425  }
426 
427  // install signal handler so we can kill children when killed
428  {
429  struct sigaction sa;
430  // specifically don't set SA_RESTART in sa.sa_flags, so that
431  // waitid will be interrupted by SIGTERM with handler installed
432  memset(&sa, 0, sizeof(sa));
433  sa.sa_handler = handle_sigterm;
434  sigaction(SIGTERM, &sa, nullptr);
435  }
436 
437  while (true)
438  {
439  // wait for child to change state; if finished, then respawn
440  int status;
441  pid_t pid = wait(&status);
442  if (got_sigterm)
443  {
444  for (size_t i = 0; i < num_children; i++) kill(children[i], SIGTERM);
445  VW::finish(all);
446  exit(0);
447  }
448  if (pid < 0)
449  continue;
450  for (size_t i = 0; i < num_children; i++)
451  if (pid == children[i])
452  {
453  if ((children[i] = fork()) == 0)
454  {
455  all.quiet |= (i > 0);
456  goto child;
457  }
458  break;
459  }
460  }
461 
462 #endif
463  }
464 
465 #ifndef _WIN32
466  child:
467 #endif
468  sockaddr_in client_address;
469  socklen_t size = sizeof(client_address);
470  all.p->max_fd = 0;
471  if (!all.quiet)
472  all.trace_message << "calling accept" << endl;
473  int f = (int)accept(all.p->bound_sock, (sockaddr*)&client_address, &size);
474  if (f < 0)
475  THROWERRNO("accept");
476 
477  all.p->label_sock = f;
478  all.print = print_result;
479 
480  all.final_prediction_sink.push_back((size_t)f);
481 
482  all.p->input->files.push_back(f);
483  all.p->max_fd = std::max(f, all.p->max_fd);
484  if (!all.quiet)
485  all.trace_message << "reading data from port " << port << endl;
486 
487  all.p->max_fd++;
488  if (all.active)
490  else
491  {
492  if (isbinary(*(all.p->input)))
493  {
496  }
497  else
498  {
500  }
501  all.p->sorted_cache = true;
502  }
503  all.p->resettable = all.p->write_cache || all.daemon;
504  }
505  else
506  {
507  if (!all.p->input->files.empty())
508  {
509  if (!quiet)
510  all.trace_message << "ignoring text input in favor of cache input" << endl;
511  }
512  else
513  {
514  std::string temp = all.data_filename;
515  if (!quiet)
516  all.trace_message << "Reading datafile = " << temp << endl;
517  try
518  {
519  all.p->input->open_file(temp.c_str(), all.stdin_off, io_buf::READ);
520  }
521  catch (std::exception const&)
522  {
523  // when trying to fix this exception, consider that an empty temp is valid if all.stdin_off is false
524  if (!temp.empty())
525  {
526  all.trace_message << "can't open '" << temp << "', sailing on!" << endl;
527  }
528  else
529  {
530  throw;
531  }
532  }
533 
534  if (input_options.json || input_options.dsjson)
535  {
536  // TODO: change to class with virtual method
537  // --invert_hash requires the audit parser version to save the extra information.
538  if (all.audit || all.hash_inv)
539  {
540  all.p->reader = &read_features_json<true>;
541  all.p->text_reader = &line_to_examples_json<true>;
542  all.p->audit = true;
543  }
544  else
545  {
546  all.p->reader = &read_features_json<false>;
547  all.p->text_reader = &line_to_examples_json<false>;
548  all.p->audit = false;
549  }
550 
551  all.p->decision_service_json = input_options.dsjson;
552  }
553  else
554  {
557  }
558 
559  all.p->resettable = all.p->write_cache;
560  }
561  }
562 
563  if (passes > 1 && !all.p->resettable)
564  THROW("need a cache file for multiple passes : try using --cache_file");
565 
566  all.p->input->count = all.p->input->files.size();
567  if (!quiet && !all.daemon)
568  all.trace_message << "num sources = " << all.p->input->files.size() << endl;
569 }
std::string port_file
Definition: parse_args.h:17
void resize(size_t length)
Definition: v_array.h:69
size_t length()
Definition: global_data.h:513
parameters weights
Definition: global_data.h:537
bool got_sigterm
Definition: parser.cc:66
bool audit
Definition: parser.h:104
size_t current
Definition: io_buf.h:66
VW::config::options_i * options
Definition: global_data.h:428
bool sorted_cache
Definition: parser.h:78
bool hash_inv
Definition: global_data.h:541
int read_features_string(vw *all, v_array< example *> &examples)
void handle_sigterm(int)
Definition: parser.cc:68
v_array< int > final_prediction_sink
Definition: global_data.h:518
io_buf * input
Definition: parser.h:69
void binary_print_result(int f, float res, float weight, v_array< char >)
Definition: global_data.cc:72
bool quiet
Definition: global_data.h:487
std::vector< std::string > cache_files
Definition: parse_args.h:20
int max_fd
Definition: parser.h:98
size_t port
Definition: parse_args.h:15
void finish(vw &all, bool delete_all)
Definition: parse_args.cc:1823
size_t count
Definition: io_buf.h:65
void read_lines(vw *all, char *line, size_t, v_array< example *> &examples)
size_t size() const
Definition: v_array.h:68
static constexpr int READ
Definition: io_buf.h:71
#define MAP_ANONYMOUS
Definition: parser.cc:309
parser * p
Definition: global_data.h:377
virtual int open_file(const char *name, bool stdin_off)
Definition: io_buf.h:90
int(* reader)(vw *, v_array< example *> &examples)
Definition: parser.h:70
void push_back(const T &new_ele)
Definition: v_array.h:107
shared_data * sd
Definition: global_data.h:375
void parse_cache(vw &all, std::vector< std::string > cache_files, bool kill_cache, bool quiet)
Definition: parser.cc:255
v_array< int > files
Definition: io_buf.h:64
bool active
Definition: global_data.h:489
vw_ostream trace_message
Definition: global_data.h:424
virtual bool was_supplied(const std::string &key)=0
bool kill_cache
Definition: parse_args.h:23
void share(size_t length)
bool foreground
Definition: parse_args.h:14
#define THROWERRNO(args)
Definition: vw_exception.h:167
bool decision_service_json
Definition: parser.h:105
void print_result(int f, float res, v_array< char > tag, float lb, float ub)
Definition: bs.cc:136
bool empty() const
Definition: v_array.h:59
int bound_sock
Definition: parser.h:97
void(* text_reader)(vw *, char *, size_t, v_array< example *> &)
Definition: parser.h:71
int label_sock
Definition: parser.h:96
int read_cached_features(vw *all, v_array< example *> &examples)
Definition: cache.cc:65
bool audit
Definition: global_data.h:486
bool stdin_off
Definition: global_data.h:527
size_t num_children
Definition: global_data.h:406
std::string pid_file
Definition: parse_args.h:16
bool isbinary(io_buf &i)
Definition: io_buf.cc:45
bool children(log_multi &b, uint32_t &current, uint32_t &class_index, uint32_t label)
Definition: log_multi.cc:178
#define THROW(args)
Definition: vw_exception.h:181
bool resettable
Definition: parser.h:74
void(* print)(int, float, float, v_array< char >)
Definition: global_data.h:521
float f
Definition: cache.cc:40
bool write_cache
Definition: parser.h:76
std::string data_filename
Definition: global_data.h:403
bool daemon
Definition: global_data.h:405

◆ end_pass_example()

void end_pass_example ( vw all,
example ae 
)

Definition at line 644 of file parser.cc.

References label_parser::default_label, example::end_pass, parser::in_pass_counter, example::l, parser::lp, and vw::p.

645 {
646  all.p->lp.default_label(&ae->l);
647  ae->end_pass = true;
648  all.p->in_pass_counter = 0;
649 }
void(* default_label)(void *)
Definition: label_parser.h:12
parser * p
Definition: global_data.h:377
polylabel l
Definition: example.h:57
uint32_t in_pass_counter
Definition: parser.h:83
bool end_pass
Definition: example.h:77
label_parser lp
Definition: parser.h:102

◆ feature_limit()

void feature_limit ( vw all,
example ex 
)

Definition at line 651 of file parser.cc.

References example_predict::feature_space, example_predict::indices, vw::limit, vw::parse_mask, features::sort(), and unique_features().

Referenced by VW::setup_example().

652 {
653  for (namespace_index index : ex->indices)
654  if (all.limit[index] < ex->feature_space[index].size())
655  {
656  features& fs = ex->feature_space[index];
657  fs.sort(all.parse_mask);
658  unique_features(fs, all.limit[index]);
659  }
660 }
v_array< namespace_index > indices
the core definition of a set of features.
std::array< features, NUM_NAMESPACES > feature_space
std::array< uint32_t, NUM_NAMESPACES > limit
Definition: global_data.h:474
bool sort(uint64_t parse_mask)
unsigned char namespace_index
uint64_t parse_mask
Definition: global_data.h:453
void unique_features(features &fs, int max)
Definition: unique_sort.cc:10

◆ finalize_source()

void finalize_source ( parser p)

Definition at line 206 of file parser.cc.

References io_buf::close_files(), v_array< T >::empty(), f, io_buf::files, parser::input, v_array< T >::last(), parser::output, and v_array< T >::pop().

Referenced by VW::finish(), and set_compressed().

207 {
208 #ifdef _WIN32
209  int f = _fileno(stdin);
210 #else
211  int f = fileno(stdin);
212 #endif
213  while (!p->input->files.empty() && p->input->files.last() == f) p->input->files.pop();
214  p->input->close_files();
215 
216  delete p->input;
217  p->input = nullptr;
218  p->output->close_files();
219  delete p->output;
220  p->output = nullptr;
221 }
void close_files()
Definition: io_buf.h:218
T pop()
Definition: v_array.h:58
io_buf * input
Definition: parser.h:69
v_array< int > files
Definition: io_buf.h:64
io_buf * output
Definition: parser.h:75
bool empty() const
Definition: v_array.h:59
T last() const
Definition: v_array.h:57
float f
Definition: cache.cc:40

◆ free_parser()

void free_parser ( vw all)

Definition at line 976 of file parser.cc.

References parser::counts, io_buf::currentname, VW::dealloc_example(), label_parser::delete_label, vw::delete_prediction, v_array< T >::delete_v(), VW::object_pool< T, TInitializer, TCleanup >::empty(), parser::example_pool, io_buf::finalname, VW::object_pool< T, TInitializer, TCleanup >::get_object(), parser::gram_mask, parser::lp, parser::name, vw::ngram_strings, parser::output, vw::p, VW::ptr_queue< T >::pop(), parser::ready_parsed_examples, VW::ptr_queue< T >::size(), and parser::words.

Referenced by VW::finish().

977 {
978  all.p->words.delete_v();
979  all.p->name.delete_v();
980 
981  if (!all.ngram_strings.empty())
982  all.p->gram_mask.delete_v();
983 
984  io_buf* output = all.p->output;
985  if (output != nullptr)
986  {
987  output->finalname.delete_v();
988  output->currentname.delete_v();
989  }
990 
991  while (!all.p->example_pool.empty())
992  {
993  example* temp = all.p->example_pool.get_object();
995  }
996 
997  while (all.p->ready_parsed_examples.size() != 0)
998  {
999  example* temp = all.p->ready_parsed_examples.pop();
1001  }
1002  all.p->counts.delete_v();
1003 }
void(* delete_prediction)(void *)
Definition: global_data.h:485
VW::object_pool< example, example_initializer > example_pool
Definition: parser.h:66
v_array< substring > words
Definition: parser.h:63
void(* delete_label)(void *)
Definition: label_parser.h:16
std::vector< std::string > ngram_strings
Definition: global_data.h:469
void dealloc_example(void(*delete_label)(void *), example &ec, void(*delete_prediction)(void *))
Definition: example.cc:219
VW::ptr_queue< example > ready_parsed_examples
Definition: parser.h:67
v_array< size_t > counts
Definition: parser.h:94
v_array< substring > name
Definition: parser.h:64
parser * p
Definition: global_data.h:377
v_array< char > finalname
Definition: io_buf.h:69
v_array< char > currentname
Definition: io_buf.h:68
v_array< size_t > gram_mask
Definition: parser.h:91
size_t size() const
Definition: queue.h:70
Definition: io_buf.h:54
io_buf * output
Definition: parser.h:75
void delete_v()
Definition: v_array.h:98
bool empty() const
Definition: object_pool.h:186
T * pop()
Definition: queue.h:27
label_parser lp
Definition: parser.h:102

◆ generateGrams()

void generateGrams ( vw all,
example *&  ex 
)

This function adds k-skip-n-grams to the feature vector. Definition of k-skip-n-grams: Consider a feature vector - a, b, c, d, e, f 2-skip-2-grams would be - ab, ac, ad, bc, bd, be, cd, ce, cf, de, df, ef 1-skip-3-grams would be - abc, abd, acd, ace, bcd, bce, bde, bdf, cde, cdf, cef, def Note that for a n-gram, (n-1)-grams, (n-2)-grams... 2-grams are also appended The k-skip-n-grams are appended to the feature vector. Hash is evaluated using the principle h(a, b) = h(a)*X + h(b), where X is a random no. 32 random nos. are maintained in an array and are used in the hashing.

Definition at line 630 of file parser.cc.

References addgrams(), v_array< T >::clear(), example_predict::feature_space, parser::gram_mask, example_predict::indices, vw::ngram, vw::p, v_array< T >::push_back(), and vw::skips.

Referenced by VW::setup_example().

631 {
632  for (namespace_index index : ex->indices)
633  {
634  size_t length = ex->feature_space[index].size();
635  for (size_t n = 1; n < all.ngram[index]; n++)
636  {
637  all.p->gram_mask.clear();
638  all.p->gram_mask.push_back((size_t)0);
639  addgrams(all, n, all.skips[index], ex->feature_space[index], length, all.p->gram_mask, 0);
640  }
641  }
642 }
v_array< namespace_index > indices
std::array< uint32_t, NUM_NAMESPACES > skips
Definition: global_data.h:472
std::array< uint32_t, NUM_NAMESPACES > ngram
Definition: global_data.h:471
void addgrams(vw &all, size_t ngram, size_t skip_gram, features &fs, size_t initial_length, v_array< size_t > &gram_mask, size_t skips)
Definition: parser.cc:584
parser * p
Definition: global_data.h:377
std::array< features, NUM_NAMESPACES > feature_space
void push_back(const T &new_ele)
Definition: v_array.h:107
void clear()
Definition: v_array.h:88
unsigned char namespace_index
v_array< size_t > gram_mask
Definition: parser.h:91

◆ handle_sigterm()

void handle_sigterm ( int  )

Definition at line 68 of file parser.cc.

Referenced by enable_sources().

68 { got_sigterm = true; }
bool got_sigterm
Definition: parser.cc:66

◆ is_test_only()

bool is_test_only ( uint32_t  counter,
uint32_t  period,
uint32_t  after,
bool  holdout_off,
uint32_t  target_modulus 
)

Definition at line 70 of file parser.cc.

Referenced by VW::setup_example().

73 {
74  if (holdout_off)
75  return false;
76  if (after == 0) // hold out by period
77  return (counter % period == target_modulus);
78  else // hold out by position
79  return (counter > after);
80 }

◆ lock_done()

void lock_done ( parser p)

Definition at line 571 of file parser.cc.

References parser::done, parser::ready_parsed_examples, and VW::ptr_queue< T >::set_done().

Referenced by parse_dispatch(), and set_done().

572 {
573  p.done = true;
574  // in case get_example() is waiting for a fresh example, wake so it can realize there are no more.
576 }
VW::ptr_queue< example > ready_parsed_examples
Definition: parser.h:67
void set_done()
Definition: queue.h:62
bool done
Definition: parser.h:90

◆ main_parse_loop()

void main_parse_loop ( vw all)

Definition at line 905 of file parser.cc.

References parse_dispatch(), and thread_dispatch().

Referenced by VW::start_parser().

void parse_dispatch(vw &all, dispatch_fptr dispatch)
void thread_dispatch(vw &all, v_array< example *> examples)
Definition: parser.cc:896

◆ make_write_cache()

void make_write_cache ( vw all,
std::string &  newname,
bool  quiet 
)

Definition at line 223 of file parser.cc.

References io_buf::currentname, f, io_buf::files, io_buf::finalname, vw::num_bits, io_buf::open_file(), parser::output, vw::p, push_many(), v_array< T >::size(), vw::stdin_off, prediction_type::to_string(), vw::trace_message, VW::version(), io_buf::WRITE, parser::write_cache, and io_buf::write_file().

Referenced by parse_cache().

224 {
225  io_buf* output = all.p->output;
226  if (output->files.size() != 0)
227  {
228  all.trace_message << "Warning: you tried to make two write caches. Only the first one will be made." << endl;
229  return;
230  }
231 
232  std::string temp = newname + std::string(".writing");
233  push_many(output->currentname, temp.c_str(), temp.length() + 1);
234 
235  int f = output->open_file(temp.c_str(), all.stdin_off, io_buf::WRITE);
236  if (f == -1)
237  {
238  all.trace_message << "can't create cache file !" << endl;
239  return;
240  }
241 
242  size_t v_length = (uint64_t)VW::version.to_string().length() + 1;
243 
244  output->write_file(f, &v_length, sizeof(v_length));
245  output->write_file(f, VW::version.to_string().c_str(), v_length);
246  output->write_file(f, "c", 1);
247  output->write_file(f, &all.num_bits, sizeof(all.num_bits));
248 
249  push_many(output->finalname, newname.c_str(), newname.length() + 1);
250  all.p->write_cache = true;
251  if (!quiet)
252  all.trace_message << "creating cache_file = " << newname << endl;
253 }
static constexpr int WRITE
Definition: io_buf.h:72
uint32_t num_bits
Definition: global_data.h:398
const version_struct version(PACKAGE_VERSION)
size_t size() const
Definition: v_array.h:68
parser * p
Definition: global_data.h:377
virtual int open_file(const char *name, bool stdin_off)
Definition: io_buf.h:90
v_array< char > finalname
Definition: io_buf.h:69
void push_many(v_array< T > &v, const T *_begin, size_t num)
Definition: v_array.h:207
v_array< int > files
Definition: io_buf.h:64
vw_ostream trace_message
Definition: global_data.h:424
v_array< char > currentname
Definition: io_buf.h:68
Definition: io_buf.h:54
io_buf * output
Definition: parser.h:75
bool stdin_off
Definition: global_data.h:527
virtual ssize_t write_file(int f, const void *buf, size_t nbytes)
Definition: io_buf.h:190
float f
Definition: cache.cc:40
bool write_cache
Definition: parser.h:76
const char * to_string(prediction_type_t prediction_type)
Definition: learner.cc:12

◆ parse_cache()

void parse_cache ( vw all,
std::vector< std::string >  cache_files,
bool  kill_cache,
bool  quiet 
)

Definition at line 255 of file parser.cc.

References c, cache_numbits(), io_buf::close_file(), v_array< T >::delete_v(), f, parser::input, make_write_cache(), vw::num_bits, io_buf::open_file(), parser::output, vw::p, vw::parse_mask, io_buf::READ, read_cached_features(), parser::reader, parser::resettable, parser::sorted_cache, io_buf::space, vw::stdin_off, vw::trace_message, and parser::write_cache.

Referenced by enable_sources().

256 {
257  all.p->write_cache = false;
258 
259  for (auto& file : cache_files)
260  {
261  int f = -1;
262  if (!kill_cache)
263  try
264  {
265  f = all.p->input->open_file(file.c_str(), all.stdin_off, io_buf::READ);
266  }
267  catch (const std::exception&)
268  {
269  f = -1;
270  }
271  if (f == -1)
272  make_write_cache(all, file, quiet);
273  else
274  {
275  uint64_t c = cache_numbits(all.p->input, f);
276  if (c < all.num_bits)
277  {
278  if (!quiet)
279  all.trace_message << "WARNING: cache file is ignored as it's made with less bit precision than required!"
280  << endl;
281  all.p->input->close_file();
282  make_write_cache(all, file, quiet);
283  }
284  else
285  {
286  if (!quiet)
287  all.trace_message << "using cache_file = " << file.c_str() << endl;
289  if (c == all.num_bits)
290  all.p->sorted_cache = true;
291  else
292  all.p->sorted_cache = false;
293  all.p->resettable = true;
294  }
295  }
296  }
297 
298  all.parse_mask = ((uint64_t)1 << all.num_bits) - 1;
299  if (cache_files.size() == 0)
300  {
301  if (!quiet)
302  all.trace_message << "using no cache" << endl;
303  all.p->output->space.delete_v();
304  }
305 }
bool sorted_cache
Definition: parser.h:78
virtual bool close_file()
Definition: io_buf.h:204
uint32_t cache_numbits(io_buf *buf, int filepointer)
Definition: parser.cc:91
io_buf * input
Definition: parser.h:69
uint32_t num_bits
Definition: global_data.h:398
static constexpr int READ
Definition: io_buf.h:71
parser * p
Definition: global_data.h:377
virtual int open_file(const char *name, bool stdin_off)
Definition: io_buf.h:90
int(* reader)(vw *, v_array< example *> &examples)
Definition: parser.h:70
vw_ostream trace_message
Definition: global_data.h:424
v_array< char > space
Definition: io_buf.h:62
uint64_t parse_mask
Definition: global_data.h:453
void make_write_cache(vw &all, std::string &newname, bool quiet)
Definition: parser.cc:223
io_buf * output
Definition: parser.h:75
int read_cached_features(vw *all, v_array< example *> &examples)
Definition: cache.cc:65
bool stdin_off
Definition: global_data.h:527
void delete_v()
Definition: v_array.h:98
constexpr uint64_t c
Definition: rand48.cc:12
bool resettable
Definition: parser.h:74
float f
Definition: cache.cc:40
bool write_cache
Definition: parser.h:76

◆ reset_source()

void reset_source ( vw all,
size_t  numbits 
)

Definition at line 126 of file parser.cc.

References v_array< T >::begin(), binary_print_result(), parser::bound_sock, cache_numbits(), v_array< T >::clear(), io_buf::close_file(), io_buf::close_file_or_socket(), io_buf::compressed(), io_buf::current, io_buf::currentname, vw::daemon, f, io_buf::files, vw::final_prediction_sink, io_buf::finalname, recall_tree_ns::find(), io_buf::flush(), parser::input, isbinary(), io_buf::num_files(), io_buf::open_file(), parser::output, parser::output_done, parser::output_lock, vw::p, v_array< T >::pop(), vw::print, print_result(), v_array< T >::push_back(), io_buf::READ, read_cached_features(), read_features_string(), parser::reader, parser::ready_parsed_examples, io_buf::reset_file(), parser::resettable, v_array< T >::size(), VW::ptr_queue< T >::size(), vw::stdin_off, THROW, and parser::write_cache.

Referenced by parse_dispatch().

127 {
128  io_buf* input = all.p->input;
129  input->current = 0;
130  if (all.p->write_cache)
131  {
132  all.p->output->flush();
133  all.p->write_cache = false;
134  all.p->output->close_file();
135  remove(all.p->output->finalname.begin());
136 
137  if (0 != rename(all.p->output->currentname.begin(), all.p->output->finalname.begin()))
138  THROW("WARN: reset_source(vw& all, size_t numbits) cannot rename: " << all.p->output->currentname << " to "
139  << all.p->output->finalname);
140 
141  while (input->num_files() > 0)
142  if (input->compressed())
143  input->close_file();
144  else
145  {
146  int fd = input->files.pop();
147  const auto& fps = all.final_prediction_sink;
148 
149  // If the current popped file is not in the list of final predictions sinks, close it.
150  if (std::find(fps.cbegin(), fps.cend(), fd) == fps.cend())
152  }
153  input->open_file(all.p->output->finalname.begin(), all.stdin_off, io_buf::READ); // pushing is merged into
154  // open_file
156  }
157  if (all.p->resettable == true)
158  {
159  if (all.daemon)
160  {
161  // wait for all predictions to be sent back to client
162  {
163  std::unique_lock<std::mutex> lock(all.p->output_lock);
164  all.p->output_done.wait(lock, [&] { return all.p->ready_parsed_examples.size() == 0; });
165  }
166 
167  // close socket, erase final prediction sink and socket
170  all.p->input->files.clear();
171 
172  sockaddr_in client_address;
173  socklen_t size = sizeof(client_address);
174  int f = (int)accept(all.p->bound_sock, (sockaddr*)&client_address, &size);
175  if (f < 0)
176  THROW("accept: " << strerror(errno));
177 
178  // note: breaking cluster parallel online learning by dropping support for id
179 
180  all.final_prediction_sink.push_back((size_t)f);
181  all.p->input->files.push_back(f);
182 
183  if (isbinary(*(all.p->input)))
184  {
187  }
188  else
189  {
191  all.print = print_result;
192  }
193  }
194  else
195  {
196  for (size_t i = 0; i < input->files.size(); i++)
197  {
198  input->reset_file(input->files[i]);
199  if (cache_numbits(input, input->files[i]) < numbits)
200  THROW("argh, a bug in caching of some sort!");
201  }
202  }
203  }
204 }
virtual bool compressed()
Definition: io_buf.h:214
T pop()
Definition: v_array.h:58
size_t current
Definition: io_buf.h:66
virtual bool close_file()
Definition: io_buf.h:204
uint32_t cache_numbits(io_buf *buf, int filepointer)
Definition: parser.cc:91
int read_features_string(vw *all, v_array< example *> &examples)
VW::ptr_queue< example > ready_parsed_examples
Definition: parser.h:67
v_array< int > final_prediction_sink
Definition: global_data.h:518
io_buf * input
Definition: parser.h:69
void binary_print_result(int f, float res, float weight, v_array< char >)
Definition: global_data.cc:72
virtual void reset_file(int f)
Definition: io_buf.h:136
T *& begin()
Definition: v_array.h:42
static void close_file_or_socket(int f)
Definition: io_buf.cc:152
size_t size() const
Definition: v_array.h:68
static constexpr int READ
Definition: io_buf.h:71
std::mutex output_lock
Definition: parser.h:87
parser * p
Definition: global_data.h:377
virtual int open_file(const char *name, bool stdin_off)
Definition: io_buf.h:90
v_array< char > finalname
Definition: io_buf.h:69
int(* reader)(vw *, v_array< example *> &examples)
Definition: parser.h:70
virtual size_t num_files()
Definition: io_buf.h:165
virtual void flush()
Definition: io_buf.h:194
void push_back(const T &new_ele)
Definition: v_array.h:107
v_array< int > files
Definition: io_buf.h:64
void clear()
Definition: v_array.h:88
v_array< char > currentname
Definition: io_buf.h:68
size_t size() const
Definition: queue.h:70
Definition: io_buf.h:54
node_pred * find(recall_tree &b, uint32_t cn, example &ec)
Definition: recall_tree.cc:126
void print_result(int f, float res, v_array< char > tag, float lb, float ub)
Definition: bs.cc:136
io_buf * output
Definition: parser.h:75
int bound_sock
Definition: parser.h:97
int read_cached_features(vw *all, v_array< example *> &examples)
Definition: cache.cc:65
bool stdin_off
Definition: global_data.h:527
std::condition_variable output_done
Definition: parser.h:88
bool isbinary(io_buf &i)
Definition: io_buf.cc:45
#define THROW(args)
Definition: vw_exception.h:181
bool resettable
Definition: parser.h:74
void(* print)(int, float, float, v_array< char >)
Definition: global_data.h:521
float f
Definition: cache.cc:40
bool write_cache
Definition: parser.h:76
bool daemon
Definition: global_data.h:405

◆ set_compressed()

void set_compressed ( parser par)

Definition at line 82 of file parser.cc.

References finalize_source(), parser::input, and parser::output.

Referenced by parse_source().

83 {
84  finalize_source(par);
85  delete par->input;
86  par->input = new comp_io_buf;
87  delete par->output;
88  par->output = new comp_io_buf;
89 }
void finalize_source(parser *p)
Definition: parser.cc:206
io_buf * input
Definition: parser.h:69
io_buf * output
Definition: parser.h:75

◆ set_done()

void set_done ( vw all)

Definition at line 578 of file parser.cc.

References vw::early_terminate, lock_done(), and vw::p.

Referenced by GD::end_pass(), end_pass(), and finish_example().

579 {
580  all.early_terminate = true;
581  lock_done(*all.p);
582 }
parser * p
Definition: global_data.h:377
bool early_terminate
Definition: global_data.h:500
void lock_done(parser &p)
Definition: parser.cc:571

◆ thread_dispatch()

void thread_dispatch ( vw all,
v_array< example *>  examples 
)

Definition at line 896 of file parser.cc.

References parser::end_parsed_examples, vw::p, VW::ptr_queue< T >::push(), parser::ready_parsed_examples, and v_array< T >::size().

Referenced by main_parse_loop().

897 {
898  all.p->end_parsed_examples += examples.size();
899  for (auto example : examples)
900  {
902  }
903 }
void push(T *item)
Definition: queue.h:48
VW::ptr_queue< example > ready_parsed_examples
Definition: parser.h:67
size_t size() const
Definition: v_array.h:68
parser * p
Definition: global_data.h:377
uint64_t end_parsed_examples
Definition: parser.h:82

Variable Documentation

◆ got_sigterm

bool got_sigterm

Definition at line 66 of file parser.cc.