Skip to content

Commit a79ed1d

Browse files
committed
wip async read and write
1 parent c545908 commit a79ed1d

File tree

6 files changed

+675
-35
lines changed

6 files changed

+675
-35
lines changed

external/b64.mojo

+432
Large diffs are not rendered by default.

lightbug_http/io/bytes.mojo

+104
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,110 @@
11
from python import PythonObject
2+
from base64 import b64encode
3+
from memory.unsafe import bitcast
24

5+
alias ByteDType = DType.int8
36
alias Bytes = DynamicVector[Int8]
7+
alias Byte = Int8
8+
9+
10+
fn to_bytes(string: String) -> Bytes:
11+
return b64encode(string)._buffer
12+
13+
14+
fn to_bytes[type: DType, nelts: Int = 1](simd: SIMD[type, nelts]) -> Bytes:
15+
let simd_bytes = bitcast[ByteDType, nelts * sizeof[type](), type, nelts](simd)
16+
17+
var bytes = Bytes(nelts * sizeof[type]())
18+
19+
@unroll
20+
for i in range(nelts * sizeof[type]()):
21+
bytes.append(simd_bytes[i])
22+
23+
return bytes
24+
25+
26+
fn to_string(bytes: Bytes) -> String:
27+
return b64decode(bytes)
28+
29+
30+
fn rstrip_unsafe(content: String, chars: String = " ") -> String:
31+
var strip_pos: Int = len(content)
32+
for i in range(len(content)):
33+
let c = content[len(content) - i - 1 : len(content) - i]
34+
if chars.find(c) == -1:
35+
strip_pos = len(content) - i
36+
break
37+
38+
return content[:strip_pos]
39+
40+
41+
# Temporary until stdlib base64 decode is implemented
42+
fn b64decode(s: String) -> String:
43+
alias base64: String = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
44+
45+
let padding = s.count("=")
46+
47+
let s_strip = rstrip_unsafe(s, "=")
48+
49+
# base64 decode
50+
var binary_string: String = ""
51+
for i in range(len(s_strip)):
52+
let index: Byte = base64.find(s_strip[i : i + 1])
53+
binary_string += byte_to_binary_string(index)
54+
55+
if padding:
56+
binary_string = binary_string[: -padding * 2]
57+
58+
var decoded_string: String = ""
59+
for i in range(0, len(binary_string), 8):
60+
let byte = binary_string[i : i + 8]
61+
decoded_string += chr(binary_string_to_byte(byte).to_int())
62+
63+
return decoded_string
64+
65+
66+
fn byte_to_binary_string(byte: Byte) -> String:
67+
var binary_string: String = ""
68+
for i in range(8):
69+
let bit = (byte >> i) & 1
70+
binary_string += String(bit)
71+
72+
# find significant bits
73+
var significant_binary_string: String = ""
74+
var found_significant_bit: Bool = False
75+
for i in range(len(binary_string)):
76+
let bit = binary_string[len(binary_string) - i - 1 : len(binary_string) - i]
77+
if bit == "1":
78+
found_significant_bit = True
79+
if found_significant_bit:
80+
significant_binary_string += bit
81+
82+
# left pad to 6 bits if less than 6 bits
83+
if len(significant_binary_string) < 6:
84+
let padding = 6 - len(significant_binary_string)
85+
for i in range(padding):
86+
significant_binary_string = "0" + significant_binary_string
87+
88+
return significant_binary_string
89+
90+
91+
fn binary_string_to_byte(binary_string: String) -> Byte:
92+
var total = 0
93+
let length = len(binary_string)
94+
for i in range(length):
95+
# Get the value at the current position (0 or 1)
96+
let bit = binary_string[length - 1 - i]
97+
98+
let bit_value: Int
99+
if bit == "1":
100+
bit_value = 1
101+
else:
102+
bit_value = 0
103+
104+
# Add to the total, considering its position (2^i)
105+
total += bit_value * (2**i)
106+
107+
return total
4108

5109

6110
@value

lightbug_http/sys/net.mojo

+33-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ from lightbug_http.net import (
99
default_tcp_keep_alive,
1010
)
1111
from lightbug_http.strings import NetworkType
12-
from lightbug_http.io.bytes import Bytes
12+
from lightbug_http.io.bytes import Bytes, to_string, to_bytes, b64decode
1313
from lightbug_http.io.sync import Duration
1414
from external.libc import (
1515
c_void,
@@ -24,6 +24,7 @@ from external.libc import (
2424
SOCK_STREAM,
2525
SOL_SOCKET,
2626
SO_REUSEADDR,
27+
O_NONBLOCK,
2728
SHUT_RDWR,
2829
htons,
2930
inet_pton,
@@ -38,6 +39,9 @@ from external.libc import (
3839
shutdown,
3940
close,
4041
)
42+
from external.b64 import encode as b64_encode
43+
44+
# from external.b64 import decode as b64_decode
4145

4246

4347
@value
@@ -162,12 +166,40 @@ struct SysConnection(Connection):
162166
buf = bytes_str._buffer
163167
return bytes_recv
164168

169+
async fn read_async(self, inout buf: Bytes) raises -> Int:
170+
@parameter
171+
async fn task() -> Int:
172+
try:
173+
_ = self.read(buf)
174+
return buf[0].__int__()
175+
except e:
176+
print("Failed to read from connection: " + e.__str__())
177+
return -1
178+
179+
let routine: Coroutine[Int] = task()
180+
return await routine
181+
165182
fn write(self, buf: Bytes) raises -> Int:
166183
let msg = String(buf)
167184
if send(self.fd, to_char_ptr(msg).bitcast[c_void](), len(msg), 0) == -1:
168185
print("Failed to send response")
169186
return len(buf)
170187

188+
async fn write_async(self, buf: Bytes) raises -> Int:
189+
print("write_async " + b64decode(buf))
190+
191+
@parameter
192+
async fn task() -> Int:
193+
try:
194+
let write_len = self.write(buf)
195+
return write_len
196+
except e:
197+
print("Failed to write to connection: " + e.__str__())
198+
return -1
199+
200+
let routine: Coroutine[Int] = task()
201+
return await routine
202+
171203
fn close(self) raises:
172204
_ = shutdown(self.fd, SHUT_RDWR)
173205
let close_status = close(self.fd)

lightbug_http/sys/server.mojo

+105-32
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ from lightbug_http.header import RequestHeader
66
from lightbug_http.sys.net import SysListener, SysConnection, SysNet
77
from lightbug_http.service import HTTPService
88
from lightbug_http.io.sync import Duration
9-
from lightbug_http.io.bytes import Bytes
9+
from lightbug_http.io.bytes import Bytes, to_bytes, to_string
1010
from lightbug_http.error import ErrorHandler
1111
from lightbug_http.strings import next_line, NetworkType
12+
from external.b64 import encode as b64_encode
1213

1314

1415
struct SysServer:
@@ -55,40 +56,112 @@ struct SysServer:
5556
self.serve(listener, handler)
5657

5758
fn serve[T: HTTPService](inout self, ln: SysListener, handler: T) raises -> None:
58-
# let max_worker_count = self.get_concurrency()
59-
# TODO: logic for non-blocking read and write here, see for example https://github.com/valyala/fasthttp/blob/9ba16466dfd5d83e2e6a005576ee0d8e127457e2/server.go#L1789
60-
6159
self.ln = ln
6260

63-
while True:
64-
let conn = self.ln.accept[SysConnection]()
61+
async fn handle_connection(conn: SysConnection, handler: T) -> None:
6562
var buf = Bytes()
66-
let read_len = conn.read(buf)
67-
let first_line_and_headers = next_line(buf)
68-
let request_line = first_line_and_headers.first_line
69-
let rest_of_headers = first_line_and_headers.rest
70-
71-
var uri = URI(request_line)
7263
try:
73-
uri.parse()
74-
except:
75-
conn.close()
76-
raise Error("Failed to parse request line")
77-
78-
var header = RequestHeader(buf)
64+
let read_len = await conn.read_async(buf)
65+
except e:
66+
try:
67+
conn.close()
68+
except e:
69+
print("Failed to close connection")
70+
print("Failed to read from connection")
7971
try:
80-
header.parse()
81-
except:
82-
conn.close()
83-
raise Error("Failed to parse request header")
84-
85-
let res = handler.func(
86-
HTTPRequest(
87-
uri,
88-
buf,
89-
header,
72+
let first_line_and_headers = next_line(buf)
73+
let request_line = first_line_and_headers.first_line
74+
let rest_of_headers = first_line_and_headers.rest
75+
76+
var uri = URI(request_line)
77+
try:
78+
uri.parse()
79+
except:
80+
try:
81+
conn.close()
82+
except e:
83+
print("Failed to close connection")
84+
print("Failed to parse request line")
85+
86+
var header = RequestHeader(buf)
87+
try:
88+
header.parse()
89+
except:
90+
try:
91+
conn.close()
92+
except e:
93+
print("Failed to close connection")
94+
print("Failed to parse request header")
95+
96+
let res = handler.func(
97+
HTTPRequest(
98+
uri,
99+
buf,
100+
header,
101+
)
90102
)
91-
)
92-
let res_encoded = encode(res)
93-
_ = conn.write(res_encoded)
94-
conn.close()
103+
var res_encoded: String = encode(res)
104+
try:
105+
let write_len = await conn.write_async(res_encoded._buffer)
106+
print(write_len)
107+
except e:
108+
print("Ooph! " + e.__str__())
109+
try:
110+
conn.close()
111+
except e:
112+
print("Failed to close connection")
113+
print("Failed to read from connection")
114+
try:
115+
conn.close()
116+
except e:
117+
print("Failed to close connection")
118+
except e:
119+
print("Failed to parse request line")
120+
try:
121+
conn.close()
122+
except e:
123+
print("Failed to close connection")
124+
125+
while True:
126+
let conn = self.ln.accept[SysConnection]()
127+
let coroutine: Coroutine[NoneType] = handle_connection(conn, handler)
128+
_ = coroutine() # Execute the coroutine synchronously
129+
130+
# fn serve[T: HTTPService](inout self, ln: SysListener, handler: T) raises -> None:
131+
# # let max_worker_count = self.get_concurrency()
132+
# # TODO: logic for non-blocking read and write here, see for example https://github.com/valyala/fasthttp/blob/9ba16466dfd5d83e2e6a005576ee0d8e127457e2/server.go#L1789
133+
134+
# self.ln = ln
135+
136+
# while True:
137+
# let conn = self.ln.accept[SysConnection]()
138+
# var buf = Bytes()
139+
# let read_len = conn.read(buf)
140+
# let first_line_and_headers = next_line(buf)
141+
# let request_line = first_line_and_headers.first_line
142+
# let rest_of_headers = first_line_and_headers.rest
143+
144+
# var uri = URI(request_line)
145+
# try:
146+
# uri.parse()
147+
# except:
148+
# conn.close()
149+
# raise Error("Failed to parse request line")
150+
151+
# var header = RequestHeader(buf)
152+
# try:
153+
# header.parse()
154+
# except:
155+
# conn.close()
156+
# raise Error("Failed to parse request header")
157+
158+
# let res = handler.func(
159+
# HTTPRequest(
160+
# uri,
161+
# buf,
162+
# header,
163+
# )
164+
# )
165+
# let res_encoded = encode(res)
166+
# _ = conn.write(res_encoded)
167+
# conn.close()

lightbug_http/tests/utils.mojo

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ from lightbug_http.client import Client
1111

1212
alias default_server_host = "localhost"
1313
alias default_server_port = 8080
14-
alias default_server_conn_string = "http://" + default_server_host + ":" + default_server_port.__str__()
14+
alias default_server_conn_string = String("http://localhost:8080")
1515

1616
alias getRequest = String(
1717
"GET /foobar?baz HTTP/1.1\r\nHost: google.com\r\nUser-Agent: aaa/bbb/ccc/ddd/eee"

lightbug_http/uri.mojo

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ from lightbug_http.strings import (
88
)
99

1010

11-
# TODO: convenience type, not currently used properly but will be helpful in the future
1211
@value
1312
struct URI:
1413
var __path_original: Bytes

0 commit comments

Comments
 (0)