Skip to content

Commit

Permalink
Merge pull request #75 from quickwit-oss/fmassot/fetch-with-index-pat…
Browse files Browse the repository at this point in the history
…tern

Add support for index pattern
  • Loading branch information
fmassot authored Feb 18, 2024
2 parents 4d491b9 + 09030cc commit 7d0af35
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 112 deletions.
2 changes: 1 addition & 1 deletion pkg/quickwit/quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
}

if !toOk {
timeField, err = GetTimestampFieldInfos(index, settings.URL, httpCli)
timeField, err = GetTimestampField(index, settings.URL, httpCli)
if nil != err {
return nil, err
}
Expand Down
91 changes: 77 additions & 14 deletions pkg/quickwit/timestamp_infos.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"fmt"
"io"
"net/http"
"strings"
)

type QuickwitMapping struct {
type QuickwitIndexMetadata struct {
IndexConfig struct {
DocMapping struct {
TimestampField string `json:"timestamp_field"`
FieldMappings []FieldMappings `json:"field_mappings"`
TimestampField string `json:"timestamp_field"`
} `json:"doc_mapping"`
} `json:"index_config"`
}
Expand All @@ -34,25 +34,48 @@ func NewErrorCreationPayload(statusCode int, message string) error {
return errors.New(string(json))
}

func DecodeTimestampFieldInfos(statusCode int, body []byte) (string, error) {
var payload QuickwitMapping
err := json.Unmarshal(body, &payload)
// TODO: refactor either by using a timestamp alias suppprted by quickwit
// or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint
// /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1
func GetTimestampField(index string, qwickwitUrl string, cli *http.Client) (string, error) {
if strings.Contains(index, "*") || strings.Contains(index, ",") {
return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli)
}
return GetTimestampFieldFromIndex(index, qwickwitUrl, cli)
}

func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, error) {
mappingEndpointUrl := qwickwitUrl + "/indexes/" + index
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
r, err := cli.Get(mappingEndpointUrl)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", err
}

statusCode := r.StatusCode

if statusCode < 200 || statusCode >= 400 {
errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl)
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(statusCode, errMsg)
}

timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(statusCode, errMsg)
}

qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
return timestampFieldName, nil
return DecodeTimestampFieldFromIndexConfig(body)
}

func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, error) {
mappingEndpointUrl := qwUrl + "/indexes/" + index
qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl)
func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, error) {
mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_pattern=" + indexPattern
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
r, err := cli.Get(mappingEndpointUrl)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
Expand All @@ -76,5 +99,45 @@ func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin
return "", NewErrorCreationPayload(statusCode, errMsg)
}

return DecodeTimestampFieldInfos(statusCode, body)
return DecodeTimestampFieldFromIndexConfigs(body)
}

func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, error) {
var payload []QuickwitIndexMetadata
err := json.Unmarshal(body, &payload)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(500, errMsg)
}

var timestampFieldName string = ""
for _, indexMetadata := range payload {
if timestampFieldName == "" {
timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField
continue
}

if timestampFieldName != indexMetadata.IndexConfig.DocMapping.TimestampField {
errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s and %s", timestampFieldName, indexMetadata.IndexConfig.DocMapping.TimestampField)
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(400, errMsg)
}
}

qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
return timestampFieldName, nil
}

func DecodeTimestampFieldFromIndexConfig(body []byte) (string, error) {
var payload QuickwitIndexMetadata
err := json.Unmarshal(body, &payload)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
qwlog.Error(errMsg)
return "", NewErrorCreationPayload(500, errMsg)
}
timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
return timestampFieldName, nil
}
139 changes: 43 additions & 96 deletions pkg/quickwit/timestamp_infos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,142 +13,89 @@ func TestDecodeTimestampFieldInfos(t *testing.T) {
query := []byte(`
{
"version": "0.6",
"index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W",
"index_config": {
"version": "0.6",
"index_id": "myindex",
"index_uri": "s3://quickwit-indexes/myindex",
"doc_mapping": {
"field_mappings": [
{
"name": "foo",
"type": "text",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "timestamp",
"type": "datetime",
"fast": true,
"fast_precision": "seconds",
"indexed": true,
"input_formats": [
"rfc3339",
"unix_timestamp"
],
"output_format": "rfc3339",
"stored": true
}
],
"tag_fields": [],
"store_source": true,
"index_field_presence": false,
"timestamp_field": "timestamp",
"mode": "dynamic",
"dynamic_mapping": {},
"partition_key": "foo",
"max_num_partitions": 1,
"tokenizers": []
},
"indexing_settings": {},
"search_settings": {
"default_search_fields": [
"foo"
]
},
"retention": null
},
"checkpoint": {},
"create_timestamp": 1701075471,
"sources": []
}
`)

// When
timestampFieldName, err := DecodeTimestampFieldInfos(200, query)
timestampFieldName, err := DecodeTimestampFieldFromIndexConfig(query)

// Then
require.NoError(t, err)
require.Equal(t, timestampFieldName, "timestamp")
})

t.Run("Test decode nested fields", func(t *testing.T) {
t.Run("Test decode from list of index config", func(t *testing.T) {
// Given
query := []byte(`
[
{
"version": "0.6",
"index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W",
"index_config": {
"version": "0.6",
"index_id": "myindex",
"index_uri": "s3://quickwit-indexes/myindex",
"doc_mapping": {
"field_mappings": [
{
"name": "foo",
"type": "text",
"fast": false,
"fieldnorms": false,
"indexed": true,
"record": "basic",
"stored": true,
"tokenizer": "default"
},
{
"name": "sub",
"type": "object",
"field_mappings": [
{
"fast": true,
"fast_precision": "seconds",
"indexed": true,
"input_formats": [
"rfc3339",
"unix_timestamp"
],
"name": "timestamp",
"output_format": "rfc3339",
"stored": true,
"type": "datetime"
}
]
}
],
"tag_fields": [],
"store_source": true,
"index_field_presence": false,
"timestamp_field": "sub.timestamp",
"mode": "dynamic",
"dynamic_mapping": {},
"partition_key": "foo",
"max_num_partitions": 1,
"tokenizers": []
"timestamp_field": "sub.timestamp"
},
"indexing_settings": {},
"search_settings": {
"default_search_fields": [
"foo"
]
},
"retention": null
},
"checkpoint": {},
"create_timestamp": 1701075471,
"sources": []
}
]
`)

// When
timestampFieldName, err := DecodeTimestampFieldInfos(200, query)
timestampFieldName, err := DecodeTimestampFieldFromIndexConfigs(query)

// Then
require.NoError(t, err)
require.Equal(t, timestampFieldName, "sub.timestamp")
})

t.Run("Test decode from list of index config with different timestamp fields return an error", func(t *testing.T) {
// Given
query := []byte(`
[
{
"version": "0.6",
"index_config": {
"doc_mapping": {
"timestamp_field": "sub.timestamp"
},
"indexing_settings": {},
"retention": null
},
"sources": []
},
{
"version": "0.6",
"index_config": {
"doc_mapping": {
"timestamp_field": "sub.timestamp2"
},
"indexing_settings": {},
"retention": null
},
"sources": []
}
]
`)

// When
_, err := DecodeTimestampFieldFromIndexConfigs(query)

// Then
require.Error(t, err)
require.ErrorContains(t, err, "Index matching the pattern should have the same timestamp fields")
})
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ export function enhanceDataFrameWithDataLinks(dataFrame: DataFrame, dataLinks: D
config: {},
values: displayedMessages,
}
console.log('newField');
console.log(dataFrame);
dataFrame.fields = [newField, ...dataFrame.fields];
}

Expand Down

0 comments on commit 7d0af35

Please sign in to comment.