Bitcoin Core  27.99.0
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 #include <tinyformat.h>
10 #include <util/threadnames.h>
11 
12 #include <algorithm>
13 #include <iterator>
14 #include <vector>
15 
26 template <typename T>
28 {
29 private:
32 
34  std::condition_variable m_worker_cv;
35 
37  std::condition_variable m_master_cv;
38 
41  std::vector<T> queue GUARDED_BY(m_mutex);
42 
44  int nIdle GUARDED_BY(m_mutex){0};
45 
47  int nTotal GUARDED_BY(m_mutex){0};
48 
50  bool fAllOk GUARDED_BY(m_mutex){true};
51 
57  unsigned int nTodo GUARDED_BY(m_mutex){0};
58 
60  const unsigned int nBatchSize;
61 
62  std::vector<std::thread> m_worker_threads;
63  bool m_request_stop GUARDED_BY(m_mutex){false};
64 
66  bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
67  {
68  std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
69  std::vector<T> vChecks;
70  vChecks.reserve(nBatchSize);
71  unsigned int nNow = 0;
72  bool fOk = true;
73  do {
74  {
75  WAIT_LOCK(m_mutex, lock);
76  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
77  if (nNow) {
78  fAllOk &= fOk;
79  nTodo -= nNow;
80  if (nTodo == 0 && !fMaster)
81  // We processed the last element; inform the master it can exit and return the result
82  m_master_cv.notify_one();
83  } else {
84  // first iteration
85  nTotal++;
86  }
87  // logically, the do loop starts here
88  while (queue.empty() && !m_request_stop) {
89  if (fMaster && nTodo == 0) {
90  nTotal--;
91  bool fRet = fAllOk;
92  // reset the status for new work later
93  fAllOk = true;
94  // return the current status
95  return fRet;
96  }
97  nIdle++;
98  cond.wait(lock); // wait
99  nIdle--;
100  }
101  if (m_request_stop) {
102  return false;
103  }
104 
105  // Decide how many work units to process now.
106  // * Do not try to do everything at once, but aim for increasingly smaller batches so
107  // all workers finish approximately simultaneously.
108  // * Try to account for idle jobs which will instantly start helping.
109  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
110  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
111  auto start_it = queue.end() - nNow;
112  vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
113  queue.erase(start_it, queue.end());
114  // Check whether we need to do work at all
115  fOk = fAllOk;
116  }
117  // execute work
118  for (T& check : vChecks)
119  if (fOk)
120  fOk = check();
121  vChecks.clear();
122  } while (true);
123  }
124 
125 public:
128 
130  explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
131  : nBatchSize(batch_size)
132  {
133  m_worker_threads.reserve(worker_threads_num);
134  for (int n = 0; n < worker_threads_num; ++n) {
135  m_worker_threads.emplace_back([this, n]() {
136  util::ThreadRename(strprintf("scriptch.%i", n));
137  Loop(false /* worker thread */);
138  });
139  }
140  }
141 
142  // Since this class manages its own resources, which is a thread
143  // pool `m_worker_threads`, copy and move operations are not appropriate.
144  CCheckQueue(const CCheckQueue&) = delete;
145  CCheckQueue& operator=(const CCheckQueue&) = delete;
148 
151  {
152  return Loop(true /* master thread */);
153  }
154 
156  void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
157  {
158  if (vChecks.empty()) {
159  return;
160  }
161 
162  {
163  LOCK(m_mutex);
164  queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
165  nTodo += vChecks.size();
166  }
167 
168  if (vChecks.size() == 1) {
169  m_worker_cv.notify_one();
170  } else {
171  m_worker_cv.notify_all();
172  }
173  }
174 
176  {
177  WITH_LOCK(m_mutex, m_request_stop = true);
178  m_worker_cv.notify_all();
179  for (std::thread& t : m_worker_threads) {
180  t.join();
181  }
182  }
183 
184  bool HasThreads() const { return !m_worker_threads.empty(); }
185 };
186 
191 template <typename T>
193 {
194 private:
196  bool fDone;
197 
198 public:
199  CCheckQueueControl() = delete;
202  explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
203  {
204  // passed queue is supposed to be unused, or nullptr
205  if (pqueue != nullptr) {
206  ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
207  }
208  }
209 
210  bool Wait()
211  {
212  if (pqueue == nullptr)
213  return true;
214  bool fRet = pqueue->Wait();
215  fDone = true;
216  return fRet;
217  }
218 
219  void Add(std::vector<T>&& vChecks)
220  {
221  if (pqueue != nullptr) {
222  pqueue->Add(std::move(vChecks));
223  }
224  }
225 
227  {
228  if (!fDone)
229  Wait();
230  if (pqueue != nullptr) {
231  LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
232  }
233  }
234 };
235 
236 #endif // BITCOIN_CHECKQUEUE_H
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:193
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:195
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:202
CCheckQueueControl()=delete
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
void Add(std::vector< T > &&vChecks)
Definition: checkqueue.h:219
CCheckQueueControl(const CCheckQueueControl &)=delete
Queue for verifications that have to be performed.
Definition: checkqueue.h:28
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:57
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:37
bool fAllOk GUARDED_BY(m_mutex)
The temporary evaluation result.
Definition: checkqueue.h:50
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:63
bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:66
CCheckQueue(CCheckQueue &&)=delete
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:47
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:62
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:127
CCheckQueue & operator=(const CCheckQueue &)=delete
CCheckQueue & operator=(CCheckQueue &&)=delete
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:34
bool HasThreads() const
Definition: checkqueue.h:184
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:31
bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:150
CCheckQueue(unsigned int batch_size, int worker_threads_num)
Create a new check queue.
Definition: checkqueue.h:130
void Add(std::vector< T > &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Add a batch of checks to the queue.
Definition: checkqueue.h:156
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:44
CCheckQueue(const CCheckQueue &)=delete
unsigned int nTodo GUARDED_BY(m_mutex)
Number of verifications that haven't completed yet.
Definition: checkqueue.h:57
void ThreadRename(std::string &&)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:59
#define WAIT_LOCK(cs, name)
Definition: sync.h:262
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:264
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:270
#define LOCK(cs)
Definition: sync.h:257
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:301
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:49
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1162