Vowpal Wabbit
Public Member Functions | Public Attributes | Private Attributes | List of all members
AllReduceSync Class Reference

#include <allreduce.h>

Public Member Functions

 AllReduceSync (const size_t total)
 
 ~AllReduceSync ()
 
void waitForSynchronization ()
 

Public Attributes

void ** buffers
 

Private Attributes

std::mutex * m_mutex
 
std::condition_variable * m_cv
 
size_t m_total
 
uint32_t m_count
 
bool m_run
 

Detailed Description

Definition at line 98 of file allreduce.h.

Constructor & Destructor Documentation

◆ AllReduceSync()

AllReduceSync::AllReduceSync ( const size_t  total)

Definition at line 12 of file allreduce_threads.cc.

References buffers, m_cv, and m_mutex.

12  : m_total(total), m_count(0), m_run(true)
13 {
14  m_mutex = new std::mutex;
15  m_cv = new std::condition_variable;
16  buffers = new void*[total];
17 }
uint32_t m_count
Definition: allreduce.h:108
std::mutex * m_mutex
Definition: allreduce.h:101
void ** buffers
Definition: allreduce.h:120
size_t m_total
Definition: allreduce.h:105
std::condition_variable * m_cv
Definition: allreduce.h:102

◆ ~AllReduceSync()

AllReduceSync::~AllReduceSync ( )

Definition at line 19 of file allreduce_threads.cc.

References buffers, m_cv, and m_mutex.

20 {
21  delete m_mutex;
22  delete m_cv;
23  delete[] buffers;
24 }
std::mutex * m_mutex
Definition: allreduce.h:101
void ** buffers
Definition: allreduce.h:120
std::condition_variable * m_cv
Definition: allreduce.h:102

Member Function Documentation

◆ waitForSynchronization()

void AllReduceSync::waitForSynchronization ( )

Definition at line 26 of file allreduce_threads.cc.

References m_count, m_cv, m_mutex, m_run, and m_total.

Referenced by AllReduceThreads::all_reduce().

27 {
28  std::unique_lock<std::mutex> l(*m_mutex);
29  m_count++;
30 
31  if (m_count >= m_total)
32  {
33  assert(m_count == m_total);
34 
35  m_cv->notify_all();
36 
37  // order of m_count before or after notify_all doesn't matter
38  // since the lock is still hold at this point in time.
39  m_count = 0;
40 
41  // flip for the next run
42  m_run = !m_run;
43  }
44  else
45  {
46  bool current_run = m_run;
47  // this predicate cannot depend on m_count, as somebody can race ahead and m_count++
48  // FYI just wait can spuriously wake-up
49  m_cv->wait(l, [this, current_run] { return m_run != current_run; });
50  }
51 }
uint32_t m_count
Definition: allreduce.h:108
std::mutex * m_mutex
Definition: allreduce.h:101
size_t m_total
Definition: allreduce.h:105
std::condition_variable * m_cv
Definition: allreduce.h:102

Member Data Documentation

◆ buffers

void** AllReduceSync::buffers

Definition at line 120 of file allreduce.h.

Referenced by AllReduceThreads::all_reduce(), AllReduceSync(), and ~AllReduceSync().

◆ m_count

uint32_t AllReduceSync::m_count
private

Definition at line 108 of file allreduce.h.

Referenced by waitForSynchronization().

◆ m_cv

std::condition_variable* AllReduceSync::m_cv
private

Definition at line 102 of file allreduce.h.

Referenced by AllReduceSync(), waitForSynchronization(), and ~AllReduceSync().

◆ m_mutex

std::mutex* AllReduceSync::m_mutex
private

Definition at line 101 of file allreduce.h.

Referenced by AllReduceSync(), waitForSynchronization(), and ~AllReduceSync().

◆ m_run

bool AllReduceSync::m_run
private

Definition at line 111 of file allreduce.h.

Referenced by waitForSynchronization().

◆ m_total

size_t AllReduceSync::m_total
private

Definition at line 105 of file allreduce.h.

Referenced by waitForSynchronization().


The documentation for this class was generated from the following files: