Skip to content

Commit 16e0190

Browse files
committed
more compat
1 parent 2323906 commit 16e0190

File tree

3 files changed

+36
-26
lines changed

3 files changed

+36
-26
lines changed

README.md

+7-10
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
# py-mysql-elasticsearch-sync
2-
Simple and fast MySQL to Elasticsearch sync tool, written in Python3.
2+
Simple and fast MySQL to Elasticsearch sync tool, written in Python.
33

44
## Introduction
55
This tool helps you to initialize MySQL dump table to Elasticsearch by parsing mysqldump, then incremental sync MySQL table to Elasticsearch by processing MySQL Binlog.
66
Also, during the binlog syncing, this tool will save the binlog sync position, so that it is easy to recover after this tool being shutdown for any reason.
77

88
## Installation
99
By following these steps.
10-
##### 1. Python3
11-
This tool is written in Python3.4, so you must install Python3.4 or above first, by following [this guide](https://docs.python.org/3.4/using/index.html)
12-
##### 2. ibxml2 and libxslt
10+
11+
##### 1. ibxml2 and libxslt
1312
Also, this tool depends on python lxml package, so that you should install the lxml's dependecies correctly, the libxml2 and libxslt are required.
1413

1514
For example, in CentOS:
@@ -25,15 +24,15 @@ sudo apt-get install libxml2-dev libxslt-dev python-dev
2524
```
2625

2726
See [lxml Installation](http://lxml.de/installation.html) for more infomation.
28-
##### 3. mysqldump
29-
And then, mysqldump is required.
27+
##### 2. mysqldump
28+
And then, mysqldump is required.(and enable binlog)
3029

3130

32-
##### 4. this tool
31+
##### 3. this tool
3332
Then install this tool
3433

3534
```
36-
pip3 install py-mysql-elasticsearch-sync
35+
pip install py-mysql-elasticsearch-sync
3736
```
3837

3938
## Configuration
@@ -61,8 +60,6 @@ to start sync, when xml sync is over, it will also start binlog sync.
6160
## Deployment
6261
We provide an upstart script to help you deploy this tool,since we use virtualenv for requirements isolation, you must edit it for your own condition, besides, you can deploy it in your own way.
6362

64-
6563
## TODO
6664
- [ ] MultiIndex Supporting
6765
- [ ] Multi table Supporting
68-
- [ ] Python version compat

src/__init__.py

+27-13
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
from __future__ import print_function, unicode_literals
22
from future.builtins import str, range
3-
try:
4-
from subprocess import DEVNULL # PY3
5-
except ImportError:
3+
import sys
4+
PY2 = sys.version_info[0] == 2
5+
6+
if PY2:
67
import os
78
DEVNULL = open(os.devnull, 'wb')
9+
else:
10+
from subprocess import DEVNULL
11+
def encode_in_py2(s):
12+
if PY2:
13+
return s.encode('utf-8')
14+
return s
15+
816
import os.path
9-
import sys
1017
import yaml
1118
import signal
1219
import requests
@@ -20,7 +27,7 @@
2027
from pymysqlreplication import BinLogStreamReader
2128
from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
2229

23-
__version__ = '0.2.1'
30+
__version__ = '0.3.0'
2431

2532

2633
# The magic spell for removing invalid characters in xml stream.
@@ -44,7 +51,9 @@ def __init__(self):
4451
self.dump_cmd = 'mysqldump -h {host} -P {port} -u {user} --password={password} {db} {table} ' \
4552
'--default-character-set=utf8 -X'.format(**self.config['mysql'])
4653

47-
self.binlog_conf = {key: self.config['mysql'][key] for key in ['host', 'port', 'user', 'password', 'db']}
54+
self.binlog_conf = dict(
55+
[(key, self.config['mysql'][key]) for key in ['host', 'port', 'user', 'password', 'db']]
56+
)
4857

4958
self.endpoint = 'http://{host}:{port}/{index}/{type}/_bulk'.format(
5059
host=self.config['elastic']['host'],
@@ -192,8 +201,9 @@ def _formatter(self, data):
192201
try:
193202
item['doc'][field] = serializer(item['doc'][field])
194203
except ValueError as e:
195-
self.logger.error("Error occurred during format, ErrorMessage:{}, ErrorItem:{}".format(str(e),
196-
str(item)))
204+
self.logger.error("Error occurred during format, ErrorMessage:{msg}, ErrorItem:{item}".format(
205+
msg=str(e),
206+
item=str(item)))
197207
item['doc'][field] = None
198208
# print(item)
199209
yield item
@@ -204,7 +214,8 @@ def _binlog_loader(self):
204214
"""
205215
if self.log_file and self.log_pos:
206216
resume_stream = True
207-
logging.info("Resume from binlog_file: {} binlog_pos: {}".format(self.log_file, self.log_pos))
217+
logging.info("Resume from binlog_file: {file} binlog_pos: {pos}".format(file=self.log_file,
218+
pos=self.log_pos))
208219
else:
209220
resume_stream = False
210221

@@ -309,18 +320,21 @@ def _xml_parser(self, f_obj):
309320
def _save_binlog_record(self):
310321
if self.log_file and self.log_pos:
311322
with open(self.config['binlog_sync']['record_file'], 'w') as f:
312-
logging.info("Sync binlog_file: {} binlog_pos: {}".format(self.log_file, self.log_pos))
313-
yaml.dump({"log_file": self.log_file, "log_pos": self.log_pos}, f)
323+
logging.info("Sync binlog_file: {file} binlog_pos: {pos}".format(
324+
file=self.log_file,
325+
pos=self.log_pos)
326+
)
327+
yaml.safe_dump({"log_file": self.log_file, "log_pos": self.log_pos}, f, default_flow_style=False)
314328

315329
def _xml_dump_loader(self):
316330
mysqldump = subprocess.Popen(
317-
shlex.split(self.dump_cmd),
331+
shlex.split(encode_in_py2(self.dump_cmd)),
318332
stdout=subprocess.PIPE,
319333
stderr=DEVNULL,
320334
close_fds=True)
321335

322336
remove_invalid_pipe = subprocess.Popen(
323-
shlex.split(REMOVE_INVALID_PIPE),
337+
shlex.split(encode_in_py2(REMOVE_INVALID_PIPE)),
324338
stdin=mysqldump.stdout,
325339
stdout=subprocess.PIPE,
326340
stderr=DEVNULL,

upstart.conf

+2-3
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ stop on runlevel [06]
55
respawn
66
normal exit 0
77

8-
chdir <PROJECT_ROOT>
8+
chdir <PATH_TO_CONFIG>
99

1010
script
11-
. venv/bin/activate
12-
exec python main.py config.yaml
11+
es-sync config.yaml
1312
end script

0 commit comments

Comments
 (0)