Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions src/pytroll_watchers/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,16 @@ def __setitem__(self, key, value):
"""Set the *value* corresponding to *key*."""
if key not in self._data:
self._data[key] = value
Timer(self._ttl, self._data.pop, (key, None)).start()
timer = Timer(self._ttl, self._expire, (key,))
timer.daemon = True
timer.start()

def _expire(self, key):
self._data.pop(key, None)

def __contains__(self, key):
"""Check if key is already present."""
try:
_ = self[key]
return True
except KeyError:
return False
return key in self._data


def running_selector(selector_config, subscriber_config):
Expand Down Expand Up @@ -128,14 +129,17 @@ def _data_messages(subscriber_config):

with closing(subscriber):
for msg in subscriber.recv():
if msg.type != "file":
if msg.type not in ["file", "dataset"]:
continue
yield msg


def unique_key(msg):
"""Identify the content of the message with a unique key."""
return msg.data["uid"]
try:
return msg.data["uid"]
except KeyError:
return tuple(sorted(map(lambda x: x["uid"], msg.data["dataset"])))


def _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config):
Expand Down
51 changes: 51 additions & 0 deletions tests/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,57 @@ def test_run_selector_on_single_file_messages(tmp_path):
assert published_messages[1] == msg3


def test_run_selector_on_multi_file_messages(tmp_path):
"""Test running the selector on multi file messages."""
uid = "IVCDB_j02_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5"
sdr_file = tmp_path / "sdr" / uid
create_data_file(sdr_file)

uid2 = "IVCDB_j01_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5"
sdr_file2 = tmp_path / "sdr" / uid2
create_data_file(sdr_file2)


msg1 = ('pytroll://segment/viirs/l1b/ dataset a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 '
'application/json {"sensor": "viirs", '
'"dataset": [{'
f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", '
'"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}]}')

msg2 = ('pytroll://segment/viirs/l1b/ dataset a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 '
'application/json {"sensor": "viirs", '
'"dataset": [{'
f'"uid": "{uid}", "uri": "ssh://someplace.pytroll.org:/{str(sdr_file)}", "path": "{str(sdr_file)}", '
'"filesystem": {"cls": "fsspec.implementations.ssh.SFTPFileSystem", "protocol": "ssh", "args": []}}]}')

msg3 = ('pytroll://segment/viirs/l1b/ dataset a001673@c22519.ad.smhi.se 2024-04-19T11:35:00.487388 v1.01 '
'application/json {"sensor": "viirs", '
'"dataset": [{'
f'"uid": "{uid2}", "uri": "ssh://someplace.pytroll.org:/{str(sdr_file2)}", "path": "{str(sdr_file2)}", '
'"filesystem": {"cls": "fsspec.implementations.ssh.SFTPFileSystem", "protocol": "ssh", "args": []}}]}')

messages = [Message.decode(msg1), Message.decode(msg2), Message.decode(msg3)]

pipe_in_address = "ipc://" + str(tmp_path / "in.ipc")
pipe_out_address = "ipc://" + str(tmp_path / "out.ipc")
subscriber_config = dict(addresses=[pipe_in_address],
nameserver=False,
port=3000)

publisher_config = dict(address=pipe_out_address,
port=1999,
nameservers=False)

selector_config = dict(ttl=1)

with patched_subscriber_recv(messages):
with patched_publisher() as published_messages:
_run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config)
assert len(published_messages) == 2
assert published_messages[0] == msg1
assert published_messages[1] == msg3


def test_ttldict():
"""Test the TTLDict."""
ttl = 0.1
Expand Down
Loading