Skip to content

Commit f342eed

Browse files
committed
done
1 parent e2ca219 commit f342eed

6 files changed

Lines changed: 153 additions & 11 deletions

File tree

.rgignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
steps

co_async/epoll_loop.hpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ struct EpollFilePromise : Promise<EpollEventMask> {
3030
};
3131

3232
struct EpollLoop {
33-
inline void addListener(EpollFilePromise &promise, int ctl);
33+
inline bool addListener(EpollFilePromise &promise, int ctl);
3434
inline void removeListener(int fileNo);
3535
inline bool run(std::optional<std::chrono::system_clock::duration> timeout =
3636
std::nullopt);
@@ -58,7 +58,10 @@ struct EpollFileAwaiter {
5858
void await_suspend(std::coroutine_handle<EpollFilePromise> coroutine) {
5959
auto &promise = coroutine.promise();
6060
promise.mAwaiter = this;
61-
mLoop.addListener(promise, mCtlCode);
61+
if (!mLoop.addListener(promise, mCtlCode)) {
62+
promise.mAwaiter = nullptr;
63+
coroutine.resume();
64+
}
6265
}
6366

6467
EpollEventMask await_resume() const noexcept {
@@ -73,18 +76,21 @@ struct EpollFileAwaiter {
7376
};
7477

7578
EpollFilePromise::~EpollFilePromise() {
76-
if (mAwaiter) [[likely]] {
79+
if (mAwaiter) {
7780
mAwaiter->mLoop.removeListener(mAwaiter->mFileNo);
7881
}
7982
}
8083

81-
void EpollLoop::addListener(EpollFilePromise &promise, int ctl) {
84+
bool EpollLoop::addListener(EpollFilePromise &promise, int ctl) {
8285
struct epoll_event event;
8386
event.events = promise.mAwaiter->mEvents;
8487
event.data.ptr = &promise;
85-
checkError(epoll_ctl(mEpoll, ctl, promise.mAwaiter->mFileNo, &event));
88+
int res = epoll_ctl(mEpoll, ctl, promise.mAwaiter->mFileNo, &event);
89+
if (res == -1)
90+
return false;
8691
if (ctl == EPOLL_CTL_ADD)
8792
++mCount;
93+
return true;
8894
}
8995

9096
void EpollLoop::removeListener(int fileNo) {

co_async/error_handling.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ auto checkError(auto res) {
3939
return res;
4040
}
4141

42-
auto checkErrorExceptBlock(auto res, int blockres = 0, int blockerr = EWOULDBLOCK) {
42+
auto checkErrorNonBlock(auto res, int blockres = 0, int blockerr = EWOULDBLOCK) {
4343
if (res == -1) {
4444
if (errno != blockerr) [[unlikely]] {
4545
throw std::system_error(errno, std::system_category());

co_async/filesystem.hpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ namespace co_async {
1919

2020
enum class OpenMode : int {
2121
Read = O_RDONLY,
22-
Write = O_WRONLY | O_CREAT,
22+
Write = O_WRONLY | O_TRUNC | O_CREAT,
2323
ReadWrite = O_RDWR | O_CREAT,
2424
Append = O_WRONLY | O_APPEND | O_CREAT,
25-
ReadAppend = O_RDWR | O_APPEND | O_CREAT,
2625
};
2726

2827
inline Task<AsyncFile> open_fs_file(EpollLoop &loop, std::filesystem::path path, OpenMode mode, mode_t access = 0644) {
2928
int oflags = (int)mode;
30-
oflags |= O_NONBLOCK;
29+
/* oflags |= O_NONBLOCK; */ // TODO: nonblockfsfilesviaiouring!
3130
int res = checkError(open(path.c_str(), oflags, access));
3231
AsyncFile file(res);
3332
co_return file;

main.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ Task<> amain() {
109109

110110
HTTPRequest request{
111111
.method = "GET",
112-
.uri = "/api/tts?text=小彭老师真好",
112+
.uri = "/api/tts?text=小彭老师对我们同学真好呀!又要做OpenGL课件又要做协程库,我们必须一键三连奖励他&volume=0.5&quality=low",
113113
.headers =
114114
{
115115
{"host", "142857.red:8080"},
@@ -122,8 +122,9 @@ Task<> amain() {
122122

123123
HTTPResponse response;
124124
co_await response.read_from(sock);
125+
debug(), (int)response.body.size();
125126

126-
FileStream file(loop, co_await open_fs_file(loop, "/tmp/output.wav", OpenMode::Write));
127+
FileOStream file(loop, co_await open_fs_file(loop, "/tmp/output.wav", OpenMode::Write));
127128
co_await file.puts(response.body);
128129
co_await file.flush();
129130
}

steps/step22.cpp

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#include <co_async/socket.hpp>
2+
#include <co_async/debug.hpp>
3+
#include <co_async/task.hpp>
4+
#include <co_async/generator.hpp>
5+
#include <co_async/timer_loop.hpp>
6+
#include <co_async/epoll_loop.hpp>
7+
#include <co_async/async_loop.hpp>
8+
#include <co_async/when_any.hpp>
9+
#include <co_async/when_all.hpp>
10+
#include <co_async/limit_timeout.hpp>
11+
#include <co_async/and_then.hpp>
12+
#include <co_async/stdio.hpp>
13+
#include <co_async/socket.hpp>
14+
#include <co_async/filesystem.hpp>
15+
#include <co_async/stream.hpp>
16+
#include <co_async/simple_map.hpp>
17+
#include <memory>
18+
#include <string>
19+
#include <tuple>
20+
#include <vector>
21+
#include <optional>
22+
#include <algorithm>
23+
24+
using namespace std::literals;
25+
using namespace co_async;
26+
27+
AsyncLoop loop;
28+
29+
struct HTTPHeaders : SimpleMap<std::string, std::string> {
30+
using SimpleMap<std::string, std::string>::SimpleMap;
31+
};
32+
33+
struct HTTPRequest {
34+
std::string method;
35+
std::string uri;
36+
HTTPHeaders headers;
37+
std::string body;
38+
39+
Task<> write_into(auto &sock) {
40+
co_await sock.puts(method);
41+
co_await sock.putchar(' ');
42+
co_await sock.puts(uri);
43+
co_await sock.puts(" HTTP/1.1\r\n"sv);
44+
for (auto const &[k, v]: headers) {
45+
co_await sock.puts(k);
46+
co_await sock.puts(": "sv);
47+
co_await sock.puts(v);
48+
co_await sock.puts("\r\n"sv);
49+
}
50+
if (body.empty()) {
51+
co_await sock.puts("\r\n"sv);
52+
} else {
53+
co_await sock.puts("content-length: "sv);
54+
co_await sock.puts(std::to_string(body.size()));
55+
co_await sock.puts("\r\n"sv);
56+
co_await sock.puts(body);
57+
}
58+
}
59+
60+
auto repr() const {
61+
return std::make_tuple(method, uri, headers, body);
62+
}
63+
};
64+
65+
struct HTTPResponse {
66+
int status;
67+
HTTPHeaders headers;
68+
std::string body;
69+
70+
Task<> read_from(auto &sock) {
71+
auto line = co_await sock.getline("\r\n"sv);
72+
if (line.size() <= 9 || line.substr(0, 9) != "HTTP/1.1 "sv)
73+
[[unlikely]] {
74+
throw std::invalid_argument("invalid http response");
75+
}
76+
status = std::stoi(line.substr(9));
77+
while (true) {
78+
auto line = co_await sock.getline("\r\n"sv);
79+
if (line.empty()) {
80+
break;
81+
}
82+
auto pos = line.find(':');
83+
if (pos == line.npos || pos == line.size() - 1 || line[pos + 1] != ' ')
84+
[[unlikely]] {
85+
throw std::invalid_argument("invalid http response");
86+
}
87+
auto key = line.substr(0, pos);
88+
for (auto &c: key) {
89+
if (c >= 'A' && c <= 'Z') {
90+
c += 'a' - 'A';
91+
}
92+
}
93+
headers.insert_or_assign(std::move(key), line.substr(pos + 2));
94+
}
95+
if (auto p = headers.at("content-length"sv)) [[likely]] {
96+
auto len = std::stoi(*p);
97+
body = co_await sock.getn(len);
98+
}
99+
}
100+
101+
auto repr() const {
102+
return std::make_tuple(status, headers, body);
103+
}
104+
};
105+
106+
Task<> amain() {
107+
auto addr = socket_address(ip_address("142857.red"), 8080);
108+
FileStream sock(loop, co_await create_tcp_client(loop, addr));
109+
110+
HTTPRequest request{
111+
.method = "GET",
112+
.uri = "/api/tts?text=小彭老师对我们同学真好呀!又要做OpenGL课件又要做协程库,我们必须一键三连奖励他&volume=0.5&quality=low",
113+
.headers =
114+
{
115+
{"host", "142857.red:8080"},
116+
{"user-agent", "co_async"},
117+
{"connection", "keep-alive"},
118+
},
119+
};
120+
co_await request.write_into(sock);
121+
co_await sock.flush();
122+
123+
HTTPResponse response;
124+
co_await response.read_from(sock);
125+
debug(), (int)response.body.size();
126+
127+
FileOStream file(loop, co_await open_fs_file(loop, "/tmp/output.wav", OpenMode::Write));
128+
co_await file.puts(response.body);
129+
co_await file.flush();
130+
}
131+
132+
int main() {
133+
run_task(loop, amain());
134+
return 0;
135+
}

0 commit comments

Comments
 (0)