From 5b1ee94592a3e4a50b16d0bbd5aaeef76f23f70e Mon Sep 17 00:00:00 2001 From: Giovanni Carlo Marasco Date: Fri, 7 Jun 2019 00:49:46 -0500 Subject: [PATCH 1/2] Adds stream pixels functionality and utility methods and constants required. Adds example file to show how to use new functionality. Todo, unit tests. --- .gitignore | 1 + communitysdk/retailpixelkit.py | 51 +++++++++++++++++++++++++++---- example_pixel_kit_stream_pixel.py | 40 ++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 6 deletions(-) create mode 100644 example_pixel_kit_stream_pixel.py diff --git a/.gitignore b/.gitignore index d08b0f5..bb0aee7 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__ .DS_Store .pytest_cache .coverage +.idea \ No newline at end of file diff --git a/communitysdk/retailpixelkit.py b/communitysdk/retailpixelkit.py index bb2d74e..339878f 100644 --- a/communitysdk/retailpixelkit.py +++ b/communitysdk/retailpixelkit.py @@ -1,6 +1,14 @@ from .serialdevice import SerialDevice from base64 import b64encode + +COLUMNS = 16 +ROWS = 8 +PIXEL_COUNT = COLUMNS * ROWS +OFF = '#000000' +BLANK_SCREEN = [OFF] * PIXEL_COUNT + + class RetailPixelKitSerial(SerialDevice): def __init__(self, path): super().__init__(path) @@ -33,18 +41,49 @@ def hex_to_base64(self, hex_colors): return result.decode() def stream_frame(self, frame): - ''' + """ Just send to serial, don't wait until `rpc-response` - ''' - if len(frame) != 128: - raise Exception('Frame must contain 128 values') - encodedFrame = self.hex_to_base64(frame) + :param frame: + :return: None + """ + if len(frame) != PIXEL_COUNT: + exc = 'Frame must contain {} values'.format(PIXEL_COUNT) + raise Exception(exc) + + encoded_frame = self.hex_to_base64(frame) method = 'lightboard:on' - params = [{ 'map': encodedFrame }] + params = [{'map': encoded_frame}] request_obj = self.get_request_object(method, params) request_str = self.get_request_string(request_obj) self.conn_send(request_str) + @classmethod + def coord_to_index(cls, x, y): + """ + Convert an x, y coordinate to an index in a frame list + :param x: + :param y: + :return: int + """ + return (y * COLUMNS + (x - COLUMNS)) - 1 + + @classmethod + def pixels_to_frame(cls, pixels): + """ + Create a frame list of hex colors + :param pixels: list tuple (x, y, color) + :return: list + """ + frame = BLANK_SCREEN.copy() + for x, y, color in pixels: + index = cls.coord_to_index(x, y) + frame[index] = color + return frame + + def stream_pixels(self, pixels): + frame = RetailPixelKitSerial.pixels_to_frame(pixels) + self.stream_frame(frame) + def get_battery_status(self): return self.rpc_request('battery-status', []) diff --git a/example_pixel_kit_stream_pixel.py b/example_pixel_kit_stream_pixel.py new file mode 100644 index 0000000..c0d94fe --- /dev/null +++ b/example_pixel_kit_stream_pixel.py @@ -0,0 +1,40 @@ +""" +This example will stream a single color pixel to Pixel Kit. +""" +from communitysdk import list_connected_devices +from communitysdk import RetailPixelKitSerial as PixelKit +from communitysdk.retailpixelkit import COLUMNS, ROWS +from random import randint +from time import sleep + +devices = list_connected_devices() +pk_filter = filter(lambda device: isinstance(device, PixelKit), devices) +pk = next(pk_filter, None) # Get first Pixel Kit + + +def get_random_hex()->str: + random_number = randint(0, 16777215) # number of possible colors + hex_number = str(hex(random_number)) + hex_number = '#{}'.format(hex_number[2:]) + return hex_number + + +def get_random_pixel()->tuple: + x = randint(1, COLUMNS) + y = randint(1, ROWS) + color = get_random_hex() + return x, y, color + + +if pk is not None: + """ + A pixel is a tuple with an x and y coordinate and a hexadecimal color + prefixed with `#`. + We'll create a random color and coordinate to stream to the Pixel Kit. + """ + while True: + pixels = [get_random_pixel()] + pk.stream_pixels(pixels) + sleep(0.1) +else: + print('No Pixel Kit was found :(') From b0d0803dbd32f296cfd8bbbfa07eb78b16322d27 Mon Sep 17 00:00:00 2001 From: Giovanni Carlo Marasco Date: Sun, 9 Jun 2019 01:25:23 -0500 Subject: [PATCH 2/2] Adds unit tests --- example_pixel_kit_stream_pixel.py | 4 +-- test/test_retailpixelkitserial.py | 60 +++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/example_pixel_kit_stream_pixel.py b/example_pixel_kit_stream_pixel.py index c0d94fe..7b3a03a 100644 --- a/example_pixel_kit_stream_pixel.py +++ b/example_pixel_kit_stream_pixel.py @@ -12,14 +12,14 @@ pk = next(pk_filter, None) # Get first Pixel Kit -def get_random_hex()->str: +def get_random_hex() -> str: random_number = randint(0, 16777215) # number of possible colors hex_number = str(hex(random_number)) hex_number = '#{}'.format(hex_number[2:]) return hex_number -def get_random_pixel()->tuple: +def get_random_pixel() -> tuple: x = randint(1, COLUMNS) y = randint(1, ROWS) color = get_random_hex() diff --git a/test/test_retailpixelkitserial.py b/test/test_retailpixelkitserial.py index 05bda90..bb62b0c 100644 --- a/test/test_retailpixelkitserial.py +++ b/test/test_retailpixelkitserial.py @@ -174,3 +174,63 @@ def test_connect_to_wifi_wrong(rpk): patch.object(rpk, 'rpc_request'): rpk.connect_to_wifi(ssid, value) assert "`password` must be a string" in str(err.value) + + +def test_coord_to_index(): + """ + Should return the correct index in the pixel array from x and y coordinates + """ + assert PixelKit.coord_to_index(1, 1) == 0 + assert PixelKit.coord_to_index(16, 8) == 127 + + +@pytest.fixture +def pixel1() -> tuple: + return 1, 1, '#ffffff' + + +@pytest.fixture +def pixel2() -> tuple: + return 16, 8, '#2b2b2b' + + +class TestPixelsToFrame: + from communitysdk.retailpixelkit import BLANK_SCREEN + + def test_empty_list(self): + """ + Should return a blank screen list when the pixels argument is an empty list + """ + assert PixelKit.pixels_to_frame([]) == self.BLANK_SCREEN + + def test_pixels_to_frame(self, pixel1, pixel2): + """ + Should return a list with the pixels at the correct indices + """ + expected_frame = self.BLANK_SCREEN.copy() + color1 = pixel1[2] + color2 = pixel2[2] + expected_frame[0] = color1 + expected_frame[127] = color2 + assert PixelKit.pixels_to_frame([pixel1, pixel2]) == expected_frame + + +class TestStreamPixels: + @pytest.fixture + def pixels(self, pixel1, pixel2) -> list: + return [pixel1, pixel2] + + @pytest.fixture + def frame(self, pixels) -> list: + return PixelKit.pixels_to_frame(pixels) + + def test_calls_pixels_to_frame(self, rpk, pixels, frame): + with patch.object(PixelKit, 'pixels_to_frame', return_value=frame) as pixels_to_frame, \ + patch.object(rpk, 'stream_frame'): + rpk.stream_pixels(pixels) + pixels_to_frame.assert_called_with(pixels) + + def test_calls_stream_frame(self, rpk, pixels, frame): + with patch.object(rpk, 'stream_frame') as stream_frame: + rpk.stream_pixels(pixels) + stream_frame.assert_called_with(frame)