Skip to content

#42 - Fix potential for futures thread pool to drain on non-terminal … #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,50 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog], [markdownlint],
and this project adheres to [Semantic Versioning].

## [0.0.7] - 2025-06-03

### Changed in 0.0.7

- Small improvements to Python snippets

## [0.0.6]

### Changed in 0.0.6

- Improved Python add_data_sources.py

## [0.0.5]

### Changed in 0.0.5

- Modified configuration examples for new szconfig and szconfigmanager pattern

## [0.0.4]

### Added to 0.0.4

- C# examples
- Updated Java examples to move declarations to the bottom

## [0.0.3]

### Added to 0.0.3

- Java examples

## [0.0.2]

### Changed in 0.0.2

- Modify Python imports to use senzing and senzing_core

### Added to 0.0.1

### Added to 0.0.2

- Couple of new examples

## [0.0.1]

### Added to 0.0.1

- Initial for V4
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,24 @@ You may already have installed the Senzing and created a Senzing project by foll

### Configuration

When using a bare metal install, the initialization parameters used by the Senzing Python utilities are maintained within `<project_path>/etc/G2Module.ini`.
When using a bare metal install, the initialization parameters used by the Senzing Python utilities are maintained within `<project_path>/etc/sz_engine_config.ini`.

🤔To convert an existing Senzing project G2Module.ini file to a JSON string use one of the following methods:
🤔To convert an existing Senzing project sz_engine_config.ini file to a JSON string use one of the following methods:

- [g2_module_ini_to_json.py]
- [sz_engine_config_ini_to_json.py]

- Modify the path to your projects G2Module.ini file.
- Modify the path to your projects sz_engine_config.ini file.

- [jc]

- ```console
cat <project_path>/etc/G2Module.ini | jc --ini
cat <project_path>/etc/sz_engine_config.ini.ini | jc --ini
```

- Python one liner

- ```python
python3 -c $'import configparser; ini_file_name = "<project_path>/etc/G2Module.ini";settings = {};cfgp = configparser.ConfigParser();cfgp.optionxform = str;cfgp.read(ini_file_name)\nfor section in cfgp.sections(): settings[section] = dict(cfgp.items(section))\nprint(settings)'
python3 -c $'import configparser; ini_file_name = "<project_path>/etc/sz_engine_config.ini";settings = {};cfgp = configparser.ConfigParser();cfgp.optionxform = str;cfgp.read(ini_file_name)\nfor section in cfgp.sections(): settings[section] = dict(cfgp.items(section))\nprint(settings)'
```

:pencil2: `<project_path>` in the above example should point to your project.
Expand Down Expand Up @@ -172,7 +172,7 @@ There are different sized load files within the [data] path that can be used to
[Configuration]: #configuration
[data]: resources/data/
[Docker Usage]: #docker-usage
[g2_module_ini_to_json.py]: python/initialization/g2_module_ini_to_json.py
[sz_engine_config_ini_to_json.py]: python/initialization/sz_engine_config_ini_to_json.py
[Input Load File Sizes]: #input-load-file-sizes
[Items of Note]: #items-of-note
[jc]: https://github.com/kellyjonbrazil/jc
Expand Down
9 changes: 4 additions & 5 deletions python/deleting/delete_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ def futures_del(engine, input_file):
mock_logger("CRITICAL", err, futures[f])
raise err
else:
record = in_file.readline()
if record:
futures[executor.submit(delete_record, engine, record)] = record

success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)
finally:
if record := in_file.readline():
futures[executor.submit(delete_record, engine, record)] = record

del futures[f]

print(f"\nSuccessfully deleted {success_recs:,} records, with" f" {error_recs:,} errors")
Expand Down
2 changes: 1 addition & 1 deletion python/deleting/delete_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def del_records_from_file(engine, input_file):
success_recs += 1

if success_recs % 100 == 0:
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)

print(f"\nSuccessfully deleted {success_recs:,} records, with {error_recs:,} errors")

Expand Down
9 changes: 4 additions & 5 deletions python/deleting/delete_with_info_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,15 @@ def futures_del(engine, input_file, output_file):
mock_logger("CRITICAL", err, futures[f])
raise err
else:
record = in_file.readline()
if record:
futures[executor.submit(delete_record, engine, record)] = record

out_file.write(f"{result}\n")

success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)
finally:
if record := in_file.readline():
futures[executor.submit(delete_record, engine, record)] = record

del futures[f]

print(f"\nSuccessfully deleted {success_recs:,} records, with" f" {error_recs:,} errors")
Expand Down
7 changes: 3 additions & 4 deletions python/information/get_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,16 @@ def futures_add(engine, input_file):
mock_logger("CRITICAL", err, futures[f])
raise err
else:
record = file.readline()
if record:
futures[executor.submit(add_record, engine, record)] = record

success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)

if success_recs % 200 == 0:
engine_stats(engine)
finally:
if record := file.readline():
futures[executor.submit(add_record, engine, record)] = record

del futures[f]

print(f"\nSuccessfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")
Expand Down
6 changes: 3 additions & 3 deletions python/initialization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
- Priming the Senzing engine before use loads resource intensive assets upfront. Without priming the first SDK call to the engine will appear slower than usual as it causes these assets to be loaded
- **factory_and_engines.py**
- Basic example of how to create an abstract Senzing factory and each of the available engines
- **g2_module_ini_to_json.py**
- **sz_engine_config_ini_to_json.py**
- The snippets herein utilize the `SENZING_ENGINE_CONFIGURATION_JSON` environment variable for Senzing abstract factory creation
- If you are familiar with working with a Senzing project you may be aware the same configuration data is held in the G2Module.ini file
- Example to convert G2Module.ini to a JSON string for use with `SENZING_ENGINE_CONFIGURATION_JSON`
- If you are familiar with working with a Senzing project you may be aware the same configuration data is held in the sz_engine_config.ini file
- Example to convert sz_engine_config.ini to a JSON string for use with `SENZING_ENGINE_CONFIGURATION_JSON`
- **purge_repository.py**
- **WARNING** This script will remove all data from a Senzing repository, use with caution! **WARNING**
- It will prompt first, still use with caution!
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import configparser
from pathlib import Path

INI_FILE = Path("../../resources/g2module/G2Module.ini").resolve()
INI_FILE = Path("../../resources/engine_config/sz_engine_config.ini").resolve()
settings = {}

cfgp = configparser.ConfigParser()
Expand Down
7 changes: 3 additions & 4 deletions python/loading/add_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ def futures_add(engine, input_file):
mock_logger("CRITICAL", err, futures[f])
raise err
else:
record = file.readline()
if record:
futures[executor.submit(add_record, engine, record)] = record

success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
finally:
if record := file.readline():
futures[executor.submit(add_record, engine, record)] = record

del futures[f]

print(f"\nSuccessfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")
Expand Down
9 changes: 4 additions & 5 deletions python/loading/add_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ def consumer(engine, queue):
mock_logger("CRITICAL", err, futures[f])
raise err
else:
if not queue.empty():
record = queue.get()
futures[executor.submit(add_record, engine, record)] = record

success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
finally:
if not queue.empty():
record = queue.get()
futures[executor.submit(add_record, engine, record)] = record

del futures[f]

print(f"\nSuccessfully loaded {success_recs:,} records, with {error_recs:,} errors")
Expand All @@ -80,6 +80,5 @@ def consumer(engine, queue):
consumer_proc.start()
producer_proc.join()
consumer_proc.join()

except SzError as err:
mock_logger("CRITICAL", err)
7 changes: 3 additions & 4 deletions python/loading/add_with_info_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ def futures_add(engine, input_file, output_file):
mock_logger("CRITICAL", err, futures[f])
raise err
else:
record = in_file.readline()
if record:
futures[executor.submit(add_record, engine, record)] = record

out_file.write(f"{result}\n")

success_recs += 1
Expand All @@ -85,6 +81,9 @@ def futures_add(engine, input_file, output_file):
if success_recs % 200 == 0:
engine_stats(engine)
finally:
if record := in_file.readline():
futures[executor.submit(add_record, engine, record)] = record

del futures[f]

print(f"\nSuccessfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")
Expand Down
8 changes: 3 additions & 5 deletions python/redo/add_with_redo.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ def process_redo(engine):

print("\nStarting to process redo records...")

while 1:
while True:
try:
response = engine.get_redo_record()
if not response:
if not (response := engine.get_redo_record()):
break
engine.process_redo_record(response)

Expand All @@ -87,9 +86,8 @@ def process_redo(engine):
sz_engine = sz_factory.create_engine()
for load_file in INPUT_FILES:
add_records_from_file(sz_engine, load_file)
redo_count = sz_engine.count_redo_records()

if redo_count:
if sz_engine.count_redo_records():
process_redo(sz_engine)
else:
print("\nNo redo records to process")
Expand Down
6 changes: 2 additions & 4 deletions python/redo/redo_continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ def process_redo(engine):
success_recs = 0
error_recs = 0

while 1:
while True:
try:
response = engine.get_redo_record()

if not response:
if not (response := engine.get_redo_record()):
print(
"No redo records to process, pausing for 30 seconds. Total"
f" processed {success_recs:,} . (CTRL-C to exit)..."
Expand Down
21 changes: 9 additions & 12 deletions python/redo/redo_continuous_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ def get_redo_record(engine):
def prime_redo_records(engine, quantity):
redo_records = []
for _ in range(quantity):
redo_record = get_redo_record(engine)
if redo_record:
if redo_record := get_redo_record(engine):
redo_records.append(redo_record)
return redo_records

Expand Down Expand Up @@ -61,7 +60,7 @@ def futures_redo(engine):
redo_paused = False

with concurrent.futures.ThreadPoolExecutor() as executor:
while 1:
while True:
futures = {
executor.submit(process_redo_record, engine, record): record
for record in prime_redo_records(engine, executor._max_workers)
Expand All @@ -71,7 +70,7 @@ def futures_redo(engine):
else:
break

while 1:
while True:
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
for f in done:
try:
Expand All @@ -86,25 +85,23 @@ def futures_redo(engine):
mock_logger("CRITICAL", err, futures[f])
raise err
else:
record = get_redo_record(engine)
if record:
futures[executor.submit(process_redo_record, engine, record)] = record
else:
redo_paused = True

success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} redo records, with" f" {error_recs:,} errors")
finally:
if record := get_redo_record(engine):
futures[executor.submit(process_redo_record, engine, record)] = record
else:
redo_paused = True

del futures[f]

if redo_paused:
while not redo_count(engine):
redo_pause(success_recs)
redo_paused = False
while len(futures) < executor._max_workers:
record = get_redo_record(engine)
if record:
if record := get_redo_record(engine):
futures[executor.submit(process_redo_record, engine, record)] = record


Expand Down
6 changes: 2 additions & 4 deletions python/redo/redo_with_info_continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ def process_redo(engine, output_file):

with open(output_file, "w", encoding="utf-8") as out_file:
try:
while 1:
redo_record = engine.get_redo_record()

if not redo_record:
while True:
if not (redo_record := engine.get_redo_record()):
redo_pause(success_recs)
continue

Expand Down
9 changes: 4 additions & 5 deletions python/searching/search_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,16 @@ def futures_search(engine, input_file):
mock_logger("CRITICAL", err, futures[f])
raise err
else:
record = in_file.readline()
if record:
futures[executor.submit(search_record, engine, record)] = record

success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
print(f"Processed {success_recs:,} searches, with {error_recs:,} errors", flush=True)

print(f"\n------ Searched: {futures[f]}", flush=True)
print(f"\n{result}", flush=True)
finally:
if record := in_file.readline():
futures[executor.submit(search_record, engine, record)] = record

del futures[f]

print(f"\nSuccessfully searched {success_recs:,} records, with" f" {error_recs:,} errors")
Expand Down
Loading
Loading