|
7 | 7 | import time
|
8 | 8 |
|
9 | 9 | from shutil import rmtree
|
10 |
| -from six import raise_from, iteritems |
| 10 | +from six import raise_from, iteritems, text_type |
11 | 11 | from tempfile import mkstemp, mkdtemp
|
12 | 12 |
|
13 | 13 | from .enums import \
|
|
38 | 38 | PG_PID_FILE
|
39 | 39 |
|
40 | 40 | from .consts import \
|
41 |
| - MAX_WAL_SENDERS, \ |
| 41 | + MAX_LOGICAL_REPLICATION_WORKERS, \ |
42 | 42 | MAX_REPLICATION_SLOTS, \
|
| 43 | + MAX_WORKER_PROCESSES, \ |
| 44 | + MAX_WAL_SENDERS, \ |
43 | 45 | WAL_KEEP_SEGMENTS
|
44 | 46 |
|
45 | 47 | from .decorators import \
|
@@ -329,25 +331,27 @@ def _create_recovery_conf(self, username, slot=None):
|
329 | 331 | # Connect to master for some additional actions
|
330 | 332 | with master.connect(username=username) as con:
|
331 | 333 | # check if slot already exists
|
332 |
| - res = con.execute(""" |
| 334 | + res = con.execute( |
| 335 | + """ |
333 | 336 | select exists (
|
334 | 337 | select from pg_catalog.pg_replication_slots
|
335 | 338 | where slot_name = %s
|
336 | 339 | )
|
337 |
| - """, slot) |
| 340 | + """, slot) |
338 | 341 |
|
339 | 342 | if res[0][0]:
|
340 | 343 | raise TestgresException(
|
341 | 344 | "Slot '{}' already exists".format(slot))
|
342 | 345 |
|
343 | 346 | # TODO: we should drop this slot after replica's cleanup()
|
344 |
| - con.execute(""" |
| 347 | + con.execute( |
| 348 | + """ |
345 | 349 | select pg_catalog.pg_create_physical_replication_slot(%s)
|
346 |
| - """, slot) |
| 350 | + """, slot) |
347 | 351 |
|
348 | 352 | line += "primary_slot_name={}\n".format(slot)
|
349 | 353 |
|
350 |
| - self.append_conf(RECOVERY_CONF_FILE, line) |
| 354 | + self.append_conf(filename=RECOVERY_CONF_FILE, line=line) |
351 | 355 |
|
352 | 356 | def _maybe_start_logger(self):
|
353 | 357 | if testgres_config.use_python_logging:
|
@@ -475,65 +479,79 @@ def get_auth_method(t):
|
475 | 479 |
|
476 | 480 | # overwrite config file
|
477 | 481 | with io.open(postgres_conf, "w") as conf:
|
478 |
| - # remove old lines |
479 | 482 | conf.truncate()
|
480 | 483 |
|
481 |
| - if not fsync: |
482 |
| - conf.write(u"fsync = off\n") |
| 484 | + self.append_conf(fsync=fsync, |
| 485 | + max_worker_processes=MAX_WORKER_PROCESSES, |
| 486 | + log_statement=log_statement, |
| 487 | + listen_addresses=self.host, |
| 488 | + port=self.port) # yapf:disable |
483 | 489 |
|
484 |
| - conf.write(u"log_statement = {}\n" |
485 |
| - u"listen_addresses = '{}'\n" |
486 |
| - u"port = {}\n".format(log_statement, |
487 |
| - self.host, |
488 |
| - self.port)) # yapf: disable |
| 490 | + # common replication settings |
| 491 | + if allow_streaming or allow_logical: |
| 492 | + self.append_conf(max_replication_slots=MAX_REPLICATION_SLOTS, |
| 493 | + max_wal_senders=MAX_WAL_SENDERS) # yapf: disable |
489 | 494 |
|
490 |
| - # replication-related settings |
491 |
| - if allow_streaming: |
| 495 | + # binary replication |
| 496 | + if allow_streaming: |
| 497 | + # select a proper wal_level for PostgreSQL |
| 498 | + wal_level = 'replica' if self._pg_version >= '9.6' else 'hot_standby' |
492 | 499 |
|
493 |
| - # select a proper wal_level for PostgreSQL |
494 |
| - if self._pg_version >= '9.6': |
495 |
| - wal_level = "replica" |
496 |
| - else: |
497 |
| - wal_level = "hot_standby" |
498 |
| - |
499 |
| - conf.write(u"hot_standby = on\n" |
500 |
| - u"max_wal_senders = {}\n" |
501 |
| - u"max_replication_slots = {}\n" |
502 |
| - u"wal_keep_segments = {}\n" |
503 |
| - u"wal_level = {}\n".format(MAX_WAL_SENDERS, |
504 |
| - MAX_REPLICATION_SLOTS, |
505 |
| - WAL_KEEP_SEGMENTS, |
506 |
| - wal_level)) # yapf: disable |
507 |
| - |
508 |
| - if allow_logical: |
509 |
| - if self._pg_version < '10': |
510 |
| - raise InitNodeException( |
511 |
| - "Logical replication is only available for Postgres 10 " |
512 |
| - "and newer") |
513 |
| - conf.write(u"wal_level = logical\n") |
514 |
| - |
515 |
| - # disable UNIX sockets if asked to |
516 |
| - if not unix_sockets: |
517 |
| - conf.write(u"unix_socket_directories = ''\n") |
| 500 | + self.append_conf(hot_standby=True, |
| 501 | + wal_keep_segments=WAL_KEEP_SEGMENTS, |
| 502 | + wal_level=wal_level) # yapf: disable |
| 503 | + |
| 504 | + # logical replication |
| 505 | + if allow_logical: |
| 506 | + if self._pg_version < '10': |
| 507 | + raise InitNodeException("Logical replication is only " |
| 508 | + "available on PostgreSQL 10 and newer") |
| 509 | + |
| 510 | + self.append_conf( |
| 511 | + max_logical_replication_workers=MAX_LOGICAL_REPLICATION_WORKERS, |
| 512 | + wal_level='logical') |
| 513 | + |
| 514 | + # disable UNIX sockets if asked to |
| 515 | + if not unix_sockets: |
| 516 | + self.append_conf(unix_socket_directories='') |
518 | 517 |
|
519 | 518 | return self
|
520 | 519 |
|
521 | 520 | @method_decorator(positional_args_hack(['filename', 'line']))
|
522 |
| - def append_conf(self, line, filename=PG_CONF_FILE): |
| 521 | + def append_conf(self, line='', filename=PG_CONF_FILE, **kwargs): |
523 | 522 | """
|
524 | 523 | Append line to a config file.
|
525 | 524 |
|
526 | 525 | Args:
|
527 | 526 | line: string to be appended to config.
|
528 | 527 | filename: config file (postgresql.conf by default).
|
| 528 | + **kwargs: named config options. |
529 | 529 |
|
530 | 530 | Returns:
|
531 | 531 | This instance of :class:`.PostgresNode`.
|
| 532 | +
|
| 533 | + Examples: |
| 534 | + append_conf(fsync=False) |
| 535 | + append_conf('log_connections = yes') |
| 536 | + append_conf('postgresql.conf', 'synchronous_commit = off') |
532 | 537 | """
|
533 | 538 |
|
| 539 | + lines = [line] |
| 540 | + |
| 541 | + for option, value in iteritems(kwargs): |
| 542 | + if isinstance(value, bool): |
| 543 | + value = 'on' if value else 'off' |
| 544 | + elif not str(value).replace('.', '', 1).isdigit(): |
| 545 | + value = "'{}'".format(value) |
| 546 | + |
| 547 | + # format a new config line |
| 548 | + lines.append('{} = {}'.format(option, value)) |
| 549 | + |
534 | 550 | config_name = os.path.join(self.data_dir, filename)
|
535 | 551 | with io.open(config_name, 'a') as conf:
|
536 |
| - conf.write(u''.join([line, '\n'])) |
| 552 | + for line in lines: |
| 553 | + conf.write(text_type(line)) |
| 554 | + conf.write(text_type('\n')) |
537 | 555 |
|
538 | 556 | return self
|
539 | 557 |
|
|
0 commit comments