Vowpal Wabbit
io_buf.h
Go to the documentation of this file.
1 /*
2 Copyright (c) by respective owners including Yahoo!, Microsoft, and
3 individual contributors. All rights reserved. Released under a BSD
4 license as described in the file LICENSE.
5  */
6 #pragma once
7 #ifndef _WIN32
8 #include <sys/types.h>
9 #include <unistd.h>
10 #endif
11 
12 #include <cstdio>
13 #include <fcntl.h>
14 #include "v_array.h"
15 #include <iostream>
16 #include <sstream>
17 #include <cerrno>
18 #include <stdexcept>
19 #include "hash.h"
20 #include "vw_exception.h"
21 #include "vw_validate.h"
22 
23 #ifndef O_LARGEFILE // for OSX
24 #define O_LARGEFILE 0
25 #endif
26 
27 #ifdef _WIN32
28 #define NOMINMAX
29 #define ssize_t int64_t
30 #include <io.h>
31 #include <sys/stat.h>
32 #endif
33 
34 /* The i/o buffer can be conceptualized as an array below:
35 ** _______________________________________________________________________________________
36 ** |__________|__________|__________|__________|__________|__________|__________|__________| **
37 ** space.begin space.head space.end space.endarray **
38 **
39 ** space.begin = the beginning of the loaded values in the buffer
40 ** space.head = the end of the last-read point in the buffer
41 ** space.end = the end of the loaded values from file
42 ** space.endarray = the end of the allocated space for the array
43 **
44 ** The values are ordered so that:
45 ** space.begin <= space.head <= space.end <= space.endarray
46 **
47 ** Initially space.begin == space.head since no values have been read.
48 **
49 ** The interval [space.head, space.end] may be shifted down to space.begin
50 ** if the requested number of bytes to be read is larger than the interval size.
51 ** This is done to avoid reallocating arrays as much as possible.
52 */
53 
54 class io_buf
55 {
56  // used to check-sum i/o files for corruption detection
58  uint32_t _hash;
59  static constexpr size_t INITIAL_BUFF_SIZE = 1 << 16;
60 
61  public:
62  v_array<char> space; // space.begin = beginning of loaded values. space.end = end of read or written values from/to
63  // the buffer.
65  size_t count; // maximum number of file descriptors.
66  size_t current; // file descriptor currently being used.
67  char* head;
70 
71  static constexpr int READ = 1;
72  static constexpr int WRITE = 2;
73 
74  void verify_hash(bool verify)
75  {
76  _verify_hash = verify;
77  // reset the hash so that the io_buf can be re-used for loading
78  // as it is done for Reload()
79  if (!verify)
80  _hash = 0;
81  }
82 
83  uint32_t hash()
84  {
85  if (!_verify_hash)
86  THROW("HASH WAS NOT CALCULATED");
87  return _hash;
88  }
89 
90  virtual int open_file(const char* name, bool stdin_off) { return open_file(name, stdin_off, READ); }
91 
92  virtual int open_file(const char* name, bool stdin_off, int flag = READ)
93  {
94  int ret = -1;
95  switch (flag)
96  {
97  case READ:
98  if (*name != '\0')
99  {
100 #ifdef _WIN32
101  // _O_SEQUENTIAL hints to OS that we'll be reading sequentially, so cache aggressively.
102  _sopen_s(&ret, name, _O_RDONLY | _O_BINARY | _O_SEQUENTIAL, _SH_DENYWR, 0);
103 #else
104  ret = open(name, O_RDONLY | O_LARGEFILE);
105 #endif
106  }
107  else if (!stdin_off)
108 #ifdef _WIN32
109  ret = _fileno(stdin);
110 #else
111  ret = fileno(stdin);
112 #endif
113  if (ret != -1)
114  files.push_back(ret);
115  break;
116 
117  case WRITE:
118 #ifdef _WIN32
119  _sopen_s(&ret, name, _O_CREAT | _O_WRONLY | _O_BINARY | _O_TRUNC, _SH_DENYWR, _S_IREAD | _S_IWRITE);
120 #else
121  ret = open(name, O_CREAT | O_WRONLY | O_LARGEFILE | O_TRUNC, 0666);
122 #endif
123  if (ret != -1)
124  files.push_back(ret);
125  break;
126 
127  default:
128  std::cerr << "Unknown file operation. Something other than READ/WRITE specified" << std::endl;
129  ret = -1;
130  }
131  if (ret == -1 && *name != '\0')
132  THROWERRNO("can't open: " << name);
133  return ret;
134  }
135 
136  virtual void reset_file(int f)
137  {
138 #ifdef _WIN32
139  _lseek(f, 0, SEEK_SET);
140 #else
141  lseek(f, 0, SEEK_SET);
142 #endif
143  space.end() = space.begin();
144  head = space.begin();
145  }
146 
147  io_buf() : _verify_hash{false}, _hash{0}, count{0}, current{0}
148  {
149  space = v_init<char>();
150  files = v_init<int>();
151  currentname = v_init<char>();
152  finalname = v_init<char>();
153  space.resize(INITIAL_BUFF_SIZE);
154  head = space.begin();
155  }
156 
157  virtual ~io_buf()
158  {
159  files.delete_v();
160  space.delete_v();
161  }
162 
163  void set(char* p) { head = p; }
164 
165  virtual size_t num_files() { return files.size(); }
166 
167  virtual ssize_t read_file(int f, void* buf, size_t nbytes) { return read_file_or_socket(f, buf, nbytes); }
168 
169  static ssize_t read_file_or_socket(int f, void* buf, size_t nbytes);
170 
171  ssize_t fill(int f)
172  { // if the loaded values have reached the allocated space
173  if (space.end_array - space.end() == 0)
174  { // reallocate to twice as much space
175  size_t head_loc = head - space.begin();
176  space.resize(2 * (space.end_array - space.begin()));
177  head = space.begin() + head_loc;
178  }
179  // read more bytes from file up to the remaining allocated space
180  ssize_t num_read = read_file(f, space.end(), space.end_array - space.end());
181  if (num_read >= 0)
182  { // if some bytes were actually loaded, update the end of loaded values
183  space.end() = space.end() + num_read;
184  return num_read;
185  }
186  else
187  return 0;
188  }
189 
190  virtual ssize_t write_file(int f, const void* buf, size_t nbytes) { return write_file_or_socket(f, buf, nbytes); }
191 
192  static ssize_t write_file_or_socket(int f, const void* buf, size_t nbytes);
193 
194  virtual void flush()
195  {
196  if (!files.empty())
197  {
198  if (write_file(files[0], space.begin(), head - space.begin()) != (int)(head - space.begin()))
199  std::cerr << "error, failed to write example\n";
200  head = space.begin();
201  }
202  }
203 
204  virtual bool close_file()
205  {
206  if (!files.empty())
207  {
208  close_file_or_socket(files.pop());
209  return true;
210  }
211  return false;
212  }
213 
214  virtual bool compressed() { return false; }
215 
216  static void close_file_or_socket(int f);
217 
218  void close_files()
219  {
220  while (close_file())
221  ;
222  }
223 
224  static bool is_socket(int f);
225 
226  void buf_write(char*& pointer, size_t n);
227  size_t buf_read(char*& pointer, size_t n);
228 
229  // if read_message is null, just read it in. Otherwise do a comparison and barf on read_message.
230  size_t bin_read_fixed(char* data, size_t len, const char* read_message)
231  {
232  if (len > 0)
233  {
234  char* p;
235  // if the model is corrupt the number of bytes can be less then specified (as there isn't enought data available
236  // in the file)
237  len = buf_read(p, len);
238 
239  // compute hash for check-sum
240  if (_verify_hash)
241  _hash = (uint32_t)uniform_hash(p, len, _hash);
242 
243  if (*read_message == '\0')
244  memcpy(data, p, len);
245  else if (memcmp(data, p, len) != 0)
246  THROW(read_message);
247  return len;
248  }
249  return 0;
250  }
251 
252  size_t bin_write_fixed(const char* data, size_t len)
253  {
254  if (len > 0)
255  {
256  char* p;
257  buf_write(p, len);
258 
259  memcpy(p, data, len);
260 
261  // compute hash for check-sum
262  if (_verify_hash)
263  _hash = (uint32_t)uniform_hash(p, len, _hash);
264  }
265  return len;
266  }
267 };
268 
269 bool isbinary(io_buf& i);
270 size_t readto(io_buf& i, char*& pointer, char terminal);
271 
272 inline size_t bin_read(io_buf& i, char* data, size_t len, const char* read_message)
273 {
274  uint32_t obj_len;
275  size_t ret = i.bin_read_fixed((char*)&obj_len, sizeof(obj_len), "");
276  if (obj_len > len || ret < sizeof(uint32_t))
277  THROW("bad model format!");
278 
279  ret += i.bin_read_fixed(data, obj_len, read_message);
280 
281  return ret;
282 }
283 
284 inline size_t bin_write(io_buf& o, const char* data, uint32_t len)
285 {
286  o.bin_write_fixed((char*)&len, sizeof(len));
287  o.bin_write_fixed(data, len);
288  return (len + sizeof(len));
289 }
290 
291 inline size_t bin_text_write(io_buf& io, char* data, size_t len, std::stringstream& msg, bool text)
292 {
293  if (text)
294  {
295  size_t temp = io.bin_write_fixed(msg.str().c_str(), msg.str().size());
296  msg.str("");
297  return temp;
298  }
299  else
300  return bin_write(io, data, (uint32_t)len);
301 }
302 
303 // a unified function for read(in binary), write(in binary), and write(in text)
304 inline size_t bin_text_read_write(
305  io_buf& io, char* data, size_t len, const char* read_message, bool read, std::stringstream& msg, bool text)
306 {
307  if (read)
308  return bin_read(io, data, len, read_message);
309  else
310  return bin_text_write(io, data, len, msg, text);
311 }
312 
313 inline size_t bin_text_write_fixed(io_buf& io, char* data, size_t len, std::stringstream& msg, bool text)
314 {
315  if (text)
316  {
317  size_t temp = io.bin_write_fixed(msg.str().c_str(), msg.str().size());
318  msg.str("");
319  return temp;
320  }
321  else
322  return io.bin_write_fixed(data, len);
323 }
324 
325 // a unified function for read(in binary), write(in binary), and write(in text)
327  io_buf& io, char* data, size_t len, const char* read_message, bool read, std::stringstream& msg, bool text)
328 {
329  if (read)
330  return io.bin_read_fixed(data, len, read_message);
331  else
332  return bin_text_write_fixed(io, data, len, msg, text);
333 }
334 
336  io_buf& io, char* data, size_t len, const char* read_message, bool read, std::stringstream& msg, bool text)
337 {
338  size_t nbytes = bin_text_read_write_fixed(io, data, len, read_message, read, msg, text);
339  if (read && len > 0) // only validate bytes read/write if expected length > 0
340  {
341  if (nbytes == 0)
342  {
343  THROW("Unexpected end of file encountered.");
344  }
345  }
346  return nbytes;
347 }
348 
349 #define writeit(what, str) \
350  do \
351  { \
352  msg << str << " = " << what << " "; \
353  bin_text_read_write_fixed(model_file, (char*)&what, sizeof(what), "", read, msg, text); \
354  } while (0);
355 
356 #define writeitvar(what, str, mywhat) \
357  auto mywhat = (what); \
358  do \
359  { \
360  msg << str << " = " << mywhat << " "; \
361  bin_text_read_write_fixed(model_file, (char*)&mywhat, sizeof(mywhat), "", read, msg, text); \
362  } while (0);
#define O_LARGEFILE
Definition: io_buf.h:24
void resize(size_t length)
Definition: v_array.h:69
void close_files()
Definition: io_buf.h:218
virtual bool compressed()
Definition: io_buf.h:214
T pop()
Definition: v_array.h:58
size_t current
Definition: io_buf.h:66
static ssize_t write_file_or_socket(int f, const void *buf, size_t nbytes)
Definition: io_buf.cc:140
static constexpr int WRITE
Definition: io_buf.h:72
virtual bool close_file()
Definition: io_buf.h:204
size_t readto(io_buf &i, char *&pointer, char terminal)
Definition: io_buf.cc:58
static constexpr size_t INITIAL_BUFF_SIZE
Definition: io_buf.h:59
size_t bin_text_read_write_fixed_validated(io_buf &io, char *data, size_t len, const char *read_message, bool read, std::stringstream &msg, bool text)
Definition: io_buf.h:335
VW_STD14_CONSTEXPR uint64_t uniform_hash(const void *key, size_t len, uint64_t seed)
Definition: hash.h:67
size_t bin_text_read_write(io_buf &io, char *data, size_t len, const char *read_message, bool read, std::stringstream &msg, bool text)
Definition: io_buf.h:304
static bool is_socket(int f)
Definition: io_buf.cc:116
virtual void reset_file(int f)
Definition: io_buf.h:136
virtual int open_file(const char *name, bool stdin_off, int flag=READ)
Definition: io_buf.h:92
size_t count
Definition: io_buf.h:65
T *& begin()
Definition: v_array.h:42
virtual ssize_t read_file(int f, void *buf, size_t nbytes)
Definition: io_buf.h:167
static void close_file_or_socket(int f)
Definition: io_buf.cc:152
size_t size() const
Definition: v_array.h:68
static constexpr int READ
Definition: io_buf.h:71
uint32_t hash()
Definition: io_buf.h:83
virtual int open_file(const char *name, bool stdin_off)
Definition: io_buf.h:90
v_array< char > finalname
Definition: io_buf.h:69
char * head
Definition: io_buf.h:67
size_t bin_text_write_fixed(io_buf &io, char *data, size_t len, std::stringstream &msg, bool text)
Definition: io_buf.h:313
ssize_t fill(int f)
Definition: io_buf.h:171
virtual size_t num_files()
Definition: io_buf.h:165
size_t bin_read_fixed(char *data, size_t len, const char *read_message)
Definition: io_buf.h:230
size_t bin_text_write(io_buf &io, char *data, size_t len, std::stringstream &msg, bool text)
Definition: io_buf.h:291
size_t bin_read(io_buf &i, char *data, size_t len, const char *read_message)
Definition: io_buf.h:272
virtual void flush()
Definition: io_buf.h:194
void push_back(const T &new_ele)
Definition: v_array.h:107
v_array< int > files
Definition: io_buf.h:64
virtual ~io_buf()
Definition: io_buf.h:157
v_array< char > currentname
Definition: io_buf.h:68
void verify_hash(bool verify)
Definition: io_buf.h:74
#define THROWERRNO(args)
Definition: vw_exception.h:167
Definition: io_buf.h:54
static ssize_t read_file_or_socket(int f, void *buf, size_t nbytes)
Definition: io_buf.cc:128
v_array< char > space
Definition: io_buf.h:62
T *& end()
Definition: v_array.h:43
io_buf()
Definition: io_buf.h:147
void buf_write(char *&pointer, size_t n)
Definition: io_buf.cc:94
bool isbinary(io_buf &i)
Definition: io_buf.cc:45
size_t bin_write(io_buf &o, const char *data, uint32_t len)
Definition: io_buf.h:284
size_t bin_write_fixed(const char *data, size_t len)
Definition: io_buf.h:252
bool empty() const
Definition: v_array.h:59
bool _verify_hash
Definition: io_buf.h:57
void delete_v()
Definition: v_array.h:98
uint32_t _hash
Definition: io_buf.h:58
size_t bin_text_read_write_fixed(io_buf &io, char *data, size_t len, const char *read_message, bool read, std::stringstream &msg, bool text)
Definition: io_buf.h:326
#define THROW(args)
Definition: vw_exception.h:181
virtual ssize_t write_file(int f, const void *buf, size_t nbytes)
Definition: io_buf.h:190
float f
Definition: cache.cc:40
T * end_array
Definition: v_array.h:38
size_t buf_read(char *&pointer, size_t n)
Definition: io_buf.cc:12