Bitcoin Core  27.99.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <common/args.h>
8 #include <kernel/chain.h>
9 #include <kernel/mempool_entry.h>
10 #include <logging.h>
11 #include <primitives/block.h>
12 #include <primitives/transaction.h>
13 #include <validationinterface.h>
15 #include <zmq/zmqpublishnotifier.h>
16 #include <zmq/zmqutil.h>
17 
18 #include <zmq.h>
19 
20 #include <cassert>
21 #include <map>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
27 {
28 }
29 
31 {
32  Shutdown();
33 }
34 
35 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
36 {
37  std::list<const CZMQAbstractNotifier*> result;
38  for (const auto& n : notifiers) {
39  result.push_back(n.get());
40  }
41  return result;
42 }
43 
44 std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<uint8_t>&, const CBlockIndex&)> get_block_by_index)
45 {
46  std::map<std::string, CZMQNotifierFactory> factories;
47  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
48  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
49  factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
50  return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
51  };
52  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
53  factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
54 
55  std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
56  for (const auto& entry : factories)
57  {
58  std::string arg("-zmq" + entry.first);
59  const auto& factory = entry.second;
60  for (const std::string& address : gArgs.GetArgs(arg)) {
61  std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
62  notifier->SetType(entry.first);
63  notifier->SetAddress(address);
64  notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
65  notifiers.push_back(std::move(notifier));
66  }
67  }
68 
69  if (!notifiers.empty())
70  {
71  std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
72  notificationInterface->notifiers = std::move(notifiers);
73 
74  if (notificationInterface->Initialize()) {
75  return notificationInterface;
76  }
77  }
78 
79  return nullptr;
80 }
81 
82 // Called at startup to conditionally set up ZMQ socket(s)
84 {
85  int major = 0, minor = 0, patch = 0;
86  zmq_version(&major, &minor, &patch);
87  LogPrint(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
88 
89  LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
90  assert(!pcontext);
91 
92  pcontext = zmq_ctx_new();
93 
94  if (!pcontext)
95  {
96  zmqError("Unable to initialize context");
97  return false;
98  }
99 
100  for (auto& notifier : notifiers) {
101  if (notifier->Initialize(pcontext)) {
102  LogPrint(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
103  } else {
104  LogPrint(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
105  return false;
106  }
107  }
108 
109  return true;
110 }
111 
112 // Called during shutdown sequence
114 {
115  LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
116  if (pcontext)
117  {
118  for (auto& notifier : notifiers) {
119  LogPrint(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
120  notifier->Shutdown();
121  }
122  zmq_ctx_term(pcontext);
123 
124  pcontext = nullptr;
125  }
126 }
127 
128 namespace {
129 
130 template <typename Function>
131 void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
132 {
133  for (auto i = notifiers.begin(); i != notifiers.end(); ) {
134  CZMQAbstractNotifier* notifier = i->get();
135  if (func(notifier)) {
136  ++i;
137  } else {
138  notifier->Shutdown();
139  i = notifiers.erase(i);
140  }
141  }
142 }
143 
144 } // anonymous namespace
145 
146 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
147 {
148  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
149  return;
150 
151  TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
152  return notifier->NotifyBlock(pindexNew);
153  });
154 }
155 
157 {
158  const CTransaction& tx = *(ptx.info.m_tx);
159 
160  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
161  return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
162  });
163 }
164 
166 {
167  // Called for all non-block inclusion reasons
168  const CTransaction& tx = *ptx;
169 
170  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
171  return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
172  });
173 }
174 
175 void CZMQNotificationInterface::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
176 {
177  if (role == ChainstateRole::BACKGROUND) {
178  return;
179  }
180  for (const CTransactionRef& ptx : pblock->vtx) {
181  const CTransaction& tx = *ptx;
182  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
183  return notifier->NotifyTransaction(tx);
184  });
185  }
186 
187  // Next we notify BlockConnect listeners for *all* blocks
188  TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
189  return notifier->NotifyBlockConnect(pindexConnected);
190  });
191 }
192 
193 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
194 {
195  for (const CTransactionRef& ptx : pblock->vtx) {
196  const CTransaction& tx = *ptx;
197  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
198  return notifier->NotifyTransaction(tx);
199  });
200  }
201 
202  // Next we notify BlockDisconnect listeners for *all* blocks
203  TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
204  return notifier->NotifyBlockDisconnect(pindexDisconnected);
205  });
206 }
207 
208 std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
ArgsManager gArgs
Definition: args.cpp:41
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: args.cpp:360
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: args.cpp:480
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:141
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:296
virtual void Shutdown()=0
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static const int DEFAULT_ZMQ_SNDHWM
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
static std::unique_ptr< CZMQNotificationInterface > Create(std::function< bool(std::vector< uint8_t > &, const CBlockIndex &)> get_block_by_index)
void TransactionAddedToMempool(const NewMempoolTransactionInfo &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected Provides the block that was disconnected.
void BlockConnected(ChainstateRole role, const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
ChainstateRole
This enum describes the various roles a specific Chainstate instance can take.
Definition: chain.h:25
#define LogPrint(category,...)
Definition: logging.h:263
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
@ ZMQ
Definition: logging.h:46
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:423
const CTransactionRef m_tx
assert(!tx.IsCoinBase())
std::unique_ptr< CZMQNotificationInterface > g_zmq_notification_interface
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13