Skip to content

Commit 8a3c88b

Browse files
committed
Revert to using timestamp output format
1 parent 957a6f8 commit 8a3c88b

10 files changed

+200
-93
lines changed

pkg/quickwit/client/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ type DatasourceInfo struct {
2626
}
2727

2828
type ConfiguredFields struct {
29-
TimeField string
30-
LogMessageField string
31-
LogLevelField string
29+
TimeField string
30+
TimeOutputFormat string
31+
LogMessageField string
32+
LogLevelField string
3233
}
3334

3435
// Client represents a client which can interact with elasticsearch api

pkg/quickwit/error_handling_test.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ func TestErrorAvgMissingField(t *testing.T) {
3939
`)
4040

4141
configuredFields := es.ConfiguredFields{
42-
TimeField: "testtime",
43-
LogMessageField: "line",
44-
LogLevelField: "lvl",
42+
TimeOutputFormat: Rfc3339,
43+
TimeField: "testtime",
44+
LogMessageField: "line",
45+
LogLevelField: "lvl",
4546
}
4647

4748
result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields)
@@ -71,9 +72,10 @@ func TestErrorAvgMissingFieldNoDetailedErrors(t *testing.T) {
7172
`)
7273

7374
configuredFields := es.ConfiguredFields{
74-
TimeField: "testtime",
75-
LogMessageField: "line",
76-
LogLevelField: "lvl",
75+
TimeOutputFormat: Rfc3339,
76+
TimeField: "testtime",
77+
LogMessageField: "line",
78+
LogLevelField: "lvl",
7779
}
7880

7981
result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields)
@@ -117,9 +119,10 @@ func TestErrorTooManyDateHistogramBuckets(t *testing.T) {
117119
`)
118120

119121
configuredFields := es.ConfiguredFields{
120-
TimeField: "testtime",
121-
LogMessageField: "line",
122-
LogLevelField: "lvl",
122+
TimeOutputFormat: Rfc3339,
123+
TimeField: "testtime",
124+
LogMessageField: "line",
125+
LogLevelField: "lvl",
123126
}
124127
result, err := queryDataTestWithResponseCode(query, 200, response, configuredFields)
125128
require.NoError(t, err)
@@ -154,9 +157,10 @@ func TestNonElasticError(t *testing.T) {
154157
response := []byte(`Access to the database is forbidden`)
155158

156159
configuredFields := es.ConfiguredFields{
157-
TimeField: "testtime",
158-
LogMessageField: "line",
159-
LogLevelField: "lvl",
160+
TimeOutputFormat: Rfc3339,
161+
TimeField: "testtime",
162+
LogMessageField: "line",
163+
LogLevelField: "lvl",
160164
}
161165

162166
result, err := queryDataTestWithResponseCode(query, 403, response, configuredFields)

pkg/quickwit/models.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,13 @@ func describeMetric(metricType, field string) string {
122122
}
123123
return text + " " + field
124124
}
125+
126+
const (
127+
Iso8601 string = "iso8601"
128+
Rfc2822 string = "rfc2822"
129+
Rfc3339 string = "rfc3339"
130+
TimestampSecs string = "unix_timestamp_secs"
131+
TimestampMillis string = "unix_timestamp_millis"
132+
TimestampMicros string = "unix_timestamp_micros"
133+
TimestampNanos string = "unix_timestamp_nanos"
134+
)

pkg/quickwit/querydata_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,10 @@ func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int,
144144

145145
func queryDataTest(queriesBytes []byte, responseBytes []byte) (queryDataTestResult, error) {
146146
configuredFields := es.ConfiguredFields{
147-
TimeField: "testtime",
148-
LogMessageField: "line",
149-
LogLevelField: "lvl",
147+
TimeOutputFormat: Rfc3339,
148+
TimeField: "testtime",
149+
LogMessageField: "line",
150+
LogLevelField: "lvl",
150151
}
151152
return queryDataTestWithResponseCode(queriesBytes, 200, responseBytes, configuredFields)
152153
}

pkg/quickwit/quickwit.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
5858
}
5959

6060
timeField, toOk := jsonData["timeField"].(string)
61+
timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string)
6162

6263
logLevelField, ok := jsonData["logLevelField"].(string)
6364
if !ok {
@@ -91,17 +92,18 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
9192
maxConcurrentShardRequests = 256
9293
}
9394

94-
if !toOk {
95-
timeField, err = GetTimestampField(index, settings.URL, httpCli)
95+
if !toOk || !tofOk {
96+
timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli)
9697
if nil != err {
9798
return nil, err
9899
}
99100
}
100101

101102
configuredFields := es.ConfiguredFields{
102-
TimeField: timeField,
103-
LogLevelField: logLevelField,
104-
LogMessageField: logMessageField,
103+
TimeField: timeField,
104+
TimeOutputFormat: timeOutputFormat,
105+
LogLevelField: logLevelField,
106+
LogMessageField: logMessageField,
105107
}
106108

107109
model := es.DatasourceInfo{

pkg/quickwit/response_parser.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/grafana/grafana-plugin-sdk-go/backend"
1414
"github.com/grafana/grafana-plugin-sdk-go/data"
15+
"golang.org/x/exp/slices"
1516

1617
es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client"
1718
"github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/simplejson"
@@ -246,7 +247,7 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str
246247
if propName == configuredFields.TimeField {
247248
timeVector := make([]*time.Time, size)
248249
for i, doc := range docs {
249-
timeValue, err := ParseToTime(doc["sort"].([]any)[0])
250+
timeValue, err := ParseToTime(doc[configuredFields.TimeField], configuredFields.TimeOutputFormat)
250251
if err != nil {
251252
continue
252253
}
@@ -293,13 +294,39 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str
293294
// Parses a value into Time given a timeOutputFormat. The conversion
294295
// only works with float64 as this is what we get when parsing a response.
295296
// TODO: understand why we get a float64?
296-
func ParseToTime(value interface{}) (time.Time, error) {
297-
typed_value, ok := value.(float64)
298-
if !ok {
299-
return time.Time{}, errors.New("parse time only accepts float64 with timestamp based format")
297+
func ParseToTime(value interface{}, timeOutputFormat string) (time.Time, error) {
298+
299+
if timeOutputFormat == Iso8601 || timeOutputFormat == Rfc3339 {
300+
value_string := value.(string)
301+
timeValue, err := time.Parse(time.RFC3339, value_string)
302+
if err != nil {
303+
return time.Time{}, err
304+
}
305+
return timeValue, nil
306+
} else if timeOutputFormat == Rfc2822 {
307+
value_string := value.(string)
308+
timeValue, err := time.Parse(time.RFC822Z, value_string)
309+
if err != nil {
310+
return time.Time{}, err
311+
}
312+
return timeValue, nil
313+
} else if slices.Contains([]string{TimestampSecs, TimestampMillis, TimestampMicros, TimestampNanos}, timeOutputFormat) {
314+
typed_value, ok := value.(float64)
315+
if !ok {
316+
return time.Time{}, errors.New("parse time only accepts float64 with timestamp based format")
317+
}
318+
int64_value := int64(typed_value)
319+
if timeOutputFormat == TimestampSecs {
320+
return time.Unix(int64_value, 0), nil
321+
} else if timeOutputFormat == TimestampMillis {
322+
return time.Unix(0, int64_value*1_000_000), nil
323+
} else if timeOutputFormat == TimestampMicros {
324+
return time.Unix(0, int64_value*1_000), nil
325+
} else if timeOutputFormat == TimestampNanos {
326+
return time.Unix(0, int64_value), nil
327+
}
300328
}
301-
int64_value := int64(typed_value)
302-
return time.Unix(0, int64_value), nil
329+
return time.Time{}, fmt.Errorf("timeOutputFormat not supported yet %s", timeOutputFormat)
303330
}
304331

305332
func processBuckets(aggs map[string]interface{}, target *Query,

pkg/quickwit/response_parser_qw_test.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
5959
`)
6060

6161
configuredFields := es.ConfiguredFields{
62-
TimeField: "testtime",
63-
LogMessageField: "line",
64-
LogLevelField: "lvl",
62+
TimeOutputFormat: TimestampNanos,
63+
TimeField: "testtime",
64+
LogMessageField: "line",
65+
LogLevelField: "lvl",
6566
}
6667
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
6768
frames := result.response.Responses["A"].Frames
@@ -125,9 +126,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
125126
`)
126127

127128
configuredFields := es.ConfiguredFields{
128-
TimeField: "testtime",
129-
LogMessageField: "line",
130-
LogLevelField: "lvl",
129+
TimeOutputFormat: TimestampMicros,
130+
TimeField: "testtime",
131+
LogMessageField: "line",
132+
LogLevelField: "lvl",
131133
}
132134
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
133135
frames := result.response.Responses["A"].Frames
@@ -191,9 +193,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
191193
`)
192194

193195
configuredFields := es.ConfiguredFields{
194-
TimeField: "testtime",
195-
LogMessageField: "line",
196-
LogLevelField: "lvl",
196+
TimeOutputFormat: TimestampMillis,
197+
TimeField: "testtime",
198+
LogMessageField: "line",
199+
LogLevelField: "lvl",
197200
}
198201
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
199202
frames := result.response.Responses["A"].Frames
@@ -257,9 +260,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
257260
`)
258261

259262
configuredFields := es.ConfiguredFields{
260-
TimeField: "testtime",
261-
LogMessageField: "line",
262-
LogLevelField: "lvl",
263+
TimeOutputFormat: TimestampSecs,
264+
TimeField: "testtime",
265+
LogMessageField: "line",
266+
LogLevelField: "lvl",
263267
}
264268
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
265269
frames := result.response.Responses["A"].Frames
@@ -278,7 +282,7 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
278282
func TestConvertToTime(t *testing.T) {
279283
t.Run("Test parse unix timestamps nanosecs of float type", func(t *testing.T) {
280284
inputValue := interface{}(1234567890000000000.0)
281-
value, _ := ParseToTime(inputValue)
285+
value, _ := ParseToTime(inputValue, "unix_timestamp_nanos")
282286
require.Equal(t, time.Unix(1234567890, 0), value)
283287
})
284288
}

pkg/quickwit/response_parser_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3228,9 +3228,10 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac
32283228
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
32293229
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
32303230
configuredFields := es.ConfiguredFields{
3231-
TimeField: "@timestamp",
3232-
LogMessageField: "line",
3233-
LogLevelField: "lvl",
3231+
TimeOutputFormat: Rfc3339,
3232+
TimeField: "@timestamp",
3233+
LogMessageField: "line",
3234+
LogLevelField: "lvl",
32343235
}
32353236
timeRange := backend.TimeRange{
32363237
From: from,

0 commit comments

Comments
 (0)