Skip to content

Commit 0a21541

Browse files
committed
Initial copy over from source repository
1 parent 0cb946f commit 0a21541

File tree

2 files changed

+149
-0
lines changed

2 files changed

+149
-0
lines changed

cobhan/__init__.py

Whitespace-only changes.

cobhan/cobhan.py

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
from io import UnsupportedOperation
2+
import pathlib
3+
import platform
4+
import os
5+
import json
6+
7+
from cffi import FFI
8+
9+
10+
class Cobhan:
11+
def __init__(self):
12+
self.__ffi = FFI()
13+
self.__sizeof_int32 = self.__ffi.sizeof("int32_t")
14+
self.__sizeof_header = self.__sizeof_int32 * 2
15+
self.__minimum_allocation = 1024
16+
self.__int32_zero_bytes = int(0).to_bytes(
17+
self.__sizeof_int32, byteorder="little", signed=True
18+
)
19+
20+
def _load_library(self, library_path, library_name, cdefines):
21+
self.__ffi.cdef(cdefines)
22+
23+
system = platform.system()
24+
need_chdir = 0
25+
if system == "Linux":
26+
if pathlib.Path("/lib").match("libc.musl*"):
27+
os_ext = "-musl.so"
28+
need_chdir = 1
29+
else:
30+
os_path = ".so"
31+
elif system == "Darwin":
32+
os_ext = ".dylib"
33+
elif system == "Windows":
34+
os_ext = ".dll"
35+
else:
36+
raise UnsupportedOperation("Unsupported operating system")
37+
38+
machine = platform.machine()
39+
if machine == "x86_64" or machine == "AMD64":
40+
arch_part = "-x64"
41+
elif machine == "arm64":
42+
arch_part = "-arm64"
43+
else:
44+
raise UnsupportedOperation("Unsupported CPU")
45+
46+
# Get absolute library path
47+
resolved_library_path = pathlib.Path(
48+
os.path.join(library_path, os_path, cpu_arch)
49+
).resolve()
50+
51+
# Build library path with file name
52+
library_file_path = os.path.join(
53+
str(resolved_library_path), f"{library_name}{arch_part}{os_ext}"
54+
)
55+
56+
if need_chdir:
57+
old_dir = os.getcwd()
58+
os.chdir(library_path)
59+
60+
self._lib = self.__ffi.dlopen(library_file_path)
61+
62+
if need_chdir:
63+
os.chdir(old_dir)
64+
65+
def _load_library_direct(self, library_file_path, cdefines):
66+
self.__ffi.cdef(cdefines)
67+
self._lib = self.__ffi.dlopen(library_file_path)
68+
69+
def to_json_buf(self, obj):
70+
return self.str_to_buf(json.dumps(obj))
71+
72+
def from_json_buf(self, buf):
73+
return json.loads(self.buf_to_str(buf))
74+
75+
def set_header(self, buf, length):
76+
self.__ffi.memmove(
77+
buf[0 : self.__sizeof_int32],
78+
length.to_bytes(self.__sizeof_int32, byteorder="little", signed=True),
79+
self.__sizeof_int32,
80+
)
81+
self.__ffi.memmove(
82+
buf[self.__sizeof_int32 : self.__sizeof_int32 * 2],
83+
self.__int32_zero_bytes,
84+
self.__sizeof_int32,
85+
)
86+
87+
def set_payload(self, buf, payload, length):
88+
self.set_header(buf, length)
89+
self.__ffi.memmove(
90+
buf[self.__sizeof_header : self.__sizeof_header + length - 1],
91+
payload,
92+
length,
93+
)
94+
95+
def bytearray_to_buf(self, payload):
96+
length = len(payload)
97+
buf = self.allocate_buf(length)
98+
self.set_payload(buf, payload, length)
99+
return buf
100+
101+
def str_to_buf(self, string):
102+
encoded_bytes = string.encode("utf8")
103+
length = len(encoded_bytes)
104+
buf = self.allocate_buf(length)
105+
self.set_payload(buf, encoded_bytes, length)
106+
return buf
107+
108+
def allocate_buf(self, buffer_len):
109+
length = int(buffer_len)
110+
length = max(length, self.__minimum_allocation)
111+
buf = self.__ffi.new(f"char[{self.__sizeof_header + length}]")
112+
self.set_header(buf, length)
113+
return buf
114+
115+
def buf_to_str(self, buf):
116+
length_buf = self.__ffi.unpack(buf, self.__sizeof_int32)
117+
length = int.from_bytes(length_buf, byteorder="little", signed=True)
118+
if length < 0:
119+
return self.temp_to_str(buf, length)
120+
encoded_bytes = self.__ffi.unpack(
121+
buf[self.__sizeof_header : self.__sizeof_header + length], length
122+
)
123+
return encoded_bytes.decode("utf8")
124+
125+
def buf_to_bytearray(self, buf):
126+
length_buf = self.__ffi.unpack(buf, self.__sizeof_int32)
127+
length = int.from_bytes(length_buf, byteorder="little", signed=True)
128+
if length < 0:
129+
return self.temp_to_bytearray(buf, length)
130+
payload = bytearray(length)
131+
self.__ffi.memmove(
132+
payload, buf[self.__sizeof_header : self.__sizeof_header + length], length
133+
)
134+
return payload
135+
136+
def temp_to_str(self, buf, length):
137+
encoded_bytes = self.temp_to_bytearray(buf, length)
138+
return encoded_bytes.decode("utf8")
139+
140+
def temp_to_bytearray(self, buf, length):
141+
length = 0 - length
142+
encoded_bytes = self.__ffi.unpack(
143+
buf[self.__sizeof_header : self.__sizeof_header + length], length
144+
)
145+
file_name = encoded_bytes.decode("utf8")
146+
with open(file_name, "rb") as binaryfile:
147+
payload = bytearray(binaryfile.read())
148+
os.remove(file_name)
149+
return payload

0 commit comments

Comments
 (0)