Skip to content

Commit c857e0d

Browse files
committed
Fix race condition on datasource.queryBuilder, fixes #55
1 parent 55f88c7 commit c857e0d

File tree

1 file changed

+48
-39
lines changed

1 file changed

+48
-39
lines changed

src/datasource.ts

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { cloneDeep, first as _first, map as _map, groupBy } from 'lodash';
22
import { Observable, lastValueFrom, from, isObservable, of } from 'rxjs';
3-
import { catchError, mergeMap, map } from 'rxjs/operators';
3+
import { catchError, mergeMap, map, shareReplay } from 'rxjs/operators';
44

55
import {
66
AbstractQuery,
@@ -27,6 +27,7 @@ import {
2727
QueryFixAction,
2828
ScopedVars,
2929
SupplementaryQueryType,
30+
TestDataSourceResponse,
3031
TimeRange,
3132
} from '@grafana/data';
3233
import { BucketAggregation, DataLinkConfig, ElasticsearchQuery, Field as QuickwitField, FieldMapping, IndexMetadata, TermsQuery, FieldCapabilitiesResponse } from './types';
@@ -61,6 +62,15 @@ type FieldCapsSpec = {
6162
_range?: TimeRange
6263
}
6364

65+
function getTimeFieldInfoFromIndexMetadata(indexMetadata: any){
66+
let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings);
67+
let timestampFieldName = indexMetadata.index_config.doc_mapping.timestamp_field
68+
let timestampField = fields.find((field) => field.json_path === timestampFieldName);
69+
let timestampFormat = timestampField ? timestampField.field_mapping.output_format || '' : ''
70+
let timestampFieldInfos = { 'field': timestampFieldName, 'format': timestampFormat }
71+
return timestampFieldInfos
72+
}
73+
6474
export class QuickwitDataSource
6575
extends DataSourceWithBackend<ElasticsearchQuery, QuickwitOptions>
6676
implements
@@ -73,12 +83,17 @@ export class QuickwitDataSource
7383
timeOutputFormat: string;
7484
logMessageField?: string;
7585
logLevelField?: string;
76-
queryBuilder: ElasticQueryBuilder;
7786
dataLinks: DataLinkConfig[];
7887
languageProvider: ElasticsearchLanguageProvider;
7988

8089
private logContextProvider: LogContextProvider;
8190

91+
92+
// Observables from index metadata
93+
indexMetadata$: any;
94+
timeFieldInfo$: Observable<{field: string, format: string}>
95+
queryBuilder$: Observable<ElasticQueryBuilder>;
96+
8297
constructor(
8398
instanceSettings: DataSourceInstanceSettings<QuickwitOptions>,
8499
private readonly templateSrv: TemplateSrv = getTemplateSrv()
@@ -88,37 +103,32 @@ export class QuickwitDataSource
88103
this.index = settingsData.index || '';
89104
this.timeField = ''
90105
this.timeOutputFormat = ''
91-
this.queryBuilder = new ElasticQueryBuilder({
92-
timeField: this.timeField,
93-
});
94-
from(this.getResource('indexes/' + this.index)).pipe(
95-
map((indexMetadata) => {
96-
let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings);
97-
let timestampFieldName = indexMetadata.index_config.doc_mapping.timestamp_field
98-
let timestampField = fields.find((field) => field.json_path === timestampFieldName);
99-
let timestampFormat = timestampField ? timestampField.field_mapping.output_format || '' : ''
100-
let timestampFieldInfos = { 'field': timestampFieldName, 'format': timestampFormat }
101-
return timestampFieldInfos
102-
}),
106+
107+
this.indexMetadata$ = from(this.getResource('indexes/' + this.index))
108+
this.timeFieldInfo$ = this.indexMetadata$.pipe(
109+
map(getTimeFieldInfoFromIndexMetadata),
103110
catchError((err) => {
104111
if (!err.data || !err.data.error) {
105112
let err_source = extractJsonPayload(err.data.error)
106113
if(!err_source) {
107114
throw err
108115
}
109116
}
110-
111117
// the error will be handle in the testDatasource function
112118
return of({'field': '', 'format': ''})
113-
})
114-
).subscribe(result => {
115-
this.timeField = result.field;
116-
this.timeOutputFormat = result.format;
117-
this.queryBuilder = new ElasticQueryBuilder({
118-
timeField: this.timeField,
119-
});
120-
});
121-
119+
}),
120+
shareReplay(1),
121+
)
122+
this.timeFieldInfo$.subscribe((timeFieldInfo)=>{
123+
this.timeField = timeFieldInfo.field;
124+
this.timeOutputFormat = timeFieldInfo.format
125+
})
126+
this.queryBuilder$ = this.timeFieldInfo$.pipe(
127+
map((timeFieldInfo)=> {
128+
return new ElasticQueryBuilder({timeField:timeFieldInfo.field})
129+
}),
130+
shareReplay(1),
131+
)
122132
this.logMessageField = settingsData.logMessageField || '';
123133
this.logLevelField = settingsData.logLevelField || '';
124134
this.dataLinks = settingsData.dataLinks || [];
@@ -147,10 +157,8 @@ export class QuickwitDataSource
147157
message: 'Cannot save datasource, `index` is required',
148158
};
149159
}
150-
151-
return lastValueFrom(
152-
from(this.getResource('indexes/' + this.index)).pipe(
153-
mergeMap((indexMetadata) => {
160+
const validation$: Observable<TestDataSourceResponse> = this.indexMetadata$.pipe(
161+
mergeMap((indexMetadata: IndexMetadata): Observable<TestDataSourceResponse> => {
154162
let error = this.validateIndexConfig(indexMetadata);
155163
if (error) {
156164
return of({
@@ -160,7 +168,7 @@ export class QuickwitDataSource
160168
}
161169
return of({ status: 'success', message: `Index OK. Time field name OK` });
162170
}),
163-
catchError((err) => {
171+
catchError((err): Observable<TestDataSourceResponse> => {
164172
if (err.data && err.data.error) {
165173
let err_source = extractJsonPayload(err.data.error)
166174
if (err_source) {
@@ -177,7 +185,8 @@ export class QuickwitDataSource
177185
}
178186
})
179187
)
180-
);
188+
189+
return lastValueFrom(validation$);
181190
}
182191

183192
validateIndexConfig(indexMetadata: IndexMetadata): string | undefined {
@@ -347,18 +356,18 @@ export class QuickwitDataSource
347356
index: this.index,
348357
});
349358

350-
let esQuery = JSON.stringify(this.queryBuilder.getTermsQuery(queryDef));
351-
esQuery = esQuery.replace(/\$timeFrom/g, range.from.valueOf().toString());
352-
esQuery = esQuery.replace(/\$timeTo/g, range.to.valueOf().toString());
353-
esQuery = header + '\n' + esQuery + '\n';
354359
const resourceOptions = {
355-
headers: {
356-
'content-type': 'application/x-ndjson'
357-
}
360+
headers: { 'content-type': 'application/x-ndjson' }
358361
};
359-
const termsObservable = from(this.postResource("_elastic/_msearch", esQuery, resourceOptions));
360362

361-
return termsObservable.pipe(
363+
return this.queryBuilder$.pipe(
364+
mergeMap((queryBuilder)=>{
365+
let esQuery = JSON.stringify(queryBuilder.getTermsQuery(queryDef));
366+
esQuery = esQuery.replace(/\$timeFrom/g, range.from.valueOf().toString());
367+
esQuery = esQuery.replace(/\$timeTo/g, range.to.valueOf().toString());
368+
esQuery = header + '\n' + esQuery + '\n';
369+
return from(this.postResource("_elastic/_msearch", esQuery, resourceOptions))
370+
}),
362371
map((res) => {
363372
if (!res.responses[0].aggregations) {
364373
return [];

0 commit comments

Comments
 (0)