11#!/usr/bin/env python
22
3- import os , sys , getopt , time , cgi , urlparse , signal , daemon
4- import threading , mimetypes , json , gevent
53
6- from geventwebsocket import WebSocketServer , WebSocketApplication , Resource
74from collections import OrderedDict
85from datetime import datetime
9- from time import sleep
10- from string import hexdigits
11- from hashlib import sha256
12- from struct import pack , unpack , unpack_from
6+ from time import time , sleep
7+ import threading , mimetypes , json
8+ import os , sys , getopt , cgi , signal , daemon
9+
10+ from geventwebsocket import WebSocketServer , WebSocketApplication , Resource
1311
14- from sqlchain .version import *
15- from sqlchain .dbpool import *
16- from sqlchain .util import *
12+ from sqlchain .version import version , P2SH_FLAG , BECH32_FLAG
13+ from sqlchain .rpc import do_RPC
14+ from sqlchain .bci import isTxAddrs
15+ from sqlchain .dbpool import DBPool
16+ from sqlchain .util import dotdict , loadcfg , savecfg , drop2user , getssl , log , logts
1717
18- __builtins__ .sqc = dotdict () # container for super globals
18+ __builtins__ .sqc = dotdict () # container for super globals
1919
2020sqc .cfg = { 'log' :sys .argv [0 ]+ '.log' , 'listen' :'localhost:8085' , 'www' :'www' , 'block' :0 ,
21- 'pool' :4 , 'dbinfo-ts' :datetime .now ().strftime ('%s' ),
21+ 'pool' :4 , 'dbinfo-ts' :datetime .now ().strftime ('%s' ),
2222 'dbinfo' :- 1 , 'path' :'/var/data/sqlchain' }
2323
24- sqc .sync = threading .Condition ();
25- sqc .sync_id = 0
26- dbwrk = None
27- syncd = None
24+ sqc .server = {}
25+ sqc .clients = {} # active websockets we publish to
26+ sqc .syncTxs ,sqc .lastBlk = [],{} # current sync data shared for every sync/subscription
27+ sqc .sync = threading .Condition ()
28+ sqc .sync_id = 0
2829
2930def do_Root (env , send_resp ):
3031 try :
@@ -33,63 +34,59 @@ def do_Root(env, send_resp):
3334 if path == '/' : # the /rpc api is mirrored here as form params
3435 form = cgi .FieldStorage (fp = env ['wsgi.input' ], environ = env , keep_blank_values = True )
3536 env ['PATH_INFO' ] = "/rpc/%s/%s" % (form ['method' ].value , "/" .join (form .getlist ('params' )))
36- return lib . rpc . do_RPC (env , send_resp )
37+ return do_RPC (env , send_resp )
3738 elif sqc .cfg ['www' ]: # GET static website files, if path configured
3839 path = '/main.html' if path in ['' , '/' ] else path
3940 if os .path .isfile (sqc .cfg ['www' ]+ path ):
4041 _ ,ext = os .path .splitext (path )
4142 filesize = str (os .path .getsize (sqc .cfg ['www' ]+ path ))
4243 with open (sqc .cfg ['www' ]+ path ) as fd :
43- send_resp ('200 OK' , [('Content-Type' , mimetypes .types_map [ext ]), ('Content-Length' , filesize ),
44- ('Expires' , datetime .utcfromtimestamp (time . time ()+ 3600 ).strftime ("%a, %d %b %Y %H:%M:%S %ZGMT" ))])
44+ send_resp ('200 OK' , [('Content-Type' , mimetypes .types_map [ext ]), ('Content-Length' , filesize ),
45+ ('Expires' , datetime .utcfromtimestamp (time ()+ 3600 ).strftime ("%a, %d %b %Y %H:%M:%S %ZGMT" ))])
4546 return [ fd .read () ]
4647 send_resp ('404 - File Not Found: %s' % path , [("Content-Type" , "text/html" )], sys .exc_info ())
47- if not sqc .cfg ['www' ]:
48+ if not sqc .cfg ['www' ]:
4849 return []
4950 with open (sqc .cfg ['www' ]+ '/404.html' ) as fd :
5051 return [ fd .read () ]
5152 except IOError :
5253 pass
5354
54- clients = {} # active websockets we publish to
55- sqc .syncTxs ,lastBlk = [],{} # current sync data shared for every sync/subscription
56-
5755class BCIWebSocket (WebSocketApplication ):
58- global clients
5956 remote = None
60- def on_open (self ):
57+ def on_open (self , * args , ** kwargs ):
6158 self .remote = self .ws .environ ['REMOTE_ADDR' ]
6259 logts ("WS Client connected from %s" % self .remote )
63- clients [self .ws .handler .active_client ] = { 'subs' :[], 'addrs' :set () }
64-
65- def on_message (self , msg ):
60+ sqc . clients [self .ws .handler .active_client ] = { 'subs' :[], 'addrs' :set () }
61+
62+ def on_message (self , msg , * args , ** kwargs ): # pylint:disable=arguments-differ
6663 if msg :
6764 msg = json .loads (msg )
6865 if msg ['op' ] in [ 'blocks_sub' , 'unconfirmed_sub' ]:
69- clients [self .ws .handler .active_client ]['subs' ].append (msg ['op' ])
66+ sqc . clients [self .ws .handler .active_client ]['subs' ].append (msg ['op' ])
7067 if msg ['op' ] == 'addr_sub' and 'addr' in msg :
71- clients [self .ws .handler .active_client ]['addrs' ].add (msg ['addr' ])
68+ sqc . clients [self .ws .handler .active_client ]['addrs' ].add (msg ['addr' ])
7269 if msg ['op' ] == 'ping_block' :
73- self .ws .send ({ 'op' : 'block' , 'x' : lastBlk })
70+ self .ws .send ({ 'op' : 'block' , 'x' : sqc . lastBlk })
7471 if msg ['op' ] == 'ping_tx' :
75- if 'lasttx' in clients [self .ws .handler .active_client ]:
76- self .ws .send (json .dumps ({ 'op' : 'utx' , 'x' : clients [self .ws .handler .active_client ]['lasttx' ] }))
72+ if 'lasttx' in sqc . clients [self .ws .handler .active_client ]:
73+ self .ws .send (json .dumps ({ 'op' : 'utx' , 'x' : sqc . clients [self .ws .handler .active_client ]['lasttx' ] }))
7774
78- def on_close (self , reason ):
79- logts ("WS Client disconnected %s %s" % (self .remote , reason ))
80- del clients [self .ws .handler .active_client ]
75+ def on_close (self , * args , ** kwargs ):
76+ logts ("WS Client disconnected %s %s" % (self .remote , '' . join ( args ) ))
77+ del sqc . clients [self .ws .handler .active_client ]
8178
8279# monitor mempool, block, orphan changes - publish to websocket subscriptions, notify waiting sync connections
8380def syncMonitor ():
84- global done , dbwrk
8581 with sqc .dbpool .get ().cursor () as cur :
8682 cur .execute ("select greatest(ifnull(m,0),ifnull(o,0)) from (select max(sync_id) as m from mempool) m,(select max(sync_id) as o from orphans) o;" )
8783 sqc .sync_id = cur .fetchone ()[0 ]
8884 cur .execute ("select ifnull(max(id),0) from blocks;" )
8985 sqc .cfg ['block' ] = cur .fetchone ()[0 ]
9086 if sqc .cfg ['dbinfo' ] == 0 :
91- dbwrk = threading .Thread (target = mkDBInfo ).start ()
92- while not done .isSet ():
87+ sqc .dbwrk = threading .Thread (target = mkDBInfo )
88+ sqc .dbwrk .start ()
89+ while not sqc .done .isSet ():
9390 with sqc .dbpool .get ().cursor () as cur :
9491 txs = []
9592 cur .execute ("select hash from mempool m, trxs t where m.sync_id > %s and t.id=m.id;" , (sqc .sync_id ,))
@@ -101,42 +98,40 @@ def syncMonitor():
10198 new_orphans = cur .fetchone ()[0 ] > 0
10299 cur .execute ("select max(id) from blocks;" )
103100 block = cur .fetchone ()[0 ]
104- cur .execute ("replace into info (class,`key`,value) values('info','ws-clients',%s),('info','connections',%s);" , (len (clients ), len (server .pool ) if server .pool else 0 ))
101+ cur .execute ("replace into info (class,`key`,value) values('info','ws-clients',%s),('info','connections',%s);" , (len (sqc . clients ), len (sqc . server .pool ) if sqc . server .pool else 0 ))
105102 if block != sqc .cfg ['block' ] or new_orphans or len (txs ) > 0 :
106103 do_Sync (block )
107104 if sqc .cfg ['dbinfo' ] > 0 and (datetime .now () - datetime .fromtimestamp (int (sqc .cfg ['dbinfo-ts' ]))).total_seconds () > sqc .cfg ['dbinfo' ]* 60 :
108- dbwrk = threading .Thread (target = mkDBInfo )
109- dbwrk .start ()
110- sleep (sqc .cfg ['sync' ] if 'sync' in sqc .cfg else 5 )
111- if dbwrk :
112- dbwrk .join ()
113-
105+ sqc . dbwrk = threading .Thread (target = mkDBInfo )
106+ sqc . dbwrk .start ()
107+ sleep (sqc .cfg ['sync' ] if 'sync' in sqc .cfg else 5 )
108+ if sqc . dbwrk :
109+ sqc . dbwrk .join ()
110+
114111def do_Sync (block ):
115- global lastBlk
116112 if block != sqc .cfg ['block' ]:
117113 sqc .cfg ['block' ] = min (block , sqc .cfg ['block' ]+ 1 )
118114 with sqc .dbpool .get ().cursor () as cur :
119- lastBlk = sqlchain .bci .bciBlockWS (cur , block )
120- for client in server .clients .values ():
121- if 'blocks_sub' in clients [client ]['subs' ]:
122- client .ws .send (json .dumps ({ 'op' : 'block' , 'x' : lastBlk }))
115+ sqc . lastBlk = sqlchain .bci .bciBlockWS (cur , block )
116+ for client in sqc . server .clients .values ():
117+ if 'blocks_sub' in sqc . clients [client ]['subs' ]:
118+ client .ws .send (json .dumps ({ 'op' : 'block' , 'x' : sqc . lastBlk }))
123119 sqc .sync_id += 1
124120 with sqc .sync :
125121 sqc .sync .notifyAll ()
126122 if len (sqc .syncTxs ) > 0 :
127- for client in server .clients .values ():
123+ for client in sqc . server .clients .values ():
128124 for tx in sqc .syncTxs :
129- if 'unconfirmed_sub' in clients [client ]['subs' ] or (clients [client ]['addrs' ] and lib . bci . isTxAddrs (tx , clients [client ]['addrs' ])):
125+ if 'unconfirmed_sub' in sqc . clients [client ]['subs' ] or (sqc . clients [client ]['addrs' ] and isTxAddrs (tx , sqc . clients [client ]['addrs' ])):
130126 client .ws .send (json .dumps ({ 'op' : 'utx' , 'x' : tx }))
131- clients [client ]['lasttx' ] = tx
127+ sqc . clients [client ]['lasttx' ] = tx
132128
133129def mkDBInfo ():
134- global dbwrk
135130 with sqc .dbpool .get ().cursor () as cur :
136131 logts ("Updating server db info" )
137132 sqc .cfg ['dbinfo-ts' ] = datetime .now ().strftime ('%s' )
138133 savecfg (sqc .cfg )
139- sqlchain .insight .apiStatus (cur , 'db' )
134+ sqlchain .insight .apiStatus (cur , 'db' )
140135 cur .execute ("select count(*) from address where (id & %s = %s);" , (P2SH_FLAG ,P2SH_FLAG ))
141136 cur .execute ("replace into info (class,`key`,value) values('db','address:p2sh',%s);" , (cur .fetchone ()[0 ], ))
142137 cur .execute ("select count(*) from address where (id & %s = %s);" , (BECH32_FLAG ,BECH32_FLAG ))
@@ -153,11 +148,11 @@ def mkDBInfo():
153148 cur .execute ("replace into info (class,`key`,value) values('db','outputs:unspent',%s);" , (cur .fetchone ()[0 ], ))
154149 cur .execute ("replace into info (class,`key`,value) values('db','all:updated',now());" )
155150 logts ("DB info update complete" )
156- dbwrk = None
151+ sqc . dbwrk = None
157152
158- def options ():
159- try :
160- opts ,args = getopt .getopt (sys .argv [1 :], "hvb:p:c:d:l:w:h:p:r:u:i:" ,
153+ def options (cfg ): # pylint:disable=too-many-branches
154+ try :
155+ opts ,_ = getopt .getopt (sys .argv [1 :], "hvb:p:c:d:l:w:h:p:r:u:i:" ,
161156 ["help" , "version" , "debug" , "db=" , "log=" , "www=" , "listen=" , "path=" , "rpc=" , "user=" , "dbinfo=" , "defaults" ])
162157 except getopt .GetoptError :
163158 usage ()
@@ -167,27 +162,27 @@ def options():
167162 elif opt in ("-v" , "--version" ):
168163 sys .exit (sys .argv [0 ]+ ': ' + version )
169164 elif opt in ("-d" , "--db" ):
170- sqc . cfg ['db' ] = arg
165+ cfg ['db' ] = arg
171166 elif opt in ("-l" , "--log" ):
172- sqc . cfg ['log' ] = arg
167+ cfg ['log' ] = arg
173168 elif opt in ("-w" , "--www" ):
174- sqc . cfg ['www' ] = arg
169+ cfg ['www' ] = arg
175170 elif opt in ("-p" , "--path" ):
176- sqc . cfg ['path' ] = arg
171+ cfg ['path' ] = arg
177172 elif opt in ("-h" , "--listen" ):
178- sqc . cfg ['listen' ] = arg
173+ cfg ['listen' ] = arg
179174 elif opt in ("-r" , "--rpc" ):
180- sqc . cfg ['rpc' ] = arg
175+ cfg ['rpc' ] = arg
181176 elif opt in ("-u" , "--user" ):
182- sqc . cfg ['user' ] = arg
177+ cfg ['user' ] = arg
183178 elif opt in ("-i" ,"--dbinfo" ):
184- sqc . cfg ['dbinfo' ] = int (arg )
185- elif opt in ( "--defaults" ) :
186- savecfg (sqc . cfg )
179+ cfg ['dbinfo' ] = int (arg )
180+ elif opt in "--defaults" :
181+ savecfg (cfg )
187182 sys .exit ("%s updated" % (sys .argv [0 ]+ '.cfg' ))
188- elif opt in ( "--debug" ) :
189- sqc . cfg ['debug' ] = True
190-
183+ elif opt in "--debug" :
184+ cfg ['debug' ] = True
185+
191186def usage ():
192187 print """Usage: {0} [options...][cfg file]\n Command options are:\n -h,--help\t Show this help info\n -v,--version\t Show version info
193188--debug\t \t Run in foreground with logging to console
@@ -196,32 +191,29 @@ def usage():
196191-p,--path\t Set path for blob and header data files (/var/data/sqlchain)
197192-h,--listen\t Set host:port for web server\n -w,--www\t Web server root directory\n -u,--user\t Set user to run as
198193-d,--db \t Set mysql db connection, "host:user:pwd:dbname"\n -l,--log\t Set log file path
199- -r,--rpc\t Set rpc connection, "http://user:pwd@host:port"
194+ -r,--rpc\t Set rpc connection, "http://user:pwd@host:port"
200195-i,--dbinfo\t Set db info update period in minutes (default=180, 0=at start, -1=never) """ .format (sys .argv [0 ])
201- sys .exit (2 )
202-
196+ sys .exit (2 )
197+
203198def sigterm_handler (_signo , _stack_frame ):
204- global syncd , done
205199 logts ("Shutting down." )
206- done .set ()
207- if syncd :
208- syncd .join ()
200+ sqc . done .set ()
201+ if sqc . syncd :
202+ sqc . syncd .join ()
209203 if not sqc .cfg ['debug' ]:
210204 os .unlink (sqc .cfg ['pid' ] if 'pid' in sqc .cfg else sys .argv [0 ]+ '.pid' )
211205 sys .exit (0 )
212-
206+
213207def sighup_handler (_signo , _stack_frame ):
214- logpath = sqc .cfg ['log' ] if 'log' in sqc .cfg else sys .argv [0 ]+ '.log'
208+ path = sqc .cfg ['log' ] if 'log' in sqc .cfg else sys .argv [0 ]+ '.log'
215209 sys .stdout .close ()
216- sys .stdout = open (logpath ,'a' )
210+ sys .stdout = open (path ,'a' )
217211 sys .stderr .close ()
218- sys .stderr = open (logpath ,'a' )
212+ sys .stderr = open (path ,'a' )
219213 logts ("SIGHUP Log reopened" )
220-
214+
221215def run ():
222- global server , syncd , done
223-
224- done = threading .Event ()
216+ sqc .done = threading .Event ()
225217 sqc .dbpool = DBPool (sqc .cfg ['db' ].split (':' ), sqc .cfg ['pool' ], 'MySQLdb' )
226218
227219 mimetypes .init ()
@@ -231,29 +223,29 @@ def run():
231223
232224 logts ("Starting on %s" % sqc .cfg ['listen' ])
233225 host ,port = sqc .cfg ['listen' ].split (':' )
234- server = WebSocketServer ((host , int (port )), APIs , spawn = 10000 , ** getssl (sqc .cfg ))
235- server .start ()
226+ sqc . server = WebSocketServer ((host , int (port )), APIs , spawn = 10000 , ** getssl (sqc .cfg ))
227+ sqc . server .start ()
236228
237229 if 'sync' not in sqc .cfg or sqc .cfg ['sync' ] > 0 :
238230 log ("Sync monitoring at %d second intervals" % (sqc .cfg ['sync' ] if 'sync' in sqc .cfg else 5 ,))
239- syncd = threading .Thread (target = syncMonitor )
240- syncd .daemon = True
241- syncd .start ()
231+ sqc . syncd = threading .Thread (target = syncMonitor )
232+ sqc . syncd .daemon = True
233+ sqc . syncd .start ()
242234 else :
243235 log ("Sync monitor disabled" )
244-
236+
245237 drop2user (sqc .cfg , chown = True )
246238
247- server .serve_forever ()
248-
239+ sqc . server .serve_forever ()
240+
249241if __name__ == '__main__' :
250-
242+
251243 loadcfg (sqc .cfg )
252- options ()
244+ options (sqc . cfg )
253245
254246 import sqlchain .insight , sqlchain .bci , sqlchain .rpc
255-
256- APIs = Resource (OrderedDict ((
247+
248+ APIs = Resource (OrderedDict ((
257249 ('/api' , sqlchain .insight .do_API ),
258250 ('/bci' , sqlchain .bci .do_BCI ),
259251 ('/rpc' , sqlchain .rpc .do_RPC ),
@@ -266,17 +258,8 @@ if __name__ == '__main__':
266258 else :
267259 logpath = sqc .cfg ['log' ] if 'log' in sqc .cfg else sys .argv [0 ]+ '.log'
268260 pidpath = sqc .cfg ['pid' ] if 'pid' in sqc .cfg else sys .argv [0 ]+ '.pid'
269- with daemon .DaemonContext (working_directory = '.' , umask = 0002 , stdout = open (logpath ,'a' ), stderr = open (logpath ,'a' ),
261+ with daemon .DaemonContext (working_directory = '.' , umask = 0002 , stdout = open (logpath ,'a' ), stderr = open (logpath ,'a' ),
270262 signal_map = {signal .SIGTERM :sigterm_handler , signal .SIGHUP :sighup_handler } ):
271- with file (pidpath ,'w' ) as f :
263+ with file (pidpath ,'w' ) as f :
272264 f .write (str (os .getpid ()))
273265 run ()
274-
275-
276-
277-
278-
279-
280-
281-
282-
0 commit comments