Vowpal Wabbit
parser.cc
Go to the documentation of this file.
1 /*
2 Copyright (c) by respective owners including Yahoo!, Microsoft, and
3 individual contributors. All rights reserved. Released under a BSD (revised)
4 license as described in the file LICENSE.
5  */
6 #include <sys/types.h>
7 
8 #ifndef _WIN32
9 #include <sys/mman.h>
10 #include <sys/wait.h>
11 #include <unistd.h>
12 #include <netinet/tcp.h>
13 #endif
14 
15 #include <csignal>
16 
17 #include <fstream>
18 
19 #ifdef _WIN32
20 #define NOMINMAX
21 #include <winsock2.h>
22 #include <Windows.h>
23 #include <io.h>
24 typedef int socklen_t;
25 
26 int daemon(int /*a*/, int /*b*/)
27 {
28  exit(0);
29  return 0;
30 }
31 
32 // Starting with v142 the fix in the else block no longer works due to mismatching linkage. Going forward we should just
33 // use the actual isocpp version.
34 #if _MSC_VER >= 1920
35 #define getpid _getpid
36 #else
37 int getpid() { return (int)::GetCurrentProcessId(); }
38 #endif
39 
40 #else
41 #include <netdb.h>
42 #endif
43 
44 #ifdef __FreeBSD__
45 #include <netinet/in.h>
46 #endif
47 
48 #include <cerrno>
49 #include <cstdio>
50 #include <cassert>
51 
52 #include "parse_example.h"
53 #include "cache.h"
54 #include "unique_sort.h"
55 #include "constant.h"
56 #include "vw.h"
57 #include "interactions.h"
58 #include "vw_exception.h"
59 #include "parse_example_json.h"
60 #include "parse_dispatch_loop.h"
61 #include "parse_args.h"
62 
63 using std::endl;
64 
65 // This should not? matter in a library mode.
67 
68 void handle_sigterm(int) { got_sigterm = true; }
69 
70 bool is_test_only(uint32_t counter, uint32_t period, uint32_t after, bool holdout_off,
71  uint32_t target_modulus) // target should be 0 in the normal case, or period-1 in the case that emptylines separate
72  // examples
73 {
74  if (holdout_off)
75  return false;
76  if (after == 0) // hold out by period
77  return (counter % period == target_modulus);
78  else // hold out by position
79  return (counter > after);
80 }
81 
83 {
84  finalize_source(par);
85  delete par->input;
86  par->input = new comp_io_buf;
87  delete par->output;
88  par->output = new comp_io_buf;
89 }
90 
91 uint32_t cache_numbits(io_buf* buf, int filepointer)
92 {
93  size_t v_length;
94  buf->read_file(filepointer, (char*)&v_length, sizeof(v_length));
95  if (v_length > 61)
96  THROW("cache version too long, cache file is probably invalid");
97 
98  if (v_length == 0)
99  THROW("cache version too short, cache file is probably invalid");
100 
101  std::vector<char> t(v_length);
102  buf->read_file(filepointer, t.data(), v_length);
103  VW::version_struct v_tmp(t.data());
104  if (v_tmp != VW::version)
105  {
106  // cout << "cache has possibly incompatible version, rebuilding" << endl;
107  return 0;
108  }
109 
110  char temp;
111  if (buf->read_file(filepointer, &temp, 1) < 1)
112  THROW("failed to read");
113 
114  if (temp != 'c')
115  THROW("data file is not a cache file");
116 
117  uint32_t cache_numbits;
118  if (buf->read_file(filepointer, &cache_numbits, sizeof(cache_numbits)) < (int)sizeof(cache_numbits))
119  {
120  return true;
121  }
122 
123  return cache_numbits;
124 }
125 
126 void reset_source(vw& all, size_t numbits)
127 {
128  io_buf* input = all.p->input;
129  input->current = 0;
130  if (all.p->write_cache)
131  {
132  all.p->output->flush();
133  all.p->write_cache = false;
134  all.p->output->close_file();
135  remove(all.p->output->finalname.begin());
136 
137  if (0 != rename(all.p->output->currentname.begin(), all.p->output->finalname.begin()))
138  THROW("WARN: reset_source(vw& all, size_t numbits) cannot rename: " << all.p->output->currentname << " to "
139  << all.p->output->finalname);
140 
141  while (input->num_files() > 0)
142  if (input->compressed())
143  input->close_file();
144  else
145  {
146  int fd = input->files.pop();
147  const auto& fps = all.final_prediction_sink;
148 
149  // If the current popped file is not in the list of final predictions sinks, close it.
150  if (std::find(fps.cbegin(), fps.cend(), fd) == fps.cend())
152  }
153  input->open_file(all.p->output->finalname.begin(), all.stdin_off, io_buf::READ); // pushing is merged into
154  // open_file
156  }
157  if (all.p->resettable == true)
158  {
159  if (all.daemon)
160  {
161  // wait for all predictions to be sent back to client
162  {
163  std::unique_lock<std::mutex> lock(all.p->output_lock);
164  all.p->output_done.wait(lock, [&] { return all.p->ready_parsed_examples.size() == 0; });
165  }
166 
167  // close socket, erase final prediction sink and socket
170  all.p->input->files.clear();
171 
172  sockaddr_in client_address;
173  socklen_t size = sizeof(client_address);
174  int f = (int)accept(all.p->bound_sock, (sockaddr*)&client_address, &size);
175  if (f < 0)
176  THROW("accept: " << strerror(errno));
177 
178  // note: breaking cluster parallel online learning by dropping support for id
179 
180  all.final_prediction_sink.push_back((size_t)f);
181  all.p->input->files.push_back(f);
182 
183  if (isbinary(*(all.p->input)))
184  {
187  }
188  else
189  {
191  all.print = print_result;
192  }
193  }
194  else
195  {
196  for (size_t i = 0; i < input->files.size(); i++)
197  {
198  input->reset_file(input->files[i]);
199  if (cache_numbits(input, input->files[i]) < numbits)
200  THROW("argh, a bug in caching of some sort!");
201  }
202  }
203  }
204 }
205 
207 {
208 #ifdef _WIN32
209  int f = _fileno(stdin);
210 #else
211  int f = fileno(stdin);
212 #endif
213  while (!p->input->files.empty() && p->input->files.last() == f) p->input->files.pop();
214  p->input->close_files();
215 
216  delete p->input;
217  p->input = nullptr;
218  p->output->close_files();
219  delete p->output;
220  p->output = nullptr;
221 }
222 
223 void make_write_cache(vw& all, std::string& newname, bool quiet)
224 {
225  io_buf* output = all.p->output;
226  if (output->files.size() != 0)
227  {
228  all.trace_message << "Warning: you tried to make two write caches. Only the first one will be made." << endl;
229  return;
230  }
231 
232  std::string temp = newname + std::string(".writing");
233  push_many(output->currentname, temp.c_str(), temp.length() + 1);
234 
235  int f = output->open_file(temp.c_str(), all.stdin_off, io_buf::WRITE);
236  if (f == -1)
237  {
238  all.trace_message << "can't create cache file !" << endl;
239  return;
240  }
241 
242  size_t v_length = (uint64_t)VW::version.to_string().length() + 1;
243 
244  output->write_file(f, &v_length, sizeof(v_length));
245  output->write_file(f, VW::version.to_string().c_str(), v_length);
246  output->write_file(f, "c", 1);
247  output->write_file(f, &all.num_bits, sizeof(all.num_bits));
248 
249  push_many(output->finalname, newname.c_str(), newname.length() + 1);
250  all.p->write_cache = true;
251  if (!quiet)
252  all.trace_message << "creating cache_file = " << newname << endl;
253 }
254 
255 void parse_cache(vw& all, std::vector<std::string> cache_files, bool kill_cache, bool quiet)
256 {
257  all.p->write_cache = false;
258 
259  for (auto& file : cache_files)
260  {
261  int f = -1;
262  if (!kill_cache)
263  try
264  {
265  f = all.p->input->open_file(file.c_str(), all.stdin_off, io_buf::READ);
266  }
267  catch (const std::exception&)
268  {
269  f = -1;
270  }
271  if (f == -1)
272  make_write_cache(all, file, quiet);
273  else
274  {
275  uint64_t c = cache_numbits(all.p->input, f);
276  if (c < all.num_bits)
277  {
278  if (!quiet)
279  all.trace_message << "WARNING: cache file is ignored as it's made with less bit precision than required!"
280  << endl;
281  all.p->input->close_file();
282  make_write_cache(all, file, quiet);
283  }
284  else
285  {
286  if (!quiet)
287  all.trace_message << "using cache_file = " << file.c_str() << endl;
289  if (c == all.num_bits)
290  all.p->sorted_cache = true;
291  else
292  all.p->sorted_cache = false;
293  all.p->resettable = true;
294  }
295  }
296  }
297 
298  all.parse_mask = ((uint64_t)1 << all.num_bits) - 1;
299  if (cache_files.size() == 0)
300  {
301  if (!quiet)
302  all.trace_message << "using no cache" << endl;
303  all.p->output->space.delete_v();
304  }
305 }
306 
307 // For macs
308 #ifndef MAP_ANONYMOUS
309 #define MAP_ANONYMOUS MAP_ANON
310 #endif
311 
312 void enable_sources(vw& all, bool quiet, size_t passes, input_options& input_options)
313 {
314  all.p->input->current = 0;
315  parse_cache(all, input_options.cache_files, input_options.kill_cache, quiet);
316 
317  if (all.daemon || all.active)
318  {
319 #ifdef _WIN32
320  WSAData wsaData;
321  int lastError = WSAStartup(MAKEWORD(2, 2), &wsaData);
322  if (lastError != 0)
323  THROWERRNO("WSAStartup() returned error:" << lastError);
324 #endif
325  all.p->bound_sock = (int)socket(PF_INET, SOCK_STREAM, 0);
326  if (all.p->bound_sock < 0)
327  {
328  std::stringstream msg;
329  msg << "socket: " << strerror(errno);
330  all.trace_message << msg.str() << endl;
331  THROW(msg.str().c_str());
332  }
333 
334  int on = 1;
335  if (setsockopt(all.p->bound_sock, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on)) < 0)
336  all.trace_message << "setsockopt SO_REUSEADDR: " << strerror(errno) << endl;
337 
338  // Enable TCP Keep Alive to prevent socket leaks
339  int enableTKA = 1;
340  if (setsockopt(all.p->bound_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&enableTKA, sizeof(enableTKA)) < 0)
341  all.trace_message << "setsockopt SO_KEEPALIVE: " << strerror(errno) << endl;
342 
343  sockaddr_in address;
344  address.sin_family = AF_INET;
345  address.sin_addr.s_addr = htonl(INADDR_ANY);
346  short unsigned int port = 26542;
347  if (all.options->was_supplied("port"))
348  port = (uint16_t)input_options.port;
349  address.sin_port = htons(port);
350 
351  // attempt to bind to socket
352  if (::bind(all.p->bound_sock, (sockaddr*)&address, sizeof(address)) < 0)
353  THROWERRNO("bind");
354 
355  // listen on socket
356  if (listen(all.p->bound_sock, 1) < 0)
357  THROWERRNO("listen");
358 
359  // write port file
360  if (all.options->was_supplied("port_file"))
361  {
362  socklen_t address_size = sizeof(address);
363  if (getsockname(all.p->bound_sock, (sockaddr*)&address, &address_size) < 0)
364  {
365  all.trace_message << "getsockname: " << strerror(errno) << endl;
366  }
367  std::ofstream port_file;
368  port_file.open(input_options.port_file.c_str());
369  if (!port_file.is_open())
370  THROW("error writing port file: " << input_options.port_file);
371 
372  port_file << ntohs(address.sin_port) << endl;
373  port_file.close();
374  }
375 
376  // background process (if foreground is not set)
377  if (!input_options.foreground)
378  {
379  // FIXME switch to posix_spawn
380  if (!all.active && daemon(1, 1))
381  THROWERRNO("daemon");
382  }
383 
384  // write pid file
385  if (all.options->was_supplied("pid_file"))
386  {
387  std::ofstream pid_file;
388  pid_file.open(input_options.pid_file.c_str());
389  if (!pid_file.is_open())
390  THROW("error writing pid file");
391 
392  pid_file << getpid() << endl;
393  pid_file.close();
394  }
395 
396  if (all.daemon && !all.active)
397  {
398 #ifdef _WIN32
399  THROW("not supported on windows");
400 #else
401  fclose(stdin);
402  // weights will be shared across processes, accessible to children
403  all.weights.share(all.length());
404 
405  // learning state to be shared across children
406  shared_data* sd =
407  (shared_data*)mmap(0, sizeof(shared_data), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
408  memcpy(sd, all.sd, sizeof(shared_data));
409  free(all.sd);
410  all.sd = sd;
411 
412  // create children
413  size_t num_children = all.num_children;
414  v_array<int> children = v_init<int>();
415  children.resize(num_children);
416  for (size_t i = 0; i < num_children; i++)
417  {
418  // fork() returns pid if parent, 0 if child
419  // store fork value and run child process if child
420  if ((children[i] = fork()) == 0)
421  {
422  all.quiet |= (i > 0);
423  goto child;
424  }
425  }
426 
427  // install signal handler so we can kill children when killed
428  {
429  struct sigaction sa;
430  // specifically don't set SA_RESTART in sa.sa_flags, so that
431  // waitid will be interrupted by SIGTERM with handler installed
432  memset(&sa, 0, sizeof(sa));
433  sa.sa_handler = handle_sigterm;
434  sigaction(SIGTERM, &sa, nullptr);
435  }
436 
437  while (true)
438  {
439  // wait for child to change state; if finished, then respawn
440  int status;
441  pid_t pid = wait(&status);
442  if (got_sigterm)
443  {
444  for (size_t i = 0; i < num_children; i++) kill(children[i], SIGTERM);
445  VW::finish(all);
446  exit(0);
447  }
448  if (pid < 0)
449  continue;
450  for (size_t i = 0; i < num_children; i++)
451  if (pid == children[i])
452  {
453  if ((children[i] = fork()) == 0)
454  {
455  all.quiet |= (i > 0);
456  goto child;
457  }
458  break;
459  }
460  }
461 
462 #endif
463  }
464 
465 #ifndef _WIN32
466  child:
467 #endif
468  sockaddr_in client_address;
469  socklen_t size = sizeof(client_address);
470  all.p->max_fd = 0;
471  if (!all.quiet)
472  all.trace_message << "calling accept" << endl;
473  int f = (int)accept(all.p->bound_sock, (sockaddr*)&client_address, &size);
474  if (f < 0)
475  THROWERRNO("accept");
476 
477  all.p->label_sock = f;
478  all.print = print_result;
479 
480  all.final_prediction_sink.push_back((size_t)f);
481 
482  all.p->input->files.push_back(f);
483  all.p->max_fd = std::max(f, all.p->max_fd);
484  if (!all.quiet)
485  all.trace_message << "reading data from port " << port << endl;
486 
487  all.p->max_fd++;
488  if (all.active)
490  else
491  {
492  if (isbinary(*(all.p->input)))
493  {
496  }
497  else
498  {
500  }
501  all.p->sorted_cache = true;
502  }
503  all.p->resettable = all.p->write_cache || all.daemon;
504  }
505  else
506  {
507  if (!all.p->input->files.empty())
508  {
509  if (!quiet)
510  all.trace_message << "ignoring text input in favor of cache input" << endl;
511  }
512  else
513  {
514  std::string temp = all.data_filename;
515  if (!quiet)
516  all.trace_message << "Reading datafile = " << temp << endl;
517  try
518  {
519  all.p->input->open_file(temp.c_str(), all.stdin_off, io_buf::READ);
520  }
521  catch (std::exception const&)
522  {
523  // when trying to fix this exception, consider that an empty temp is valid if all.stdin_off is false
524  if (!temp.empty())
525  {
526  all.trace_message << "can't open '" << temp << "', sailing on!" << endl;
527  }
528  else
529  {
530  throw;
531  }
532  }
533 
534  if (input_options.json || input_options.dsjson)
535  {
536  // TODO: change to class with virtual method
537  // --invert_hash requires the audit parser version to save the extra information.
538  if (all.audit || all.hash_inv)
539  {
540  all.p->reader = &read_features_json<true>;
541  all.p->text_reader = &line_to_examples_json<true>;
542  all.p->audit = true;
543  }
544  else
545  {
546  all.p->reader = &read_features_json<false>;
547  all.p->text_reader = &line_to_examples_json<false>;
548  all.p->audit = false;
549  }
550 
551  all.p->decision_service_json = input_options.dsjson;
552  }
553  else
554  {
557  }
558 
559  all.p->resettable = all.p->write_cache;
560  }
561  }
562 
563  if (passes > 1 && !all.p->resettable)
564  THROW("need a cache file for multiple passes : try using --cache_file");
565 
566  all.p->input->count = all.p->input->files.size();
567  if (!quiet && !all.daemon)
568  all.trace_message << "num sources = " << all.p->input->files.size() << endl;
569 }
570 
572 {
573  p.done = true;
574  // in case get_example() is waiting for a fresh example, wake so it can realize there are no more.
576 }
577 
578 void set_done(vw& all)
579 {
580  all.early_terminate = true;
581  lock_done(*all.p);
582 }
583 
584 void addgrams(vw& all, size_t ngram, size_t skip_gram, features& fs, size_t initial_length, v_array<size_t>& gram_mask,
585  size_t skips)
586 {
587  if (ngram == 0 && gram_mask.last() < initial_length)
588  {
589  size_t last = initial_length - gram_mask.last();
590  for (size_t i = 0; i < last; i++)
591  {
592  uint64_t new_index = fs.indicies[i];
593  for (size_t n = 1; n < gram_mask.size(); n++)
594  new_index = new_index * quadratic_constant + fs.indicies[i + gram_mask[n]];
595 
596  fs.push_back(1., new_index);
597  if (fs.space_names.size() > 0)
598  {
599  std::string feature_name(fs.space_names[i].get()->second);
600  for (size_t n = 1; n < gram_mask.size(); n++)
601  {
602  feature_name += std::string("^");
603  feature_name += std::string(fs.space_names[i + gram_mask[n]].get()->second);
604  }
605  fs.space_names.push_back(audit_strings_ptr(new audit_strings(fs.space_names[i].get()->first, feature_name)));
606  }
607  }
608  }
609  if (ngram > 0)
610  {
611  gram_mask.push_back(gram_mask.last() + 1 + skips);
612  addgrams(all, ngram - 1, skip_gram, fs, initial_length, gram_mask, 0);
613  gram_mask.pop();
614  }
615  if (skip_gram > 0 && ngram > 0)
616  addgrams(all, ngram, skip_gram - 1, fs, initial_length, gram_mask, skips + 1);
617 }
618 
630 void generateGrams(vw& all, example*& ex)
631 {
632  for (namespace_index index : ex->indices)
633  {
634  size_t length = ex->feature_space[index].size();
635  for (size_t n = 1; n < all.ngram[index]; n++)
636  {
637  all.p->gram_mask.clear();
638  all.p->gram_mask.push_back((size_t)0);
639  addgrams(all, n, all.skips[index], ex->feature_space[index], length, all.p->gram_mask, 0);
640  }
641  }
642 }
643 
644 void end_pass_example(vw& all, example* ae)
645 {
646  all.p->lp.default_label(&ae->l);
647  ae->end_pass = true;
648  all.p->in_pass_counter = 0;
649 }
650 
651 void feature_limit(vw& all, example* ex)
652 {
653  for (namespace_index index : ex->indices)
654  if (all.limit[index] < ex->feature_space[index].size())
655  {
656  features& fs = ex->feature_space[index];
657  fs.sort(all.parse_mask);
658  unique_features(fs, all.limit[index]);
659  }
660 }
661 
662 namespace VW
663 {
665 {
666  parser* p = all->p;
667  auto ex = p->example_pool.get_object();
668  ex->in_use = true;
670  return *ex;
671 }
672 
673 void setup_examples(vw& all, v_array<example*>& examples)
674 {
675  for (example* ae : examples) setup_example(all, ae);
676 }
677 
678 void setup_example(vw& all, example* ae)
679 {
680  if (all.p->sort_features && ae->sorted == false)
682 
683  if (all.p->write_cache)
684  {
685  all.p->lp.cache_label(&ae->l, *(all.p->output));
686  cache_features(*(all.p->output), ae, all.parse_mask);
687  }
688 
689  ae->partial_prediction = 0.;
690  ae->num_features = 0;
691  ae->total_sum_feat_sq = 0;
692  ae->loss = 0.;
693 
694  ae->example_counter = (size_t)(all.p->end_parsed_examples);
696  all.p->in_pass_counter++;
697 
699  all.p->emptylines_separate_examples ? (all.holdout_period - 1) : 0);
700  ae->test_only |= all.p->lp.test_label(&ae->l);
701 
703  all.p->in_pass_counter++;
704 
705  ae->weight = all.p->lp.get_weight(&ae->l);
706 
707  if (all.ignore_some)
708  for (unsigned char* i = ae->indices.begin(); i != ae->indices.end(); i++)
709  if (all.ignore[*i])
710  {
711  // delete namespace
712  ae->feature_space[*i].clear();
713  memmove(i, i + 1, (ae->indices.end() - (i + 1)) * sizeof(*i));
714  ae->indices.end()--;
715  i--;
716  }
717 
718  if (!all.ngram_strings.empty())
719  generateGrams(all, ae);
720 
721  if (all.add_constant) // add constant feature
722  VW::add_constant_feature(all, ae);
723 
724  if (!all.limit_strings.empty())
725  feature_limit(all, ae);
726 
727  uint64_t multiplier = (uint64_t)all.wpp << all.weights.stride_shift();
728 
729  if (multiplier != 1) // make room for per-feature information.
730  for (features& fs : *ae)
731  for (auto& j : fs.indicies) j *= multiplier;
732  ae->num_features = 0;
733  ae->total_sum_feat_sq = 0;
734  for (features& fs : *ae)
735  {
736  ae->num_features += fs.size();
737  ae->total_sum_feat_sq += fs.sum_feat_sq;
738  }
739 
740  // Set the interactions for this example to the global set.
741  ae->interactions = &all.interactions;
742 
743  size_t new_features_cnt;
744  float new_features_sum_feat_sq;
745  INTERACTIONS::eval_count_of_generated_ft(all, *ae, new_features_cnt, new_features_sum_feat_sq);
746  ae->num_features += new_features_cnt;
747  ae->total_sum_feat_sq += new_features_sum_feat_sq;
748 }
749 } // namespace VW
750 
751 namespace VW
752 {
754 {
755  example* ec = &get_unused_example(&all);
756  all.p->lp.default_label(&ec->l);
757  all.p->begin_parsed_examples++;
758  ec->example_counter = (size_t)all.p->begin_parsed_examples;
759  return ec;
760 }
761 example* read_example(vw& all, char* example_line)
762 {
763  example* ret = &get_unused_example(&all);
764 
765  VW::read_line(all, ret, example_line);
766  setup_example(all, ret);
767  all.p->end_parsed_examples++;
768 
769  return ret;
770 }
771 
772 example* read_example(vw& all, std::string example_line) { return read_example(all, (char*)example_line.c_str()); }
773 
775 {
777  ec->feature_space[constant_namespace].push_back(1, constant);
778  ec->total_sum_feat_sq++;
779  ec->num_features++;
780  if (vw.audit || vw.hash_inv)
781  ec->feature_space[constant_namespace].space_names.push_back(audit_strings_ptr(new audit_strings("", "Constant")));
782 }
783 
784 void add_label(example* ec, float label, float weight, float base)
785 {
786  ec->l.simple.label = label;
787  ec->l.simple.initial = base;
788  ec->weight = weight;
789 }
790 
791 example* import_example(vw& all, const std::string& label, primitive_feature_space* features, size_t len)
792 {
793  example* ret = &get_unused_example(&all);
794  all.p->lp.default_label(&ret->l);
795 
796  if (label.length() > 0)
797  parse_example_label(all, *ret, label);
798 
799  for (size_t i = 0; i < len; i++)
800  {
801  unsigned char index = features[i].name;
802  ret->indices.push_back(index);
803  for (size_t j = 0; j < features[i].len; j++)
804  ret->feature_space[index].push_back(features[i].fs[j].x, features[i].fs[j].weight_index);
805  }
806 
807  setup_example(all, ret);
808  all.p->end_parsed_examples++;
809  return ret;
810 }
811 
813 {
814  len = ec->indices.size();
816 
817  int fs_count = 0;
818 
819  for (size_t idx = 0; idx < len; ++idx)
820  {
821  namespace_index i = ec->indices[idx];
822  fs_ptr[fs_count].name = i;
823  fs_ptr[fs_count].len = ec->feature_space[i].size();
824  fs_ptr[fs_count].fs = new feature[fs_ptr[fs_count].len];
825 
826  uint32_t stride_shift = all.weights.stride_shift();
827  int f_count = 0;
828  for (features::iterator& f : ec->feature_space[i])
829  {
830  feature t = {f.value(), f.index()};
832  fs_ptr[fs_count].fs[f_count] = t;
833  f_count++;
834  }
835  fs_count++;
836  }
837  return fs_ptr;
838 }
839 
841 {
842  for (size_t i = 0; i < len; i++) delete[] features[i].fs;
843  delete (features);
844 }
845 
846 void parse_example_label(vw& all, example& ec, std::string label)
847 {
848  v_array<substring> words = v_init<substring>();
849  char* cstr = (char*)label.c_str();
850  substring str = {cstr, cstr + label.length()};
851  tokenize(' ', str, words);
852  all.p->lp.parse_label(all.p, all.sd, &ec.l, words);
853  words.clear();
854  words.delete_v();
855 }
856 
857 void empty_example(vw& /*all*/, example& ec)
858 {
859  for (features& fs : ec) fs.clear();
860 
861  ec.indices.clear();
862  ec.tag.clear();
863  ec.sorted = false;
864  ec.end_pass = false;
865 }
866 
867 void clean_example(vw& all, example& ec, bool rewind)
868 {
869  if (rewind)
870  {
871  assert(all.p->begin_parsed_examples > 0);
872  all.p->begin_parsed_examples--;
873  }
874 
875  empty_example(all, ec);
876  assert(ec.in_use);
877  ec.in_use = false;
878  all.p->example_pool.return_object(&ec);
879 }
880 
881 void finish_example(vw& all, example& ec)
882 {
883  // only return examples to the pool that are from the pool and not externally allocated
884  if (!is_ring_example(all, &ec))
885  return;
886 
887  clean_example(all, ec, false);
888 
889  {
890  std::lock_guard<std::mutex> lock(all.p->output_lock);
891  all.p->output_done.notify_one();
892  }
893 }
894 } // namespace VW
895 
897 {
898  all.p->end_parsed_examples += examples.size();
899  for (auto example : examples)
900  {
902  }
903 }
904 
906 
907 namespace VW
908 {
910 
911 float get_topic_prediction(example* ec, size_t i) { return ec->pred.scalars[i]; }
912 
913 float get_label(example* ec) { return ec->l.simple.label; }
914 
915 float get_importance(example* ec) { return ec->weight; }
916 
917 float get_initial(example* ec) { return ec->l.simple.initial; }
918 
919 float get_prediction(example* ec) { return ec->pred.scalar; }
920 
921 float get_cost_sensitive_prediction(example* ec) { return (float)ec->pred.multiclass; }
922 
924 
925 uint32_t* get_multilabel_predictions(example* ec, size_t& len)
926 {
927  MULTILABEL::labels labels = ec->pred.multilabels;
928  len = labels.label_v.size();
929  return labels.label_v.begin();
930 }
931 
932 float get_action_score(example* ec, size_t i)
933 {
934  ACTION_SCORE::action_scores scores = ec->pred.a_s;
935 
936  if (i < scores.size())
937  {
938  return scores[i].score;
939  }
940  else
941  {
942  return 0.0;
943  }
944 }
945 
946 size_t get_action_score_length(example* ec) { return ec->pred.a_s.size(); }
947 
948 size_t get_tag_length(example* ec) { return ec->tag.size(); }
949 
950 const char* get_tag(example* ec) { return ec->tag.begin(); }
951 
952 size_t get_feature_number(example* ec) { return ec->num_features; }
953 
954 float get_confidence(example* ec) { return ec->confidence; }
955 } // namespace VW
956 
958 {
959  memset(&ex->l, 0, sizeof(polylabel));
960  ex->in_use = false;
961  ex->passthrough = nullptr;
962  ex->tag = v_init<char>();
963  ex->indices = v_init<namespace_index>();
964  memset(&ex->feature_space, 0, sizeof(ex->feature_space));
965  return ex;
966 }
967 
969 { /* no longer used */
970 }
971 
972 namespace VW
973 {
974 void start_parser(vw& all) { all.parse_thread = std::thread(main_parse_loop, &all); }
975 } // namespace VW
976 void free_parser(vw& all)
977 {
978  all.p->words.delete_v();
979  all.p->name.delete_v();
980 
981  if (!all.ngram_strings.empty())
982  all.p->gram_mask.delete_v();
983 
984  io_buf* output = all.p->output;
985  if (output != nullptr)
986  {
987  output->finalname.delete_v();
988  output->currentname.delete_v();
989  }
990 
991  while (!all.p->example_pool.empty())
992  {
993  example* temp = all.p->example_pool.get_object();
995  }
996 
997  while (all.p->ready_parsed_examples.size() != 0)
998  {
999  example* temp = all.p->ready_parsed_examples.pop();
1001  }
1002  all.p->counts.delete_v();
1003 }
1004 
1005 namespace VW
1006 {
1007 void end_parser(vw& all) { all.parse_thread.join(); }
1008 
1009 bool is_ring_example(vw& all, example* ae) { return all.p->example_pool.is_from_pool(ae); }
1010 } // namespace VW
void push(T *item)
Definition: queue.h:48
std::string port_file
Definition: parse_args.h:17
void resize(size_t length)
Definition: v_array.h:69
uint32_t holdout_after
Definition: global_data.h:502
v_array< char > tag
Definition: example.h:63
size_t length()
Definition: global_data.h:513
v_array< namespace_index > indices
std::array< uint32_t, NUM_NAMESPACES > skips
Definition: global_data.h:472
uint32_t multiclass
Definition: example.h:49
size_t example_counter
Definition: example.h:64
void set_done(vw &all)
Definition: parser.cc:578
void clean_example(vw &, example &, bool rewind)
Definition: parser.cc:867
ACTION_SCORE::action_scores a_s
Definition: example.h:47
bool is_ring_example(vw &all, example *ae)
Definition: parser.cc:1009
void parse_example_label(vw &all, example &ec, std::string label)
Definition: parser.cc:846
parameters weights
Definition: global_data.h:537
bool got_sigterm
Definition: parser.cc:66
void enable_sources(vw &all, bool quiet, size_t passes, input_options &input_options)
Definition: parser.cc:312
void unique_sort_features(uint64_t parse_mask, example *ae)
Definition: unique_sort.cc:33
bool audit
Definition: parser.h:104
void close_files()
Definition: io_buf.h:218
virtual bool compressed()
Definition: io_buf.h:214
void(* delete_prediction)(void *)
Definition: global_data.h:485
T pop()
Definition: v_array.h:58
VW::object_pool< example, example_initializer > example_pool
Definition: parser.h:66
std::array< uint32_t, NUM_NAMESPACES > ngram
Definition: global_data.h:471
uint64_t stride_shift(const stagewise_poly &poly, uint64_t idx)
void push_back(feature_value v, feature_index i)
float x
Definition: feature_group.h:27
void finalize_source(parser *p)
Definition: parser.cc:206
float get_confidence(example *ec)
Definition: parser.cc:954
void read_line(vw &all, example *ex, char *line)
size_t current
Definition: io_buf.h:66
VW::config::options_i * options
Definition: global_data.h:428
float get_importance(example *ec)
Definition: parser.cc:915
float scalar
Definition: example.h:45
v_array< substring > words
Definition: parser.h:63
void setup_example(vw &all, example *ae)
Definition: parser.cc:678
const char * get_tag(example *ec)
Definition: parser.cc:950
void(* delete_label)(void *)
Definition: label_parser.h:16
static constexpr int WRITE
Definition: io_buf.h:72
std::shared_ptr< audit_strings > audit_strings_ptr
Definition: feature_group.h:23
bool sorted_cache
Definition: parser.h:78
virtual bool close_file()
Definition: io_buf.h:204
uint32_t cache_numbits(io_buf *buf, int filepointer)
Definition: parser.cc:91
std::vector< std::string > ngram_strings
Definition: global_data.h:469
bool hash_inv
Definition: global_data.h:541
void eval_count_of_generated_ft(vw &all, example &ec, size_t &new_features_cnt, float &new_features_value)
void main_parse_loop(vw *all)
Definition: parser.cc:905
constexpr int quadratic_constant
Definition: constant.h:7
v_array< feature_index > indicies
example * get_example(parser *p)
Definition: parser.cc:909
int read_features_string(vw *all, v_array< example *> &examples)
void(* default_label)(void *)
Definition: label_parser.h:12
bool add_constant
Definition: global_data.h:496
bool sorted
Definition: example.h:78
void dealloc_example(void(*delete_label)(void *), example &ec, void(*delete_prediction)(void *))
Definition: example.cc:219
bool(* test_label)(void *)
Definition: label_parser.h:22
void addgrams(vw &all, size_t ngram, size_t skip_gram, features &fs, size_t initial_length, v_array< size_t > &gram_mask, size_t skips)
Definition: parser.cc:584
uint64_t begin_parsed_examples
Definition: parser.h:81
VW::ptr_queue< example > ready_parsed_examples
Definition: parser.h:67
void handle_sigterm(int)
Definition: parser.cc:68
v_array< int > final_prediction_sink
Definition: global_data.h:518
the core definition of a set of features.
v_array< size_t > counts
Definition: parser.h:94
void generateGrams(vw &all, example *&ex)
Definition: parser.cc:630
io_buf * input
Definition: parser.h:69
void binary_print_result(int f, float res, float weight, v_array< char >)
Definition: global_data.cc:72
float confidence
Definition: example.h:72
float partial_prediction
Definition: example.h:68
bool quiet
Definition: global_data.h:487
std::vector< std::string > limit_strings
Definition: global_data.h:473
std::vector< std::string > cache_files
Definition: parse_args.h:20
int max_fd
Definition: parser.h:98
size_t port
Definition: parse_args.h:15
float label
Definition: simple_label.h:14
virtual void reset_file(int f)
Definition: io_buf.h:136
void finish(vw &all, bool delete_all)
Definition: parse_args.cc:1823
label_data simple
Definition: example.h:28
bool holdout_set_off
Definition: global_data.h:499
void set_compressed(parser *par)
Definition: parser.cc:82
int example_is_newline(example const &ec)
Definition: example.h:104
size_t count
Definition: io_buf.h:65
v_array< substring > name
Definition: parser.h:64
void read_lines(vw *all, char *line, size_t, v_array< example *> &examples)
uint32_t num_bits
Definition: global_data.h:398
std::array< bool, NUM_NAMESPACES > ignore
Definition: global_data.h:463
T *& begin()
Definition: v_array.h:42
uint64_t weight_index
Definition: feature_group.h:28
void parse_dispatch(vw &all, dispatch_fptr dispatch)
const version_struct version(PACKAGE_VERSION)
virtual ssize_t read_file(int f, void *buf, size_t nbytes)
Definition: io_buf.h:167
static void close_file_or_socket(int f)
Definition: io_buf.cc:152
bool is_test_only(uint32_t counter, uint32_t period, uint32_t after, bool holdout_off, uint32_t target_modulus)
Definition: parser.cc:70
size_t size() const
Definition: v_array.h:68
static constexpr int READ
Definition: io_buf.h:71
std::mutex output_lock
Definition: parser.h:87
#define MAP_ANONYMOUS
Definition: parser.cc:309
parser * p
Definition: global_data.h:377
std::array< features, NUM_NAMESPACES > feature_space
std::array< uint32_t, NUM_NAMESPACES > limit
Definition: global_data.h:474
bool is_from_pool(T *obj) const
Definition: object_pool.h:190
virtual int open_file(const char *name, bool stdin_off)
Definition: io_buf.h:90
v_array< char > finalname
Definition: io_buf.h:69
bool ignore_some
Definition: global_data.h:462
int(* reader)(vw *, v_array< example *> &examples)
Definition: parser.h:70
virtual size_t num_files()
Definition: io_buf.h:165
void reset_source(vw &all, size_t numbits)
Definition: parser.cc:126
float(* get_weight)(void *)
Definition: label_parser.h:17
primitive_feature_space * export_example(vw &all, example *ec, size_t &len)
Definition: parser.cc:812
void push_many(v_array< T > &v, const T *_begin, size_t num)
Definition: v_array.h:207
example * import_example(vw &all, const std::string &label, primitive_feature_space *features, size_t len)
Definition: parser.cc:791
virtual void flush()
Definition: io_buf.h:194
void push_back(const T &new_ele)
Definition: v_array.h:107
shared_data * sd
Definition: global_data.h:375
void start_parser(vw &all)
Definition: parser.cc:974
void add_label(example *ec, float label, float weight, float base)
Definition: parser.cc:784
void parse_cache(vw &all, std::vector< std::string > cache_files, bool kill_cache, bool quiet)
Definition: parser.cc:255
v_array< int > files
Definition: io_buf.h:64
void clear()
Definition: v_array.h:88
bool active
Definition: global_data.h:489
void tokenize(char delim, substring s, ContainerT &ret, bool allow_empty=false)
vw_ostream trace_message
Definition: global_data.h:424
size_t num_features
Definition: example.h:67
virtual bool was_supplied(const std::string &key)=0
bool kill_cache
Definition: parse_args.h:23
void empty_example(vw &, example &ec)
Definition: parser.cc:857
bool sort(uint64_t parse_mask)
unsigned char namespace_index
void(* cache_label)(void *, io_buf &cache)
Definition: label_parser.h:14
void share(size_t length)
constexpr uint64_t constant
Definition: constant.h:11
void(* parse_label)(parser *, shared_data *, void *, v_array< substring > &)
Definition: label_parser.h:13
void end_parser(vw &all)
Definition: parser.cc:1007
float get_initial(example *ec)
Definition: parser.cc:917
v_array< float > & get_cost_sensitive_prediction_confidence_scores(example *ec)
Definition: parser.cc:923
size_t get_tag_length(example *ec)
Definition: parser.cc:948
void thread_dispatch(vw &all, v_array< example *> examples)
Definition: parser.cc:896
void add_constant_feature(vw &vw, example *ec)
Definition: parser.cc:774
unsigned char name
Definition: vw.h:66
v_array< char > currentname
Definition: io_buf.h:68
float get_prediction(example *ec)
Definition: parser.cc:919
std::thread parse_thread
Definition: global_data.h:378
v_array< size_t > gram_mask
Definition: parser.h:91
float initial
Definition: simple_label.h:16
bool foreground
Definition: parse_args.h:14
#define THROWERRNO(args)
Definition: vw_exception.h:167
size_t size() const
Definition: queue.h:70
Definition: io_buf.h:54
void finish_example(vw &, example &)
Definition: parser.cc:881
v_array< char > space
Definition: io_buf.h:62
uint32_t wpp
Definition: global_data.h:432
T *& end()
Definition: v_array.h:43
float get_action_score(example *ec, size_t i)
Definition: parser.cc:932
float loss
Definition: example.h:70
float weight
size_t get_feature_number(example *ec)
Definition: parser.cc:952
bool decision_service_json
Definition: parser.h:105
void end_pass_example(vw &all, example *ae)
Definition: parser.cc:644
iterator over values and indicies
void feature_limit(vw &all, example *ex)
Definition: parser.cc:651
v_array< audit_strings_ptr > space_names
polylabel l
Definition: example.h:57
MULTILABEL::labels multilabels
Definition: example.h:50
uint64_t parse_mask
Definition: global_data.h:453
node_pred * find(recall_tree &b, uint32_t cn, example &ec)
Definition: recall_tree.cc:126
void make_write_cache(vw &all, std::string &newname, bool quiet)
Definition: parser.cc:223
bool in_use
Definition: example.h:79
float total_sum_feat_sq
Definition: example.h:71
void print_result(int f, float res, v_array< char > tag, float lb, float ub)
Definition: bs.cc:136
features * passthrough
Definition: example.h:74
void cache_features(io_buf &cache, example *ae, uint64_t mask)
Definition: cache.cc:203
io_buf * output
Definition: parser.h:75
v_array< uint32_t > label_v
Definition: multilabel.h:16
void adjust_used_index(vw &)
Definition: parser.cc:968
example * read_example(vw &all, char *example_line)
Definition: parser.cc:761
uint64_t end_parsed_examples
Definition: parser.h:82
Definition: autolink.cc:11
bool empty() const
Definition: v_array.h:59
example * new_unused_example(vw &all)
Definition: parser.cc:753
int bound_sock
Definition: parser.h:97
float get_cost_sensitive_prediction(example *ec)
Definition: parser.cc:921
std::vector< std::string > interactions
Definition: global_data.h:457
void free_parser(vw &all)
Definition: parser.cc:976
uint32_t stride_shift()
void(* text_reader)(vw *, char *, size_t, v_array< example *> &)
Definition: parser.h:71
int label_sock
Definition: parser.h:96
int read_cached_features(vw *all, v_array< example *> &examples)
Definition: cache.cc:65
void unique_features(features &fs, int max)
Definition: unique_sort.cc:10
Definition: parser.h:38
bool sort_features
Definition: parser.h:77
bool audit
Definition: global_data.h:486
T last() const
Definition: v_array.h:57
bool stdin_off
Definition: global_data.h:527
polyprediction pred
Definition: example.h:60
size_t num_children
Definition: global_data.h:406
void delete_v()
Definition: v_array.h:98
std::condition_variable output_done
Definition: parser.h:88
float get_label(example *ec)
Definition: parser.cc:913
bool early_terminate
Definition: global_data.h:500
std::string pid_file
Definition: parse_args.h:16
example & get_unused_example(vw *all)
Definition: parser.cc:664
example * operator()(example *ex)
Definition: parser.cc:957
void return_object(T *obj)
Definition: object_pool.h:174
constexpr unsigned char constant_namespace
Definition: constant.h:22
bool isbinary(io_buf &i)
Definition: io_buf.cc:45
float weight
Definition: example.h:62
uint32_t * get_multilabel_predictions(example *ec, size_t &len)
Definition: parser.cc:925
uint32_t in_pass_counter
Definition: parser.h:83
bool children(log_multi &b, uint32_t &current, uint32_t &class_index, uint32_t label)
Definition: log_multi.cc:178
float get_topic_prediction(example *ec, size_t i)
Definition: parser.cc:911
bool end_pass
Definition: example.h:77
v_array< float > scalars
Definition: example.h:46
#define THROW(args)
Definition: vw_exception.h:181
constexpr uint64_t c
Definition: rand48.cc:12
bool resettable
Definition: parser.h:74
virtual ssize_t write_file(int f, const void *buf, size_t nbytes)
Definition: io_buf.h:190
bool emptylines_separate_examples
Definition: parser.h:84
size_t get_action_score_length(example *ec)
Definition: parser.cc:946
void(* print)(int, float, float, v_array< char >)
Definition: global_data.h:521
float f
Definition: cache.cc:40
void releaseFeatureSpace(primitive_feature_space *features, size_t len)
Definition: parser.cc:840
bool write_cache
Definition: parser.h:76
std::string data_filename
Definition: global_data.h:403
void set_done()
Definition: queue.h:62
bool daemon
Definition: global_data.h:405
bool empty() const
Definition: object_pool.h:186
uint32_t holdout_period
Definition: global_data.h:501
const char * to_string(prediction_type_t prediction_type)
Definition: learner.cc:12
T * pop()
Definition: queue.h:27
bool done
Definition: parser.h:90
label_parser lp
Definition: parser.h:102
void lock_done(parser &p)
Definition: parser.cc:571
void setup_examples(vw &all, v_array< example *> &examples)
Definition: parser.cc:673
bool test_only
Definition: example.h:76
std::pair< std::string, std::string > audit_strings
Definition: feature_group.h:22