-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathutils.py
107 lines (81 loc) · 2.81 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import asyncio
import io
import mimetypes
import os
import random
import re
import tempfile
import typing
from contextlib import contextmanager
import magic
from PIL import Image as pil_image
emoji = ['😼', '😺', '😸', '😹', '😻', '🙀', '😿', '😾', '😩', '🙈', '🙉', '🙊', '😳']
def find_first_url(string: str) -> typing.Optional[str]:
urls = re.findall(r'(https?://[^\s]+)', string)
return urls[0] if urls else None
def guess_extension_from_buffer(buffer: io.BytesIO) -> str:
extension = mimetypes.guess_extension(type=magic.from_buffer(buffer.read(2048), mime=True))
buffer.seek(0)
return extension or '.mp4'
async def resize(buffer: io.BytesIO, extension: str = 'mp4') -> io.BytesIO:
with (
tempfile.NamedTemporaryFile(suffix=extension) as input_tmp,
tempfile.NamedTemporaryFile(suffix=extension) as output_tmp,
):
# with open('/tmp/temp', 'w+b') as tmp:
input_tmp.write(buffer.read())
command = [
'ffmpeg',
'-y',
f'-i {input_tmp.name}',
'-vf "scale=trunc(iw/4)*2:trunc(ih/4)*2"',
'-c:v',
'-c:a',
'-vcodec libx264',
'-crf 28',
f'{output_tmp.name}',
]
ffmpeg_proc = await asyncio.create_subprocess_shell(
' '.join(command),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await ffmpeg_proc.communicate()
return io.BytesIO(output_tmp.read())
def random_emoji() -> str:
return random.choice(emoji)
def combine_images(
image_fps: typing.List[str | io.BytesIO],
gap: int = 10,
quality: int = 85,
max_images: int = 3,
) -> io.BytesIO:
images = [pil_image.open(path) for path in image_fps[:max_images]]
widths, heights = zip(*(im.size for im in images))
new_image = pil_image.new('RGBA', (sum(widths), max(heights)))
offset = 0
for image in images:
new_image.paste(image, (offset, 0))
offset += image.size[0] + gap
image_bytes = io.BytesIO()
new_image.save(image_bytes, format='PNG', quality=quality, optimize=True)
image_bytes.seek(0)
return image_bytes
def resize_image(buffer: io.BytesIO, factor: float = 0.75) -> io.BytesIO:
if factor == 1.0:
return buffer
image = pil_image.open(buffer)
width, height = image.size
new_image = image.resize((int(width * factor), int(height * factor)), pil_image.Resampling.NEAREST)
image_bytes = io.BytesIO()
new_image.save(image_bytes, format='PNG', optimize=True)
image_bytes.seek(0)
return image_bytes
@contextmanager
def temp_open(path: str, mode: str = 'rb'):
f = open(path, mode) # pylint: disable=unspecified-encoding
try:
yield f
finally:
f.close()
os.remove(path)