Skip to content

Commit 7c8c22d

Browse files
committed
解决了内存溢出的问题:阻塞队列没有实现push函数的阻塞功能
1 parent c4af527 commit 7c8c22d

File tree

7 files changed

+108
-51
lines changed

7 files changed

+108
-51
lines changed

blocking_queue.h

+27-13
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ class block_queue
3232
m_back = -1;
3333

3434
m_mutex = new pthread_mutex_t;
35-
m_cond = new pthread_cond_t;
35+
m_rcond = new pthread_cond_t;
36+
m_wcond = new pthread_cond_t;
3637
pthread_mutex_init(m_mutex, NULL);
37-
pthread_cond_init(m_cond, NULL);
38+
pthread_cond_init(m_rcond, NULL);
39+
pthread_cond_init(m_wcond, NULL);
3840
}
3941

4042
void clear()
@@ -54,10 +56,11 @@ class block_queue
5456
pthread_mutex_unlock(m_mutex);
5557

5658
pthread_mutex_destroy(m_mutex);
57-
pthread_cond_destroy(m_cond);
59+
pthread_cond_destroy(m_rcond);
60+
pthread_cond_destroy(m_wcond);
5861

5962
delete m_mutex;
60-
delete m_cond;
63+
delete m_rcond;
6164
}
6265

6366
bool full()const
@@ -131,29 +134,34 @@ class block_queue
131134
bool push(const T& item)
132135
{
133136
pthread_mutex_lock(m_mutex);
134-
if(m_size >= m_max_size)
137+
const bool was_empty = (m_size == 0);
138+
while (m_size >= m_max_size)
135139
{
136-
pthread_cond_broadcast(m_cond);
137-
pthread_mutex_unlock(m_mutex);
138-
return false;
140+
if (0 != pthread_cond_wait(m_wcond, m_mutex)) {
141+
pthread_cond_broadcast(m_rcond);
142+
pthread_mutex_unlock(m_mutex);
143+
return false;
144+
}
139145
}
140146

141147
m_back = (m_back + 1) % m_max_size;
142148
m_array[m_back] = item;
143149

144150
m_size++;
145-
pthread_cond_broadcast(m_cond);
151+
pthread_cond_broadcast(m_rcond);
146152
pthread_mutex_unlock(m_mutex);
147-
153+
// 唤醒push
154+
if (was_empty) pthread_cond_broadcast(m_rcond);
148155
return true;
149156
}
150157

151158
bool pop(T *item)
152159
{
153160
pthread_mutex_lock(m_mutex);
161+
const bool was_full = (m_size == m_max_size);
154162
while(m_size <= 0)
155163
{
156-
if(0 != pthread_cond_wait(m_cond, m_mutex))
164+
if(0 != pthread_cond_wait(m_rcond, m_mutex))
157165
{
158166
pthread_mutex_unlock(m_mutex);
159167
return false;
@@ -164,6 +172,8 @@ class block_queue
164172
*item = m_array[m_front];
165173
m_size--;
166174
pthread_mutex_unlock(m_mutex);
175+
// 唤醒push
176+
if (was_full) pthread_cond_broadcast(m_wcond);
167177
return true;
168178
}
169179

@@ -173,11 +183,12 @@ class block_queue
173183
struct timeval now = {0,0};
174184
gettimeofday(&now, NULL);
175185
pthread_mutex_lock(m_mutex);
186+
const bool was_full = (m_size == m_max_size);
176187
if(m_size <= 0)
177188
{
178189
t.tv_sec = now.tv_sec + ms_timeout/1000;
179190
t.tv_nsec = (ms_timeout % 1000)*1000;
180-
if(0 != pthread_cond_timedwait(m_cond, m_mutex, &t))
191+
if(0 != pthread_cond_timedwait(m_rcond, m_mutex, &t))
181192
{
182193
pthread_mutex_unlock(m_mutex);
183194
return false;
@@ -193,12 +204,15 @@ class block_queue
193204
m_front = (m_front + 1) % m_max_size;
194205
*item = m_array[m_front];m_size--;
195206
pthread_mutex_unlock(m_mutex);
207+
// 唤醒push
208+
if (was_full) pthread_cond_broadcast(m_wcond);
196209
return true;
197210
}
198211

199212
private:
200213
pthread_mutex_t *m_mutex;
201-
pthread_cond_t *m_cond;
214+
pthread_cond_t *m_rcond;
215+
pthread_cond_t *m_wcond;
202216
T *m_array;
203217
int m_size;
204218
int m_max_size;

communicator.cpp

+18-14
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
using namespace std;
1818

19-
block_queue<Item *> *blockQueue = new block_queue<Item *>(1024);
19+
block_queue<Item *> *blockQueue = new block_queue<Item *>(1024*1024);
2020

2121
#define MAIN_THREAD_RUNNING 1
2222
#define MAIN_THREAD_WAITING 0
@@ -41,7 +41,7 @@ void mainThreadTellFinished(int thisRank, int world_size) {
4141
itemStop->tarRank = i;
4242
itemStop->pack = buf;
4343
if (nullptr == blockQueue) cerr << "Cannot create blocking queue." << endl;
44-
blockQueue->push(itemStop);
44+
if (!blockQueue->push(itemStop)) cerr << "Cannot push stop signal." << endl;
4545
itemStop = nullptr;
4646
buf = nullptr;
4747
packSizeStop = 0;
@@ -95,7 +95,7 @@ void requestOtherRanksToStoreEdge(int currank, int world_size, const char *value
9595
int packSize = 0;
9696
int strLen = getK();
9797
int op = EDGE_STORE_OP;
98-
int tarrank = edgeId % world_size;
98+
size_t tarrank = idBelongTo(world_size, edgeId);
9999
if (tarrank == currank) {
100100
cerr << "Putting a edge belonging to this host to other hosts." << endl;
101101
return;
@@ -110,19 +110,19 @@ void requestOtherRanksToStoreEdge(int currank, int world_size, const char *value
110110
// DEBUG
111111
//MPI_Send(&packSize, 1, MPI_INT, tarrank, TAG(currank, tarrank), MPI_COMM_WORLD);
112112
//MPI_Send(buf, packSize, MPI_PACKED, tarrank, TAG(currank, tarrank), MPI_COMM_WORLD);
113-
item->tarRank = tarrank;
113+
item->tarRank = (int) tarrank;
114114
item->pack = buf;
115115
item->packSize = packSize;
116116
if (nullptr == blockQueue) cerr << "Cannot create blocking queue." << endl;
117-
blockQueue->push(item);
117+
if (!blockQueue->push(item)) cerr << "Cannot request new edge" << endl;
118118
}
119119

120120
void
121121
requestOtherRanksToStoreVertex(int currank, int world_size, VertexId vertexId, EdgeId edgeId, VertexMode_t vertexMode) {
122122
char *buf = new char[BUFFER_SIZE];
123123
int packSize = 0;
124124
int op = VERTEX_STORE_OP;
125-
int tarrank = vertexId % world_size;
125+
size_t tarrank = idBelongTo(world_size, vertexId);
126126
if (tarrank == currank) {
127127
cerr << "Putting a vertex belonging to this host to other hosts." << endl;
128128
return;
@@ -137,11 +137,11 @@ requestOtherRanksToStoreVertex(int currank, int world_size, VertexId vertexId, E
137137
//MPI_Send(&packSize, 1, MPI_INT, tarrank, TAG(currank, tarrank), MPI_COMM_WORLD);
138138
//MPI_Send(buf, packSize, MPI_PACKED, tarrank, TAG(currank, tarrank), MPI_COMM_WORLD);
139139

140-
item->tarRank = tarrank;
140+
item->tarRank = (int) tarrank;
141141
item->pack = buf;
142142
item->packSize = packSize;
143143
if (nullptr == blockQueue) cerr << "Cannot create blocking queue." << endl;
144-
blockQueue->push(item);
144+
if (!blockQueue->push(item)) cerr << "Cannot request new vertex" << endl;
145145
}
146146

147147
// 该方法负责char *指针的delete
@@ -160,10 +160,10 @@ int processRecvRead(const char *filename, size_t pos) {
160160
// 该方法负责char *指针的delete
161161
int processRecvEdge(EdgeList *edgeList, char *value, ReadId readId, KMERPOS_t kmerpos) {
162162
string *strValue = new string(value, 0, getK()-1);
163-
VertexId sourceVertex = getId(*strValue);
163+
VertexId sourceVertex = getId(strValue->c_str());
164164
delete strValue;
165165
strValue = new string(value, 1, getK()-1);
166-
VertexId sinkVertex = getId(*strValue);
166+
VertexId sinkVertex = getId(strValue->c_str());
167167
delete strValue;
168168
int flag = addNewEdge(edgeList, value, sourceVertex, sinkVertex, readId, kmerpos);
169169
delete[] value;
@@ -348,7 +348,9 @@ void *receiverRunner(void *args) {
348348
int sendMsgSize = 0;
349349

350350
if (vertexList->count(vertexId) == 0) {
351+
vertexList->at(vertexId);
351352
cerr << "Cannot query the vertex #" << vertexId << endl;
353+
cerr << thisRank << endl;
352354
queryStatus = FAILED_QUERY;
353355
} else {
354356
// vertex存在时
@@ -361,13 +363,15 @@ void *receiverRunner(void *args) {
361363
// 并行环境下不能使用tangleList->count(vertexId) != 0来判断是否为tangle
362364
queryStatus = TANGLE_QUERY;
363365
Vertex *thisVertex = vertexList->at(vertexId);
364-
edgeId = *thisVertex->outKMer->begin();
366+
/*edgeId = *thisVertex->outKMer->begin();
365367
// 删除vertex的一个出度
366-
removeOutEdge(vertexList, vertexId, edgeId);
368+
removeOutEdge(vertexList, vertexId, edgeId);*/
369+
getAndRemoveOutEdge(vertexList, vertexId, &edgeId); // 保护并行环境下的删除节点
367370
}else {
368-
edgeId = *thisVertex->outKMer->begin();
371+
/*edgeId = *thisVertex->outKMer->begin();
369372
// 删除vertex的一个出度
370-
removeOutEdge(vertexList, vertexId, edgeId);
373+
removeOutEdge(vertexList, vertexId, edgeId);*/
374+
getAndRemoveOutEdge(vertexList, vertexId, &edgeId); // 保护并行环境下的删除节点
371375
queryStatus = SUCCESSFUL_QUERY;
372376
}
373377
}

entity/idset.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,8 @@ int setIntersectTo(SetOfID *a, const SetOfID &b) {
6363
delete r;
6464
return a->size();
6565
}
66+
67+
68+
size_t idBelongTo(int world_size, size_t id) {
69+
return id % (size_t) world_size;
70+
}

entity/idset.h

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ inline size_t getId(const char *value) {
1818
return std::hash<std::string>()(value);
1919
}
2020

21+
size_t idBelongTo(int world_size, size_t id);
22+
2123
//typedef std::unordered_set<size_t> SetOfID;
2224
typedef size_t ReadId;
2325
typedef size_t VertexId;

entity/k_minus_mer.cpp

+26-1
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,40 @@ int removeOutEdge(const VertexList *vList, const VertexId vId, const EdgeId eId)
224224
}
225225
v->outKMer->erase_item(eId);
226226
v->outDegree--;
227-
pthread_mutex_unlock(&kMinusMutex);
227+
228228
if (v->outKMer->size() != v->outDegree) {
229229
cerr << "Error occurs when removing out edge #"<< eId << "from vertex #" << vId << ".\n";
230230
v->outDegree++;
231231
return 0;
232232
}
233+
pthread_mutex_unlock(&kMinusMutex);
233234
return 1;
234235
}
235236

237+
int getAndRemoveOutEdge(const VertexList *vList, VertexId vId, EdgeId *eId) {
238+
if (vList->find(vId) == vList->end()) {
239+
cerr << "Error occurs when deleting an out edge of an non-exist vertex #" << vId << ".\n";
240+
return 0;
241+
}
242+
Vertex *v = vList->at(vId);
243+
pthread_mutex_lock(&kMinusMutex);
244+
EdgeId nextOutEdgeToDelete = *v->outKMer->begin();
245+
if (v->outKMer->count(nextOutEdgeToDelete) == 0) {
246+
cerr << "Error occurs when deleting an non-exist out edge #" << eId<< " of a vertex #" << vId << ".\n";
247+
return 0;
248+
}
249+
v->outKMer->erase_item(nextOutEdgeToDelete);
250+
v->outDegree--;
251+
*eId = nextOutEdgeToDelete;
252+
if (v->outKMer->size() != v->outDegree) {
253+
cerr << "Error occurs when removing out edge #"<< eId << "from vertex #" << vId << ".\n";
254+
v->outDegree++;
255+
return 0;
256+
}
257+
pthread_mutex_unlock(&kMinusMutex);
258+
return 0;
259+
}
260+
236261
int freeVertex(Vertex *pVertex) {
237262
delete pVertex;
238263
return 1;

entity/k_minus_mer.h

+2
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,7 @@ int addOutEdge(const VertexList *vList, VertexId vId, EdgeId eId);
5757

5858
int removeOutEdge(const VertexList *vList, VertexId vId, EdgeId eId);
5959

60+
int getAndRemoveOutEdge(const VertexList *vList, VertexId, EdgeId *eId);
61+
6062
int freeVertex(Vertex *pVertex);
6163
#endif //RANDOMSTRINGASSEMBLY_K_MINUS_MER_H

0 commit comments

Comments
 (0)