Skip to content

Commit

Permalink
SearchMetadatatoElasticasearchTask for elastic databuilder
Browse files Browse the repository at this point in the history
note: still need #1856 to be included to work
as highlight_options introduced in frontend app
but not yet merged in search service

Signed-off-by: wey-gu <[email protected]>
  • Loading branch information
wey-gu committed May 15, 2022
1 parent 37e71c6 commit 6f85a93
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 0 deletions.
37 changes: 37 additions & 0 deletions databuilder/example/scripts/sample_data_loader_nebula.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.nebula_csv_publisher import NebulaCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.task.search.search_metadata_to_elasticsearch_task import SearchMetadatatoElasticasearchTask
from databuilder.transformer.base_transformer import ChainedTransformer, NoopTransformer
from databuilder.transformer.complex_type_transformer import PARSING_FUNCTION, ComplexTypeTransformer
from databuilder.transformer.dict_to_model import MODEL_CLASS, DictToModel
Expand Down Expand Up @@ -675,6 +676,37 @@ def create_es_publisher_sample_job(
return job


def run_search_metadata_task(resource_type: str):
task_config = {
f'task.search_metadata_to_elasticsearch.{SearchMetadatatoElasticasearchTask.ENTITY_TYPE}':
resource_type,
f'task.search_metadata_to_elasticsearch.{SearchMetadatatoElasticasearchTask.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
es,
f'task.search_metadata_to_elasticsearch.{SearchMetadatatoElasticasearchTask.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
f'{resource_type}_search_index',
'extractor.search_data.entity_type':
resource_type,
'extractor.search_data.extractor.nebula.nebula_endpoints':
nebula_endpoints,
'extractor.search_data.extractor.nebula.nebula_auth_user':
nebula_user,
'extractor.search_data.extractor.nebula.nebula_auth_pw':
nebula_password,
'extractor.search_data.extractor.nebula.nebula_space':
nebula_space,
}

job_config = ConfigFactory.from_dict({
**task_config,
})

extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)

job = DefaultJob(conf=job_config, task=task)

job.launch()

if __name__ == "__main__":
# Uncomment next line to get INFO level logging
logging.basicConfig(level=logging.DEBUG)
Expand Down Expand Up @@ -785,6 +817,7 @@ def create_es_publisher_sample_job(

create_last_updated_job().launch()

# with ElasticsearchPublisher, which will be deprecated
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
Expand All @@ -810,3 +843,7 @@ def create_es_publisher_sample_job(
entity_type='dashboard',
elasticsearch_mapping=DASHBOARD_ELASTICSEARCH_INDEX_MAPPING)
job_es_dashboard.launch()

# with SearchMetadatatoElasticasearchTask
for resource_type in ['table', 'dashboard', 'user', 'feature']:
run_search_metadata_task(resource_type)
69 changes: 69 additions & 0 deletions databuilder/example/scripts/sample_search_metadata_task_nebula.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import os
import sys

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory

from databuilder.extractor.nebula_search_data_extractor import NebulaSearchDataExtractor
from databuilder.job.job import DefaultJob
from databuilder.task.search.search_metadata_to_elasticsearch_task import SearchMetadatatoElasticasearchTask

es_host = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_HOST', 'localhost')
NEBULA_ENDPOINTS = os.getenv('CREDENTIALS_NEBULA_ENDPOINTS', 'localhost:9669')

nebula_space = os.getenv('NEBULA_SPACE', 'amundsen')
es_port = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_PORT', 9200)

if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
nebula_endpoints = sys.argv[2]

es = Elasticsearch([
{'host': es_host, 'port': es_port},
])

nebula_endpoints = NEBULA_ENDPOINTS

nebula_user = 'root'
nebula_password = 'nebula'


def run_search_metadata_task(resource_type: str):
task_config = {
f'task.search_metadata_to_elasticsearch.{SearchMetadatatoElasticasearchTask.ENTITY_TYPE}':
resource_type,
f'task.search_metadata_to_elasticsearch.{SearchMetadatatoElasticasearchTask.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
es,
f'task.search_metadata_to_elasticsearch.{SearchMetadatatoElasticasearchTask.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
f'{resource_type}_search_index',
'extractor.search_data.entity_type':
resource_type,
'extractor.search_data.extractor.nebula.nebula_endpoints':
nebula_endpoints,
'extractor.search_data.extractor.nebula.nebula_auth_user':
nebula_user,
'extractor.search_data.extractor.nebula.nebula_auth_pw':
nebula_password,
'extractor.search_data.extractor.nebula.nebula_space':
nebula_space,
}

job_config = ConfigFactory.from_dict({
**task_config,
})

extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)

job = DefaultJob(conf=job_config, task=task)

job.launch()


if __name__ == "__main__":
for resource_type in ['table', 'dashboard', 'user', 'feature']:
run_search_metadata_task(resource_type)

0 comments on commit 6f85a93

Please sign in to comment.