forked from mdhiggins/sickbeard_mp4_automator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_utils.py
150 lines (130 loc) · 4.95 KB
/
_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import logging
import traceback
import os
import sys
import time
import datetime
import string
import unicodedata
import Levenshtein
from mutagen.mp4 import MP4
from logging.config import fileConfig
fileConfig(os.path.join(os.path.dirname(__file__), 'logging.ini'), defaults={'logfilename': os.path.join(os.path.dirname(__file__), 'info.log').replace("\\", "/")})
log = logging.getLogger(__name__)
def filename_clean(filename, real_clean=False):
if real_clean:
validFilenameChars = "%s%s" % (string.ascii_letters, string.digits)
else:
validFilenameChars = "%s%s" % (string.ascii_letters, string.digits)
filename_unicode = unicode(filename)
filename_normalized = unicodedata.normalize('NFKD', filename_unicode).encode('ASCII', 'ignore')
filename_clean = ''.join(c for c in filename_normalized if c in validFilenameChars)
return filename_clean
def levenshtein_distance(compare_base, compare_to):
return Levenshtein.distance(str(compare_base), str(compare_to))
def is_number(s):
try:
float(s)
return True
except ValueError:
return False
class executionLocker():
def __init__(self):
self.lockfile = os.path.join(os.path.dirname(__file__), 'run.lock')
def lock(self):
try:
log.debug("--- LOCK FILE CREATED ---")
with open(self.lockfile, 'a'):
os.utime(self.lockfile, None)
return True
except:
log.debug("!!! FAILED TO ACQUIRE LOCK !!!")
return False
def renew(self):
try:
os.utime(self.lockfile, None)
return True
except:
log.warning("!!! FAILED TO RENEW LOCK !!!")
return False
def unlock(self):
try:
log.debug("--- LOCK FILE REMOVED ---")
os.remove(self.lockfile)
return True
except:
log.debug("!!! FAILED TO RELEASE LOCK !!!")
return False
def islocked(self):
try:
try:
lockmt = os.path.getmtime(self.lockfile)
except OSError as e:
if e.errno == 2:
return False
else:
raise(e)
lockexp = lockmt + ( 360 * 60 )
log.info("Found valid lock: %s, expires: %s" % (datetime.datetime.fromtimestamp(lockmt).strftime('%Y-%m-%d %H:%M:%S'), datetime.datetime.fromtimestamp(lockexp).strftime('%Y-%m-%d %H:%M:%S')))
if lockexp < time.time():
log.debug("!!! RELEASING DEADLOCK !!!")
self.unlock()
return False
else:
return True
except:
log.exception("!!! FAILED CHECKING FOR LOCK !!!")
return True
class LoggingAdapter(logging.LoggerAdapter):
@staticmethod
def indent():
indents = []
st = traceback.extract_stack()
for s in st:
cmd = s[3][:3]
if not (cmd == 'if ' or cmd == 'if('):
indents.append(s)
indentation_level = len(indents)
return indentation_level-4 # Remove logging infrastructure frames
def process(self, msg, kwargs):
return msg, kwargs
def debugI(self, msg, *args, **kwargs):
msg, kwargs = self.process(msg, kwargs)
indents = self.indent()
msg = '({j}) {i}{m}'.format(j=str.format("%2s" % indents),i=' '*indents, m=msg)
self.logger.debug(msg, *args, **kwargs)
def debugTrace(self, msg, *args, **kwargs):
self.debug("-- STACK TRACE DUMP FOLLOWS -- %s" % msg, *args, **kwargs)
st = traceback.extract_stack()
for s in st:
self.logger.debug(s, *args, **kwargs)
@staticmethod
def getLogger(name=None, kwargs={}):
if name is None:
try:
name = (os.path.split(traceback.extract_stack()[-2][0])[1]).split('.')[0]
except:
name = "FIX_MY_NAME"
log.debug("*** Creating LoggingAdapter for %s ***" % name)
return LoggingAdapter(logging.getLogger(name), kwargs)
class metadata_stamper:
@staticmethod
def stamp_encoder(mp4Path=None, video=None, stamp='untagged', save=True):
if video is None and mp4Path is not None:
video_tag = MP4(mp4Path)
elif video is not None:
video_tag = video
else:
video_tag = None
if video_tag is not None:
video_tag['\xa9too'] = "meks-ffmpeg %s" % stamp
if save:
for i in range(3):
try:
video_tag.save()
return True
except IOError as e:
time.sleep(5)
raise IOError
return video_tag
raise IOError