diff --git a/.gitignore b/.gitignore index ad06216..587ab37 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ pg_query_state--*.sql cscope.out tags Dockerfile +tmp_stress + diff --git a/.travis.yml b/.travis.yml index 05f37c3..5d26b7a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,11 +18,11 @@ notifications: on_failure: always env: - - PG_VERSION=12 LEVEL=hardcore + - PG_VERSION=12 LEVEL=hardcore USE_TPCDS=1 - PG_VERSION=12 - - PG_VERSION=11 LEVEL=hardcore + - PG_VERSION=11 LEVEL=hardcore USE_TPCDS=1 - PG_VERSION=11 - - PG_VERSION=10 LEVEL=hardcore + - PG_VERSION=10 LEVEL=hardcore USE_TPCDS=1 - PG_VERSION=10 - PG_VERSION=9.6 LEVEL=hardcore - PG_VERSION=9.6 diff --git a/Dockerfile.tmpl b/Dockerfile.tmpl index 1725de3..a9fdac2 100644 --- a/Dockerfile.tmpl +++ b/Dockerfile.tmpl @@ -2,12 +2,12 @@ FROM postgres:${PG_VERSION}-alpine # Install dependencies RUN apk add --no-cache \ - openssl curl \ + openssl curl git \ perl perl-ipc-run \ make musl-dev gcc bison flex coreutils \ zlib-dev libedit-dev \ clang clang-analyzer linux-headers \ - python2 python2-dev py2-virtualenv; + python3 python3-dev py3-virtualenv; # Install fresh valgrind @@ -35,4 +35,4 @@ ADD . /pg/testdir WORKDIR /pg/testdir USER postgres -ENTRYPOINT LEVEL=${LEVEL} /run.sh +ENTRYPOINT LEVEL=${LEVEL} USE_TPCDS=${USE_TPCDS} /run.sh diff --git a/Makefile b/Makefile index c142ed5..aecfb45 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ DATA_built = $(EXTENSION)--$(EXTVERSION).sql PGFILEDESC = "pg_query_state - facility to track progress of plan execution" EXTRA_CLEAN = ./isolation_output $(EXTENSION)--$(EXTVERSION).sql \ - Dockerfile ./tests/*.pyc + Dockerfile ./tests/*.pyc ./tmp_stress ifdef USE_PGXS PG_CONFIG ?= pg_config diff --git a/README.md b/README.md index 34ecdb0..795f571 100644 --- a/README.md +++ b/README.md @@ -26,26 +26,41 @@ Add module name to the `shared_preload_libraries` parameter in `postgresql.conf` shared_preload_libraries = 'pg_query_state' ``` It is essential to restart the PostgreSQL instance. After that, execute the following query in psql: -``` +```sql CREATE EXTENSION pg_query_state; ``` Done! ## Tests -Tests using parallel sessions using python 2.7 script: - ``` - python tests/pg_qs_test_runner.py [OPTION]... - ``` +Test using parallel sessions with Python 3+ compatible script: +```shell +python tests/pg_qs_test_runner.py [OPTION]... +``` *prerequisite packages*: * `psycopg2` version 2.6 or later * `PyYAML` version 3.11 or later - +* `progressbar2` for stress test progress reporting + *options*: * *- -host* --- postgres server host, default value is *localhost* * *- -port* --- postgres server port, default value is *5432* * *- -database* --- database name, default value is *postgres* * *- -user* --- user name, default value is *postgres* * *- -password* --- user's password, default value is empty +* *- -tpc-ds* --- runs only stress tests on TPC-DS benchmark + +Or run all tests in `Docker` using: + +```shell +export LEVEL=hardcore +export USE_TPCDS=1 +export PG_VERSION=12 + +docker-compose build +docker-compose run tests +``` + +There are different test levels: `hardcore`, `nightmare` (runs tests under `valgrind`) and `stress` (runs tests under `TPC-DS` load). ## Function pg\_query\_state ```plpgsql @@ -92,11 +107,11 @@ This parameters is set on called side before running any queries whose states ar ## Examples Set maximum number of parallel workers on `gather` node equals `2`: -``` +```sql postgres=# set max_parallel_workers_per_gather = 2; ``` Assume one backend with pid = 49265 performs a simple query: -``` +```sql postgres=# select pg_backend_pid(); pg_backend_pid ---------------- @@ -105,7 +120,7 @@ postgres=# select pg_backend_pid(); postgres=# select count(*) from foo join bar on foo.c1=bar.c1; ``` Other backend can extract intermediate state of execution that query: -``` +```sql postgres=# \x postgres=# select * from pg_query_state(49265); -[ RECORD 1 ]+------------------------------------------------------------------------------------------------------------------------- @@ -150,11 +165,11 @@ In example above working backend spawns two parallel workers with pids `49324` a `Seq Scan` node has statistics on passed loops (average number of rows delivered to `Nested Loop` and number of passed loops are shown) and statistics on current loop. Other nodes has statistics only for current loop as this loop is first (`loop number` = 1). Assume first backend executes some function: -``` +```sql postgres=# select n_join_foo_bar(); ``` Other backend can get the follow output: -``` +```sql postgres=# select * from pg_query_state(49265); -[ RECORD 1 ]+------------------------------------------------------------------------------------------------------------------ pid | 49265 @@ -180,7 +195,7 @@ leader_pid | (null) First row corresponds to function call, second - to query which is in the body of that function. We can get result plans in different format (e.g. `json`): -``` +```sql postgres=# select * from pg_query_state(pid := 49265, format := 'json'); -[ RECORD 1 ]+------------------------------------------------------------ pid | 49265 diff --git a/mk_dockerfile.sh b/mk_dockerfile.sh index f15433c..86f72a4 100755 --- a/mk_dockerfile.sh +++ b/mk_dockerfile.sh @@ -1,3 +1,5 @@ +#!/usr/bin/env sh + if [ -z ${PG_VERSION+x} ]; then echo PG_VERSION is not set! exit 1 @@ -7,10 +9,16 @@ if [ -z ${LEVEL+x} ]; then LEVEL=scan-build fi +if [ -z ${USE_TPCDS+x} ]; then + USE_TPCDS=0 +fi + echo PG_VERSION=${PG_VERSION} echo LEVEL=${LEVEL} +echo USE_TPCDS=${USE_TPCDS} sed \ -e 's/${PG_VERSION}/'${PG_VERSION}/g \ -e 's/${LEVEL}/'${LEVEL}/g \ + -e 's/${USE_TPCDS}/'${USE_TPCDS}/g \ Dockerfile.tmpl > Dockerfile diff --git a/run_tests.sh b/run_tests.sh index bb7b75c..651e9a1 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash # -# Copyright (c) 2018, Postgres Professional +# Copyright (c) 2019, Postgres Professional # # supported levels: # * standard @@ -55,7 +55,7 @@ fi # build and install PostgreSQL if [ "$LEVEL" = "hardcore" ] || \ - [ "$LEVEL" = "nightmare" ]; then + [ "$LEVEL" = "nightmare" ]; then # enable Valgrind support sed -i.bak "s/\/* #define USE_VALGRIND *\//#define USE_VALGRIND/g" src/include/pg_config_manual.h @@ -143,10 +143,14 @@ if [ -f regression.diffs ]; then cat regression.diffs; fi # run python tests set +x -e -virtualenv /tmp/env && source /tmp/env/bin/activate && -pip install PyYAML && pip install psycopg2 +python3 -m venv /tmp/env && source /tmp/env/bin/activate && +pip install -r tests/requirements.txt set -e #exit virtualenv with error code python tests/pg_qs_test_runner.py --port $PGPORT +if [[ "$USE_TPCDS" == "1" ]]; then + python tests/pg_qs_test_runner.py --port $PGPORT --tpc-ds-setup + python tests/pg_qs_test_runner.py --port $PGPORT --tpc-ds-run +fi deactivate set -x @@ -170,4 +174,4 @@ gcov *.c *.h set +ux # send coverage stats to Codecov -bash <(curl -s https://codecov.io/bash) \ No newline at end of file +bash <(curl -s https://codecov.io/bash) diff --git a/tests/common.py b/tests/common.py new file mode 100644 index 0000000..3f4f9c2 --- /dev/null +++ b/tests/common.py @@ -0,0 +1,103 @@ +''' +common.py +Copyright (c) 2016-2020, Postgres Professional +''' + +import psycopg2 +import psycopg2.extensions +import select +import time + +BACKEND_IS_IDLE_INFO = 'INFO: state of backend is idle\n' +BACKEND_IS_ACTIVE_INFO = 'INFO: state of backend is active\n' + +def wait(conn): + """wait for some event on connection to postgres""" + while 1: + state = conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [conn.fileno()], []) + elif state == psycopg2.extensions.POLL_READ: + select.select([conn.fileno()], [], []) + else: + raise psycopg2.OperationalError("poll() returned %s" % state) + +def n_async_connect(config, n=1): + """establish n asynchronious connections to the postgres with specified config""" + + aconfig = config.copy() + aconfig['async'] = True + + result = [] + for _ in range(n): + conn = psycopg2.connect(**aconfig) + wait(conn) + result.append(conn) + return result + +def n_close(conns): + """close connections to postgres""" + + for conn in conns: + conn.close() + +def pg_query_state(config, pid, verbose=False, costs=False, timing=False, \ + buffers=False, triggers=False, format='text'): + """ + Get query state from backend with specified pid and optional parameters. + Save any warning, info, notice and log data in global variable 'notices' + """ + + conn = psycopg2.connect(**config) + curs = conn.cursor() + + curs.callproc('pg_query_state', (pid, verbose, costs, timing, buffers, triggers, format)) + result = curs.fetchall() + notices = conn.notices[:] + conn.close() + + return result, notices + +def onetime_query_state(config, async_conn, query, args={}, num_workers=0): + """ + Get intermediate state of 'query' on connection 'async_conn' after number of 'steps' + of node executions from start of query + """ + + acurs = async_conn.cursor() + + set_guc(async_conn, 'enable_mergejoin', 'off') + set_guc(async_conn, 'max_parallel_workers_per_gather', num_workers) + acurs.execute(query) + + # extract current state of query progress + MAX_PG_QS_RETRIES = 10 + DELAY_BETWEEN_RETRIES = 0.1 + pg_qs_args = { + 'config': config, + 'pid': async_conn.get_backend_pid() + } + for k, v in args.items(): + pg_qs_args[k] = v + n_retries = 0 + while True: + result, notices = pg_query_state(**pg_qs_args) + n_retries += 1 + if len(result) > 0: + break + if n_retries >= MAX_PG_QS_RETRIES: + # pg_query_state callings don't return any result, more likely run + # query has completed + break + time.sleep(DELAY_BETWEEN_RETRIES) + wait(async_conn) + + set_guc(async_conn, 'enable_mergejoin', 'on') + return result, notices + +def set_guc(async_conn, param, value): + acurs = async_conn.cursor() + acurs.execute('set %s to %s' % (param, value)) + wait(async_conn) diff --git a/tests/pg_qs_test_runner.py b/tests/pg_qs_test_runner.py index 716719e..28db807 100644 --- a/tests/pg_qs_test_runner.py +++ b/tests/pg_qs_test_runner.py @@ -1,13 +1,18 @@ ''' -pg_qs_test_cases.py - Tests extract query state from running backend (including concurrent extracts) -Copyright (c) 2016-2016, Postgres Professional +pg_qs_test_runner.py +Copyright (c) 2016-2020, Postgres Professional ''' import argparse -import psycopg2 +import getpass +import os import sys + +import psycopg2 + +sys.path.append(os.path.dirname(os.path.abspath(__file__))) from test_cases import * +import tpcds class PasswordPromptAction(argparse.Action): def __call__(self, parser, args, values, option_string=None): @@ -28,77 +33,97 @@ class TeardownException(Exception): pass 'insert into bar select i, i%2=1 from generate_series(1, 500000) as i', 'analyze foo', 'analyze bar', - ] +] teardown_cmd = [ 'drop table foo cascade', 'drop table bar cascade', 'drop extension pg_query_state cascade', - ] +] tests = [ - test_deadlock, - test_simple_query, - test_concurrent_access, - test_nested_call, - test_trigger, - test_costs, - test_buffers, - test_timing, - test_formats, - test_timing_buffers_conflicts, - test_insert_on_conflict, - ] + test_deadlock, + test_simple_query, + test_concurrent_access, + test_nested_call, + test_trigger, + test_costs, + test_buffers, + test_timing, + test_formats, + test_timing_buffers_conflicts, + test_insert_on_conflict, +] def setup(con): ''' Creates pg_query_state extension, creates tables for tests, fills it with data ''' - print 'setting up...' + print('setting up...') try: cur = con.cursor() for cmd in setup_cmd: cur.execute(cmd) con.commit() cur.close() - except Exception, e: + except Exception as e: raise SetupException('Setup failed: %s' % e) - print 'done!' + print('done!') def teardown(con): ''' Drops table and extension ''' - print 'tearing down...' + print('tearing down...') try: cur = con.cursor() for cmd in teardown_cmd: cur.execute(cmd) con.commit() cur.close() - except Exception, e: + except Exception as e: raise TeardownException('Teardown failed: %s' % e) - print 'done!' + print('done!') def main(config): ''' Main test function ''' - con = psycopg2.connect(**config) - setup(con) + conn_params = { + key:config.__dict__[key] for key in ('host', 'port', 'user', 'database', 'password') + } + + if config.tpcds_setup: + print('Setup database for TPC-DS bench') + tpcds.setup_tpcds(conn_params) + print('Database is setup successfully') + return + if config.tpcds_run: + print('Starting stress test') + tpcds.run_tpcds(conn_params) + print('Stress finished successfully') + return + + # run default tests + init_conn = psycopg2.connect(**conn_params) + setup(init_conn) for i, test in enumerate(tests): if test.__doc__: descr = test.__doc__ else: descr = 'test case %d' % (i+1) - print ("%s..." % descr),; sys.stdout.flush() - test(config) - print 'ok!' - - teardown(con) - con.close() + print(("%s..." % descr)) + sys.stdout.flush() + test(conn_params) + print('ok!') + teardown(init_conn) + init_conn.close() if __name__ == '__main__': parser = argparse.ArgumentParser(description='Query state of running backends tests') + parser.add_argument('--host', default='localhost', help='postgres server host') parser.add_argument('--port', type=int, default=5432, help='postgres server port') parser.add_argument('--user', dest='user', default='postgres', help='user name') parser.add_argument('--database', dest='database', default='postgres', help='database name') - parser.add_argument('--password', dest='password', nargs=0, action=PasswordPromptAction, default='') + parser.add_argument('--password', dest='password', nargs=0, action=PasswordPromptAction, default='', help='password') + parser.add_argument('--tpc-ds-setup', dest='tpcds_setup', action='store_true', help='setup database to run TPC-DS benchmark') + parser.add_argument('--tpc-ds-run', dest='tpcds_run', action='store_true', help='run only stress test based on TPC-DS benchmark') + args = parser.parse_args() - main(args.__dict__) + main(args) diff --git a/tests/prepare_stress.sh b/tests/prepare_stress.sh new file mode 100755 index 0000000..da5ae48 --- /dev/null +++ b/tests/prepare_stress.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env sh + +mkdir -p tmp_stress +cd tmp_stress +rm -rf ./* + +git clone --depth 1 --single-branch --branch master https://github.com/gregrahn/tpcds-kit.git +git clone --depth 1 --single-branch --branch master https://github.com/cwida/tpcds-result-reproduction.git + +cd tpcds-kit/tools +make -s + +#Generate data +./dsdgen -FORCE -VERBOSE -SCALE 1 + +#Prepare data +mkdir -p tables +for i in `ls *.dat`; do + echo "Preparing file" $i + sed 's/|$//' $i > tables/$i +done + +#Generate queries +./dsqgen -DIRECTORY ../query_templates \ + -INPUT ../query_templates/templates.lst \ + -VERBOSE Y \ + -QUALIFY Y \ + -SCALE 1 \ + -DIALECT netezza diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 0000000..ff6b4f4 --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,3 @@ +PyYAML +psycopg2 +progressbar2 diff --git a/tests/test_cases.py b/tests/test_cases.py index 175dbd1..8d7fc28 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -1,109 +1,23 @@ +''' +test_cases.py +Copyright (c) 2016-2020, Postgres Professional +''' + import json -import psycopg2 -import psycopg2.extensions import re import select import time import xml.etree.ElementTree as ET + +import psycopg2 import yaml -from time import sleep -def wait(conn): - """wait for some event on connection to postgres""" - while 1: - state = conn.poll() - if state == psycopg2.extensions.POLL_OK: - break - elif state == psycopg2.extensions.POLL_WRITE: - select.select([], [conn.fileno()], []) - elif state == psycopg2.extensions.POLL_READ: - select.select([conn.fileno()], [], []) - else: - raise psycopg2.OperationalError("poll() returned %s" % state) - -def n_async_connect(config, n=1): - """establish n asynchronious connections to the postgres with specified config""" - - aconfig = config.copy() - aconfig['async'] = True - - result = [] - for _ in xrange(n): - conn = psycopg2.connect(**aconfig) - wait(conn) - result.append(conn) - return result - -def n_close(conns): - """close connections to postgres""" - - for conn in conns: - conn.close() - -notices = [] - -def debug_output(qs, qs_len, pid, query, expected): - something_happened = False - if (qs_len and len(qs) != qs_len ): - print "len(qs): ", len(qs), ", expected: ", qs_len - something_happened = True - if (pid and qs[0][0] != pid): - print "qs[0][0]: ", qs[0][0], " = ", pid - something_happened = True - if (qs[0][1] != 0): - print "qs[0][1]: ", qs[0][1], ", expected: 0" - something_happened = True - if (qs[0][2] != query): - print "qs[0][2]:\n", qs[0][2] - print "Expected:\n", query - something_happened = True - if (not (re.match(expected, qs[0][3]))): - print "qs[0][3]:\n", qs[0][3] - print "Expected:\n", expected - something_happened = True - if (qs[0][4] != None): - print "qs[0][4]: ", qs[0][4], "Expected: None" - something_happened = True - if (qs_len and len(qs) > qs_len): - for i in range(qs_len, len(qs)): - print "qs[",i,"][0]: ", qs[i][0] - print "qs[",i,"][1]: ", qs[i][1] - print "qs[",i,"][2]: ", qs[i][2] - print "qs[",i,"][3]: ", qs[i][3] - print "qs[",i,"][4]: ", qs[i][4] - something_happened = True - if (something_happened): - print "If test have not crashed, then it's OK" - -def notices_warning(): - if (len(notices) > 0): - print("") - print("WARNING:") - print(notices) - -def pg_query_state(config, pid, verbose=False, costs=False, timing=False, \ - buffers=False, triggers=False, format='text'): - """ - Get query state from backend with specified pid and optional parameters. - Save any warning, info, notice and log data in global variable 'notices' - """ - - global notices - - conn = psycopg2.connect(**config) - curs = conn.cursor() - result = [] - while not result: - curs.callproc('pg_query_state', (pid, verbose, costs, timing, buffers, triggers, format)) - result = curs.fetchall() - notices = conn.notices[:] - conn.close() - return result +import common def test_deadlock(config): """test when two backends try to extract state of each other""" - acon1, acon2 = n_async_connect(config, 2) + acon1, acon2 = common.n_async_connect(config, 2) acurs1 = acon1.cursor() acurs2 = acon2.cursor() @@ -115,50 +29,20 @@ def test_deadlock(config): r, w, x = select.select([acon1.fileno(), acon2.fileno()], [], [], 10) assert (r or w or x), "Deadlock is happened under cross reading of query states" - wait(acon1) - wait(acon2) + common.wait(acon1) + common.wait(acon2) # exit from loop if one backend could read state of execution 'pg_query_state' # from other backend if acurs1.fetchone() or acurs2.fetchone(): break - n_close((acon1, acon2)) - -def query_state(config, async_conn, query, args={}, num_workers=0): - """ - Get intermediate state of 'query' on connection 'async_conn' after number of 'steps' - of node executions from start of query - """ - - acurs = async_conn.cursor() - conn = psycopg2.connect(**config) - curs = conn.cursor() - - set_guc(async_conn, 'enable_mergejoin', 'off') - set_guc(async_conn, 'max_parallel_workers_per_gather', num_workers) - acurs.execute(query) - - # extract current state of query progress - pg_qs_args = { - 'config': config, - 'pid': async_conn.get_backend_pid() - } - for k, v in args.iteritems(): - pg_qs_args[k] = v - result = pg_query_state(**pg_qs_args) - wait(async_conn) - - set_guc(async_conn, 'pg_query_state.executor_trace', 'off') - set_guc(async_conn, 'enable_mergejoin', 'on') - - conn.close() - return result + common.n_close((acon1, acon2)) def test_simple_query(config): """test statistics of simple query""" - acon, = n_async_connect(config) + acon, = common.n_async_connect(config) query = 'select count(*) from foo join bar on foo.c1=bar.c1' expected = r"""Aggregate \(Current loop: actual rows=\d+, loop number=1\) -> Hash Join \(Current loop: actual rows=\d+, loop number=1\) @@ -168,47 +52,42 @@ def test_simple_query(config): Buckets: \d+ Batches: \d+ Memory Usage: \d+kB -> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)""" - qs = query_state(config, acon, query) - debug_output(qs, 1, acon.get_backend_pid(), query, expected) - notices_warning() - #assert len(qs) == 1 #Skip this check while output of test can be different - assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \ + qs, _ = common.onetime_query_state(config, acon, query) + assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \ and qs[0][2] == query and re.match(expected, qs[0][3]) and qs[0][4] == None - n_close((acon,)) + common.n_close((acon,)) def test_concurrent_access(config): """test when two backends compete with each other to extract state from third running backend""" - acon1, acon2, acon3 = n_async_connect(config, 3) + acon1, acon2, acon3 = common.n_async_connect(config, 3) acurs1, acurs2, acurs3 = acon1.cursor(), acon2.cursor(), acon3.cursor() query = 'select count(*) from foo join bar on foo.c1=bar.c1' - set_guc(acon3, 'max_parallel_workers_per_gather', 0) + common.set_guc(acon3, 'max_parallel_workers_per_gather', 0) acurs3.execute(query) time.sleep(0.1) acurs1.callproc('pg_query_state', (acon3.get_backend_pid(),)) acurs2.callproc('pg_query_state', (acon3.get_backend_pid(),)) - wait(acon1) - wait(acon2) - wait(acon3) + common.wait(acon1) + common.wait(acon2) + common.wait(acon3) qs1, qs2 = acurs1.fetchall(), acurs2.fetchall() - assert len(qs1) == len(qs2) == 1 \ + assert len(qs1) == len(qs2) == 1 \ and qs1[0][0] == qs2[0][0] == acon3.get_backend_pid() \ and qs1[0][1] == qs2[0][1] == 0 \ and qs1[0][2] == qs2[0][2] == query \ and len(qs1[0][3]) > 0 and len(qs2[0][3]) > 0 \ and qs1[0][4] == qs2[0][4] == None - #assert len(notices) == 0 - notices_warning() - n_close((acon1, acon2, acon3)) + common.n_close((acon1, acon2, acon3)) def test_nested_call(config): """test statistics under calling function""" - acon, = n_async_connect(config) + acon, = common.n_async_connect(config) util_conn = psycopg2.connect(**config) util_curs = util_conn.cursor() create_function = """ @@ -234,24 +113,24 @@ def test_nested_call(config): util_curs.execute(create_function) util_conn.commit() - qs = query_state(config, acon, call_function) - assert len(qs) == 2 \ + qs, notices = common.onetime_query_state(config, acon, call_function) + assert len(qs) == 2 \ and qs[0][0] == qs[1][0] == acon.get_backend_pid() \ and qs[0][1] == 0 and qs[1][1] == 1 \ and qs[0][2] == call_function and qs[0][3] == expected \ and qs[1][2] == nested_query and re.match(expected_nested, qs[1][3]) \ and qs[0][4] == qs[1][4] == None - assert len(notices) == 0 + assert len(notices) == 0 util_curs.execute(drop_function) util_conn.close() - n_close((acon,)) + common.n_close((acon,)) def test_insert_on_conflict(config): """test statistics on conflicting tuples under INSERT ON CONFLICT query""" - acon, = n_async_connect(config) + acon, = common.n_async_connect(config) util_conn = psycopg2.connect(**config) util_curs = util_conn.cursor() add_field_uniqueness = 'alter table foo add constraint unique_c1 unique(c1)' @@ -266,30 +145,22 @@ def test_insert_on_conflict(config): util_curs.execute(add_field_uniqueness) util_conn.commit() - qs = query_state(config, acon, query) + qs, notices = common.onetime_query_state(config, acon, query) - debug_output(qs, 1, acon.get_backend_pid(), query, expected) - notices_warning() - #assert len(qs) == 1 \ - assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \ + assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \ and qs[0][2] == query and re.match(expected, qs[0][3]) \ and qs[0][4] == None - assert len(notices) == 0 + assert len(notices) == 0 util_curs.execute(drop_field_uniqueness) util_conn.close() - n_close((acon,)) - -def set_guc(async_conn, param, value): - acurs = async_conn.cursor() - acurs.execute('set %s to %s' % (param, value)) - wait(async_conn) + common.n_close((acon,)) def test_trigger(config): """test trigger statistics""" - acon, = n_async_connect(config) + acon, = common.n_async_connect(config) acurs = acon.cursor() util_conn = psycopg2.connect(**config) util_curs = util_conn.cursor() @@ -305,7 +176,7 @@ def test_trigger(config): create_trigger = """ create trigger unique_foo_c1 before insert or update of c1 on foo for row - execute procedure unique_c1_in_foo()""" + execute procedure unique_c1_in_foo()""" drop_temps = 'drop function unique_c1_in_foo() cascade' query = 'insert into foo select i, md5(random()::text) from generate_series(1, 10000) as i' expected_upper = r"""Insert on foo \(Current loop: actual rows=\d+, loop number=1\) @@ -316,31 +187,27 @@ def test_trigger(config): util_curs.execute(create_trigger) util_conn.commit() - qs = query_state(config, acon, query, {'triggers': True}) - debug_output(qs, None, acon.get_backend_pid(), query, expected_upper) - notices_warning() + qs, notices = common.onetime_query_state(config, acon, query, {'triggers': True}) assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \ and qs[0][2] == query and re.match(expected_upper, qs[0][3]) \ and qs[0][4] == None - assert len(notices) == 0 + assert len(notices) == 0 - qs = query_state(config, acon, query, {'triggers': False}) - debug_output(qs, None, acon.get_backend_pid(), query, expected_upper) - notices_warning() + qs, notices = common.onetime_query_state(config, acon, query, {'triggers': False}) assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \ and qs[0][2] == query and re.match(expected_upper, qs[0][3]) \ and qs[0][4] == None - assert len(notices) == 0 + assert len(notices) == 0 util_curs.execute(drop_temps) util_conn.close() - n_close((acon,)) + common.n_close((acon,)) def test_costs(config): """test plan costs""" - acon, = n_async_connect(config) + acon, = common.n_async_connect(config) query = 'select count(*) from foo join bar on foo.c1=bar.c1' expected = r"""Aggregate \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=8\) \(Current loop: actual rows=0, loop number=1\) -> Hash Join \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=0\) \(Current loop: actual rows=0, loop number=1\) @@ -350,18 +217,16 @@ def test_costs(config): Buckets: \d+ Batches: \d+ Memory Usage: \d+kB -> Seq Scan on bar \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=\d+, loop number=1\)""" - qs = query_state(config, acon, query, {'costs': True}) - debug_output(qs, 1, None, query, expected) - notices_warning() - assert len(qs) == 1 and re.match(expected, qs[0][3]) - assert len(notices) == 0 + qs, notices = common.onetime_query_state(config, acon, query, {'costs': True}) + assert len(qs) == 1 and re.match(expected, qs[0][3]) + assert len(notices) == 0 - n_close((acon,)) + common.n_close((acon,)) def test_buffers(config): """test buffer statistics""" - acon, = n_async_connect(config) + acon, = common.n_async_connect(config) query = 'select count(*) from foo join bar on foo.c1=bar.c1' expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\) -> Hash Join \(Current loop: actual rows=0, loop number=1\) @@ -373,20 +238,18 @@ def test_buffers(config): -> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\) Buffers: .*""" - set_guc(acon, 'pg_query_state.enable_buffers', 'on') + common.set_guc(acon, 'pg_query_state.enable_buffers', 'on') - qs = query_state(config, acon, query, {'buffers': True}) - debug_output(qs, 1, None, query, expected) - notices_warning() - assert len(qs) == 1 and re.match(expected, qs[0][3]) - assert len(notices) == 0 + qs, notices = common.onetime_query_state(config, acon, query, {'buffers': True}) + assert len(qs) == 1 and re.match(expected, qs[0][3]) + assert len(notices) == 0 - n_close((acon,)) + common.n_close((acon,)) def test_timing(config): """test timing statistics""" - acon, = n_async_connect(config) + acon, = common.n_async_connect(config) query = 'select count(*) from foo join bar on foo.c1=bar.c1' expected = r"""Aggregate \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\) -> Hash Join \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\) @@ -396,23 +259,21 @@ def test_timing(config): Buckets: \d+ Batches: \d+ Memory Usage: \d+kB -> Seq Scan on bar \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=\d+, loop number=1\)""" - set_guc(acon, 'pg_query_state.enable_timing', 'on') + common.set_guc(acon, 'pg_query_state.enable_timing', 'on') - qs = query_state(config, acon, query, {'timing': True}) - debug_output(qs, 1, None, query, expected) - notices_warning() - assert len(qs) == 1 and re.match(expected, qs[0][3]) - assert len(notices) == 0 + qs, notices = common.onetime_query_state(config, acon, query, {'timing': True}) + assert len(qs) == 1 and re.match(expected, qs[0][3]) + assert len(notices) == 0 - n_close((acon,)) + common.n_close((acon,)) def check_plan(plan): - assert plan.has_key('Current loop') + assert 'Current loop' in plan cur_loop = plan['Current loop'] - assert cur_loop.has_key('Actual Loop Number') \ - and cur_loop.has_key('Actual Rows') + assert 'Actual Loop Number' in cur_loop\ + and 'Actual Rows' in cur_loop - if not plan.has_key('Plans'): + if not 'Plans' in plan: return for subplan in plan['Plans']: @@ -422,14 +283,14 @@ def check_xml(root): prefix = '{http://www.postgresql.org/2009/explain}' for plan in root.iter(prefix + 'Plan'): cur_loop = plan.find(prefix + 'Current-loop') - assert cur_loop != None \ + assert cur_loop != None \ and cur_loop.find(prefix + 'Actual-Loop-Number') != None \ and cur_loop.find(prefix + 'Actual-Rows') != None def test_formats(config): """test all formats of pg_query_state output""" - acon, = n_async_connect(config) + acon, = common.n_async_connect(config) query = 'select count(*) from foo join bar on foo.c1=bar.c1' expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\) -> Hash Join \(Current loop: actual rows=0, loop number=1\) @@ -439,61 +300,59 @@ def test_formats(config): Buckets: \d+ Batches: \d+ Memory Usage: \d+kB -> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)""" - qs = query_state(config, acon, query, {'format': 'text'}) - debug_output(qs, 1, None, query, expected) - notices_warning() - assert len(qs) == 1 and re.match(expected, qs[0][3]) - assert len(notices) == 0 + qs, notices = common.onetime_query_state(config, acon, query, {'format': 'text'}) + assert len(qs) == 1 and re.match(expected, qs[0][3]) + assert len(notices) == 0 - qs = query_state(config, acon, query, {'format': 'json'}) + qs, notices = common.onetime_query_state(config, acon, query, {'format': 'json'}) try: js_obj = json.loads(qs[0][3]) except ValueError: assert False, 'Invalid json format' - assert len(qs) == 1 - assert len(notices) == 0 + assert len(qs) == 1 + assert len(notices) == 0 check_plan(js_obj['Plan']) - qs = query_state(config, acon, query, {'format': 'xml'}) - assert len(qs) == 1 - assert len(notices) == 0 + qs, notices = common.onetime_query_state(config, acon, query, {'format': 'xml'}) + assert len(qs) == 1 + assert len(notices) == 0 try: xml_root = ET.fromstring(qs[0][3]) except: assert False, 'Invalid xml format' check_xml(xml_root) - qs = query_state(config, acon, query, {'format': 'yaml'}) + qs, _ = common.onetime_query_state(config, acon, query, {'format': 'yaml'}) try: - yaml_doc = yaml.load(qs[0][3]) + yaml_doc = yaml.load(qs[0][3], Loader=yaml.FullLoader) except: assert False, 'Invalid yaml format' - assert len(qs) == 1 - assert len(notices) == 0 + assert len(qs) == 1 + assert len(notices) == 0 check_plan(yaml_doc['Plan']) - n_close((acon,)) + common.n_close((acon,)) def test_timing_buffers_conflicts(config): """test when caller requests timing and buffers but counterpart turned off its""" - acon, = n_async_connect(config) + acon, = common.n_async_connect(config) query = 'select count(*) from foo join bar on foo.c1=bar.c1' timing_pattern = '(?:running time=\d+.\d+)|(?:actual time=\d+.\d+..\d+.\d+)' buffers_pattern = 'Buffers:' - qs = query_state(config, acon, query, {'timing': True, 'buffers': False}) - assert len(qs) == 1 and not re.search(timing_pattern, qs[0][3]) + qs, notices = common.onetime_query_state(config, acon, query, {'timing': True, 'buffers': False}) + assert len(qs) == 1 and not re.search(timing_pattern, qs[0][3]) assert notices == ['WARNING: timing statistics disabled\n'] - qs = query_state(config, acon, query, {'timing': False, 'buffers': True}) - assert len(qs) == 1 and not re.search(buffers_pattern, qs[0][3]) + qs, notices = common.onetime_query_state(config, acon, query, {'timing': False, 'buffers': True}) + assert len(qs) == 1 and not re.search(buffers_pattern, qs[0][3]) assert notices == ['WARNING: buffers statistics disabled\n'] - qs = query_state(config, acon, query, {'timing': True, 'buffers': True}) - assert len(qs) == 1 and not re.search(timing_pattern, qs[0][3]) \ + qs, notices = common.onetime_query_state(config, acon, query, {'timing': True, 'buffers': True}) + assert len(qs) == 1 and not re.search(timing_pattern, qs[0][3]) \ and not re.search(buffers_pattern, qs[0][3]) assert len(notices) == 2 and 'WARNING: timing statistics disabled\n' in notices \ and 'WARNING: buffers statistics disabled\n' in notices - n_close((acon,)) + common.n_close((acon,)) diff --git a/tests/tpcds.py b/tests/tpcds.py new file mode 100644 index 0000000..8ac7183 --- /dev/null +++ b/tests/tpcds.py @@ -0,0 +1,115 @@ +''' +test_cases.py +Copyright (c) 2016-2020, Postgres Professional +''' + +import os +import subprocess +import time + +import progressbar +import psycopg2.extensions + +import common + +class DataLoadException(Exception): pass +class StressTestException(Exception): pass + +def setup_tpcds(config): + print('Setting up TPC-DS test...') + subprocess.call(['./tests/prepare_stress.sh']) + + try: + conn = psycopg2.connect(**config) + cur = conn.cursor() + + # Create pg_query_state extension + cur.execute('CREATE EXTENSION IF NOT EXISTS pg_query_state') + + # Create tables + with open('tmp_stress/tpcds-kit/tools/tpcds.sql', 'r') as f: + cur.execute(f.read()) + + # Copy table data from files + for table_datafile in os.listdir('tmp_stress/tpcds-kit/tools/'): + if table_datafile.endswith('.dat'): + table_name = os.path.splitext(os.path.basename(table_datafile))[0] + + print('Loading table', table_name) + with open('tmp_stress/tpcds-kit/tools/tables/%s' % table_datafile) as f: + cur.copy_from(f, table_name, sep='|', null='') + + conn.commit() + + except Exception as e: + cur.close() + conn.close() + raise DataLoadException('Load failed: %s' % e) + + print('done!') + +def run_tpcds(config): + """TPC-DS stress test""" + + TPC_DS_EXCLUDE_LIST = [] # actual numbers of TPC-DS tests to exclude + TPC_DS_STATEMENT_TIMEOUT = 20000 # statement_timeout in ms + + print('Preparing TPC-DS queries...') + queries = [] + for query_file in sorted(os.listdir('tmp_stress/tpcds-result-reproduction/query_qualification/')): + with open('tmp_stress/tpcds-result-reproduction/query_qualification/%s' % query_file, 'r') as f: + queries.append(f.read()) + + acon, = common.n_async_connect(config) + pid = acon.get_backend_pid() + + print('Starting TPC-DS queries...') + timeout_list = [] + bar = progressbar.ProgressBar(max_value=len(queries)) + for i, query in enumerate(queries): + bar.update(i + 1) + if i + 1 in TPC_DS_EXCLUDE_LIST: + continue + try: + # Set query timeout to TPC_DS_STATEMENT_TIMEOUT / 1000 seconds + common.set_guc(acon, 'statement_timeout', TPC_DS_STATEMENT_TIMEOUT) + + # run query + acurs = acon.cursor() + acurs.execute(query) + + # periodically run pg_query_state on running backend trying to get + # crash of PostgreSQL + MAX_FIRST_GETTING_QS_RETRIES = 10 + PG_QS_DELAY, BEFORE_GETTING_QS_DELAY = 0.1, 0.1 + BEFORE_GETTING_QS, GETTING_QS = range(2) + state, n_first_getting_qs_retries = BEFORE_GETTING_QS, 0 + while True: + result, notices = common.pg_query_state(config, pid) + # run state machine to determine the first getting of query state + # and query finishing + if state == BEFORE_GETTING_QS: + if len(result) > 0 or common.BACKEND_IS_ACTIVE_INFO in notices: + state = GETTING_QS + continue + n_first_getting_qs_retries += 1 + if n_first_getting_qs_retries >= MAX_FIRST_GETTING_QS_RETRIES: + # pg_query_state callings don't return any result, more likely run + # query has completed + break + time.sleep(BEFORE_GETTING_QS_DELAY) + elif state == GETTING_QS: + if common.BACKEND_IS_IDLE_INFO in notices: + break + time.sleep(PG_QS_DELAY) + + # wait for real query completion + common.wait(acon) + + except psycopg2.extensions.QueryCanceledError: + timeout_list.append(i + 1) + + common.n_close((acon,)) + + if len(timeout_list) > 0: + print('\nThere were pg_query_state timeouts (%s s) on queries:' % TPC_DS_STATEMENT_TIMEOUT, timeout_list)