Skip to content

Commit 9362da7

Browse files
committed
Merge pull request bitcoin#1033 from sipa/wait
Condition variables instead of polling
2 parents 9682087 + 092631f commit 9362da7

File tree

3 files changed

+132
-122
lines changed

3 files changed

+132
-122
lines changed

src/net.cpp

+22-19
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ map<CInv, int64> mapAlreadyAskedFor;
6464
set<CNetAddr> setservAddNodeAddresses;
6565
CCriticalSection cs_setservAddNodeAddresses;
6666

67+
static CWaitableCriticalSection csOutbound;
68+
static int nOutbound = 0;
69+
static CConditionVariable condOutbound;
6770

6871

6972
unsigned short GetListenPort()
@@ -361,6 +364,8 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
361364
pnode->AddRef();
362365
CRITICAL_BLOCK(cs_vNodes)
363366
vNodes.push_back(pnode);
367+
WAITABLE_CRITICAL_BLOCK(csOutbound)
368+
nOutbound++;
364369

365370
pnode->nTimeConnected = GetTime();
366371
return pnode;
@@ -504,6 +509,15 @@ void ThreadSocketHandler2(void* parg)
504509
// remove from vNodes
505510
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
506511

512+
if (!pnode->fInbound)
513+
WAITABLE_CRITICAL_BLOCK(csOutbound)
514+
{
515+
nOutbound--;
516+
517+
// Connection slot(s) were removed, notify connection creator(s)
518+
NOTIFY(condOutbound);
519+
}
520+
507521
// close socket and cleanup
508522
pnode->CloseSocketDisconnect();
509523
pnode->Cleanup();
@@ -1172,32 +1186,20 @@ void ThreadOpenConnections2(void* parg)
11721186
int64 nStart = GetTime();
11731187
loop
11741188
{
1175-
int nOutbound = 0;
1176-
11771189
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
11781190
Sleep(500);
11791191
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
11801192
if (fShutdown)
11811193
return;
11821194

11831195
// Limit outbound connections
1184-
loop
1185-
{
1186-
nOutbound = 0;
1187-
CRITICAL_BLOCK(cs_vNodes)
1188-
BOOST_FOREACH(CNode* pnode, vNodes)
1189-
if (!pnode->fInbound)
1190-
nOutbound++;
1191-
int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS;
1192-
nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125));
1193-
if (nOutbound < nMaxOutboundConnections)
1194-
break;
1195-
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
1196-
Sleep(2000);
1197-
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
1198-
if (fShutdown)
1199-
return;
1200-
}
1196+
int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
1197+
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
1198+
WAITABLE_CRITICAL_BLOCK(csOutbound)
1199+
WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound);
1200+
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
1201+
if (fShutdown)
1202+
return;
12011203

12021204
bool fAddSeeds = false;
12031205

@@ -1646,6 +1648,7 @@ bool StopNode()
16461648
fShutdown = true;
16471649
nTransactionsUpdated++;
16481650
int64 nStart = GetTime();
1651+
NOTIFY_ALL(condOutbound);
16491652
do
16501653
{
16511654
int nThreadsRunning = 0;

src/util.cpp

+4-52
Original file line numberDiff line numberDiff line change
@@ -1183,62 +1183,14 @@ static void pop_lock()
11831183
dd_mutex.unlock();
11841184
}
11851185

1186-
void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine)
1186+
void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs)
11871187
{
1188-
push_lock(this, CLockLocation(pszName, pszFile, nLine));
1189-
#ifdef DEBUG_LOCKCONTENTION
1190-
bool result = mutex.try_lock();
1191-
if (!result)
1192-
{
1193-
printf("LOCKCONTENTION: %s\n", pszName);
1194-
printf("Locker: %s:%d\n", pszFile, nLine);
1195-
mutex.lock();
1196-
printf("Locked\n");
1197-
}
1198-
#else
1199-
mutex.lock();
1200-
#endif
1201-
}
1202-
void CCriticalSection::Leave()
1203-
{
1204-
mutex.unlock();
1205-
pop_lock();
1206-
}
1207-
bool CCriticalSection::TryEnter(const char* pszName, const char* pszFile, int nLine)
1208-
{
1209-
push_lock(this, CLockLocation(pszName, pszFile, nLine));
1210-
bool result = mutex.try_lock();
1211-
if (!result) pop_lock();
1212-
return result;
1188+
push_lock(cs, CLockLocation(pszName, pszFile, nLine));
12131189
}
12141190

1215-
#else
1216-
1217-
void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine)
1191+
void LeaveCritical()
12181192
{
1219-
#ifdef DEBUG_LOCKCONTENTION
1220-
bool result = mutex.try_lock();
1221-
if (!result)
1222-
{
1223-
printf("LOCKCONTENTION: %s\n", pszName);
1224-
printf("Locker: %s:%d\n", pszFile, nLine);
1225-
mutex.lock();
1226-
}
1227-
#else
1228-
mutex.lock();
1229-
#endif
1230-
}
1231-
1232-
void CCriticalSection::Leave()
1233-
{
1234-
mutex.unlock();
1235-
}
1236-
1237-
bool CCriticalSection::TryEnter(const char*, const char*, int)
1238-
{
1239-
bool result = mutex.try_lock();
1240-
return result;
1193+
pop_lock();
12411194
}
12421195

12431196
#endif /* DEBUG_LOCKORDER */
1244-

src/util.h

+106-51
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ typedef int pid_t; /* define for windows compatiblity */
2020

2121
#include <boost/thread.hpp>
2222
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
23+
#include <boost/interprocess/sync/scoped_lock.hpp>
24+
#include <boost/interprocess/sync/interprocess_condition.hpp>
25+
#include <boost/interprocess/sync/lock_options.hpp>
2326
#include <boost/date_time/gregorian/gregorian_types.hpp>
2427
#include <boost/date_time/posix_time/posix_time_types.hpp>
2528

@@ -180,82 +183,134 @@ void AddTimeData(const CNetAddr& ip, int64 nTime);
180183

181184

182185

186+
/** Wrapped boost mutex: supports recursive locking, but no waiting */
187+
typedef boost::interprocess::interprocess_recursive_mutex CCriticalSection;
183188

184-
/** Wrapper to automatically initialize mutex. */
185-
class CCriticalSection
186-
{
187-
protected:
188-
boost::interprocess::interprocess_recursive_mutex mutex;
189-
public:
190-
explicit CCriticalSection() { }
191-
~CCriticalSection() { }
192-
void Enter(const char* pszName, const char* pszFile, int nLine);
193-
void Leave();
194-
bool TryEnter(const char* pszName, const char* pszFile, int nLine);
195-
};
189+
/** Wrapped boost mutex: supports waiting but not recursive locking */
190+
typedef boost::interprocess::interprocess_mutex CWaitableCriticalSection;
196191

197-
/** RAII object that acquires mutex. Needed for exception safety. */
198-
class CCriticalBlock
199-
{
200-
protected:
201-
CCriticalSection* pcs;
192+
#ifdef DEBUG_LOCKORDER
193+
void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs);
194+
void LeaveCritical();
195+
#else
196+
void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs) {}
197+
void static inline LeaveCritical() {}
198+
#endif
202199

200+
/** Wrapper around boost::interprocess::scoped_lock */
201+
template<typename Mutex>
202+
class CMutexLock
203+
{
204+
private:
205+
boost::interprocess::scoped_lock<Mutex> lock;
203206
public:
204-
CCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine)
207+
208+
void Enter(const char* pszName, const char* pszFile, int nLine)
205209
{
206-
pcs = &csIn;
207-
pcs->Enter(pszName, pszFile, nLine);
210+
if (!lock.owns())
211+
{
212+
EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex()));
213+
#ifdef DEBUG_LOCKCONTENTION
214+
if (!lock.try_lock())
215+
{
216+
printf("LOCKCONTENTION: %s\n", pszName);
217+
printf("Locker: %s:%d\n", pszFile, nLine);
218+
}
219+
#endif
220+
lock.lock();
221+
}
208222
}
209223

210-
operator bool() const
224+
void Leave()
211225
{
212-
return true;
226+
if (lock.owns())
227+
{
228+
lock.unlock();
229+
LeaveCritical();
230+
}
213231
}
214232

215-
~CCriticalBlock()
233+
bool TryEnter(const char* pszName, const char* pszFile, int nLine)
216234
{
217-
pcs->Leave();
235+
if (!lock.owns())
236+
{
237+
EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex()));
238+
lock.try_lock();
239+
if (!lock.owns())
240+
LeaveCritical();
241+
}
242+
return lock.owns();
218243
}
219-
};
220-
221-
#define CRITICAL_BLOCK(cs) \
222-
if (CCriticalBlock criticalblock = CCriticalBlock(cs, #cs, __FILE__, __LINE__))
223-
224-
#define ENTER_CRITICAL_SECTION(cs) \
225-
(cs).Enter(#cs, __FILE__, __LINE__)
226-
227-
#define LEAVE_CRITICAL_SECTION(cs) \
228-
(cs).Leave()
229244

230-
/** RAII object that tries to acquire mutex. Needed for exception safety. */
231-
class CTryCriticalBlock
232-
{
233-
protected:
234-
CCriticalSection* pcs;
245+
CMutexLock(Mutex& mutexIn, const char* pszName, const char* pszFile, int nLine, bool fTry = false) : lock(mutexIn, boost::interprocess::defer_lock)
246+
{
247+
if (fTry)
248+
TryEnter(pszName, pszFile, nLine);
249+
else
250+
Enter(pszName, pszFile, nLine);
251+
}
235252

236-
public:
237-
CTryCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine)
253+
~CMutexLock()
238254
{
239-
pcs = (csIn.TryEnter(pszName, pszFile, nLine) ? &csIn : NULL);
255+
if (lock.owns())
256+
LeaveCritical();
240257
}
241258

242-
operator bool() const
259+
operator bool()
243260
{
244-
return Entered();
261+
return lock.owns();
245262
}
246263

247-
~CTryCriticalBlock()
264+
boost::interprocess::scoped_lock<Mutex> &GetLock()
248265
{
249-
if (pcs)
250-
{
251-
pcs->Leave();
252-
}
266+
return lock;
253267
}
254-
bool Entered() const { return pcs != NULL; }
255268
};
256269

270+
typedef CMutexLock<CCriticalSection> CCriticalBlock;
271+
typedef CMutexLock<CWaitableCriticalSection> CWaitableCriticalBlock;
272+
typedef boost::interprocess::interprocess_condition CConditionVariable;
273+
274+
/** Wait for a given condition inside a WAITABLE_CRITICAL_BLOCK */
275+
#define WAIT(name,condition) \
276+
do { while(!(condition)) { (name).wait(waitablecriticalblock.GetLock()); } } while(0)
277+
278+
/** Notify waiting threads that a condition may hold now */
279+
#define NOTIFY(name) \
280+
do { (name).notify_one(); } while(0)
281+
282+
#define NOTIFY_ALL(name) \
283+
do { (name).notify_all(); } while(0)
284+
285+
#define CRITICAL_BLOCK(cs) \
286+
for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
287+
for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false)
288+
289+
#define WAITABLE_CRITICAL_BLOCK(cs) \
290+
for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by WAITABLE_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
291+
for (CWaitableCriticalBlock waitablecriticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false)
292+
293+
#define ENTER_CRITICAL_SECTION(cs) \
294+
{ \
295+
EnterCritical(#cs, __FILE__, __LINE__, (void*)(&cs)); \
296+
(cs).lock(); \
297+
}
298+
299+
#define LEAVE_CRITICAL_SECTION(cs) \
300+
{ \
301+
(cs).unlock(); \
302+
LeaveCritical(); \
303+
}
304+
257305
#define TRY_CRITICAL_BLOCK(cs) \
258-
if (CTryCriticalBlock criticalblock = CTryCriticalBlock(cs, #cs, __FILE__, __LINE__))
306+
for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by TRY_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
307+
for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__, true); fcriticalblockonce && (fcriticalblockonce = criticalblock); fcriticalblockonce=false)
308+
309+
310+
// This is exactly like std::string, but with a custom allocator.
311+
// (secure_allocator<> is defined in serialize.h)
312+
typedef std::basic_string<char, std::char_traits<char>, secure_allocator<char> > SecureString;
313+
259314

260315

261316

0 commit comments

Comments
 (0)