From 95ff50e35ba8e84187308cfc2e7b72135c35eb88 Mon Sep 17 00:00:00 2001 From: Damien de Lemeny Date: Tue, 13 Feb 2024 11:07:36 -0500 Subject: [PATCH] Don't send index metadata, revamp Elastic querybuilder --- src/QueryBuilder/elastic.ts | 121 +++++------- .../BucketAggregationsEditor/state/reducer.ts | 2 +- .../ElasticsearchQueryContext.test.tsx | 5 +- .../QueryEditor/ElasticsearchQueryContext.tsx | 5 +- src/configuration/mocks.ts | 2 - src/dataquery.gen.ts | 4 - src/datasource.ts | 186 ++++-------------- src/mocks.ts | 2 - src/quickwit.ts | 2 - 9 files changed, 92 insertions(+), 237 deletions(-) diff --git a/src/QueryBuilder/elastic.ts b/src/QueryBuilder/elastic.ts index f66c64b..96f9340 100644 --- a/src/QueryBuilder/elastic.ts +++ b/src/QueryBuilder/elastic.ts @@ -1,84 +1,67 @@ import { + BucketAggregation, + ElasticsearchQuery, + MetricAggregation, TermsQuery, } from '../types'; -export class ElasticQueryBuilder { - timeField: string; - constructor(options: { timeField: string }) { - this.timeField = options.timeField; - } - - getRangeFilter() { - const filter: any = {}; - filter[this.timeField] = { - gte: '$timeFrom', - lte: '$timeTo', - // FIXME when Quickwit supports format. - // format: 'epoch_millis', - }; - - return filter; - } - - getTermsQuery(queryDef: TermsQuery) { - const query: any = { - size: 0, - query: { - bool: { - filter: [{ range: this.getRangeFilter() }], - }, - }, - }; - - if (queryDef.query) { - query.query.bool.filter.push({ - query_string: { - // FIXME when Quickwit supports analyze_wildcard. - // analyze_wildcard: true, - query: queryDef.query, - }, - }); - } +type OrderByType = '_key' | '_term' | '_count' - let size = 500; - if (queryDef.size) { - size = queryDef.size; +function getTermsAgg( + fieldName: string, + size: number, + orderBy: OrderByType = "_key", + order: 'asc'|'desc' = 'asc' + ): BucketAggregation { + return { + type: 'terms', + id: "", + field: fieldName, + settings:{ + size: size.toString(), + order: order, + orderBy: orderBy, } + } +} - query.aggs = { - '1': { - terms: { - field: queryDef.field, - size: size, - order: {}, - }, - }, - }; +export function getDataQuery(queryDef: TermsQuery, refId: string): ElasticsearchQuery { + const metrics: MetricAggregation[] = [ + {id:"count1", type:'count'} + ]; - // Default behaviour is to order results by { _key: asc } - // queryDef.order allows selection of asc/desc - // queryDef.orderBy allows selection of doc_count ordering (defaults desc) + // Default behaviour is to order results by { _key: asc } + // queryDef.order allows selection of asc/desc + // queryDef.orderBy allows selection of doc_count ordering (defaults desc) - const { orderBy = 'key', order = orderBy === 'doc_count' ? 'desc' : 'asc' } = queryDef; + let orderBy: OrderByType; + switch (queryDef.orderBy || 'key') { + case 'key': + case 'term': + orderBy = '_key' + break; + case 'doc_count': + orderBy = '_count' + break; + default: + throw { message: `Invalid query sort type ${queryDef.orderBy}` }; + } - if (['asc', 'desc'].indexOf(order) < 0) { - throw { message: `Invalid query sort order ${order}` }; - } + const {order = orderBy === '_count' ? 'desc' : 'asc' } = queryDef; + if (['asc', 'desc'].indexOf(order) < 0) { + throw { message: `Invalid query sort order ${order}` }; + } - switch (orderBy) { - case 'key': - case 'term': - const keyname = '_key'; - query.aggs['1'].terms.order[keyname] = order; - break; - case 'doc_count': - query.aggs['1'].terms.order['_count'] = order; - break; - default: - throw { message: `Invalid query sort type ${orderBy}` }; - } + const bucketAggs: BucketAggregation[] = []; + if (queryDef.field) { + bucketAggs.push(getTermsAgg(queryDef.field, 500, orderBy, order)) + } - return query; + return { + refId, + metrics, + bucketAggs, + query: queryDef.query, } } diff --git a/src/components/QueryEditor/BucketAggregationsEditor/state/reducer.ts b/src/components/QueryEditor/BucketAggregationsEditor/state/reducer.ts index b589386..77dfbd4 100644 --- a/src/components/QueryEditor/BucketAggregationsEditor/state/reducer.ts +++ b/src/components/QueryEditor/BucketAggregationsEditor/state/reducer.ts @@ -17,7 +17,7 @@ import { } from './actions'; export const createReducer = - (defaultTimeField: string) => + (defaultTimeField = "default timefield") => (state: ElasticsearchQuery['bucketAggs'], action: Action): ElasticsearchQuery['bucketAggs'] => { if (addBucketAggregation.match(action)) { const newAgg: Terms = { diff --git a/src/components/QueryEditor/ElasticsearchQueryContext.test.tsx b/src/components/QueryEditor/ElasticsearchQueryContext.test.tsx index 449e066..bbb5aeb 100644 --- a/src/components/QueryEditor/ElasticsearchQueryContext.test.tsx +++ b/src/components/QueryEditor/ElasticsearchQueryContext.test.tsx @@ -18,7 +18,7 @@ const query: ElasticsearchQuery = { describe('ElasticsearchQueryContext', () => { it('Should call onChange and onRunQuery with the default query when the query is empty', () => { - const datasource = { timeField: 'TIMEFIELD' } as ElasticDatasource; + const datasource = {} as ElasticDatasource; const onChange = jest.fn(); const onRunQuery = jest.fn(); @@ -39,9 +39,6 @@ describe('ElasticsearchQueryContext', () => { expect(changedQuery.metrics).toBeDefined(); expect(changedQuery.bucketAggs).toBeDefined(); - // Should also set timeField to the configured `timeField` option in datasource configuration - expect(changedQuery.timeField).toBe(datasource.timeField); - expect(onRunQuery).toHaveBeenCalled(); }); diff --git a/src/components/QueryEditor/ElasticsearchQueryContext.tsx b/src/components/QueryEditor/ElasticsearchQueryContext.tsx index 34e77c5..9cfcbd6 100644 --- a/src/components/QueryEditor/ElasticsearchQueryContext.tsx +++ b/src/components/QueryEditor/ElasticsearchQueryContext.tsx @@ -50,12 +50,11 @@ export const ElasticsearchProvider = ({ query: queryReducer, alias: aliasPatternReducer, metrics: metricsReducer, - bucketAggs: createBucketAggsReducer(datasource.timeField), + bucketAggs: createBucketAggsReducer(), }); const dispatch = useStatelessReducer( - // timeField is part of the query model, but its value is always set to be the one from datasource settings. - (newState) => onStateChange({ ...query, ...newState, timeField: datasource.timeField }), + (newState) => onStateChange({ ...query, ...newState }), query, reducer ); diff --git a/src/configuration/mocks.ts b/src/configuration/mocks.ts index dc8c621..c129106 100644 --- a/src/configuration/mocks.ts +++ b/src/configuration/mocks.ts @@ -4,8 +4,6 @@ import { QuickwitOptions } from 'quickwit'; export function createDefaultConfigOptions(): DataSourceSettings { return createDatasourceSettings({ - timeField: 'timestamp', - timeOutputFormat: 'unix_timestamp_millisecs', logMessageField: 'test.message', logLevelField: 'test.level', index: 'test', diff --git a/src/dataquery.gen.ts b/src/dataquery.gen.ts index efe68c0..e394915 100644 --- a/src/dataquery.gen.ts +++ b/src/dataquery.gen.ts @@ -395,10 +395,6 @@ export interface Elasticsearch extends DataQuery { * Lucene query */ query?: string; - /** - * Name of time field - */ - timeField?: string; } export const defaultElasticsearch: Partial = { diff --git a/src/datasource.ts b/src/datasource.ts index c6e5ef8..1df4a63 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -1,10 +1,11 @@ import { cloneDeep, first as _first, map as _map, groupBy } from 'lodash'; import { Observable, lastValueFrom, from, isObservable, of } from 'rxjs'; -import { catchError, mergeMap, map, shareReplay } from 'rxjs/operators'; +import { map, mergeMap } from 'rxjs/operators'; import { AbstractQuery, AdHocVariableFilter, + CoreApp, DataFrame, DataLink, DataQueryRequest, @@ -27,17 +28,16 @@ import { QueryFixAction, ScopedVars, SupplementaryQueryType, - TestDataSourceResponse, TimeRange, } from '@grafana/data'; -import { BucketAggregation, DataLinkConfig, ElasticsearchQuery, Field as QuickwitField, FieldMapping, IndexMetadata, TermsQuery, FieldCapabilitiesResponse } from './types'; +import { BucketAggregation, DataLinkConfig, ElasticsearchQuery, TermsQuery, FieldCapabilitiesResponse } from './types'; import { DataSourceWithBackend, getTemplateSrv, TemplateSrv, getDataSourceSrv } from '@grafana/runtime'; import { QuickwitOptions } from 'quickwit'; -import { ElasticQueryBuilder } from 'QueryBuilder/elastic'; +import { getDataQuery } from 'QueryBuilder/elastic'; import { colors } from '@grafana/ui'; import { BarAlignment, DataQuery, GraphDrawStyle, StackingMode } from '@grafana/schema'; @@ -47,7 +47,7 @@ import { bucketAggregationConfig } from 'components/QueryEditor/BucketAggregatio import { isBucketAggregationWithField } from 'components/QueryEditor/BucketAggregationsEditor/aggregations'; import ElasticsearchLanguageProvider from 'LanguageProvider'; import { ReactNode } from 'react'; -import { extractJsonPayload, fieldTypeMap } from 'utils'; +import { fieldTypeMap } from 'utils'; import { addAddHocFilter } from 'modifyQuery'; import { LogContextProvider, LogRowContextOptions } from './LogContext/LogContextProvider'; @@ -62,15 +62,6 @@ type FieldCapsSpec = { _range?: TimeRange } -function getTimeFieldInfoFromIndexMetadata(indexMetadata: any){ - let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings); - let timestampFieldName = indexMetadata.index_config.doc_mapping.timestamp_field - let timestampField = fields.find((field) => field.json_path === timestampFieldName); - let timestampFormat = timestampField ? timestampField.field_mapping.output_format || '' : '' - let timestampFieldInfos = { 'field': timestampFieldName, 'format': timestampFormat } - return timestampFieldInfos -} - export class QuickwitDataSource extends DataSourceWithBackend implements @@ -79,8 +70,6 @@ export class QuickwitDataSource DataSourceWithQueryImportSupport { index: string; - timeField: string; - timeOutputFormat: string; logMessageField?: string; logLevelField?: string; dataLinks: DataLinkConfig[]; @@ -88,12 +77,6 @@ export class QuickwitDataSource private logContextProvider: LogContextProvider; - - // Observables from index metadata - indexMetadata$: any; - timeFieldInfo$: Observable<{field: string, format: string}> - queryBuilder$: Observable; - constructor( instanceSettings: DataSourceInstanceSettings, private readonly templateSrv: TemplateSrv = getTemplateSrv() @@ -101,34 +84,6 @@ export class QuickwitDataSource super(instanceSettings); const settingsData = instanceSettings.jsonData || ({} as QuickwitOptions); this.index = settingsData.index || ''; - this.timeField = '' - this.timeOutputFormat = '' - - this.indexMetadata$ = from(this.getResource('indexes/' + this.index)) - this.timeFieldInfo$ = this.indexMetadata$.pipe( - map(getTimeFieldInfoFromIndexMetadata), - catchError((err) => { - if (!err.data || !err.data.error) { - let err_source = extractJsonPayload(err.data.error) - if(!err_source) { - throw err - } - } - // the error will be handle in the testDatasource function - return of({'field': '', 'format': ''}) - }), - shareReplay(1), - ) - this.timeFieldInfo$.subscribe((timeFieldInfo)=>{ - this.timeField = timeFieldInfo.field; - this.timeOutputFormat = timeFieldInfo.format - }) - this.queryBuilder$ = this.timeFieldInfo$.pipe( - map((timeFieldInfo)=> { - return new ElasticQueryBuilder({timeField:timeFieldInfo.field}) - }), - shareReplay(1), - ) this.logMessageField = settingsData.logMessageField || ''; this.logLevelField = settingsData.logLevelField || ''; this.dataLinks = settingsData.dataLinks || []; @@ -157,58 +112,11 @@ export class QuickwitDataSource message: 'Cannot save datasource, `index` is required', }; } - const validation$: Observable = this.indexMetadata$.pipe( - mergeMap((indexMetadata: IndexMetadata): Observable => { - let error = this.validateIndexConfig(indexMetadata); - if (error) { - return of({ - status: 'error', - message: error, - }); - } - return of({ status: 'success', message: `Index OK. Time field name OK` }); - }), - catchError((err): Observable => { - if (err.data && err.data.error) { - let err_source = extractJsonPayload(err.data.error) - if (err_source) { - err = err_source - } - } - - if (err.status && err.status === 404) { - return of({ status: 'error', message: 'Index does not exists.' }); - } else if (err.message) { - return of({ status: 'error', message: err.message }); - } else { - return of({ status: 'error', message: err.status }); - } - }) - ) - - return lastValueFrom(validation$); - } - - validateIndexConfig(indexMetadata: IndexMetadata): string | undefined { - // Check timestamp field. - if (this.timeField === '') { - return `Time field must not be empty`; - } - - let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings); - let timestampField = fields.find((field) => field.json_path === this.timeField); - - // Should never happen. - if (timestampField === undefined) { - return `No field named '${this.timeField}' found in the doc mapping. This should never happen.`; - } + const backendCheck = from(this.callHealthCheck()).pipe( + mergeMap((res) => of(res)) + ) - let timeOutputFormat = timestampField.field_mapping.output_format || 'unknown'; - const supportedTimestampOutputFormats = ['unix_timestamp_secs', 'unix_timestamp_millis', 'unix_timestamp_micros', 'unix_timestamp_nanos', 'iso8601', 'rfc3339']; - if (!supportedTimestampOutputFormats.includes(timeOutputFormat)) { - return `Timestamp output format '${timeOutputFormat} is not yet supported.`; - } - return; + return lastValueFrom(backendCheck) } async importFromAbstractQueries(abstractQueries: AbstractQuery[]): Promise { @@ -262,7 +170,6 @@ export class QuickwitDataSource return undefined; } const bucketAggs: BucketAggregation[] = []; - const timeField = this.timeField ?? 'timestamp'; if (this.logLevelField) { bucketAggs.push({ @@ -285,14 +192,12 @@ export class QuickwitDataSource min_doc_count: '0', trimEdges: '0', }, - field: timeField, }); return { refId: `${REF_ID_STARTER_LOG_VOLUME}${query.refId}`, query: query.query, metrics: [{ type: 'count', id: '1' }], - timeField, bucketAggs, }; @@ -350,42 +255,39 @@ export class QuickwitDataSource return { ...query, query: expression }; } - getTerms(queryDef: TermsQuery, range = getDefaultTimeRange()): Observable { - const header = JSON.stringify({ - ignore_unavailable: true, - index: this.index, - }); - - const resourceOptions = { - headers: { 'content-type': 'application/x-ndjson' } - }; - - return this.queryBuilder$.pipe( - mergeMap((queryBuilder)=>{ - let esQuery = JSON.stringify(queryBuilder.getTermsQuery(queryDef)); - esQuery = esQuery.replace(/\$timeFrom/g, range.from.valueOf().toString()); - esQuery = esQuery.replace(/\$timeTo/g, range.to.valueOf().toString()); - esQuery = header + '\n' + esQuery + '\n'; - return from(this.postResource("_elastic/_msearch", esQuery, resourceOptions)) - }), - map((res) => { - if (!res.responses[0].aggregations) { - return []; - } + getDataQueryRequest(queryDef: TermsQuery, range: TimeRange) { + let dataQuery = getDataQuery(queryDef, 'getTerms'); + const request: DataQueryRequest = { + app: CoreApp.Unknown, + requestId: 'GetTerms', + interval: '', + intervalMs: 0, + range, + targets:[dataQuery], + timezone:'browser', + scopedVars:{}, + startTime: Date.now(), + } + return request + } - const buckets = res.responses[0].aggregations['1'].buckets; - return _map(buckets, (bucket) => { - return { - text: bucket.key_as_string || bucket.key, - value: bucket.key, - }; - }); + getTerms(queryDef: TermsQuery, range = getDefaultTimeRange()): Observable { + const dataquery = this.getDataQueryRequest(queryDef, range) + return super.query(dataquery).pipe( + mergeMap(res=> res.data.map((df: DataFrame)=>{ + + return df.fields[0]!.values.map((bucket)=>({ + text: bucket, + value: bucket, + })) }) - ); + ) + ) } getFields(spec: FieldCapsSpec={}): Observable { // TODO: use the time range. + return from(this.getResource('_elastic/' + this.index + '/_field_caps')).pipe( map((field_capabilities_response: FieldCapabilitiesResponse) => { const shouldAddField = (field: any) => { @@ -594,22 +496,6 @@ export class QuickwitDataSource } } -// Returns a flatten array of fields and nested fields found in the given `FieldMapping` array. -function getAllFields(field_mappings: FieldMapping[]): QuickwitField[] { - const fields: QuickwitField[] = []; - for (const field_mapping of field_mappings) { - if (field_mapping.type === 'object' && field_mapping.field_mappings !== undefined) { - for (const child_field_mapping of getAllFields(field_mapping.field_mappings)) { - fields.push({json_path: field_mapping.name + '.' + child_field_mapping.json_path, path_segments: [field_mapping.name].concat(child_field_mapping.path_segments), field_mapping: child_field_mapping.field_mapping}) - } - } else { - fields.push({json_path: field_mapping.name, path_segments: [field_mapping.name], field_mapping: field_mapping}); - } - } - - return fields; -} - /** * Creates an observable, which makes requests to get logs volume and aggregates results. */ diff --git a/src/mocks.ts b/src/mocks.ts index 097d01f..158c9e7 100644 --- a/src/mocks.ts +++ b/src/mocks.ts @@ -38,8 +38,6 @@ export function createElasticDatasource( access: 'proxy', url: '', jsonData: { - timeField: '', - timeOutputFormat: '', index: '', ...jsonData, }, diff --git a/src/quickwit.ts b/src/quickwit.ts index 94e3725..dffad7a 100644 --- a/src/quickwit.ts +++ b/src/quickwit.ts @@ -2,8 +2,6 @@ import { DataSourceJsonData } from "@grafana/data"; import { DataLinkConfig } from "./types"; export interface QuickwitOptions extends DataSourceJsonData { - timeField: string; - timeOutputFormat: string; interval?: Interval; logMessageField?: string; logLevelField?: string;