Skip to content

Commit 9fabbc4

Browse files
committed
更改了并行方式,直接分开多个进程读取文件不同位置
1 parent 925bb92 commit 9fabbc4

File tree

2 files changed

+30
-7
lines changed

2 files changed

+30
-7
lines changed

main.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,12 @@ int main(int argc, char** argv) {
157157

158158
// 先把全部read处理完之后再充分拍kmer和kminusmer
159159
// 并行化处理read中的kmer
160-
int totalKmerNum = curread->size()-k+1;
160+
/*int totalKmerNum = curread->size()-k+1;
161161
int kMerNum4Each = totalKmerNum / world_size;
162162
int startKMer = kMerNum4Each * currank;
163-
int end = (currank + 1 == world_size)? totalKmerNum: (startKMer + kMerNum4Each);
163+
int end = (currank + 1 == world_size)? totalKmerNum: (startKMer + kMerNum4Each);*/
164+
int startKMer = 0;
165+
int end = curread->size() - getK();
164166
for (int i = startKMer; i < end; ++i) {
165167
for (int j = 0; j < k; ++j) {
166168
char tmp = (*curread)[i + j];

read_io.cpp

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@ static string path;
2020
static string *fnames;
2121
static int fileAmount = 0;
2222
static int curIndex4Fnames;
23-
static fpos_t curSeek;
23+
static off_t curSeek;
24+
static off_t stopSeek;
2425
static char buffer[BUFFER_SIZE];
2526
static FILE *infile;
2627

2728
int moveToNext() {
2829
if (NULL != infile)
2930
fclose(infile);
3031
//curSeek.__pos = 0;
32+
3133
if (curIndex4Fnames + 1 >= fileAmount) {
3234
cout << "All files are read." << endl;
3335
return 0;
@@ -38,6 +40,24 @@ int moveToNext() {
3840
cerr << "Unable to open file: " << fname << endl;
3941
return -1;
4042
}
43+
44+
fseek(infile, 0, SEEK_END);
45+
off_t totalSize = ftello(infile);
46+
stopSeek = (thisRank+1==host_num)?totalSize: totalSize / host_num * (thisRank+1);
47+
if (thisRank == 0) {
48+
curSeek = 0;
49+
fseeko(infile, curSeek, SEEK_SET);
50+
} else {
51+
curSeek = totalSize / host_num * thisRank;
52+
if (curSeek < 1) {
53+
cerr << "Error on computing file position for #" << thisRank << endl;
54+
return -1;
55+
}
56+
fseeko(infile, curSeek - 1, SEEK_SET); // 定位到属于该worker的前一个位置,以防止curSeek就是一条完整的read的开端的情况
57+
while (getc(infile) != '\n'); // 把不完整的read过滤掉
58+
curSeek = ftello(infile);
59+
}
60+
4161
return 1;
4262
}
4363

@@ -65,15 +85,16 @@ int readIOInit(int currank, int world_size, string filepath, string *filenames,
6585
int getNextRead(string *outread, size_t *readpos) {
6686
if (fileAmount < 1) return -1;
6787
// 先尝试读取,如果不能读取则转向下一个文件
68-
while (fgets(buffer, BUFFER_SIZE, infile) == NULL) {
88+
while (curSeek >= stopSeek) {
6989
int flag;
7090
while ((flag = moveToNext()) == -1) ; // 当读到的文件不能打开则马上打开下一个
7191
if (flag == 0) return -1; // 全部读完则返回-1
7292
}
73-
//*readpos = curSeek.__pos; // 读取前的位置
74-
// fgetpos(infile, &curSeek);
93+
fgets(buffer, BUFFER_SIZE, infile);
94+
*readpos = curSeek; // 读取前的位置
95+
curSeek = ftello64(infile);
7596

76-
int len = strlen(buffer) - 1;
97+
int len = strlen(buffer) - 1; // 去除fgets中的换行符
7798
*outread = string(buffer, len);
7899

79100
return 1;

0 commit comments

Comments
 (0)