Skip to content

Improve timestamp infos and DS init #125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/quickwit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type DatasourceInfo struct {
Database string
ConfiguredFields ConfiguredFields
MaxConcurrentShardRequests int64
IsReady bool
}

type ConfiguredFields struct {
Expand Down
62 changes: 50 additions & 12 deletions pkg/quickwit/quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
return nil, err
}

timeField, toOk := jsonData["timeField"].(string)
timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string)

logLevelField, ok := jsonData["logLevelField"].(string)
if !ok {
logLevelField = ""
Expand All @@ -74,6 +71,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
if !ok {
index = ""
}
// XXX : Legacy check, should not happen ?
if index == "" {
index = settings.Database
}
Expand All @@ -92,18 +90,11 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
maxConcurrentShardRequests = 256
}

if !toOk || !tofOk {
timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli)
if nil != err {
return nil, err
}
}

configuredFields := es.ConfiguredFields{
TimeField: timeField,
TimeOutputFormat: timeOutputFormat,
LogLevelField: logLevelField,
LogMessageField: logMessageField,
TimeField: "",
TimeOutputFormat: "",
}

model := es.DatasourceInfo{
Expand All @@ -113,10 +104,40 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
Database: index,
MaxConcurrentShardRequests: int64(maxConcurrentShardRequests),
ConfiguredFields: configuredFields,
IsReady: false,
}
return &QuickwitDatasource{dsInfo: model}, nil
}

// Network dependent datasource initialization.
// This is not done in the "constructor" function to allow saving the ds
// even if the server is not responsive.
func (ds *QuickwitDatasource) initDatasource(force bool) error {
if ds.dsInfo.IsReady && !force {
return nil
}

indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient)
if err != nil {
return fmt.Errorf("failed to get index metadata : %w", err)
}

if len(indexMetadataList) == 0 {
return fmt.Errorf("no index found for %s", ds.dsInfo.Database)
}

timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList)
if nil != err {
return err
}

ds.dsInfo.ConfiguredFields.TimeField = timeField
ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat

ds.dsInfo.IsReady = true
return nil
}

// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
// created. As soon as datasource settings change detected by SDK old datasource instance will
// be disposed and a new one will be created using NewSampleDatasource factory function.
Expand All @@ -132,12 +153,29 @@ func (ds *QuickwitDatasource) Dispose() {
func (ds *QuickwitDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
res := &backend.CheckHealthResult{}

if err := ds.initDatasource(true); err != nil {
res.Status = backend.HealthStatusError
res.Message = fmt.Errorf("Failed to initialize datasource: %w", err).Error()
return res, nil
}

if ds.dsInfo.ConfiguredFields.TimeField == "" || ds.dsInfo.ConfiguredFields.TimeOutputFormat == "" {
res.Status = backend.HealthStatusError
res.Message = fmt.Sprintf("timefield is missing from index config \"%s\"", ds.dsInfo.Database)
return res, nil
}

res.Status = backend.HealthStatusOk
res.Message = "plugin is running"
return res, nil
}

func (ds *QuickwitDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
// Ensure ds is initialized, we need timestamp infos
if err := ds.initDatasource(false); err != nil {
return &backend.QueryDataResponse{}, fmt.Errorf("Failed to initialize datasource")
}

return queryData(ctx, req.Queries, &ds.dsInfo)
}

Expand Down
126 changes: 38 additions & 88 deletions pkg/quickwit/timestamp_infos.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"
"io"
"net/http"
"strings"
)

type QuickwitIndexMetadata struct {
IndexConfig struct {
IndexID string `json:"index_id"`
DocMapping struct {
TimestampField string `json:"timestamp_field"`
FieldMappings []FieldMappings `json:"field_mappings"`
Expand All @@ -23,6 +23,7 @@ type QuickwitCreationErrorPayload struct {
StatusCode int `json:"status"`
}

// TODO: Revamp error handling
func NewErrorCreationPayload(statusCode int, message string) error {
var payload QuickwitCreationErrorPayload
payload.Message = message
Expand All @@ -35,124 +36,73 @@ func NewErrorCreationPayload(statusCode int, message string) error {
return errors.New(string(json))
}

// TODO: refactor either by using a timestamp alias suppprted by quickwit
// or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint
// /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1
func GetTimestampFieldInfos(index string, qwickwitUrl string, cli *http.Client) (string, string, error) {
if strings.Contains(index, "*") || strings.Contains(index, ",") {
return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli)
func FilterErrorResponses(r *http.Response) (*http.Response, error) {
if r.StatusCode < 200 || r.StatusCode >= 400 {
body, err := io.ReadAll(r.Body)
if err != nil {
return nil, NewErrorCreationPayload(r.StatusCode, fmt.Errorf("failed to read error body: err = %w", err).Error())
}
return nil, NewErrorCreationPayload(r.StatusCode, fmt.Sprintf("error = %s", (body)))
}
return GetTimestampFieldFromIndex(index, qwickwitUrl, cli)
return r, nil
}

func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, string, error) {
mappingEndpointUrl := qwickwitUrl + "/indexes/" + index
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
r, err := cli.Get(mappingEndpointUrl)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", err
func GetTimestampFieldInfos(indexMetadataList []QuickwitIndexMetadata) (string, string, error) {
if len(indexMetadataList) == 0 {
return "", "", fmt.Errorf("index metadata list is empty")
}
defer r.Body.Close()

statusCode := r.StatusCode

if statusCode < 200 || statusCode >= 400 {
errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl)
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(statusCode, errMsg)
refTimestampFieldName, refTimestampOutputFormat := FindTimestampFieldInfos(indexMetadataList[0])
if refTimestampFieldName == "" || refTimestampOutputFormat == "" {
return "", "", fmt.Errorf("Invalid timestamp field infos for %s: %s, %s", indexMetadataList[0].IndexConfig.IndexID, refTimestampFieldName, refTimestampOutputFormat)
}

body, err := io.ReadAll(r.Body)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(statusCode, errMsg)
for _, indexMetadata := range indexMetadataList[1:] {
timestampFieldName, timestampOutputFormat := FindTimestampFieldInfos(indexMetadata)

if timestampFieldName != refTimestampFieldName || timestampOutputFormat != refTimestampOutputFormat {
return "", "", fmt.Errorf("Indexes matching pattern have incompatible timestamp fields, found: %s (%s) and %s (%s)", refTimestampFieldName, refTimestampOutputFormat, timestampFieldName, timestampOutputFormat)
}
}

return DecodeTimestampFieldFromIndexConfig(body)
return refTimestampFieldName, refTimestampOutputFormat, nil
}

func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, string, error) {
func GetIndexesMetadata(indexPattern string, qwickwitUrl string, cli *http.Client) ([]QuickwitIndexMetadata, error) {
mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_patterns=" + indexPattern
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
r, err := cli.Get(mappingEndpointUrl)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", err
return nil, fmt.Errorf("Error when calling url = %s: %w", mappingEndpointUrl, err)
}
defer r.Body.Close()

statusCode := r.StatusCode

if statusCode < 200 || statusCode >= 400 {
errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl)
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(statusCode, errMsg)
r, err = FilterErrorResponses(r)
if err != nil {
return nil, fmt.Errorf("API returned invalid response: %w", err)
}

body, err := io.ReadAll(r.Body)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(statusCode, errMsg)
return nil, fmt.Errorf("failed to read response body: %w", err)
}

return DecodeTimestampFieldFromIndexConfigs(body)
}

func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, string, error) {
var payload []QuickwitIndexMetadata
err := json.Unmarshal(body, &payload)
err = json.Unmarshal(body, &payload)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(500, errMsg)
}

var refTimestampFieldName string = ""
var refTimestampOutputFormat string = ""
var timestampFieldName string = ""
var timestampOutputFormat string = ""

for _, indexMetadata := range payload {
timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField
timestampOutputFormat, _ = FindTimeStampFormat(timestampFieldName, nil, indexMetadata.IndexConfig.DocMapping.FieldMappings)

if refTimestampFieldName == "" {
refTimestampFieldName = timestampFieldName
refTimestampOutputFormat = timestampOutputFormat
continue
}

if timestampFieldName != refTimestampFieldName || timestampOutputFormat != refTimestampOutputFormat {
errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s (%s) and %s (%s)", refTimestampFieldName, refTimestampOutputFormat, timestampFieldName, timestampOutputFormat)
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(400, errMsg)
}
return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
}

qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s, timestamptOutputFormat = %s", timestampFieldName, timestampOutputFormat))
return timestampFieldName, timestampOutputFormat, nil
return payload, nil
}

func DecodeTimestampFieldFromIndexConfig(body []byte) (string, string, error) {
var payload QuickwitIndexMetadata
err := json.Unmarshal(body, &payload)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(500, errMsg)
}
timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
timestampFieldFormat, _ := FindTimeStampFormat(timestampFieldName, nil, payload.IndexConfig.DocMapping.FieldMappings)
qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
return timestampFieldName, timestampFieldFormat, nil
func FindTimestampFieldInfos(indexMetadata QuickwitIndexMetadata) (string, string) {
timestampFieldName := indexMetadata.IndexConfig.DocMapping.TimestampField
timestampOutputFormat, _ := FindTimestampFormat(timestampFieldName, nil, indexMetadata.IndexConfig.DocMapping.FieldMappings)
return timestampFieldName, timestampOutputFormat
}

func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) (string, bool) {
func FindTimestampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) (string, bool) {
if nil == fieldMappings {
return "", false
}
Expand All @@ -166,7 +116,7 @@ func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMap
if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat {
return *field.OutputFormat, true
} else if field.Type == "object" && nil != field.FieldMappings {
return FindTimeStampFormat(timestampFieldName, &field.Name, field.FieldMappings)
return FindTimestampFormat(timestampFieldName, &field.Name, field.FieldMappings)
}
}

Expand Down
Loading
Loading