Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add perf test for nested field #1394

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions benchmarks/perf-tool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,26 @@ Ingests a dataset of multiple context types into the cluster.
| ----------- | ----------- | ----------- |
| took | Total time to ingest the dataset into the index.| ms |

#### ingest_nested_field

Ingests a dataset with nested field into the cluster.

##### Parameters

| Parameter Name | Description | Default |
| ----------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ----------- |
| index_name | Name of index to ingest into | No default |
| field_name | Name of field to ingest into | No default |
| dataset_path | Path to data-set | No default |
| attributes_dataset_name | Name of dataset with additional attributes inside the main dataset | No default |
| attribute_spec | Definition of attributes, format is: [{ name: [name_val], type: [type_val]}] Order is important and must match order of attributes column in dataset file. It should contains { name: 'parent_id', type: 'int'} | No default |

##### Metrics

| Metric Name | Description | Unit |
| ----------- | ----------- | ----------- |
| took | Total time to ingest the dataset into the index.| ms |

#### query

Runs a set of queries against an index.
Expand Down Expand Up @@ -330,6 +350,36 @@ Runs a set of queries with filter against an index.
| recall@R | ratio of top R results from the ground truth neighbors that are in the K results returned by the plugin | float 0.0-1.0 |
| recall@K | ratio of results returned that were ground truth nearest neighbors | float 0.0-1.0 |


#### query_nested_field

Runs a set of queries with nested field against an index.

##### Parameters

| Parameter Name | Description | Default |
| ----------- |-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------|
| k | Number of neighbors to return on search | 100 |
| r | r value in Recall@R | 1 |
| index_name | Name of index to search | No default |
| field_name | Name field to search | No default |
| calculate_recall | Whether to calculate recall values | False |
| dataset_format | Format the dataset is in. Currently hdf5 and bigann is supported. The hdf5 file must be organized in the same way that the ann-benchmarks organizes theirs. | 'hdf5' |
| dataset_path | Path to dataset | No default |
| neighbors_format | Format the neighbors dataset is in. Currently hdf5 and bigann is supported. The hdf5 file must be organized in the same way that the ann-benchmarks organizes theirs. | 'hdf5' |
| neighbors_path | Path to neighbors dataset | No default |
| neighbors_dataset | Name of filter dataset inside the neighbors dataset | No default |
| query_count | Number of queries to create from data-set | Size of the data-set |

##### Metrics

| Metric Name | Description | Unit |
| ----------- | ----------- | ----------- |
| took | Took times returned per query aggregated as total, p50, p90 and p99 (when applicable) | ms |
| memory_kb | Native memory k-NN is using at the end of the query workload | KB |
| recall@R | ratio of top R results from the ground truth neighbors that are in the K results returned by the plugin | float 0.0-1.0 |
| recall@K | ratio of results returned that were ground truth nearest neighbors | float 0.0-1.0 |

#### get_stats

Gets the index stats.
Expand Down
25 changes: 11 additions & 14 deletions benchmarks/perf-tool/add-parent-doc-id-to-dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,18 @@ def run(self, source_path, target_path) -> None:
cpus = multiprocessing.cpu_count()
total_clients = min(8, cpus) # 1 # 10
hdf5Data_train = HDF5DataSet(target_path, "train")
train_vectors = hdf5Data_train.read(0, 1000000)
train_vectors = hdf5Data_train.read(0, hdf5Data_train.size())
hdf5Data_train.close()
print(f'Train vector size: {len(train_vectors)}')

hdf5Data_test = HDF5DataSet(target_path, "test")
total_queries = 10000 # 10000
total_queries = hdf5Data_test.size() # 10000
dis = [] * total_queries

for i in range(total_queries):
dis.insert(i, [])

queries_per_client = int(total_queries / total_clients)
queries_per_client = int(total_queries / total_clients + 0.5)
if queries_per_client == 0:
queries_per_client = total_queries

Expand All @@ -176,7 +176,7 @@ def run(self, source_path, target_path) -> None:
if start_index + queries_per_client <= total_queries:
end_index = int(start_index + queries_per_client)
else:
end_index = total_queries - start_index
end_index = total_queries

print(f'Start Index: {start_index}, end Index: {end_index}')
print(f'client is : {client}')
Expand All @@ -199,7 +199,6 @@ def run(self, source_path, target_path) -> None:
i = 0
for d in calculatedDis:
if d:
print("Dis is not null")
dis[i] = d
j = j + 1
i = i + 1
Expand Down Expand Up @@ -238,14 +237,12 @@ def queryTask(train_vectors, test_vectors, startIndex, endIndex, process_number,
i = startIndex
for test in test_vectors:
distances = []
parent_ids = {}
values = {}
for value in train_vectors:
parent_ids[value.id] = value.parent_id
values[value.id] = value
distances.append({
"dis": calculateL2Distance(test.vector, value.vector),
"id": value.id
"id": value.parent_id
})

distances.sort(key=lambda vector: vector['dis'])
Expand All @@ -258,15 +255,15 @@ def queryTask(train_vectors, test_vectors, startIndex, endIndex, process_number,
for sub_i in range(len(distances)):
id = distances[sub_i]['id']
# Check if the number has been seen before
if len(nested) < 1000 and parent_ids[id] not in seen_set_nested:
if len(nested) < 1000 and id not in seen_set_nested:
# If not seen before, mark it as seen
seen_set_nested.add(parent_ids[id])
seen_set_nested.add(id)
nested.append(distances[sub_i])
if len(restricted) < 1000 and parent_ids[id] not in seen_set_restricted and values[id].apply_restricted_filter():
seen_set_restricted.add(parent_ids[id])
if len(restricted) < 1000 and id not in seen_set_restricted and values[id].apply_restricted_filter():
seen_set_restricted.add(id)
restricted.append(distances[sub_i])
if len(relaxed) < 1000 and parent_ids[id] not in seen_set_relaxed and values[id].apply_relaxed_filter():
seen_set_relaxed.add(parent_ids[id])
if len(relaxed) < 1000 and id not in seen_set_relaxed and values[id].apply_relaxed_filter():
seen_set_relaxed.add(id)
relaxed.append(distances[sub_i])

all_distances[i]['nested'] = nested
Expand Down
Binary file added benchmarks/perf-tool/dataset/data-nested.hdf5
Binary file not shown.
Binary file not shown.
6 changes: 5 additions & 1 deletion benchmarks/perf-tool/okpt/test/steps/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from okpt.test.steps.steps import CreateIndexStep, DisableRefreshStep, RefreshIndexStep, DeleteIndexStep, \
TrainModelStep, DeleteModelStep, ForceMergeStep, ClearCacheStep, IngestStep, IngestMultiFieldStep, \
QueryStep, QueryWithFilterStep, GetStatsStep, WarmupStep
IngestNestedFieldStep, QueryStep, QueryWithFilterStep, QueryNestedFieldStep, GetStatsStep, WarmupStep


def create_step(step_config: StepConfig) -> Step:
Expand All @@ -30,10 +30,14 @@ def create_step(step_config: StepConfig) -> Step:
return IngestStep(step_config)
elif step_config.step_name == IngestMultiFieldStep.label:
return IngestMultiFieldStep(step_config)
elif step_config.step_name == IngestNestedFieldStep.label:
return IngestNestedFieldStep(step_config)
elif step_config.step_name == QueryStep.label:
return QueryStep(step_config)
elif step_config.step_name == QueryWithFilterStep.label:
return QueryWithFilterStep(step_config)
elif step_config.step_name == QueryNestedFieldStep.label:
return QueryNestedFieldStep(step_config)
elif step_config.step_name == ForceMergeStep.label:
return ForceMergeStep(step_config)
elif step_config.step_name == ClearCacheStep.label:
Expand Down
155 changes: 153 additions & 2 deletions benchmarks/perf-tool/okpt/test/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ def action(doc_id):
for i in range(0, self.doc_count, self.bulk_size):
partition = self.dataset.read(self.bulk_size)
self._handle_data_bulk(partition, action, i)

self.dataset.reset()

return {}
Expand Down Expand Up @@ -379,6 +378,7 @@ def __init__(self, step_config: StepConfig):
step_config.config, {}, [])

self.partition_attr = self.attributes_dataset.read(self.doc_count)
self.action_buffer = None

def _handle_data_bulk(self, partition, action, i):
if partition is None:
Expand Down Expand Up @@ -429,6 +429,118 @@ def bulk_transform_with_attributes(self, partition: np.ndarray, partition_attr,
return actions


class IngestNestedFieldStep(BaseIngestStep):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""See base class."""

label = 'ingest_nested_field'

def __init__(self, step_config: StepConfig):
super().__init__(step_config)

dataset_path = parse_string_param('dataset_path', step_config.config,
{}, None)

self.attributes_dataset_name = parse_string_param('attributes_dataset_name',
step_config.config, {}, None)

self.attributes_dataset = parse_dataset('hdf5', dataset_path,
Context.CUSTOM, self.attributes_dataset_name)

self.attribute_spec = parse_list_param('attribute_spec',
step_config.config, {}, [])

self.partition_attr = self.attributes_dataset.read(self.doc_count)

if self.dataset.size() != self.doc_count:
raise ValueError("custom doc_count is not supported for nested field")
self.action_buffer = None
self.action_parent_id = None
self.count = 0

def _handle_data_bulk(self, partition, action, i):
if partition is None:
return
body = self.bulk_transform_with_nested(partition, self.partition_attr, self.field_name,
action, i, self.attribute_spec)
if len(body) > 0:
bulk_index(self.opensearch, self.index_name, body)

def bulk_transform_with_nested(self, partition: np.ndarray, partition_attr, field_name: str,
action, offset: int, attributes_def) -> List[Dict[str, Any]]:
"""Partitions and transforms a list of vectors into OpenSearch's bulk
injection format.
Args:
partition: An array of vectors to transform.
partition_attr: dictionary of additional data to transform
field_name: field name for action
action: Bulk API action.
offset: to start counting from
attributes_def: definition of additional doc fields
Returns:
An array of transformed vectors in bulk format.
"""
# offset is index of start row. We need number of parent doc - 1.
# The number of parent document can be calculated by using partition_attr data.
# We need to keep the last parent doc aside so that additional data can be added later.
parent_id_idx = next((index for (index, d) in enumerate(attributes_def) if d.get('name') == 'parent_id'), None)
if parent_id_idx is None:
raise ValueError("parent_id should be provided as attribute spec")
if attributes_def[parent_id_idx]['type'] != 'int':
raise ValueError("parent_id should be int type")

first_index = offset
last_index = offset + len(partition) - 1
num_of_actions = int(partition_attr[last_index][parent_id_idx].decode()) - int(partition_attr[first_index][parent_id_idx].decode())
if self.action_buffer is None:
self.action_buffer = {"nested_field": []}
self.action_parent_id = int(partition_attr[first_index][parent_id_idx].decode())

actions = []
_ = [
actions.extend([action(i + self.action_parent_id), None])
for i in range(num_of_actions)
]

idx = 1
part_list = partition.tolist()
for i in range(len(partition)):
self.count += 1
nested = {field_name: part_list[i]}
attr_idx = i + offset
attr_def_idx = 0
current_parent_id = None
for attribute in attributes_def:
attr_def_name = attribute['name']
attr_def_type = attribute['type']
if attr_def_name == "parent_id":
current_parent_id = int(partition_attr[attr_idx][attr_def_idx].decode())
attr_def_idx += 1
continue

if attr_def_type == 'str':
val = partition_attr[attr_idx][attr_def_idx].decode()
if val != 'None':
nested[attr_def_name] = val
elif attr_def_type == 'int':
val = int(partition_attr[attr_idx][attr_def_idx].decode())
nested[attr_def_name] = val
attr_def_idx += 1

if self.action_parent_id == current_parent_id:
self.action_buffer["nested_field"].append(nested)
else:
actions.extend([action(self.action_parent_id), self.action_buffer])
self.action_buffer = {"nested_field": []}
self.action_buffer["nested_field"].append(nested)
self.action_parent_id = current_parent_id
idx += 2

if self.count == self.doc_count:
actions.extend([action(self.action_parent_id), self.action_buffer])

return actions


class BaseQueryStep(OpenSearchStep):
"""See base class."""

Expand Down Expand Up @@ -469,7 +581,7 @@ def _action(self):
break
query_responses.append(
query_index(self.opensearch, self.index_name,
self.get_body(query[0]) , [self.field_name]))
self.get_body(query[0]) , self.get_exclude_fields()))

results['took'] = [
float(query_response['took']) for query_response in query_responses
Expand Down Expand Up @@ -506,6 +618,8 @@ def _get_measures(self) -> List[str]:
def get_body(self, vec):
pass

def get_exclude_fields(self):
return [self.field_name]

class QueryStep(BaseQueryStep):
"""See base class."""
Expand Down Expand Up @@ -611,6 +725,43 @@ def get_body(self, vec):
else:
raise ConfigurationError('Not supported filter type {}'.format(self.filter_type))

class QueryNestedFieldStep(BaseQueryStep):
"""See base class."""

label = 'query_nested_field'

def __init__(self, step_config: StepConfig):
super().__init__(step_config)

neighbors_dataset = parse_string_param('neighbors_dataset',
step_config.config, {}, None)

self.neighbors = parse_dataset(self.neighbors_format, self.neighbors_path,
Context.CUSTOM, neighbors_dataset)

self.implicit_config = step_config.implicit_config

def get_body(self, vec):
return {
'size': self.k,
'query': {
'nested': {
'path': 'nested_field',
'query': {
'knn': {
'nested_field.' + self.field_name: {
'vector': vec,
'k': self.k
}
}
}
}
}
}

def get_exclude_fields(self):
return ['nested_field.' + self.field_name]

class GetStatsStep(OpenSearchStep):
"""See base class."""

Expand Down
Loading
Loading