Vowpal Wabbit
accumulate.cc
Go to the documentation of this file.
1 /*
2 Copyright (c) by respective owners including Yahoo!, Microsoft, and
3 individual contributors. All rights reserved. Released under a BSD (revised)
4 license as described in the file LICENSE.
5  */
6 /*
7 This implements the allreduce function of MPI. Code primarily by
8 Alekh Agarwal and John Langford, with help Olivier Chapelle.
9 */
10 
11 #include <iostream>
12 #include <sys/timeb.h>
13 #include <cmath>
14 #include <stdint.h>
15 #include "global_data.h"
16 #include "vw_allreduce.h"
17 
18 void add_float(float& c1, const float& c2) { c1 += c2; }
19 
20 void accumulate(vw& all, parameters& weights, size_t offset)
21 {
22  uint64_t length = UINT64_ONE << all.num_bits; // This is size of gradient
23  float* local_grad = new float[length];
24 
25  if (weights.sparse)
26  for (uint64_t i = 0; i < length; i++)
27  local_grad[i] = (&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[offset];
28  else
29  for (uint64_t i = 0; i < length; i++)
30  local_grad[i] = (&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[offset];
31 
32  all_reduce<float, add_float>(all, local_grad, length); // TODO: modify to not use first()
33 
34  if (weights.sparse)
35  for (uint64_t i = 0; i < length; i++)
36  (&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[offset] = local_grad[i];
37  else
38  for (uint64_t i = 0; i < length; i++)
39  (&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[offset] = local_grad[i];
40 
41  delete[] local_grad;
42 }
43 
44 float accumulate_scalar(vw& all, float local_sum)
45 {
46  float temp = local_sum;
47  all_reduce<float, add_float>(all, &temp, 1);
48  return temp;
49 }
50 
51 void accumulate_avg(vw& all, parameters& weights, size_t offset)
52 {
53  uint32_t length = 1 << all.num_bits; // This is size of gradient
54  float numnodes = (float)all.all_reduce->total;
55  float* local_grad = new float[length];
56 
57  if (weights.sparse)
58  for (uint64_t i = 0; i < length; i++)
59  local_grad[i] = (&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[offset];
60  else
61  for (uint64_t i = 0; i < length; i++)
62  local_grad[i] = (&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[offset];
63 
64  all_reduce<float, add_float>(all, local_grad, length); // TODO: modify to not use first()
65 
66  if (weights.sparse)
67  for (uint64_t i = 0; i < length; i++)
68  (&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[offset] = local_grad[i] / numnodes;
69  else
70  for (uint64_t i = 0; i < length; i++)
71  (&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[offset] = local_grad[i] / numnodes;
72 
73  delete[] local_grad;
74 }
75 
76 float max_elem(float* arr, int length)
77 {
78  float max = arr[0];
79  for (int i = 1; i < length; i++)
80  if (arr[i] > max)
81  max = arr[i];
82  return max;
83 }
84 
85 float min_elem(float* arr, int length)
86 {
87  float min = arr[0];
88  for (int i = 1; i < length; i++)
89  if (arr[i] < min && arr[i] > 0.001)
90  min = arr[i];
91  return min;
92 }
93 
94 template <class T>
95 void do_weighting(vw& all, uint64_t length, float* local_weights, T& weights)
96 {
97  for (uint64_t i = 0; i < length; i++)
98  {
99  float* weight = &weights[i << weights.stride_shift()];
100  if (local_weights[i] > 0)
101  {
102  float ratio = weight[1] / local_weights[i];
103  local_weights[i] = weight[0] * ratio;
104  weight[0] *= ratio;
105  weight[1] *= ratio; // A crude max
106  if (all.normalized_idx > 0)
107  weight[all.normalized_idx] *= ratio; // A crude max
108  }
109  else
110  {
111  local_weights[i] = 0;
112  *weight = 0;
113  }
114  }
115 }
116 
118 {
119  if (!weights.adaptive)
120  {
121  all.trace_message << "Weighted averaging is implemented only for adaptive gradient, use accumulate_avg instead\n";
122  return;
123  }
124 
125  uint32_t length = 1 << all.num_bits; // This is the number of parameters
126  float* local_weights = new float[length];
127 
128  if (weights.sparse)
129  for (uint64_t i = 0; i < length; i++)
130  local_weights[i] = (&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[1];
131  else
132  for (uint64_t i = 0; i < length; i++)
133  local_weights[i] = (&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[1];
134 
135  // First compute weights for averaging
136  all_reduce<float, add_float>(all, local_weights, length);
137 
138  if (weights.sparse)
139  do_weighting(all, length, local_weights, weights.sparse_weights);
140  else
141  do_weighting(all, length, local_weights, weights.dense_weights);
142 
143  if (weights.sparse)
144  std::cout << "sparse parameters not supported with parallel computation!" << std::endl;
145  else
146  all_reduce<float, add_float>(
147  all, weights.dense_weights.first(), ((size_t)length) * (1ull << weights.stride_shift()));
148  delete[] local_weights;
149 }
float max_elem(float *arr, int length)
Definition: accumulate.cc:76
void accumulate_weighted_avg(vw &all, parameters &weights)
Definition: accumulate.cc:117
void accumulate(vw &all, parameters &weights, size_t offset)
Definition: accumulate.cc:20
size_t normalized_idx
Definition: global_data.h:506
uint32_t stride_shift() const
const size_t total
Definition: allreduce.h:80
uint32_t num_bits
Definition: global_data.h:398
AllReduce * all_reduce
Definition: global_data.h:381
float min_elem(float *arr, int length)
Definition: accumulate.cc:85
vw_ostream trace_message
Definition: global_data.h:424
void do_weighting(vw &all, uint64_t length, float *local_weights, T &weights)
Definition: accumulate.cc:95
void accumulate_avg(vw &all, parameters &weights, size_t offset)
Definition: accumulate.cc:51
dense_parameters dense_weights
void add_float(float &c1, const float &c2)
Definition: accumulate.cc:18
float weight
constexpr uint64_t UINT64_ONE
sparse_parameters sparse_weights
float accumulate_scalar(vw &all, float local_sum)
Definition: accumulate.cc:44
uint32_t stride_shift()
uint32_t stride_shift() const