Skip to content

Commit c6e6d75

Browse files
authored
Merge pull request #125 from quickwit-oss/ddelemeny/refactor-timefield-init
Improve timestamp infos and DS init
2 parents 766aac5 + d89c3fa commit c6e6d75

File tree

4 files changed

+223
-202
lines changed

4 files changed

+223
-202
lines changed

Diff for: pkg/quickwit/client/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type DatasourceInfo struct {
2323
Database string
2424
ConfiguredFields ConfiguredFields
2525
MaxConcurrentShardRequests int64
26+
IsReady bool
2627
}
2728

2829
type ConfiguredFields struct {

Diff for: pkg/quickwit/quickwit.go

+50-12
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,6 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
5757
return nil, err
5858
}
5959

60-
timeField, toOk := jsonData["timeField"].(string)
61-
timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string)
62-
6360
logLevelField, ok := jsonData["logLevelField"].(string)
6461
if !ok {
6562
logLevelField = ""
@@ -74,6 +71,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
7471
if !ok {
7572
index = ""
7673
}
74+
// XXX : Legacy check, should not happen ?
7775
if index == "" {
7876
index = settings.Database
7977
}
@@ -92,18 +90,11 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
9290
maxConcurrentShardRequests = 256
9391
}
9492

95-
if !toOk || !tofOk {
96-
timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli)
97-
if nil != err {
98-
return nil, err
99-
}
100-
}
101-
10293
configuredFields := es.ConfiguredFields{
103-
TimeField: timeField,
104-
TimeOutputFormat: timeOutputFormat,
10594
LogLevelField: logLevelField,
10695
LogMessageField: logMessageField,
96+
TimeField: "",
97+
TimeOutputFormat: "",
10798
}
10899

109100
model := es.DatasourceInfo{
@@ -113,10 +104,40 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
113104
Database: index,
114105
MaxConcurrentShardRequests: int64(maxConcurrentShardRequests),
115106
ConfiguredFields: configuredFields,
107+
IsReady: false,
116108
}
117109
return &QuickwitDatasource{dsInfo: model}, nil
118110
}
119111

112+
// Network dependent datasource initialization.
113+
// This is not done in the "constructor" function to allow saving the ds
114+
// even if the server is not responsive.
115+
func (ds *QuickwitDatasource) initDatasource(force bool) error {
116+
if ds.dsInfo.IsReady && !force {
117+
return nil
118+
}
119+
120+
indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient)
121+
if err != nil {
122+
return fmt.Errorf("failed to get index metadata : %w", err)
123+
}
124+
125+
if len(indexMetadataList) == 0 {
126+
return fmt.Errorf("no index found for %s", ds.dsInfo.Database)
127+
}
128+
129+
timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList)
130+
if nil != err {
131+
return err
132+
}
133+
134+
ds.dsInfo.ConfiguredFields.TimeField = timeField
135+
ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat
136+
137+
ds.dsInfo.IsReady = true
138+
return nil
139+
}
140+
120141
// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
121142
// created. As soon as datasource settings change detected by SDK old datasource instance will
122143
// be disposed and a new one will be created using NewSampleDatasource factory function.
@@ -132,12 +153,29 @@ func (ds *QuickwitDatasource) Dispose() {
132153
func (ds *QuickwitDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
133154
res := &backend.CheckHealthResult{}
134155

156+
if err := ds.initDatasource(true); err != nil {
157+
res.Status = backend.HealthStatusError
158+
res.Message = fmt.Errorf("Failed to initialize datasource: %w", err).Error()
159+
return res, nil
160+
}
161+
162+
if ds.dsInfo.ConfiguredFields.TimeField == "" || ds.dsInfo.ConfiguredFields.TimeOutputFormat == "" {
163+
res.Status = backend.HealthStatusError
164+
res.Message = fmt.Sprintf("timefield is missing from index config \"%s\"", ds.dsInfo.Database)
165+
return res, nil
166+
}
167+
135168
res.Status = backend.HealthStatusOk
136169
res.Message = "plugin is running"
137170
return res, nil
138171
}
139172

140173
func (ds *QuickwitDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
174+
// Ensure ds is initialized, we need timestamp infos
175+
if err := ds.initDatasource(false); err != nil {
176+
return &backend.QueryDataResponse{}, fmt.Errorf("Failed to initialize datasource")
177+
}
178+
141179
return queryData(ctx, req.Queries, &ds.dsInfo)
142180
}
143181

Diff for: pkg/quickwit/timestamp_infos.go

+38-88
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import (
66
"fmt"
77
"io"
88
"net/http"
9-
"strings"
109
)
1110

1211
type QuickwitIndexMetadata struct {
1312
IndexConfig struct {
13+
IndexID string `json:"index_id"`
1414
DocMapping struct {
1515
TimestampField string `json:"timestamp_field"`
1616
FieldMappings []FieldMappings `json:"field_mappings"`
@@ -23,6 +23,7 @@ type QuickwitCreationErrorPayload struct {
2323
StatusCode int `json:"status"`
2424
}
2525

26+
// TODO: Revamp error handling
2627
func NewErrorCreationPayload(statusCode int, message string) error {
2728
var payload QuickwitCreationErrorPayload
2829
payload.Message = message
@@ -35,124 +36,73 @@ func NewErrorCreationPayload(statusCode int, message string) error {
3536
return errors.New(string(json))
3637
}
3738

38-
// TODO: refactor either by using a timestamp alias suppprted by quickwit
39-
// or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint
40-
// /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1
41-
func GetTimestampFieldInfos(index string, qwickwitUrl string, cli *http.Client) (string, string, error) {
42-
if strings.Contains(index, "*") || strings.Contains(index, ",") {
43-
return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli)
39+
func FilterErrorResponses(r *http.Response) (*http.Response, error) {
40+
if r.StatusCode < 200 || r.StatusCode >= 400 {
41+
body, err := io.ReadAll(r.Body)
42+
if err != nil {
43+
return nil, NewErrorCreationPayload(r.StatusCode, fmt.Errorf("failed to read error body: err = %w", err).Error())
44+
}
45+
return nil, NewErrorCreationPayload(r.StatusCode, fmt.Sprintf("error = %s", (body)))
4446
}
45-
return GetTimestampFieldFromIndex(index, qwickwitUrl, cli)
47+
return r, nil
4648
}
4749

48-
func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, string, error) {
49-
mappingEndpointUrl := qwickwitUrl + "/indexes/" + index
50-
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
51-
r, err := cli.Get(mappingEndpointUrl)
52-
if err != nil {
53-
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
54-
qwlog.Error(errMsg)
55-
return "", "", err
50+
func GetTimestampFieldInfos(indexMetadataList []QuickwitIndexMetadata) (string, string, error) {
51+
if len(indexMetadataList) == 0 {
52+
return "", "", fmt.Errorf("index metadata list is empty")
5653
}
57-
defer r.Body.Close()
58-
59-
statusCode := r.StatusCode
6054

61-
if statusCode < 200 || statusCode >= 400 {
62-
errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl)
63-
qwlog.Error(errMsg)
64-
return "", "", NewErrorCreationPayload(statusCode, errMsg)
55+
refTimestampFieldName, refTimestampOutputFormat := FindTimestampFieldInfos(indexMetadataList[0])
56+
if refTimestampFieldName == "" || refTimestampOutputFormat == "" {
57+
return "", "", fmt.Errorf("Invalid timestamp field infos for %s: %s, %s", indexMetadataList[0].IndexConfig.IndexID, refTimestampFieldName, refTimestampOutputFormat)
6558
}
6659

67-
body, err := io.ReadAll(r.Body)
68-
if err != nil {
69-
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
70-
qwlog.Error(errMsg)
71-
return "", "", NewErrorCreationPayload(statusCode, errMsg)
60+
for _, indexMetadata := range indexMetadataList[1:] {
61+
timestampFieldName, timestampOutputFormat := FindTimestampFieldInfos(indexMetadata)
62+
63+
if timestampFieldName != refTimestampFieldName || timestampOutputFormat != refTimestampOutputFormat {
64+
return "", "", fmt.Errorf("Indexes matching pattern have incompatible timestamp fields, found: %s (%s) and %s (%s)", refTimestampFieldName, refTimestampOutputFormat, timestampFieldName, timestampOutputFormat)
65+
}
7266
}
7367

74-
return DecodeTimestampFieldFromIndexConfig(body)
68+
return refTimestampFieldName, refTimestampOutputFormat, nil
7569
}
7670

77-
func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, string, error) {
71+
func GetIndexesMetadata(indexPattern string, qwickwitUrl string, cli *http.Client) ([]QuickwitIndexMetadata, error) {
7872
mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_patterns=" + indexPattern
7973
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
8074
r, err := cli.Get(mappingEndpointUrl)
8175
if err != nil {
82-
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
83-
qwlog.Error(errMsg)
84-
return "", "", err
76+
return nil, fmt.Errorf("Error when calling url = %s: %w", mappingEndpointUrl, err)
8577
}
8678
defer r.Body.Close()
8779

88-
statusCode := r.StatusCode
89-
90-
if statusCode < 200 || statusCode >= 400 {
91-
errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl)
92-
qwlog.Error(errMsg)
93-
return "", "", NewErrorCreationPayload(statusCode, errMsg)
80+
r, err = FilterErrorResponses(r)
81+
if err != nil {
82+
return nil, fmt.Errorf("API returned invalid response: %w", err)
9483
}
9584

9685
body, err := io.ReadAll(r.Body)
9786
if err != nil {
98-
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
99-
qwlog.Error(errMsg)
100-
return "", "", NewErrorCreationPayload(statusCode, errMsg)
87+
return nil, fmt.Errorf("failed to read response body: %w", err)
10188
}
10289

103-
return DecodeTimestampFieldFromIndexConfigs(body)
104-
}
105-
106-
func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, string, error) {
10790
var payload []QuickwitIndexMetadata
108-
err := json.Unmarshal(body, &payload)
91+
err = json.Unmarshal(body, &payload)
10992
if err != nil {
110-
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
111-
qwlog.Error(errMsg)
112-
return "", "", NewErrorCreationPayload(500, errMsg)
113-
}
114-
115-
var refTimestampFieldName string = ""
116-
var refTimestampOutputFormat string = ""
117-
var timestampFieldName string = ""
118-
var timestampOutputFormat string = ""
119-
120-
for _, indexMetadata := range payload {
121-
timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField
122-
timestampOutputFormat, _ = FindTimeStampFormat(timestampFieldName, nil, indexMetadata.IndexConfig.DocMapping.FieldMappings)
123-
124-
if refTimestampFieldName == "" {
125-
refTimestampFieldName = timestampFieldName
126-
refTimestampOutputFormat = timestampOutputFormat
127-
continue
128-
}
129-
130-
if timestampFieldName != refTimestampFieldName || timestampOutputFormat != refTimestampOutputFormat {
131-
errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s (%s) and %s (%s)", refTimestampFieldName, refTimestampOutputFormat, timestampFieldName, timestampOutputFormat)
132-
qwlog.Error(errMsg)
133-
return "", "", NewErrorCreationPayload(400, errMsg)
134-
}
93+
return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
13594
}
13695

137-
qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s, timestamptOutputFormat = %s", timestampFieldName, timestampOutputFormat))
138-
return timestampFieldName, timestampOutputFormat, nil
96+
return payload, nil
13997
}
14098

141-
func DecodeTimestampFieldFromIndexConfig(body []byte) (string, string, error) {
142-
var payload QuickwitIndexMetadata
143-
err := json.Unmarshal(body, &payload)
144-
if err != nil {
145-
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
146-
qwlog.Error(errMsg)
147-
return "", "", NewErrorCreationPayload(500, errMsg)
148-
}
149-
timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
150-
timestampFieldFormat, _ := FindTimeStampFormat(timestampFieldName, nil, payload.IndexConfig.DocMapping.FieldMappings)
151-
qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
152-
return timestampFieldName, timestampFieldFormat, nil
99+
func FindTimestampFieldInfos(indexMetadata QuickwitIndexMetadata) (string, string) {
100+
timestampFieldName := indexMetadata.IndexConfig.DocMapping.TimestampField
101+
timestampOutputFormat, _ := FindTimestampFormat(timestampFieldName, nil, indexMetadata.IndexConfig.DocMapping.FieldMappings)
102+
return timestampFieldName, timestampOutputFormat
153103
}
154104

155-
func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) (string, bool) {
105+
func FindTimestampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) (string, bool) {
156106
if nil == fieldMappings {
157107
return "", false
158108
}
@@ -166,7 +116,7 @@ func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMap
166116
if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat {
167117
return *field.OutputFormat, true
168118
} else if field.Type == "object" && nil != field.FieldMappings {
169-
return FindTimeStampFormat(timestampFieldName, &field.Name, field.FieldMappings)
119+
return FindTimestampFormat(timestampFieldName, &field.Name, field.FieldMappings)
170120
}
171121
}
172122

0 commit comments

Comments
 (0)