Skip to content

Commit 9f8c1ac

Browse files
authored
Merge pull request #3 from haklein/wip-certabo-module
move certabo code into separate module
2 parents fae60d4 + e60cfdf commit 9f8c1ac

File tree

6 files changed

+281
-374
lines changed

6 files changed

+281
-374
lines changed

certabo-lichess.py

+18-224
Original file line numberDiff line numberDiff line change
@@ -4,40 +4,21 @@
44
# Copyright (c) 2020 by Harald Klein <[email protected]> - All rights reserved
55
#
66

7-
from __future__ import print_function
8-
from __future__ import division
9-
import importlib
10-
import re
117
import sys
128
import time
139
import logging
1410
import logging.handlers
1511
import traceback
1612
import os
1713
import argparse
18-
import subprocess
19-
import time as tt
2014
import threading
21-
import queue
22-
import serial
23-
import fcntl
24-
25-
import serial.tools.list_ports
26-
27-
import berserk
28-
29-
from socket import *
30-
from select import *
3115

3216
import chess.pgn
3317
import chess
3418

35-
from random import shuffle
36-
37-
import codes
38-
39-
from utils import port2number, port2udp, find_port, get_engine_list, get_book_list, coords_in
40-
from constants import CERTABO_SAVE_PATH, CERTABO_DATA_PATH, MAX_DEPTH_DEFAULT
19+
import berserk
20+
import certabo
21+
from certabo.certabo import CERTABO_DATA_PATH as CERTABO_DATA_PATH
4122

4223
parser = argparse.ArgumentParser()
4324
parser.add_argument("--port")
@@ -50,7 +31,6 @@
5031
portname = 'auto'
5132
if args.port is not None:
5233
portname = args.port
53-
port = port2number(portname)
5434

5535
calibrate = False
5636
if args.calibrate:
@@ -83,197 +63,11 @@ def my_excepthook(excType, excValue, traceback, logger=logger):
8363

8464
logging.info("certabo-lichess.py startup")
8565

86-
for d in (CERTABO_SAVE_PATH, CERTABO_DATA_PATH):
87-
try:
88-
os.makedirs(d)
89-
except OSError:
90-
pass
91-
92-
class serialreader(threading.Thread):
93-
def __init__ (self, handler, device='auto'):
94-
threading.Thread.__init__(self)
95-
self.device = device
96-
self.connected = False
97-
self.handler = handler
98-
self.serial_out = queue.Queue()
99-
100-
def send_led(self, message: bytes):
101-
self.serial_out.put(message)
102-
103-
def run(self):
104-
while True:
105-
if not self.connected:
106-
try:
107-
if self.device == 'auto':
108-
logging.info(f'Auto-detecting serial port')
109-
serialport = find_port()
110-
else:
111-
serialport = self.device
112-
if serialport is None:
113-
logging.info(f'No port found, retrying')
114-
time.sleep(1)
115-
continue
116-
logging.info(f'Opening serial port {serialport}')
117-
uart = serial.Serial(serialport, 38400, timeout=2.5) # 0-COM1, 1-COM2 / speed /
118-
if os.name == 'posix':
119-
logging.debug(f'Attempting to lock {serialport}')
120-
fcntl.flock(uart.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
121-
logging.debug(f'Flushing input on {serialport}')
122-
uart.flushInput()
123-
uart.write(b'U\xaaU\xaaU\xaaU\xaa')
124-
time.sleep(1)
125-
uart.write(b'\xaaU\xaaU\xaaU\xaaU')
126-
time.sleep(1)
127-
uart.write(b'\x00\x00\x00\x00\x00\x00\x00\x00')
128-
self.connected = True
129-
except Exception as e:
130-
logging.info(f'ERROR: Cannot open serial port {serialport}: {str(e)}')
131-
self.connected = False
132-
time.sleep(0.1)
133-
else:
134-
try:
135-
while uart.inWaiting():
136-
# logging.debug(f'serial data pending')
137-
raw_message = uart.readline()
138-
try:
139-
message = raw_message.decode("ascii")[1: -3]
140-
#if DEBUG:
141-
# print(len(message.split(" ")), "numbers")
142-
if len(message.split(" ")) == 320: # 64*5
143-
self.handler(message)
144-
message = ""
145-
except Exception as e:
146-
logging.info(f'Exception during message decode: {str(e)}')
147-
time.sleep(0.001)
148-
if not self.serial_out.empty():
149-
data = self.serial_out.get()
150-
self.serial_out.task_done()
151-
# logging.debug(f'Sending to serial: {data}')
152-
uart.write(data)
153-
except Exception as e:
154-
logging.info(f'Exception during serial communication: {str(e)}')
155-
self.connected = False
156-
157-
158-
class Certabo():
159-
def __init__(self, port='auto', calibrate=False, **kwargs):
160-
super().__init__(**kwargs)
161-
self.portname = port
162-
self.calibration = calibrate
163-
self.new_setup = False
164-
self.rotate180 = False
165-
self.color = chess.WHITE
166-
self.starting_position = chess.STARTING_FEN
167-
self.chessboard = chess.Board(chess.STARTING_FEN)
168-
self.board_state_usb = ""
169-
self.mystate = "init"
170-
self.reference = ""
171-
172-
# internal values for CERTABO board
173-
self.calibration_samples_counter = 0
174-
self.calibration_samples = []
175-
self.usb_data_history_depth = 3
176-
self.usb_data_history = list(range(self.usb_data_history_depth))
177-
self.usb_data_history_filled = False
178-
self.usb_data_history_i = 0
179-
self.move_detect_tries = 0
180-
self.move_detect_max_tries = 3
181-
182-
# try to load calibration data (mapping of RFID chip IDs to pieces)
183-
codes.load_calibration(None)
184-
185-
# spawn a serial thread and pass our data handler
186-
self.serialthread = serialreader(self.handle_usb_data, self.portname)
187-
self.serialthread.daemon = True
188-
self.serialthread.start()
189-
190-
def has_user_move(self):
191-
try:
192-
moves = codes.get_moves(self.chessboard, self.board_state_usb)
193-
return moves
194-
except:
195-
return []
196-
197-
def get_reference(self):
198-
return self.reference
199-
200-
def set_reference(self, reference):
201-
self.reference = reference
202-
203-
def get_color(self):
204-
return self.color
205-
206-
def set_color(self, color):
207-
self.color = color
208-
209-
def set_state(self, state):
210-
self.mystate = state
211-
212-
def get_state(self):
213-
return self.mystate
214-
215-
def new_game(self):
216-
self.chessboard = chess.Board()
217-
self.mystate = "init"
218-
219-
def set_board_from_fen(self, fen):
220-
self.chessboard = chess.Board(fen)
221-
222-
def send_leds(self, message:bytes=(0).to_bytes(8,byteorder='big',signed=False)):
223-
# logging.info(f'sending LED: {message}')
224-
self.serialthread.send_led(message)
225-
226-
def diff_leds(self):
227-
s1 = self.chessboard.board_fen()
228-
s2 = self.board_state_usb.split(" ")[0]
229-
if (s1 != s2):
230-
diffmap = codes.diff2squareset(s1, s2)
231-
# logging.debug(f'Difference on Squares:\n{diffmap}')
232-
self.send_leds(codes.squareset2ledbytes(diffmap))
233-
else:
234-
self.send_leds()
235-
236-
def handle_usb_data(self, data):
237-
usb_data = list(map(int, data.split(" ")))
238-
if self.calibration == True:
239-
self.calibrate_from_usb_data(usb_data)
240-
else:
241-
if self.usb_data_history_i >= self.usb_data_history_depth:
242-
self.usb_data_history_filled = True
243-
self.usb_data_history_i = 0
244-
245-
self.usb_data_history[self.usb_data_history_i] = list(usb_data)[:]
246-
self.usb_data_history_i += 1
247-
if self.usb_data_history_filled:
248-
self.usb_data_processed = codes.statistic_processing(self.usb_data_history, False)
249-
if self.usb_data_processed != []:
250-
test_state = codes.usb_data_to_FEN(self.usb_data_processed, self.rotate180)
251-
if test_state != "":
252-
self.board_state_usb = test_state
253-
# logging.info(f'info string FEN {test_state}')
254-
self.diff_leds()
255-
256-
def calibrate_from_usb_data(self, usb_data):
257-
self.calibration_samples.append(usb_data)
258-
logging.info(" adding new calibration sample")
259-
self.calibration_samples_counter += 1
260-
if self.calibration_samples_counter >= 15:
261-
logging.info( "------- we have collected enough samples for averaging ----")
262-
usb_data = codes.statistic_processing_for_calibration( self.calibration_samples, False)
263-
codes.calibration(usb_data, self.new_setup, None)
264-
self.calibration = False
265-
logging.info('calibration ok')
266-
self.send_leds()
267-
elif self.calibration_samples_counter %2:
268-
self.send_leds(b'\xff\xff\x00\x00\x00\x00\xff\xff')
269-
else:
270-
self.send_leds()
271-
27266
class Game(threading.Thread):
273-
def __init__(self, client, certabo, game_id, **kwargs):
67+
def __init__(self, client, mycertabo, game_id, **kwargs):
27468
super().__init__(**kwargs)
27569
self.game_id = game_id
276-
self.certabo = certabo
70+
self.certabo = mycertabo
27771
self.client = client
27872
self.stream = client.board.stream_game_state(game_id)
27973
self.current_state = next(self.stream)
@@ -313,7 +107,7 @@ def handle_chat_line(self, chat_line):
313107

314108

315109
def main():
316-
certabo = Certabo(port=portname, calibrate=calibrate)
110+
mycertabo = certabo.certabo.Certabo(port=portname, calibrate=calibrate)
317111

318112
try:
319113
with open('./lichess.token') as f:
@@ -347,9 +141,9 @@ def main():
347141
def setup_new_gameid(gameId):
348142
for game in client.games.get_ongoing():
349143
if game['gameId'] == gameId:
350-
certabo.new_game()
351-
certabo.set_reference(game['gameId'])
352-
logging.info(f'setup_new_gameid() found gameId: {certabo.get_reference()}')
144+
mycertabo.new_game()
145+
mycertabo.set_reference(game['gameId'])
146+
logging.info(f'setup_new_gameid() found gameId: {mycertabo.get_reference()}')
353147
tmp_chessboard = chess.Board()
354148
# unfortunately this is not a complete FEN. So we can only determine position and who's turn it is for an already ongoing game, but have no idea about castling
355149
# rights and en passant. But that's the best we can do for now, and on the next state update we'll get all moves and can replay them to get a complete board state
@@ -358,14 +152,14 @@ def setup_new_gameid(gameId):
358152
tmp_chessboard.turn = chess.BLACK
359153
else:
360154
tmp_chessboard.turn = chess.WHITE
361-
certabo.set_board_from_fen(tmp_chessboard.fen())
155+
mycertabo.set_board_from_fen(tmp_chessboard.fen())
362156
logging.info(f'final FEN: {tmp_chessboard.fen()}')
363157
if game['color'] == 'black':
364-
certabo.set_color(chess.BLACK)
158+
mycertabo.set_color(chess.BLACK)
365159
else:
366-
certabo.set_color(chess.WHITE)
160+
mycertabo.set_color(chess.WHITE)
367161
if game['isMyTurn']:
368-
certabo.set_state('myturn')
162+
mycertabo.set_state('myturn')
369163

370164
while True:
371165
try:
@@ -379,7 +173,7 @@ def setup_new_gameid(gameId):
379173
logging.info(f"game start received: {game_data['id']}")
380174

381175
try:
382-
game = Game(client, certabo, game_data['id'])
176+
game = Game(client, mycertabo, game_data['id'])
383177
game.daemon = True
384178
game.start()
385179
except berserk.exceptions.ResponseError as e:
@@ -389,11 +183,11 @@ def setup_new_gameid(gameId):
389183
continue
390184

391185
setup_new_gameid(game_data['id'])
392-
if certabo.get_state() == 'myturn':
393-
certabo.set_state('init')
394-
while certabo.has_user_move() == []:
186+
if mycertabo.get_state() == 'myturn':
187+
mycertabo.set_state('init')
188+
while mycertabo.has_user_move() == []:
395189
time.sleep(0.1)
396-
client.board.make_move(certabo.get_reference(), certabo.has_user_move()[0])
190+
client.board.make_move(mycertabo.get_reference(), mycertabo.has_user_move()[0])
397191

398192
except berserk.exceptions.ResponseError as e:
399193
print(f'ERROR: Invalid server response: {e}')

0 commit comments

Comments
 (0)