Bitcoin Core  0.18.99
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2018 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 
10 #include <algorithm>
11 #include <vector>
12 
13 #include <boost/thread/condition_variable.hpp>
14 #include <boost/thread/mutex.hpp>
15 
16 template <typename T>
18 
29 template <typename T>
31 {
32 private:
34  boost::mutex mutex;
35 
37  boost::condition_variable condWorker;
38 
40  boost::condition_variable condMaster;
41 
44  std::vector<T> queue;
45 
47  int nIdle;
48 
50  int nTotal;
51 
53  bool fAllOk;
54 
60  unsigned int nTodo;
61 
63  unsigned int nBatchSize;
64 
66  bool Loop(bool fMaster = false)
67  {
68  boost::condition_variable& cond = fMaster ? condMaster : condWorker;
69  std::vector<T> vChecks;
70  vChecks.reserve(nBatchSize);
71  unsigned int nNow = 0;
72  bool fOk = true;
73  do {
74  {
75  boost::unique_lock<boost::mutex> lock(mutex);
76  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
77  if (nNow) {
78  fAllOk &= fOk;
79  nTodo -= nNow;
80  if (nTodo == 0 && !fMaster)
81  // We processed the last element; inform the master it can exit and return the result
82  condMaster.notify_one();
83  } else {
84  // first iteration
85  nTotal++;
86  }
87  // logically, the do loop starts here
88  while (queue.empty()) {
89  if (fMaster && nTodo == 0) {
90  nTotal--;
91  bool fRet = fAllOk;
92  // reset the status for new work later
93  if (fMaster)
94  fAllOk = true;
95  // return the current status
96  return fRet;
97  }
98  nIdle++;
99  cond.wait(lock); // wait
100  nIdle--;
101  }
102  // Decide how many work units to process now.
103  // * Do not try to do everything at once, but aim for increasingly smaller batches so
104  // all workers finish approximately simultaneously.
105  // * Try to account for idle jobs which will instantly start helping.
106  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
107  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
108  vChecks.resize(nNow);
109  for (unsigned int i = 0; i < nNow; i++) {
110  // We want the lock on the mutex to be as short as possible, so swap jobs from the global
111  // queue to the local batch vector instead of copying.
112  vChecks[i].swap(queue.back());
113  queue.pop_back();
114  }
115  // Check whether we need to do work at all
116  fOk = fAllOk;
117  }
118  // execute work
119  for (T& check : vChecks)
120  if (fOk)
121  fOk = check();
122  vChecks.clear();
123  } while (true);
124  }
125 
126 public:
128  boost::mutex ControlMutex;
129 
131  explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}
132 
134  void Thread()
135  {
136  Loop();
137  }
138 
140  bool Wait()
141  {
142  return Loop(true);
143  }
144 
146  void Add(std::vector<T>& vChecks)
147  {
148  boost::unique_lock<boost::mutex> lock(mutex);
149  for (T& check : vChecks) {
150  queue.push_back(T());
151  check.swap(queue.back());
152  }
153  nTodo += vChecks.size();
154  if (vChecks.size() == 1)
155  condWorker.notify_one();
156  else if (vChecks.size() > 1)
157  condWorker.notify_all();
158  }
159 
161  {
162  }
163 
164 };
165 
170 template <typename T>
171 class CCheckQueueControl
172 {
173 private:
175  bool fDone;
176 
177 public:
178  CCheckQueueControl() = delete;
179  CCheckQueueControl(const CCheckQueueControl&) = delete;
180  CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
181  explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
182  {
183  // passed queue is supposed to be unused, or nullptr
184  if (pqueue != nullptr) {
186  }
187  }
188 
189  bool Wait()
190  {
191  if (pqueue == nullptr)
192  return true;
193  bool fRet = pqueue->Wait();
194  fDone = true;
195  return fRet;
196  }
197 
198  void Add(std::vector<T>& vChecks)
199  {
200  if (pqueue != nullptr)
201  pqueue->Add(vChecks);
202  }
203 
205  {
206  if (!fDone)
207  Wait();
208  if (pqueue != nullptr) {
210  }
211  }
212 };
213 
214 #endif // BITCOIN_CHECKQUEUE_H
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:198
boost::condition_variable condWorker
Worker threads block on this when out of work.
Definition: checkqueue.h:37
boost::mutex mutex
Mutex to protect the inner state.
Definition: checkqueue.h:34
boost::condition_variable condMaster
Master thread blocks on this when out of work.
Definition: checkqueue.h:40
bool Loop(bool fMaster=false)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:66
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:181
void Thread()
Worker thread.
Definition: checkqueue.h:134
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:17
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:131
std::vector< T > queue
The queue of elements to be processed.
Definition: checkqueue.h:44
bool fAllOk
The temporary evaluation result.
Definition: checkqueue.h:53
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:195
int nTotal
The total number of workers (including the master).
Definition: checkqueue.h:50
Queue for verifications that have to be performed.
Definition: checkqueue.h:30
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:174
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:189
bool Wait()
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:140
int nIdle
The number of workers (including the master) that are idle.
Definition: checkqueue.h:47
unsigned int nTodo
Number of verifications that haven&#39;t completed yet.
Definition: checkqueue.h:60
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:146
unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:63
boost::mutex ControlMutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:128