Skip to content

Commit

Permalink
Merge pull request #237 from thalassemia/mongo-test
Browse files Browse the repository at this point in the history
Add tests for MongoDB and fix bugs
  • Loading branch information
eagmon authored Feb 21, 2024
2 parents 58f7a84 + 12a52f2 commit f92aa65
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 28 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ jobs:
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11']
services:
mongodb:
image: mongo:6.0.13
ports:
- 27017:27017
options: >-
--health-cmd mongosh
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v2
- name: Setup Python ${{ matrix.python-version }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## v1.6.3
* (#237) Add tests for MongoDB emitter and fix related bugs

## v1.6.2
* (#236) Add data key to breakdown documents if missing

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup


VERSION = '1.6.2'
VERSION = '1.6.3'


if __name__ == '__main__':
Expand Down
29 changes: 22 additions & 7 deletions vivarium/core/emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def write_emit(self, table: Any, emit_data: Dict[str, Any]) -> None:
d['assembly_id'] = assembly_id
d['experiment_id'] = experiment_id
d.setdefault('data', {})
if time:
if time is not None:
d['data']['time'] = time
table.insert_one(d)

Expand Down Expand Up @@ -469,7 +469,7 @@ def delete_experiment_from_database(
else:
query = {'experiment_id': experiment_id}
db.history.delete_many(query, hint=HISTORY_INDEXES[1])
db.configuration.delete_many(query)
db.configuration.delete_many({'experiment_id': experiment_id})


def assemble_data(data: list) -> dict:
Expand Down Expand Up @@ -731,22 +731,37 @@ def data_from_database(


def data_to_database(
data: Dict[float, dict], environment_config: Any, client: Any) -> Any:
"""Insert something into a MongoDB."""
history_collection = client.history
data: Dict[float, dict], environment_config: Any, db: Any) -> None:
"""Insert data into MongoDB.
Args:
data: Dictionary mapping time to subdictionaries. Subdictionaries
must have an ``experiment_id`` key for compatibility with retrieval
functions like :py:func:`~.data_from_database`.
environment_config: Dictionary that will be written to ``configuration``
collection. Must have an ``experiment_id`` key.
db: MongoDB database (e.g. from :py:func:`~.get_experiment_database`)
"""
history_collection = db.history
reshaped_data = []
for time, timepoint_data in data.items():
# Since time is the dictionary key, it has to be a string for
# JSON/BSON compatibility. But now that we're uploading it, we
# want it to be a float for fast searching.
reshaped_entry = {'time': float(time)}
reshaped_entry = {}
for key, val in timepoint_data.items():
if key not in ('_id', 'time'):
reshaped_entry[key] = val
# All other functions expect `time` key under `data` key
data_dict = reshaped_entry.setdefault('data', {})
reshaped_entry['data'] = dict(data_dict)
reshaped_entry['data']['time'] = float(time)
# Add required assembly ID if it does not exist
reshaped_entry.setdefault('assembly_id', str(uuid.uuid4()))
reshaped_data.append(reshaped_entry)
history_collection.insert_many(reshaped_data)

config_collection = client.configuration
config_collection = db.configuration
config_collection.insert_one(environment_config)


Expand Down
2 changes: 1 addition & 1 deletion vivarium/experiments/hierarchy_composite_division.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_hierarchy_update() -> None:
'Protein': {'X': 50 * units.mg / units.mL}}}}

# make a txtl composite, embedded under an 'agents' store
txtl_composer = AgentDivision({})
txtl_composer = AgentDivision({'growth': {'default_growth_noise': 0}})
txtl_composite1 = txtl_composer.generate(
{'agent_id': agent_id},
path=('agents', agent_id))
Expand Down
125 changes: 106 additions & 19 deletions vivarium/experiments/large_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,40 @@
import uuid
import random

import numpy as np

from vivarium.core.engine import Engine
from vivarium.core.process import Process
from vivarium.core.composer import Composer
from vivarium.core.emitter import (
get_experiment_database,
data_from_database,
data_to_database,
delete_experiment_from_database)


class ManyParametersProcess(Process):
defaults = {
'number_of_parameters': 100}
'number_of_parameters': 100,
'arr_size': 1}
def __init__(self, parameters=None):
super().__init__(parameters)
# make a bunch of parameters
random_parameters = {
key: random.random()
for key in range(self.parameters['number_of_parameters'])}
super().__init__(random_parameters)
for key in range(parameters['number_of_parameters'])}
super().__init__({**parameters, **random_parameters})
def ports_schema(self):
return {'port': {'variable': {'_default': 0, '_emit': True}}}
return {'port': {'_default': np.random.random(
self.parameters['arr_size']), '_emit': True}}
def next_update(self, timestep, states):
return {'port': {'variable': 1}}
return {'port': 1}


class ManyParametersComposite(Composer):
defaults = {
'number_of_processes': 10,
'number_of_parameters': 100}
'number_of_parameters': 100,
'arr_size': 1}
def __init__(self, config=None):
super().__init__(config)
self.process_ids = [
Expand All @@ -43,23 +48,24 @@ def generate_processes(self, config):
return {
process_id: ManyParametersProcess({
'name': process_id,
'number_of_parameters': self.config['number_of_parameters']})
'number_of_parameters': self.config['number_of_parameters'],
'arr_size': self.config['arr_size']})
for process_id in self.process_ids}
def generate_topology(self, config):
return {
process_id: {'port': ('store', process_id,)}
for process_id in self.process_ids}


def run_large_initial_emit():
def test_large_emits():
"""
This experiment runs a large experiment to test the database emitter.
This requires MongoDB to be configured and running.
"""

config = {
'number_of_processes': 1000,
'number_of_parameters': 1000}
'number_of_processes': 10,
'number_of_parameters': 100000,
'arr_size': 150000}

composer = ManyParametersComposite(config)
composite = composer.generate()
Expand All @@ -68,6 +74,7 @@ def run_large_initial_emit():
'experiment_name': 'large database experiment',
'experiment_id': f'large_{str(uuid.uuid4())}',
'emitter': 'database',
'emit_processes': True
}

experiment = Engine(**{
Expand All @@ -76,25 +83,105 @@ def run_large_initial_emit():
**settings})

# run the experiment
experiment.update(10)
experiment.update(1)

# retrieve the data with data_from_database
experiment_id = experiment.experiment_id

# retrieve the data from emitter
data = experiment.emitter.get_data()
assert list(data.keys()) == [
0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
assert list(data.keys()) == [0.0, 1.0]

# retrieve the data directly from database
db = get_experiment_database()
# check that configuration emit was split into sub-documents
config_cursor = db.configuration.find({'experiment_id': experiment_id})
config_raw = list(config_cursor)
assert len(config_raw) > 1
# check that sim emit was split into sub-documents (2 timesteps)
history_cursor = db.history.find({'experiment_id': experiment_id})
history_raw = list(history_cursor)
assert len(history_raw) > 2

# retrieve the data directly from database
data, experiment_config = data_from_database(experiment_id, db)
assert 'processes' in experiment_config
assert 0.0 in data

# check values of emitted data
processes = experiment_config['processes']
assert len(processes) == config['number_of_processes']
for proc in processes:
np.testing.assert_allclose(np.array(data[1.0]['store'][proc]),
np.array(data[0.0]['store'][proc]) + 1)

# delete the experiment
delete_experiment_from_database(experiment_id)


def test_query_db():
"""
This tests the query features of the MongoDB API.
"""
composer = ManyParametersComposite()
composite = composer.generate()
settings = {
'experiment_name': 'large database experiment',
'experiment_id': f'large_{str(uuid.uuid4())}',
'emitter': 'database'
}
experiment = Engine(**{
'processes': composite['processes'],
'topology': composite['topology'],
**settings})
experiment.update(10)

db = get_experiment_database()

# test query
query = [('store', 'process_0'), ('store', 'process_9')]
data, _ = data_from_database(experiment.experiment_id, db, query)
assert data[0.0]['store'].keys() == {'process_0', 'process_9'}

# test func_dict
func_dict = {
('store', 'process_0'): lambda x: 3,
}
data, _ = data_from_database(experiment.experiment_id, db, query,
func_dict=func_dict)
for emit_data in data.values():
assert emit_data['store']['process_0'] == 3

# test start and end time
data, _ = data_from_database(experiment.experiment_id, db,
start_time=1, end_time=5)
assert data.keys() == {1.0, 2.0, 3.0, 4.0, 5.0}

# test multiple cpu processes
data_multi, _ = data_from_database(experiment.experiment_id, db, cpus=2)
data, _ = data_from_database(experiment.experiment_id, db)
assert data_multi == data
delete_experiment_from_database(experiment.experiment_id, cpus=2)


def test_data_to_database():
data = {
'0.0': {'data': {'store': 1},
'experiment_id': 'manual_insert'},
'1.0': {'data': {'store': 2},
'experiment_id': 'manual_insert'},
'2.0': {'data': {'store': [3.5]},
'experiment_id': 'manual_insert'}
}
config = {'experiment_id': 'manual_insert',
'data': {'store': 1}}
db = get_experiment_database()
data_to_database(data, config, db)
retrieved_data, retrieved_config = data_from_database('manual_insert', db)
assert retrieved_data == {float(t): val['data'] for t, val in data.items()}
assert retrieved_config == config['data']
delete_experiment_from_database('manual_insert')
db.configuration.delete_many({'experiment_id': 'manual_insert'})


if __name__ == '__main__':
run_large_initial_emit()
test_large_emits()
test_query_db()
test_data_to_database()

0 comments on commit f92aa65

Please sign in to comment.