Skip to content

Commit

Permalink
Issue 229.dockter.1 (#230)
Browse files Browse the repository at this point in the history
* #229 Savepoint

* #229 Savepoint
  • Loading branch information
docktermj authored Oct 18, 2022
1 parent 2268b32 commit 247ba59
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 16 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

-

## [1.8.2] - 2022-10-18

### Changed in 1.8.2

- Removed support for `SENZING_DEFAULT_ENTITY_TYPE`
- Single messages are sent as JSON Objects, not JSON lists

## [1.8.1] - 2022-09-28

Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
ARG BASE_IMAGE=debian:11.5-slim@sha256:b46fc4e6813f6cbd9f3f6322c72ab974cc0e75a72ca02730a8861e98999875c7
FROM ${BASE_IMAGE}

ENV REFRESHED_AT=2022-10-11
ENV REFRESHED_AT=2022-10-18

LABEL Name="senzing/stream-producer" \
Maintainer="[email protected]" \
Version="1.8.1"
Version="1.8.2"

HEALTHCHECK CMD ["/app/healthcheck.sh"]

Expand Down
56 changes: 42 additions & 14 deletions stream-producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
# Metadata.

__all__ = []
__version__ = "1.8.1" # See https://www.python.org/dev/peps/pep-0396/
__version__ = "1.8.2" # See https://www.python.org/dev/peps/pep-0396/
__date__ = '2020-07-07'
__updated__ = '2022-09-28'
__updated__ = '2022-10-18'

# See https://github.com/Senzing/knowledge-base/blob/main/lists/senzing-product-ids.md
SENZING_PRODUCT_ID = "5014"
Expand Down Expand Up @@ -1650,7 +1650,10 @@ def __init__(self, config=None, *args, **kwargs):
self.kafka_topic = config.get('kafka_topic')
self.record_monitor = config.get("record_monitor")
self.number_of_records_per_print = config.get("records_per_message")
self.message_buffer = '['
if self.number_of_records_per_print > 1:
self.message_buffer = '['
else:
self.message_buffer = ''
self.num_messages = 0

# default Kafka max message size, but save some space for kafka to use, just in case
Expand Down Expand Up @@ -1722,7 +1725,9 @@ def print(self, message):
self.num_messages += 1

try:
if self.num_messages == self.number_of_records_per_print:
if self.number_of_records_per_print == 1:
self.send_message_buffer()
elif self.num_messages == self.number_of_records_per_print:
self.send_message_buffer()

except BufferError as err:
Expand Down Expand Up @@ -1754,13 +1759,17 @@ def close(self):
self.kafka_producer.flush()

def send_message_buffer(self):
self.message_buffer += ']'
if self.number_of_records_per_print > 1:
self.message_buffer += ']'
self.kafka_producer.produce(
self.kafka_topic,
self.message_buffer,
on_delivery=self.on_kafka_delivery
)
self.message_buffer = '['
if self.number_of_records_per_print > 1:
self.message_buffer = '['
else:
self.message_buffer = ''
self.num_messages = 0

# -----------------------------------------------------------------------------
Expand All @@ -1785,7 +1794,10 @@ def __init__(self, config=None, *args, **kwargs):
self.rabbitmq_routing_key = config.get("rabbitmq_routing_key")
self.record_monitor = config.get("record_monitor")
self.number_of_records_per_print = config.get("records_per_message")
self.message_buffer = '['
if self.number_of_records_per_print > 1:
self.message_buffer = '['
else:
self.message_buffer = ''
self.num_messages = 0

# default RabbitMQ max message size, but save some space for Rabbit to use, just in case
Expand Down Expand Up @@ -1872,7 +1884,9 @@ def print(self, message):
# Send message to RabbitMQ. if there are enough

try:
if self.num_messages == self.number_of_records_per_print:
if self.number_of_records_per_print == 1:
self.send_message_buffer()
elif self.num_messages == self.number_of_records_per_print:
self.send_message_buffer()

except Exception as err:
Expand All @@ -1893,7 +1907,8 @@ def close(self):
self.connection.close()

def send_message_buffer(self):
self.message_buffer += ']'
if self.number_of_records_per_print > 1:
self.message_buffer += ']'

sent = False

Expand All @@ -1910,7 +1925,10 @@ def send_message_buffer(self):
except pika.exceptions.NackError:
time.sleep(1)

self.message_buffer = '['
if self.number_of_records_per_print > 1:
self.message_buffer = '['
else:
self.message_buffer = ''
self.num_messages = 0

# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1948,7 +1966,10 @@ def __init__(self, *args, **kwargs):
self.record_monitor = config.get("record_monitor")
self.sqs_delay_seconds = config.get("sqs_delay_seconds")
self.number_of_records_per_print = config.get("records_per_message")
self.message_buffer = '['
if self.number_of_records_per_print > 1:
self.message_buffer = '['
else:
self.message_buffer = ''
self.num_messages = 0
self.record_identifier = config.get("record_identifier")

Expand Down Expand Up @@ -2007,7 +2028,10 @@ def print(self, message):
self.message_buffer += message
self.num_messages += 1

if self.num_messages == self.number_of_records_per_print:

if self.number_of_records_per_print == 1:
self.send_message_buffer()
elif self.num_messages == self.number_of_records_per_print:
self.send_message_buffer()

if self.counter % self.record_monitor == 0:
Expand All @@ -2019,14 +2043,18 @@ def close(self):
self.send_message_buffer()

def send_message_buffer(self):
self.message_buffer += ']'
if self.number_of_records_per_print > 1:
self.message_buffer += ']'
self.sqs.send_message(
QueueUrl=self.queue_url,
DelaySeconds=self.sqs_delay_seconds,
MessageAttributes={},
MessageBody=(self.message_buffer),
)
self.message_buffer = '['
if self.number_of_records_per_print > 1:
self.message_buffer = '['
else:
self.message_buffer = ''
self.num_messages = 0

# -----------------------------------------------------------------------------
Expand Down

0 comments on commit 247ba59

Please sign in to comment.