1+ #cython: language_level=3, boundscheck=False
2+ import multiprocessing as mp
3+ from enum import Enum
4+ import numpy as np
5+ import gi
6+ gi .require_version ('Gst' , '1.0' )
7+ from gi .repository import Gst
8+ Gst .init (None )
9+
10+ '''Konwn issues
11+
12+ * if format changes at run time system hangs
13+ '''
14+
15+ class StreamMode (Enum ):
16+ INIT_STREAM = 1
17+ SETUP_STREAM = 1
18+ READ_STREAM = 2
19+
20+
21+ class StreamCommands (Enum ):
22+ FRAME = 1
23+ ERROR = 2
24+ HEARTBEAT = 3
25+ RESOLUTION = 4
26+ STOP = 5
27+
28+
29+ class StreamCapture (mp .Process ):
30+
31+ def __init__ (self , link , stop , outQueue ):
32+ """
33+ Initialize the stream capturing process
34+ link - rstp link of stream
35+ stop - to send commands to this process
36+ outPipe - this process can send commands outside
37+ """
38+
39+ super ().__init__ ()
40+ self .streamLink = link
41+ self .stop = stop
42+ self .outQueue = outQueue
43+ self .currentState = StreamMode .INIT_STREAM
44+ self .pipeline = None
45+ self .source = None
46+ self .decode = None
47+ self .convert = None
48+ self .sink = None
49+ self .image_arr = None
50+ self .newImage = False
51+ self .frame1 = None
52+ self .frame2 = None
53+
54+
55+
56+ def gst_to_opencv (self , sample ):
57+ buf = sample .get_buffer ()
58+ caps = sample .get_caps ()
59+
60+ # Print Height, Width and Format
61+ # print(caps.get_structure(0).get_value('format'))
62+ # print(caps.get_structure(0).get_value('height'))
63+ # print(caps.get_structure(0).get_value('width'))
64+
65+ arr = np .ndarray (
66+ (caps .get_structure (0 ).get_value ('height' ),
67+ caps .get_structure (0 ).get_value ('width' ),
68+ 3 ),
69+ buffer = buf .extract_dup (0 , buf .get_size ()),
70+ dtype = np .uint8 )
71+ return arr
72+
73+ def new_buffer (self , sink , _ ):
74+ sample = sink .emit ("pull-sample" )
75+ arr = self .gst_to_opencv (sample )
76+ self .image_arr = arr
77+ self .newImage = True
78+ return Gst .FlowReturn .OK
79+
80+ def run (self ):
81+ # Create the empty pipeline
82+ self .pipeline = Gst .parse_launch (
83+ 'rtspsrc name=m_rtspsrc ! rtph264depay name=m_rtph264depay ! avdec_h264 name=m_avdech264 ! videoconvert name=m_videoconvert ! appsink name=m_appsink' )
84+
85+ # source params
86+ self .source = self .pipeline .get_by_name ('m_rtspsrc' )
87+ self .source .set_property ('latency' , 0 )
88+ self .source .set_property ('location' , self .streamLink )
89+ self .source .set_property ('protocols' , 'tcp' )
90+ self .source .set_property ('retry' , 50 )
91+ self .source .set_property ('timeout' , 50 )
92+ self .source .set_property ('tcp-timeout' , 5000000 )
93+ self .source .set_property ('drop-on-latency' , 'true' )
94+
95+ # decode params
96+ self .decode = self .pipeline .get_by_name ('m_avdech264' )
97+ self .decode .set_property ('max-threads' , 2 )
98+ self .decode .set_property ('output-corrupt' , 'false' )
99+
100+ # convert params
101+ self .convert = self .pipeline .get_by_name ('m_videoconvert' )
102+
103+ # sink params
104+ self .sink = self .pipeline .get_by_name ('m_appsink' )
105+
106+ # Maximum number of nanoseconds that a buffer can be late before it is dropped (-1 unlimited)
107+ # flags: readable, writable
108+ # Integer64. Range: -1 - 9223372036854775807 Default: -1
109+ self .sink .set_property ('max-lateness' , 500000000 )
110+
111+ # The maximum number of buffers to queue internally (0 = unlimited)
112+ # flags: readable, writable
113+ # Unsigned Integer. Range: 0 - 4294967295 Default: 0
114+ self .sink .set_property ('max-buffers' , 5 )
115+
116+ # Drop old buffers when the buffer queue is filled
117+ # flags: readable, writable
118+ # Boolean. Default: false
119+ self .sink .set_property ('drop' , 'true' )
120+
121+ # Emit new-preroll and new-sample signals
122+ # flags: readable, writable
123+ # Boolean. Default: false
124+ self .sink .set_property ('emit-signals' , True )
125+
126+ # # sink.set_property('drop', True)
127+ # # sink.set_property('sync', False)
128+
129+ # The allowed caps for the sink pad
130+ # flags: readable, writable
131+ # Caps (NULL)
132+ caps = Gst .caps_from_string (
133+ 'video/x-raw, format=(string){BGR, GRAY8}; video/x-bayer,format=(string){rggb,bggr,grbg,gbrg}' )
134+ self .sink .set_property ('caps' , caps )
135+
136+ if not self .source or not self .sink or not self .pipeline or not self .decode or not self .convert :
137+ print ("Not all elements could be created." )
138+ self .stop .set ()
139+
140+ self .sink .connect ("new-sample" , self .new_buffer , self .sink )
141+
142+ # Start playing
143+ ret = self .pipeline .set_state (Gst .State .PLAYING )
144+ if ret == Gst .StateChangeReturn .FAILURE :
145+ print ("Unable to set the pipeline to the playing state." )
146+ self .stop .set ()
147+
148+ # Wait until error or EOS
149+ bus = self .pipeline .get_bus ()
150+
151+ while True :
152+
153+ if self .stop .is_set ():
154+ print ('Stopping CAM Stream by main process' )
155+ break
156+
157+ message = bus .timed_pop_filtered (10000 , Gst .MessageType .ANY )
158+ # print "image_arr: ", image_arr
159+ if self .image_arr is not None and self .newImage is True :
160+
161+ if not self .outQueue .full ():
162+
163+ # print("\r adding to queue of size{}".format(self.outQueue.qsize()), end='\r')
164+ self .outQueue .put ((StreamCommands .FRAME , self .image_arr ), block = False )
165+
166+ self .image_arr = None
167+
168+
169+ if message :
170+ if message .type == Gst .MessageType .ERROR :
171+ err , debug = message .parse_error ()
172+ print ("Error received from element %s: %s" % (
173+ message .src .get_name (), err ))
174+ print ("Debugging information: %s" % debug )
175+ break
176+ elif message .type == Gst .MessageType .EOS :
177+ print ("End-Of-Stream reached." )
178+ break
179+ elif message .type == Gst .MessageType .STATE_CHANGED :
180+ if isinstance (message .src , Gst .Pipeline ):
181+ old_state , new_state , pending_state = message .parse_state_changed ()
182+ print ("Pipeline state changed from %s to %s." %
183+ (old_state .value_nick , new_state .value_nick ))
184+ else :
185+ print ("Unexpected message received." )
186+
187+
188+ print ('terminating cam pipe' )
189+ self .stop .set ()
190+ self .pipeline .set_state (Gst .State .NULL )
0 commit comments