Skip to content

Commit 349894e

Browse files
committed
Added Ethernet client server example code.
1 parent f112ed7 commit 349894e

File tree

8 files changed

+378
-0
lines changed

8 files changed

+378
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
# this statement checks if there is an instance of the EtherSenseServer running
3+
if [[ ! `ps -eaf | grep "python EtherSenseServer.py" | grep -v grep` ]]; then
4+
# if not, EtherSenseServer is started with the PYTHONPATH set due to cron not passing Env
5+
PYTHONPATH=$HOME/.local/lib/python2.7/site-packages python EtherSenseServer.py
6+
fi
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/usr/bin/python
2+
import pyrealsense2 as rs
3+
import sys, getopt
4+
import asyncore
5+
import numpy as np
6+
import pickle
7+
import socket
8+
import struct
9+
import cv2
10+
11+
12+
print('Number of arguments:', len(sys.argv), 'arguments.')
13+
print('Argument List:', str(sys.argv))
14+
mc_ip_address = '224.0.0.1'
15+
local_ip_address = '192.168.0.1'
16+
port = 1024
17+
chunk_size = 4096
18+
19+
def main(argv):
20+
multi_cast_message(mc_ip_address, port, 'EtherSensePing')
21+
22+
23+
#UDP client for each camera server
24+
class ImageClient(asyncore.dispatcher):
25+
def __init__(self, server, source):
26+
asyncore.dispatcher.__init__(self, server)
27+
self.address = server.getsockname()[0]
28+
self.port = source[1]
29+
self.buffer = bytearray()
30+
self.windowName = self.port
31+
# open cv window which is unique to the port
32+
cv2.namedWindow("window"+str(self.windowName))
33+
self.remainingBytes = 0
34+
self.frame_id = 0
35+
36+
def handle_read(self):
37+
if self.remainingBytes == 0:
38+
# get the expected frame size
39+
self.frame_length = struct.unpack('<I', self.recv(4))[0]
40+
# get the timestamp of the current frame
41+
self.timestamp = struct.unpack('<d', self.recv(8))
42+
self.remainingBytes = self.frame_length
43+
44+
# request the frame data until the frame is completely in buffer
45+
data = self.recv(self.remainingBytes)
46+
self.buffer += data
47+
self.remainingBytes -= len(data)
48+
# once the frame is fully recived, process/display it
49+
if len(self.buffer) == self.frame_length:
50+
self.handle_frame()
51+
52+
def handle_frame(self):
53+
# convert the frame from string to numerical data
54+
imdata = pickle.loads(self.buffer)
55+
bigDepth = cv2.resize(imdata, (0,0), fx=2, fy=2, interpolation=cv2.INTER_NEAREST)
56+
cv2.putText(bigDepth, str(self.timestamp), (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (65536), 2, cv2.LINE_AA)
57+
cv2.imshow("window"+str(self.windowName), bigDepth)
58+
cv2.waitKey(1)
59+
self.buffer = bytearray()
60+
self.frame_id += 1
61+
def readable(self):
62+
return True
63+
64+
65+
class EtherSenseClient(asyncore.dispatcher):
66+
def __init__(self):
67+
asyncore.dispatcher.__init__(self)
68+
self.server_address = ('', 1024)
69+
# create a socket for TCP connection between the client and server
70+
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
71+
self.socket.settimeout(5)
72+
73+
self.bind(self.server_address)
74+
self.listen(10)
75+
76+
def writable(self):
77+
return False # don't want write notifies
78+
79+
def readable(self):
80+
return True
81+
82+
def handle_connect(self):
83+
print("connection recvied")
84+
85+
def handle_accept(self):
86+
pair = self.accept()
87+
#print(self.recv(10))
88+
if pair is not None:
89+
sock, addr = pair
90+
print ('Incoming connection from %s' % repr(addr))
91+
# when a connection is attempted, delegate image receival to the ImageClient
92+
handler = ImageClient(sock, addr)
93+
94+
def multi_cast_message(ip_address, port, message):
95+
# send the multicast message
96+
multicast_group = (ip_address, port)
97+
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
98+
connections = {}
99+
try:
100+
# Send data to the multicast group
101+
print('sending "%s"' % message + str(multicast_group))
102+
sent = sock.sendto(message.encode(), multicast_group)
103+
104+
# defer waiting for a response using Asyncore
105+
client = EtherSenseClient()
106+
asyncore.loop()
107+
108+
# Look for responses from all recipients
109+
110+
except socket.timeout:
111+
print('timed out, no more responses')
112+
finally:
113+
print(sys.stderr, 'closing socket')
114+
sock.close()
115+
116+
if __name__ == '__main__':
117+
main(sys.argv[1:])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#!/usr/bin/python
2+
import pyrealsense2 as rs
3+
import sys, getopt
4+
import asyncore
5+
import numpy as np
6+
import pickle
7+
import socket
8+
import struct
9+
import cv2
10+
11+
12+
print('Number of arguments:', len(sys.argv), 'arguments.')
13+
print('Argument List:', str(sys.argv))
14+
mc_ip_address = '224.0.0.1'
15+
port = 1024
16+
chunk_size = 4096
17+
#rs.log_to_console(rs.log_severity.debug)
18+
19+
def getDepthAndTimestamp(pipeline, depth_filter):
20+
frames = pipeline.wait_for_frames()
21+
# take owner ship of the frame for further processing
22+
frames.keep()
23+
depth = frames.get_depth_frame()
24+
if depth:
25+
depth2 = depth_filter.process(depth)
26+
# take owner ship of the frame for further processing
27+
depth2.keep()
28+
# represent the frame as a numpy array
29+
depthData = depth2.as_frame().get_data()
30+
depthMat = np.asanyarray(depthData)
31+
ts = frames.get_timestamp()
32+
return depthMat, ts
33+
else:
34+
return None, None
35+
def openPipeline():
36+
cfg = rs.config()
37+
cfg.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
38+
pipeline = rs.pipeline()
39+
pipeline_profile = pipeline.start(cfg)
40+
sensor = pipeline_profile.get_device().first_depth_sensor()
41+
return pipeline
42+
43+
class DevNullHandler(asyncore.dispatcher_with_send):
44+
45+
def handle_read(self):
46+
print(self.recv(1024))
47+
48+
def handle_close(self):
49+
self.close()
50+
51+
52+
class EtherSenseServer(asyncore.dispatcher):
53+
def __init__(self, address):
54+
asyncore.dispatcher.__init__(self)
55+
print("Launching Realsense Camera Server")
56+
try:
57+
self.pipeline = openPipeline()
58+
except:
59+
print("Unexpected error: ", sys.exc_info()[1])
60+
sys.exit(1)
61+
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
62+
print('sending acknowledgement to', address)
63+
64+
# reduce the resolution of the depth image using post processing
65+
self.decimate_filter = rs.decimation_filter()
66+
self.decimate_filter.set_option(rs.option.filter_magnitude, 2)
67+
self.frame_data = ''
68+
self.connect((address[0], 1024))
69+
self.packet_id = 0
70+
71+
def handle_connect(self):
72+
print("connection received")
73+
74+
def writable(self):
75+
return True
76+
77+
def update_frame(self):
78+
depth, timestamp = getDepthAndTimestamp(self.pipeline, self.decimate_filter)
79+
if depth is not None:
80+
# convert the depth image to a string for broadcast
81+
data = pickle.dumps(depth)
82+
# capture the lenght of the data portion of the message
83+
length = struct.pack('<I', len(data))
84+
# include the current timestamp for the frame
85+
ts = struct.pack('<d', timestamp)
86+
# for the message for transmission
87+
self.frame_data = ''.join([length, ts, data])
88+
89+
def handle_write(self):
90+
# first time the handle_write is called
91+
if not hasattr(self, 'frame_data'):
92+
self.update_frame()
93+
# the frame has been sent in it entirety so get the latest frame
94+
if len(self.frame_data) == 0:
95+
self.update_frame()
96+
else:
97+
# send the remainder of the frame_data until there is no data remaining for transmition
98+
remaining_size = self.send(self.frame_data)
99+
self.frame_data = self.frame_data[remaining_size:]
100+
101+
102+
def handle_close(self):
103+
self.close()
104+
105+
106+
class MulticastServer(asyncore.dispatcher):
107+
def __init__(self, host = mc_ip_address, port=1024):
108+
asyncore.dispatcher.__init__(self)
109+
server_address = ('', port)
110+
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
111+
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
112+
self.bind(server_address)
113+
114+
def handle_read(self):
115+
data, addr = self.socket.recvfrom(42)
116+
print('Recived Multicast message %s bytes from %s' % (data, addr))
117+
# Once the server recives the multicast signal, open the frame server
118+
EtherSenseServer(addr)
119+
print(sys.stderr, data)
120+
121+
def writable(self):
122+
return False # don't want write notifies
123+
124+
def handle_close(self):
125+
self.close()
126+
127+
def handle_accept(self):
128+
channel, addr = self.accept()
129+
print('received %s bytes from %s' % (data, addr))
130+
131+
132+
def main(argv):
133+
# initalise the multicast receiver
134+
server = MulticastServer()
135+
# hand over excicution flow to asyncore
136+
asyncore.loop()
137+
138+
if __name__ == '__main__':
139+
main(sys.argv[1:])
140+
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# EtherSense
2+
Ethernet client and server for RealSense using python's Asyncore.
3+
4+
## Prerequisites
5+
Installation and Setup of Server:
6+
These steps assume a fresh install of Ubuntu 18.04 on an UpBoard but has also been tested on an Intel NUC.
7+
8+
```
9+
sudo apt-get update; sudo apt-get upgrade;
10+
11+
sudo apt-get install python
12+
13+
sudo apt-get install python-pip
14+
15+
sudo apt-get install git
16+
```
17+
18+
Clone the repo then run:
19+
20+
```
21+
sudo python setup.py
22+
```
23+
24+
This will first install the pip dependencies, followed by the creation of cronjobs in the /etc/crontab file that maintains an instance of the Server running whenever the device is powered.
25+
26+
## Overview
27+
Mulicast broadcast is used to establish connections to servers that are present on the network.
28+
Once a server receives a request for connection from a client, Asyncore is used to establish a TCP connection for each server.
29+
Frames are collected from the camera using librealsense pipeline. It is then resized and send in smaller chucks as to conform with TCP.
30+
31+
### UpBoard PoE
32+
Below shows use of a PoE switch and PoE breakout devices(avalible from online retailers) powering each dedicated UpBoard:
33+
This configuration should allow for a number of RealSense cameras to be connected over distances greater then 30m
34+
![Example Image](https://github.com/krejov100/EtherSense/blob/master/UpBoardSwitch.JPG)
35+
The 5 RealSense cameras are connected to each UpBoard using the provided USB3 cables.
36+
37+
### Client Window
38+
Below shows the result of having connected to five cameras over the local network:
39+
![Example Image](https://github.com/krejov100/EtherSense/blob/master/MultiCameraEthernet.png)
40+
The window titles indicate the port which the frames are being received over.
41+
42+
## Error Logging
43+
Errors are piped to a log file stored in /tmp/error.log as part of the command that is setup in /etc/crontab
44+
45+
## NOTES
46+
47+
### Power Considerations
48+
The UpBoards require a 5v 4Amp power supply. When using PoE breakout adaptors I have found some stability issues, for example the device kernel can crash when the HDMI port is connected. As such I recommend running the UpBoard as a headless server when using PoE.
49+
50+
### Network bandwidth
51+
It is currently very easy to saturate the bandwidth of the Ethernet connection I have tested 5 servers connected to the same client without issue beyond limited framerate:
52+
53+
cfg.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
54+
55+
self.decimate_filter.set_option(rs.option.filter_magnitude, 2)
56+
57+
There are a number of strategies that can be used to increase this bandwidth but are left to the user for brevity and the specific tradeoff for your application, these include:
58+
59+
Transmitting frames using UDP and allowing for frame drop, this requires implementation of packet ordering.
60+
61+
Reducing the depth channel to 8bit.
62+
63+
Reducing the resolution further.
64+
65+
The addition of compression, either frame wise or better still temporal.
66+
67+
Local recording of the depth data into a buffer, with asynchronous frame transfer.
68+
69+
## TroubleShooting Tips
70+
71+
I first of all suggest installing and configuring openssh-server on each of the UpBoards allowing remote connection from the client machine.
72+
73+
Check that the UpBoards are avalible on the local network using "nmap -sP 192.168.2.*"
74+
75+
Check that the server is running on the UpBoard using "ps -eaf | grep "python EtherSenseServer.py"
76+
77+
Finally check the log file at /tmp/error.log
78+
79+
There might still be some conditions where the Server is running but not in a state to transmit, help in narrowing these cases would be much appreciated.
80+
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# install the dependancies from pip
2+
try:
3+
from pip import main as pipmain
4+
except:
5+
from pip._internal import main as pipmain
6+
pipmain(['install', 'numpy'])
7+
pipmain(['install', 'python-crontab'])
8+
pipmain(['install', 'opencv-python'])
9+
pipmain(['install', 'pyrealsense2'])
10+
pipmain(['install', 'cron_descriptor'])
11+
12+
# using python-crontab, setup a job that is ran on the minute to check if the server is running.
13+
# https://pypi.org/project/python-crontab/
14+
from crontab import CronTab
15+
#using the system crontab file as sudo is required for librealsense aquisition
16+
system_cron = CronTab(tabfile='/etc/crontab', user=False)
17+
#requires the shell enviroment as ./AlwaysRunningServer.bash
18+
system_cron.env['SHELL'] = '/bin/bash'
19+
job = system_cron.new(command="cd /home/$(ls /home/)/EtherSense; ./AlwaysRunningServer.bash >> /tmp/error.log 2>&1", user='root')
20+
job.every_reboot()
21+
i = 0
22+
#this while loop means that the are entrys in the cron file for each 5 sec interval
23+
while i < 60:
24+
#the cron job is cd to the EtherSense dir then run AlwaysRunningServer.bash, logging to /temp/error.log
25+
#have to use this $(ls /home/) to find home dir, assuming no other user spaces.
26+
job = system_cron.new(command="sleep %i; cd /home/$(ls /home/)/EtherSense; ./AlwaysRunningServer.bash >> /tmp/error.log 2>&1"%i, user='root')
27+
28+
# this line sets the frequance of server checking, stars for all time slices means every minute
29+
job.setall('* * * * *')
30+
system_cron.write()
31+
i += 5
32+
print('cron job set to run ' + job.description())
33+
34+

wrappers/python/examples/readme.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ These Examples demonstrate how to use the python wrapper of the SDK.
1515
8. [T265 Basic](./t265_example.py) - Demonstrates how to retrieve pose data from a T265
1616
9. [T265 Coordinates](./t265_rpy.py) - This example shows how to change coordinate systems of a T265 pose
1717
10. [T265 Stereo](./t265_stereo.py) - This example shows how to use T265 intrinsics and extrinsics in OpenCV to asynchronously compute depth maps from T265 fisheye images on the host.
18+
11. [Realsense over Ethernet](./ethernet_client_server/README.md) - This example shows how to stream depth data from RealSense depth cameras over ethernet.
1819

1920
## Pointcloud Visualization
2021

0 commit comments

Comments
 (0)