-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmotiondetect.py
executable file
·274 lines (257 loc) · 9.45 KB
/
motiondetect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
#!/usr/bin/python
import cv2 as cv
from datetime import datetime
import time
import os
import sqlite3
class dataBase():
def create_connection(db_file):
sqliteConnection = None
try:
sqliteConnection = sqlite3.connect(db_file)
cursor = sqliteConnection.cursor()
print("Database created and Successfully Connected to SQLite")
sqlite_select_Query = "select sqlite_version();"
cursor.execute(sqlite_select_Query)
record = cursor.fetchall()
print("SQLite Database Version is: ", record)
cursor.close()
except sqlite3.Error as error:
print("Error while connecting to sqlite", error)
return sqliteConnection
def GetSettingsFromDB(connection):
cur = connection.cursor()
cur.execute('SELECT * FROM userconfig_storage')
# get all rows from DB (there should be only 1)
rows = cur.fetchall()
attributeList = list()
# loop through the rows
for row in rows:
# loop through all attributes in the row and store them in list
for attribute in row:
attributeList.append(attribute)
# return list of attributes (user storage settings from DB
return attributeList
class MotionDetect():
def __init__(self):
self.capture = cv.VideoCapture(0)
self.codec = cv.VideoWriter_fourcc(*'XVID')
self.date_time = None
self.BoxColor = (0, 0, 255)
self.searchTextColor = (0, 0, 255)
self.motionTextColor = (0, 0, 255)
self.fps = 30
self.scale = 1
self.numFrames = 0
self.maxFrames = 100
self.searchText = [
"searching ",
"searching. ",
"searching.. ",
"searching..."]
self.motionText = [
"Motion ",
"Motion! ",
"Motion!! ",
"Motion!!!"]
self.avg = None
self.out = None
self.fileName = None
self.filePath = None
self.detected = False
self.record = False
self.flip = False
self.mirror = False
self.notify = False
self.showBox = True
self.maxNumberVidoes = 10
self.timeToLive = None # need to implement time to live funciton
self.showText = True
def cleanUp(self):
if self.out is not None:
self.out.release()
self.capture.release()
cv.destroyAllWindows()
cv.waitKey(0)
def updateSettings(self, row):
if len(row) > 0:
self.record = row[1]
self.showText = row[2]
self.maxNumberVidoes = row[3]
self.timeToLive = row[4]
self.maxFrames = row[5]
def setRecording(self, fileName, frame):
# type of codec (os dependent, currently working for ubunto 20.4)
# where to save files
filePath = "osCam/videos/{}.avi".format(fileName)
# set frame rate for recording
width, height, channels = frame.shape
# return output object
return (
fileName,
filePath,
cv.VideoWriter(filePath, self.codec, self.fps, (height, width))
)
def rescaleFrame(self, frame):
# scale the width and height, (cast to int)
width = int(frame.shape[1] * self.scale)
height = int(frame.shape[0] * self.scale)
dimensions = (width, height)
# return resize frame to particular dimension
return cv.resize(frame, dimensions, interpolation=cv.INTER_AREA)
def actions(self, frame):
if self.detected:
if self.record:
if self.out is None:
date_time = datetime.now().strftime("%Y_%m_%d_%H:%M:%S")
self.fileName,
self.filePath,
self.out = MotionDetect.setRecording(
self,
date_time,
frame
)
self.numFrames += 1
self.out.write(frame)
# if number of recorded frames
# is greater than max frames to record
# then save recording and start new recording file
if self.numFrames > self.maxFrames:
self.numFrames = 0
self.out.release()
camID = 1
# pass database info to subProcess
self.fileName += '.avi'
# only send email if recording and notifications are turned on
if self.notify and self.record:
os.system('python send_email.py {} {} {} {}'.format(
self.fileName,
self.filePath,
self.numFrames,
camID
)
)
date_time = datetime.now().strftime("%Y_%m_%d_%H:%M:%S")
self.fileName,
self.filePath,
self.out = MotionDetect.setRecording(
self,
date_time,
frame
)
# convert frame to grey,
# compute Gaussian blur for noise reduction,
# update ave,
# compute weighted averages,
# compute difference between wieghted ave and grayscale frame
# to get delta (background bodel - grayscale frame)
def imgProcess(self, frame, avg):
# convert to grayscale image
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
# blur image to reduce noise
gray = cv.GaussianBlur(gray, (7, 7), 0)
# if first loop, need to set avg
if avg is None:
avg = gray.copy().astype("float")
# get weighted average of prev frame
cv.accumulateWeighted(gray, avg, 0.5)
# compute difference between first frame and cur frame
frameDelta = cv.absdiff(gray, cv.convertScaleAbs(avg))
thresh = cv.threshold(frameDelta, 5, 255, cv.THRESH_BINARY)[1]
# dilate the threshold image to fill in holes
dil = cv.dilate(thresh, None, iterations=2)
# get contours
cnts = cv.findContours(
dil.copy(),
cv.RETR_EXTERNAL,
cv.CHAIN_APPROX_SIMPLE
)
if len(cnts) == 2:
cnts = cnts[0]
elif len(cnts) == 3:
cnts = cnts[1]
else:
print("something went wrong, exiting program :(")
exit()
return avg, cnts
def setStatusColor(self):
if self.detected:
return self.motionTextColor
else:
return self.searchTextColor
def flipFrame(self, frame):
if self.flip:
return cv.flip(frame, 0)
else:
return frame
def mirrorFrame(self, frame):
if self.mirror:
return cv.flip(frame, 1)
else:
return frame
def setShowText(self, frame, text):
if self.showText:
status_color = MotionDetect.setStatusColor(self)
cv.putText(
frame,
"Status: {}".format(text),
(15, 15),
cv.FONT_HERSHEY_SIMPLEX,
.5,
status_color,
1
)
date_time = datetime.now()
cv.putText(
frame,
"{}".format(date_time.strftime("%d/%m/%Y, %H:%M:%S")),
(450, 15),
cv.FONT_HERSHEY_SIMPLEX,
.5,
status_color,
1
)
return frame
def Detect(self):
isTrue, frame = self.capture.read()
frame = MotionDetect.rescaleFrame(self, frame)
frame = MotionDetect.flipFrame(self, frame)
frame = MotionDetect.mirrorFrame(self, frame)
# set up database
database = r"osCam/db.sqlite3"
connection = dataBase.create_connection(database)
# infinate loop and capture video until 'd' is pressed
while not (cv.waitKey(28) & 0xFF == ord('d')):
with connection:
row = dataBase.GetSettingsFromDB(connection)
MotionDetect.updateSettings(self, row)
text = self.searchText[int(time.time()) % 4]
isTrue, frame = self.capture.read()
frame = MotionDetect.flipFrame(self, frame)
frame = MotionDetect.mirrorFrame(self, frame)
if isTrue:
self.avg, cnts = MotionDetect.imgProcess(self, frame, self.avg)
for c in cnts:
# if contours are less than desired area cont
if cv.contourArea(c) > 5000:
self.detected = True
if self.showBox:
(x, y, w, h) = cv.boundingRect(c)
color = (0, 0, 255)
thickness = 2
cv.rectangle(
frame,
(x, y),
(x+w, y+h),
color,
thickness
)
text = self.motionText[int(time.time()) % 4]
frame = MotionDetect.setShowText(self, frame, text)
cv.imshow('Video', frame)
MotionDetect.actions(self, frame)
connection.close()
MotionDetect.cleanUp(self)
if __name__ == "__main__":
detect = MotionDetect()
detect.Detect()