-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathread_io.cpp
127 lines (112 loc) · 3.48 KB
/
read_io.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//
// Created by 梁俊鹏 on 2019-07-20.
//
#include <iostream>
#include <stdio.h>
#include <string.h>
#include "read_io.h"
#include "entity/read.h"
#define BUFFER_SIZE 152
#define MB (1024*1024)
using namespace std;
//block_queue<string> *ioqueue = new block_queue<string>;
static int thisRank;
static int host_num;
static string path;
static string *fnames;
static int fileAmount = 0;
static int curIndex4Fnames;
static off_t curSeek;
static off_t stopSeek;
static char buffer[BUFFER_SIZE];
static FILE *infile;
int moveToNext() {
if (NULL != infile)
fclose(infile);
//curSeek.__pos = 0;
if (curIndex4Fnames + 1 >= fileAmount) {
cout << "All files are read." << endl;
return 0;
}
string fname = path + fnames[++curIndex4Fnames];
infile = fopen(fname.c_str(), "r");
if (!infile) {
cerr << "Unable to open file: " << fname << endl;
return -1;
}
fseek(infile, 0, SEEK_END);
off_t totalSize = ftello(infile);
stopSeek = (thisRank+1==host_num)?totalSize: totalSize / host_num * (thisRank+1);
if (thisRank == 0) {
curSeek = 0;
fseeko(infile, curSeek, SEEK_SET);
} else {
curSeek = totalSize / host_num * thisRank;
if (curSeek < 1) {
cerr << "Error on computing file position for #" << thisRank << endl;
return -1;
}
fseeko(infile, curSeek - 1, SEEK_SET); // 定位到属于该worker的前一个位置,以防止curSeek就是一条完整的read的开端的情况
while (getc(infile) != '\n'); // 把不完整的read过滤掉
curSeek = ftello(infile);
}
return 1;
}
int readIOInit(int currank, int world_size, string filepath, string *filenames, int fileNum) {
thisRank = currank;
host_num = world_size;
path = filepath;
fnames = filenames;
fileAmount = fileNum;
curIndex4Fnames = -1; // 因为moveToNext需要直接增加
infile = NULL;
//curSeek.__pos = 0;
if (nullptr == filenames) {
cerr << "No input read file." << endl;
return -1;
}
while (moveToNext() == -1);
if (curIndex4Fnames > fileAmount) return -1;
if (createReadDir() == 0) {
exit(1);
}
return 0;
}
int getNextRead(string *outread, size_t *readpos) {
if (fileAmount < 1) return -1;
// 先尝试读取,如果不能读取则转向下一个文件
while (curSeek >= stopSeek) {
int flag;
while ((flag = moveToNext()) == -1) ; // 当读到的文件不能打开则马上打开下一个
if (flag == 0) return -1; // 全部读完则返回-1
}
fgets(buffer, BUFFER_SIZE, infile);
*readpos = curSeek; // 读取前的位置
curSeek = ftello64(infile);
int len = strlen(buffer) - 1; // 去除fgets中的换行符
buffer[len] = '\0';
outread->clear();
outread->append(buffer);
printf("\r%ldMB", (stopSeek - curSeek) / MB);
return 1;
}
string getCurrentFilename() {
return fnames[curIndex4Fnames];
}
pthread_t requestWriteRead(const string &read) {
pthread_t tid;
if (pthread_create(&tid, NULL, writeReadRunner, (void *)&read) == -1) {
cerr << "Error: Cannot create thread when requesting read writing." << endl;
return 0;
}
pthread_detach(tid);
return tid;
}
void *writeReadRunner(void *arg) {
string read = *((string *) arg);
if (createRead(read, nullptr) == 0) {
cerr << "Error: Cannot create file for read \"" << read << "\"" << endl;
return (void *) -1;
}
return (void *)0;
}