Skip to content

Commit 7e66d2f

Browse files
authored
Add TaggedSeries and deserialize_next_tagged for GROUP BY (#69)
1 parent c31188d commit 7e66d2f

File tree

3 files changed

+196
-1
lines changed

3 files changed

+196
-1
lines changed

Diff for: influxdb/src/integrations/serde_integration/de.rs

+104-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::Series;
1+
use super::{Series, TaggedSeries};
22
use serde::de::{
33
value, DeserializeSeed, Deserializer, Error, IntoDeserializer, MapAccess, SeqAccess, Visitor,
44
};
@@ -96,6 +96,109 @@ where
9696
}
9797
}
9898

99+
// Based on https://serde.rs/deserialize-struct.html
100+
impl<'de, TAG, T> Deserialize<'de> for TaggedSeries<TAG, T>
101+
where
102+
TAG: Deserialize<'de>,
103+
T: Deserialize<'de>,
104+
{
105+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
106+
where
107+
D: Deserializer<'de>,
108+
{
109+
// Field name deserializer
110+
#[derive(Deserialize)]
111+
#[serde(field_identifier, rename_all = "lowercase")]
112+
enum Field {
113+
Name,
114+
Tags,
115+
Columns,
116+
Values,
117+
};
118+
119+
struct SeriesVisitor<TAG, T> {
120+
_tag_type: PhantomData<TAG>,
121+
_value_type: PhantomData<T>,
122+
};
123+
124+
impl<'de, TAG, T> Visitor<'de> for SeriesVisitor<TAG, T>
125+
where
126+
TAG: Deserialize<'de>,
127+
T: Deserialize<'de>,
128+
{
129+
type Value = TaggedSeries<TAG, T>;
130+
131+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
132+
formatter.write_str("struct TaggedSeries")
133+
}
134+
135+
fn visit_map<V>(self, mut map: V) -> Result<TaggedSeries<TAG, T>, V::Error>
136+
where
137+
V: MapAccess<'de>,
138+
{
139+
let mut name = None;
140+
let mut tags: Option<TAG> = None;
141+
let mut columns: Option<Vec<String>> = None;
142+
let mut values: Option<Vec<T>> = None;
143+
while let Some(key) = map.next_key()? {
144+
match key {
145+
Field::Name => {
146+
if name.is_some() {
147+
return Err(Error::duplicate_field("name"));
148+
}
149+
name = Some(map.next_value()?);
150+
}
151+
Field::Tags => {
152+
if tags.is_some() {
153+
return Err(Error::duplicate_field("tags"));
154+
}
155+
tags = Some(map.next_value()?);
156+
}
157+
Field::Columns => {
158+
if columns.is_some() {
159+
return Err(Error::duplicate_field("columns"));
160+
}
161+
columns = Some(map.next_value()?);
162+
}
163+
Field::Values => {
164+
if values.is_some() {
165+
return Err(Error::duplicate_field("values"));
166+
}
167+
// Error out if "values" is encountered before "columns"
168+
// Hopefully, InfluxDB never does this.
169+
if columns.is_none() {
170+
return Err(Error::custom(
171+
"series values encountered before columns",
172+
));
173+
}
174+
// Deserialize using a HeaderVec deserializer
175+
// seeded with the headers from the "columns" field
176+
values = Some(map.next_value_seed(HeaderVec::<T> {
177+
header: columns.as_ref().unwrap(),
178+
_inner_type: PhantomData,
179+
})?);
180+
}
181+
}
182+
}
183+
let name = name.ok_or_else(|| Error::missing_field("name"))?;
184+
let tags = tags.ok_or_else(|| Error::missing_field("tags"))?;
185+
let values = values.ok_or_else(|| Error::missing_field("values"))?;
186+
Ok(TaggedSeries { name, tags, values })
187+
}
188+
}
189+
190+
const FIELDS: &[&str] = &["name", "tags", "values"];
191+
deserializer.deserialize_struct(
192+
"TaggedSeries",
193+
FIELDS,
194+
SeriesVisitor::<TAG, T> {
195+
_tag_type: PhantomData,
196+
_value_type: PhantomData,
197+
},
198+
)
199+
}
200+
}
201+
99202
// Deserializer that takes a header as a seed
100203
// and deserializes an array of arrays into a
101204
// Vec of map-like values using the header as

Diff for: influxdb/src/integrations/serde_integration/mod.rs

+29
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,20 @@ impl DatabaseQueryResult {
7777
}
7878
})
7979
}
80+
81+
pub fn deserialize_next_tagged<TAG, T: 'static>(
82+
&mut self,
83+
) -> Result<TaggedReturn<TAG, T>, Error>
84+
where
85+
TAG: DeserializeOwned + Send,
86+
T: DeserializeOwned + Send,
87+
{
88+
serde_json::from_value::<TaggedReturn<TAG, T>>(self.results.remove(0)).map_err(|err| {
89+
Error::DeserializationError {
90+
error: format!("could not deserialize: {}", err),
91+
}
92+
})
93+
}
8094
}
8195

8296
#[derive(Deserialize, Debug)]
@@ -93,6 +107,21 @@ pub struct Series<T> {
93107
pub values: Vec<T>,
94108
}
95109

110+
#[derive(Deserialize, Debug)]
111+
#[doc(hidden)]
112+
pub struct TaggedReturn<TAG, T> {
113+
#[serde(default = "Vec::new")]
114+
pub series: Vec<TaggedSeries<TAG, T>>,
115+
}
116+
117+
#[derive(Debug)]
118+
/// Represents a returned series from InfluxDB
119+
pub struct TaggedSeries<TAG, T> {
120+
pub name: String,
121+
pub tags: TAG,
122+
pub values: Vec<T>,
123+
}
124+
96125
impl Client {
97126
pub async fn json_query(&self, q: ReadQuery) -> Result<DatabaseQueryResult, Error> {
98127
let query = q.build().unwrap();

Diff for: influxdb/tests/integration_tests.rs

+63
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,69 @@ async fn test_json_query() {
358358
.await;
359359
}
360360

361+
/// INTEGRATION TEST
362+
///
363+
/// This test case tests whether the response to a GROUP BY can be parsed by
364+
// deserialize_next_tagged into a tags struct
365+
#[tokio::test]
366+
#[cfg(feature = "use-serde")]
367+
async fn test_json_query_tagged() {
368+
use serde::Deserialize;
369+
370+
const TEST_NAME: &str = "test_json_query_tagged";
371+
372+
run_test(
373+
|| async move {
374+
create_db(TEST_NAME).await.expect("could not setup db");
375+
376+
let client = create_client(TEST_NAME);
377+
378+
let write_query = Timestamp::Hours(11)
379+
.into_query("weather")
380+
.add_tag("location", "London")
381+
.add_field("temperature", 82);
382+
let write_result = client.query(&write_query).await;
383+
assert_result_ok(&write_result);
384+
385+
#[derive(Deserialize, Debug, PartialEq)]
386+
struct WeatherMeta {
387+
location: String,
388+
}
389+
390+
#[derive(Deserialize, Debug, PartialEq)]
391+
struct Weather {
392+
time: String,
393+
temperature: i32,
394+
}
395+
396+
let query = Query::raw_read_query("SELECT * FROM weather GROUP BY location");
397+
let result = client.json_query(query).await.and_then(|mut db_result| {
398+
db_result.deserialize_next_tagged::<WeatherMeta, Weather>()
399+
});
400+
assert_result_ok(&result);
401+
let result = result.unwrap();
402+
403+
assert_eq!(
404+
result.series[0].tags,
405+
WeatherMeta {
406+
location: "London".to_string(),
407+
}
408+
);
409+
assert_eq!(
410+
result.series[0].values[0],
411+
Weather {
412+
time: "1970-01-01T11:00:00Z".to_string(),
413+
temperature: 82
414+
}
415+
);
416+
},
417+
|| async move {
418+
delete_db(TEST_NAME).await.expect("could not clean up db");
419+
},
420+
)
421+
.await;
422+
}
423+
361424
/// INTEGRATION TEST
362425
///
363426
/// This test case tests whether JSON can be decoded from a InfluxDB response and wether that JSON

0 commit comments

Comments
 (0)