Skip to content

Commit 46222bb

Browse files
committed
新增blockingqueue的类以及把测试代码解耦到新的模块中,用于进程交流的函数抽离出来到一个新的模块中
1 parent 437ecad commit 46222bb

File tree

6 files changed

+560
-1
lines changed

6 files changed

+560
-1
lines changed

CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ message(STATUS "Run: ${MPIEXEC} ${MPIEXEC_NUMPROC_FLAG} ${MPIEXEC_MAX_NUMPROCS}
55

66
set(CMAKE_CXX_STANDARD 11)
77

8-
add_executable(RandomStringAssembly main.cpp entity/k_mer.cpp entity/k_mer.h entity/k_minus_mer.cpp entity/k_minus_mer.h entity/idset.h entity/read.cpp entity/read.h entity/idset.cpp)
8+
add_executable(RandomStringAssembly main.cpp entity/k_mer.cpp entity/k_mer.h entity/k_minus_mer.cpp entity/k_minus_mer.h entity/idset.h entity/read.cpp entity/read.h entity/idset.cpp test.cpp test.h blocking_queue.h communicator.cpp communicator.h)
99
target_link_libraries(RandomStringAssembly PUBLIC MPI::MPI_CXX)

blocking_queue.h

+209
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/********************************************
2+
function: thread safe blocking queue.
3+
author: liuyi
4+
date: 2014.11.13
5+
version: 2.0
6+
********************************************/
7+
8+
#ifndef BLOCK_QUEUE_H
9+
#define BLOCK_QUEUE_H
10+
11+
#include <iostream>
12+
#include <stdlib.h>
13+
#include <pthread.h>
14+
#include <sys/time.h>
15+
using namespace std;
16+
17+
template<class T>
18+
class block_queue
19+
{
20+
public:
21+
block_queue(int max_size = 1000)
22+
{
23+
if(max_size <= 0)
24+
{
25+
exit(-1);
26+
}
27+
28+
m_max_size = max_size;
29+
m_array = new T[max_size];
30+
m_size = 0;
31+
m_front = -1;
32+
m_back = -1;
33+
34+
m_mutex = new pthread_mutex_t;
35+
m_cond = new pthread_cond_t;
36+
pthread_mutex_init(m_mutex, NULL);
37+
pthread_cond_init(m_cond, NULL);
38+
}
39+
40+
void clear()
41+
{
42+
pthread_mutex_lock(m_mutex);
43+
m_size = 0;
44+
m_front = -1;
45+
m_back = -1;
46+
pthread_mutex_unlock(m_mutex);
47+
}
48+
49+
~block_queue()
50+
{
51+
pthread_mutex_lock(m_mutex);
52+
if(m_array != NULL)
53+
delete m_array;
54+
pthread_mutex_unlock(m_mutex);
55+
56+
pthread_mutex_destroy(m_mutex);
57+
pthread_cond_destroy(m_cond);
58+
59+
delete m_mutex;
60+
delete m_cond;
61+
}
62+
63+
bool full()const
64+
{
65+
pthread_mutex_lock(m_mutex);
66+
if(m_size >= m_max_size)
67+
{
68+
pthread_mutex_unlock(m_mutex);
69+
return true;
70+
}
71+
pthread_mutex_unlock(m_mutex);
72+
return false;
73+
}
74+
75+
bool empty()const
76+
{
77+
pthread_mutex_lock(m_mutex);
78+
if(0 == m_size)
79+
{
80+
pthread_mutex_unlock(m_mutex);
81+
return true;
82+
}
83+
pthread_mutex_unlock(m_mutex);
84+
return false;
85+
}
86+
87+
bool front(T& value)const
88+
{
89+
pthread_mutex_lock(m_mutex);
90+
if(0 == m_size)
91+
{
92+
pthread_mutex_unlock(m_mutex);
93+
return false;
94+
}
95+
value = m_array[m_front];
96+
pthread_mutex_unlock(m_mutex);
97+
return true;
98+
}
99+
100+
bool back(T& value)const
101+
{
102+
pthread_mutex_lock(m_mutex);
103+
if(0 == m_size)
104+
{
105+
pthread_mutex_unlock(m_mutex);
106+
return false;
107+
}
108+
value = m_array[m_back];
109+
pthread_mutex_unlock(m_mutex);
110+
return true;
111+
}
112+
113+
int size()const
114+
{
115+
int tmp = 0;
116+
pthread_mutex_lock(m_mutex);
117+
tmp = m_size;
118+
pthread_mutex_unlock(m_mutex);
119+
return tmp;
120+
}
121+
122+
int max_size()const
123+
{
124+
int tmp = 0;
125+
pthread_mutex_lock(m_mutex);
126+
tmp = m_max_size;
127+
pthread_mutex_unlock(m_mutex);
128+
return tmp;
129+
}
130+
131+
bool push(const T& item)
132+
{
133+
pthread_mutex_lock(m_mutex);
134+
if(m_size >= m_max_size)
135+
{
136+
pthread_cond_broadcast(m_cond);
137+
pthread_mutex_unlock(m_mutex);
138+
return false;
139+
}
140+
141+
m_back = (m_back + 1) % m_max_size;
142+
m_array[m_back] = item;
143+
144+
m_size++;
145+
pthread_cond_broadcast(m_cond);
146+
pthread_mutex_unlock(m_mutex);
147+
148+
return true;
149+
}
150+
151+
bool pop(T& item)
152+
{
153+
pthread_mutex_lock(m_mutex);
154+
while(m_size <= 0)
155+
{
156+
if(0 != pthread_cond_wait(m_cond, m_mutex))
157+
{
158+
pthread_mutex_unlock(m_mutex);
159+
return false;
160+
}
161+
}
162+
163+
m_front = (m_front + 1) % m_max_size;
164+
item = m_array[m_front];
165+
m_size--;
166+
pthread_mutex_unlock(m_mutex);
167+
return true;
168+
}
169+
170+
bool pop(T& item, int ms_timeout)
171+
{
172+
struct timespec t = {0,0};
173+
struct timeval now = {0,0};
174+
gettimeofday(&now, NULL);
175+
pthread_mutex_lock(m_mutex);
176+
if(m_size <= 0)
177+
{
178+
t.tv_sec = now.tv_sec + ms_timeout/1000;
179+
t.tv_nsec = (ms_timeout % 1000)*1000;
180+
if(0 != pthread_cond_timedwait(m_cond, m_mutex, &t))
181+
{
182+
pthread_mutex_unlock(m_mutex);
183+
return false;
184+
}
185+
}
186+
187+
if(m_size <= 0)
188+
{
189+
pthread_mutex_unlock(m_mutex);
190+
return false;
191+
}
192+
193+
m_front = (m_front + 1) % m_max_size;
194+
item = m_array[m_front];m_size--;
195+
pthread_mutex_unlock(m_mutex);
196+
return true;
197+
}
198+
199+
private:
200+
pthread_mutex_t *m_mutex;
201+
pthread_cond_t *m_cond;
202+
T *m_array;
203+
int m_size;
204+
int m_max_size;
205+
int m_front;
206+
int m_back;
207+
};
208+
209+
#endif

communicator.cpp

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
//
2+
// Created by erianliang on 19-7-19.
3+
//
4+
5+
#include "communicator.h"
6+
#include "blocking_queue.h"
7+
8+
const block_queue<char *> *blockQueue = new block_queue<char *>(1024*1024);
9+
10+
void requestRankToStoreRead(int currank, int tarrank, const char *filename, size_t pos) {
11+
12+
}
13+
14+
void requestOtherRanksToStoreEdge(int currank, int world_size, const char *value, EdgeId edgeId, ReadId readId,
15+
KMERPOS_t kmerpos) {
16+
17+
}
18+
19+
void
20+
requestOtherRanksToStoreVertex(int currank, int world_size, VertexId vertexId, EdgeId edgeId, VertexMode_t vertexMode) {
21+
22+
}
23+
24+
void requestOtherRanksToStoreTangle(int currank, int world_size, VertexId tangleId) {
25+
26+
}
27+
28+
int processRecvRead(const char *filename, size_t pos) {
29+
return 0;
30+
}
31+
32+
int processRecvEdge(EdgeList *edgeList, char *value, ReadId readId, KMERPOS_t kmerpos) {
33+
return 0;
34+
}
35+
36+
int processRecvVertex(VertexList *vertexList, VertexId vertexId, EdgeId edgeId, VertexMode_t vertexMode) {
37+
return 0;
38+
}
39+
40+
void senderRunner() {
41+
42+
}
43+
44+
void receiverRunner(VertexList *vertexList, EdgeList *edgeList, SetOfID *tangleList) {
45+
46+
}

communicator.h

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//
2+
// Created by erianliang on 19-7-19.
3+
//
4+
5+
#ifndef RANDOMSTRINGASSEMBLY_COMMUNICATOR_H
6+
#define RANDOMSTRINGASSEMBLY_COMMUNICATOR_H
7+
8+
#include "entity/idset.h"
9+
#include "entity/read.h"
10+
#include "entity/k_minus_mer.h"
11+
#include "entity/k_mer.h"
12+
13+
void requestRankToStoreRead(int currank, int tarrank, const char *filename, size_t pos);
14+
15+
void requestOtherRanksToStoreEdge(int currank, int world_size,
16+
const char *value, EdgeId edgeId, ReadId readId, KMERPOS_t kmerpos);
17+
18+
void requestOtherRanksToStoreVertex(int currank, int world_size,
19+
VertexId vertexId, EdgeId edgeId, VertexMode_t vertexMode);
20+
21+
void requestOtherRanksToStoreTangle(int currank, int world_size, VertexId tangleId);
22+
23+
int processRecvRead(const char *filename, size_t pos);
24+
25+
int processRecvEdge(EdgeList *edgeList, char *value, ReadId readId, KMERPOS_t kmerpos);
26+
27+
// TODO: 记得tangle 判断
28+
int processRecvVertex(VertexList *vertexList, VertexId vertexId, EdgeId edgeId, VertexMode_t vertexMode);
29+
30+
void senderRunner();
31+
32+
void receiverRunner(VertexList *vertexList, EdgeList *edgeList, SetOfID *tangleList);
33+
34+
#endif //RANDOMSTRINGASSEMBLY_COMMUNICATOR_H

0 commit comments

Comments
 (0)