diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index b43e8e6..990044c 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -91,7 +91,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc } if !toOk { - timeField, err = GetTimestampFieldInfos(index, settings.URL, httpCli) + timeField, err = GetTimestampField(index, settings.URL, httpCli) if nil != err { return nil, err } diff --git a/pkg/quickwit/timestamp_infos.go b/pkg/quickwit/timestamp_infos.go index ab1b018..ccbf978 100644 --- a/pkg/quickwit/timestamp_infos.go +++ b/pkg/quickwit/timestamp_infos.go @@ -6,13 +6,13 @@ import ( "fmt" "io" "net/http" + "strings" ) -type QuickwitMapping struct { +type QuickwitIndexMetadata struct { IndexConfig struct { DocMapping struct { - TimestampField string `json:"timestamp_field"` - FieldMappings []FieldMappings `json:"field_mappings"` + TimestampField string `json:"timestamp_field"` } `json:"doc_mapping"` } `json:"index_config"` } @@ -34,25 +34,48 @@ func NewErrorCreationPayload(statusCode int, message string) error { return errors.New(string(json)) } -func DecodeTimestampFieldInfos(statusCode int, body []byte) (string, error) { - var payload QuickwitMapping - err := json.Unmarshal(body, &payload) +// TODO: refactor either by using a timestamp alias suppprted by quickwit +// or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint +// /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1 +func GetTimestampField(index string, qwickwitUrl string, cli *http.Client) (string, error) { + if strings.Contains(index, "*") || strings.Contains(index, ",") { + return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli) + } + return GetTimestampFieldFromIndex(index, qwickwitUrl, cli) +} +func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, error) { + mappingEndpointUrl := qwickwitUrl + "/indexes/" + index + qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl) + r, err := cli.Get(mappingEndpointUrl) if err != nil { - errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) + qwlog.Error(errMsg) + return "", err + } + + statusCode := r.StatusCode + + if statusCode < 200 || statusCode >= 400 { + errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) qwlog.Error(errMsg) return "", NewErrorCreationPayload(statusCode, errMsg) } - timestampFieldName := payload.IndexConfig.DocMapping.TimestampField + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + if err != nil { + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) + qwlog.Error(errMsg) + return "", NewErrorCreationPayload(statusCode, errMsg) + } - qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) - return timestampFieldName, nil + return DecodeTimestampFieldFromIndexConfig(body) } -func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, error) { - mappingEndpointUrl := qwUrl + "/indexes/" + index - qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl) +func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, error) { + mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_pattern=" + indexPattern + qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl) r, err := cli.Get(mappingEndpointUrl) if err != nil { errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) @@ -76,5 +99,45 @@ func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin return "", NewErrorCreationPayload(statusCode, errMsg) } - return DecodeTimestampFieldInfos(statusCode, body) + return DecodeTimestampFieldFromIndexConfigs(body) +} + +func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, error) { + var payload []QuickwitIndexMetadata + err := json.Unmarshal(body, &payload) + if err != nil { + errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) + qwlog.Error(errMsg) + return "", NewErrorCreationPayload(500, errMsg) + } + + var timestampFieldName string = "" + for _, indexMetadata := range payload { + if timestampFieldName == "" { + timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField + continue + } + + if timestampFieldName != indexMetadata.IndexConfig.DocMapping.TimestampField { + errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s and %s", timestampFieldName, indexMetadata.IndexConfig.DocMapping.TimestampField) + qwlog.Error(errMsg) + return "", NewErrorCreationPayload(400, errMsg) + } + } + + qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) + return timestampFieldName, nil +} + +func DecodeTimestampFieldFromIndexConfig(body []byte) (string, error) { + var payload QuickwitIndexMetadata + err := json.Unmarshal(body, &payload) + if err != nil { + errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) + qwlog.Error(errMsg) + return "", NewErrorCreationPayload(500, errMsg) + } + timestampFieldName := payload.IndexConfig.DocMapping.TimestampField + qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) + return timestampFieldName, nil } diff --git a/pkg/quickwit/timestamp_infos_test.go b/pkg/quickwit/timestamp_infos_test.go index d24ca31..632d966 100644 --- a/pkg/quickwit/timestamp_infos_test.go +++ b/pkg/quickwit/timestamp_infos_test.go @@ -13,142 +13,89 @@ func TestDecodeTimestampFieldInfos(t *testing.T) { query := []byte(` { "version": "0.6", - "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", "index_config": { "version": "0.6", - "index_id": "myindex", - "index_uri": "s3://quickwit-indexes/myindex", "doc_mapping": { - "field_mappings": [ - { - "name": "foo", - "type": "text", - "fast": false, - "fieldnorms": false, - "indexed": true, - "record": "basic", - "stored": true, - "tokenizer": "default" - }, - { - "name": "timestamp", - "type": "datetime", - "fast": true, - "fast_precision": "seconds", - "indexed": true, - "input_formats": [ - "rfc3339", - "unix_timestamp" - ], - "output_format": "rfc3339", - "stored": true - } - ], - "tag_fields": [], - "store_source": true, - "index_field_presence": false, "timestamp_field": "timestamp", "mode": "dynamic", - "dynamic_mapping": {}, - "partition_key": "foo", - "max_num_partitions": 1, "tokenizers": [] }, - "indexing_settings": {}, - "search_settings": { - "default_search_fields": [ - "foo" - ] - }, "retention": null }, - "checkpoint": {}, - "create_timestamp": 1701075471, "sources": [] } `) // When - timestampFieldName, err := DecodeTimestampFieldInfos(200, query) + timestampFieldName, err := DecodeTimestampFieldFromIndexConfig(query) // Then require.NoError(t, err) require.Equal(t, timestampFieldName, "timestamp") }) - t.Run("Test decode nested fields", func(t *testing.T) { + t.Run("Test decode from list of index config", func(t *testing.T) { // Given query := []byte(` + [ { "version": "0.6", - "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", "index_config": { - "version": "0.6", - "index_id": "myindex", - "index_uri": "s3://quickwit-indexes/myindex", "doc_mapping": { - "field_mappings": [ - { - "name": "foo", - "type": "text", - "fast": false, - "fieldnorms": false, - "indexed": true, - "record": "basic", - "stored": true, - "tokenizer": "default" - }, - { - "name": "sub", - "type": "object", - "field_mappings": [ - { - "fast": true, - "fast_precision": "seconds", - "indexed": true, - "input_formats": [ - "rfc3339", - "unix_timestamp" - ], - "name": "timestamp", - "output_format": "rfc3339", - "stored": true, - "type": "datetime" - } - ] - } - ], - "tag_fields": [], - "store_source": true, - "index_field_presence": false, - "timestamp_field": "sub.timestamp", - "mode": "dynamic", - "dynamic_mapping": {}, - "partition_key": "foo", - "max_num_partitions": 1, - "tokenizers": [] + "timestamp_field": "sub.timestamp" }, "indexing_settings": {}, - "search_settings": { - "default_search_fields": [ - "foo" - ] - }, "retention": null }, - "checkpoint": {}, - "create_timestamp": 1701075471, "sources": [] } + ] `) // When - timestampFieldName, err := DecodeTimestampFieldInfos(200, query) + timestampFieldName, err := DecodeTimestampFieldFromIndexConfigs(query) // Then require.NoError(t, err) require.Equal(t, timestampFieldName, "sub.timestamp") }) + + t.Run("Test decode from list of index config with different timestamp fields return an error", func(t *testing.T) { + // Given + query := []byte(` + [ + { + "version": "0.6", + "index_config": { + "doc_mapping": { + "timestamp_field": "sub.timestamp" + }, + "indexing_settings": {}, + "retention": null + }, + "sources": [] + }, + { + "version": "0.6", + "index_config": { + "doc_mapping": { + "timestamp_field": "sub.timestamp2" + }, + "indexing_settings": {}, + "retention": null + }, + "sources": [] + } + ] + `) + + // When + _, err := DecodeTimestampFieldFromIndexConfigs(query) + + // Then + require.Error(t, err) + require.ErrorContains(t, err, "Index matching the pattern should have the same timestamp fields") + }) }) } diff --git a/src/datasource.ts b/src/datasource.ts index bdd6b5f..2139821 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -772,7 +772,7 @@ export function enhanceDataFrameWithDataLinks(dataFrame: DataFrame, dataLinks: D config: {}, values: displayedMessages, } - console.log('newField'); + console.log(dataFrame); dataFrame.fields = [newField, ...dataFrame.fields]; }