Skip to content

Commit f138b6c

Browse files
committed
feat: support retry
Signed-off-by: Eric Fu <[email protected]>
1 parent ac188cb commit f138b6c

File tree

5 files changed

+230
-39
lines changed

5 files changed

+230
-39
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,18 @@ echo $USER
132132
xxchan
133133
```
134134

135+
### Extension: Retry
136+
137+
```text
138+
query I retry 3 backoff 5s
139+
SELECT id FROM test;
140+
----
141+
1
142+
143+
statement ok retry 3 backoff 5s
144+
UPDATE test SET id = 1;
145+
```
146+
135147
### Extension: Environment variable substitution in query and statement
136148

137149
It needs to be enabled by adding `control substitution on` to the test file.

sqllogictest/src/parser.rs

Lines changed: 152 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ impl Location {
6767
}
6868
}
6969

70+
/// Configuration for retry behavior
71+
#[derive(Debug, Clone, PartialEq)]
72+
pub struct RetryConfig {
73+
/// Number of retry attempts
74+
pub attempts: usize,
75+
/// Duration to wait between retries
76+
pub backoff: Duration,
77+
}
78+
7079
/// Expectation for a statement.
7180
#[derive(Debug, Clone, PartialEq)]
7281
pub enum StatementExpect {
@@ -86,7 +95,6 @@ pub enum QueryExpect<T: ColumnType> {
8695
types: Vec<T>,
8796
sort_mode: Option<SortMode>,
8897
result_mode: Option<ResultMode>,
89-
label: Option<String>,
9098
results: Vec<String>,
9199
},
92100
/// Query should fail with the given error message.
@@ -100,7 +108,6 @@ impl<T: ColumnType> QueryExpect<T> {
100108
types: Vec::new(),
101109
sort_mode: None,
102110
result_mode: None,
103-
label: None,
104111
results: Vec::new(),
105112
}
106113
}
@@ -125,6 +132,8 @@ pub enum Record<T: ColumnType> {
125132
/// The SQL command.
126133
sql: String,
127134
expected: StatementExpect,
135+
/// Optional retry configuration
136+
retry: Option<RetryConfig>,
128137
},
129138
/// A query is an SQL command from which we expect to receive results. The result set might be
130139
/// empty.
@@ -135,6 +144,8 @@ pub enum Record<T: ColumnType> {
135144
/// The SQL command.
136145
sql: String,
137146
expected: QueryExpect<T>,
147+
/// Optional retry configuration
148+
retry: Option<RetryConfig>,
138149
},
139150
/// A system command is an external command that is to be executed by the shell. Currently it
140151
/// must succeed and the output is ignored.
@@ -208,13 +219,22 @@ impl<T: ColumnType> std::fmt::Display for Record<T> {
208219
connection: _,
209220
sql,
210221
expected,
222+
retry,
211223
} => {
212224
write!(f, "statement ")?;
213225
match expected {
214226
StatementExpect::Ok => write!(f, "ok")?,
215227
StatementExpect::Count(cnt) => write!(f, "count {cnt}")?,
216228
StatementExpect::Error(err) => err.fmt_inline(f)?,
217229
}
230+
if let Some(retry) = retry {
231+
write!(
232+
f,
233+
" retry {} backoff {}",
234+
retry.attempts,
235+
humantime::format_duration(retry.backoff)
236+
)?;
237+
}
218238
writeln!(f)?;
219239
// statement always end with a blank line
220240
writeln!(f, "{sql}")?;
@@ -230,36 +250,37 @@ impl<T: ColumnType> std::fmt::Display for Record<T> {
230250
connection: _,
231251
sql,
232252
expected,
253+
retry,
233254
} => {
234255
write!(f, "query ")?;
235256
match expected {
236257
QueryExpect::Results {
237-
types,
238-
sort_mode,
239-
label,
240-
..
258+
types, sort_mode, ..
241259
} => {
242260
write!(f, "{}", types.iter().map(|c| c.to_char()).join(""))?;
243261
if let Some(sort_mode) = sort_mode {
244262
write!(f, " {}", sort_mode.as_str())?;
245263
}
246-
if let Some(label) = label {
247-
write!(f, " {label}")?;
248-
}
249264
}
250265
QueryExpect::Error(err) => err.fmt_inline(f)?,
251266
}
267+
if let Some(retry) = retry {
268+
write!(
269+
f,
270+
" retry {} backoff {}",
271+
retry.attempts,
272+
humantime::format_duration(retry.backoff)
273+
)?;
274+
}
252275
writeln!(f)?;
253276
writeln!(f, "{sql}")?;
254277

255278
match expected {
256279
QueryExpect::Results { results, .. } => {
257280
write!(f, "{}", RESULTS_DELIMITER)?;
258-
259281
for result in results {
260282
write!(f, "\n{result}")?;
261283
}
262-
// query always ends with a blank line
263284
writeln!(f)?
264285
}
265286
QueryExpect::Error(err) => err.fmt_multiline(f)?,
@@ -622,6 +643,8 @@ pub enum ParseErrorKind {
622643
InvalidErrorMessage(String),
623644
#[error("duplicated error messages after error` and under `----`")]
624645
DuplicatedErrorMessage,
646+
#[error("invalid retry config: {0:?}")]
647+
InvalidRetryConfig(String),
625648
#[error("statement should have no result, use `query` instead")]
626649
StatementHasResults,
627650
#[error("invalid duration: {0:?}")]
@@ -730,22 +753,28 @@ fn parse_inner<T: ColumnType>(loc: &Location, script: &str) -> Result<Vec<Record
730753
records.push(Record::Connection(conn));
731754
}
732755
["statement", res @ ..] => {
733-
let mut expected = match res {
734-
["ok"] => StatementExpect::Ok,
735-
["error", tokens @ ..] => {
736-
let error = ExpectedError::parse_inline_tokens(tokens)
756+
let (mut expected, res) = match res {
757+
["ok", retry @ ..] => (StatementExpect::Ok, retry),
758+
["error", err_tokens @ ..] => {
759+
// NOTE: `statement error` can't be used with `retry` now because all the
760+
// tokens after `error` are treated as error message.
761+
let error = ExpectedError::parse_inline_tokens(err_tokens)
737762
.map_err(|e| e.at(loc.clone()))?;
738-
StatementExpect::Error(error)
763+
(StatementExpect::Error(error), &[][..])
739764
}
740-
["count", count_str] => {
765+
["count", count_str, retry @ ..] => {
741766
let count = count_str.parse::<u64>().map_err(|_| {
742767
ParseErrorKind::InvalidNumber((*count_str).into()).at(loc.clone())
743768
})?;
744-
StatementExpect::Count(count)
769+
(StatementExpect::Count(count), retry)
745770
}
746771
_ => return Err(ParseErrorKind::InvalidLine(line.into()).at(loc)),
747772
};
773+
774+
let retry = parse_retry_config(res).map_err(|e| e.at(loc.clone()))?;
775+
748776
let (sql, has_results) = parse_lines(&mut lines, &loc, Some(RESULTS_DELIMITER))?;
777+
749778
if has_results {
750779
if let StatementExpect::Error(e) = &mut expected {
751780
// If no inline error message is specified, it might be a multiline error.
@@ -765,14 +794,17 @@ fn parse_inner<T: ColumnType>(loc: &Location, script: &str) -> Result<Vec<Record
765794
connection: std::mem::take(&mut connection),
766795
sql,
767796
expected,
797+
retry,
768798
});
769799
}
770800
["query", res @ ..] => {
771-
let mut expected = match res {
772-
["error", tokens @ ..] => {
773-
let error = ExpectedError::parse_inline_tokens(tokens)
801+
let (mut expected, res) = match res {
802+
["error", err_tokens @ ..] => {
803+
// NOTE: `query error` can't be used with `retry` now because all the tokens
804+
// after `error` are treated as error message.
805+
let error = ExpectedError::parse_inline_tokens(err_tokens)
774806
.map_err(|e| e.at(loc.clone()))?;
775-
QueryExpect::Error(error)
807+
(QueryExpect::Error(error), &[][..])
776808
}
777809
[type_str, res @ ..] => {
778810
let types = type_str
@@ -782,23 +814,24 @@ fn parse_inner<T: ColumnType>(loc: &Location, script: &str) -> Result<Vec<Record
782814
.ok_or_else(|| ParseErrorKind::InvalidType(ch).at(loc.clone()))
783815
})
784816
.try_collect()?;
785-
let sort_mode = res
786-
.first()
787-
.map(|&s| SortMode::try_from_str(s))
788-
.transpose()
789-
.map_err(|e| e.at(loc.clone()))?;
790-
let label = res.get(1).map(|s| s.to_string());
791-
QueryExpect::Results {
792-
types,
793-
sort_mode,
794-
result_mode: None,
795-
label,
796-
results: Vec::new(),
797-
}
817+
let sort_mode = res.first().and_then(|&s| SortMode::try_from_str(s).ok()); // Could be `retry`
818+
819+
let retry_start = if sort_mode.is_some() { 1 } else { 0 };
820+
(
821+
QueryExpect::Results {
822+
types,
823+
sort_mode,
824+
result_mode: None,
825+
results: Vec::new(),
826+
},
827+
&res[retry_start..],
828+
)
798829
}
799-
[] => QueryExpect::empty_results(),
830+
[] => (QueryExpect::empty_results(), &[][..]),
800831
};
801832

833+
let retry = parse_retry_config(res).map_err(|e| e.at(loc.clone()))?;
834+
802835
// The SQL for the query is found on second and subsequent lines of the record
803836
// up to first line of the form "----" or until the end of the record.
804837
let (sql, has_result) = parse_lines(&mut lines, &loc, Some(RESULTS_DELIMITER))?;
@@ -830,6 +863,7 @@ fn parse_inner<T: ColumnType>(loc: &Location, script: &str) -> Result<Vec<Record
830863
connection: std::mem::take(&mut connection),
831864
sql,
832865
expected,
866+
retry,
833867
});
834868
}
835869
["system", "ok"] => {
@@ -982,6 +1016,77 @@ fn parse_multiline_error<'a>(
9821016
ExpectedError::Multiline(parse_multiple_result(lines))
9831017
}
9841018

1019+
/// Parse retry configuration from tokens
1020+
///
1021+
/// The retry configuration is optional and can be specified as:
1022+
///
1023+
/// ```text
1024+
/// ... retry 3 backoff 1s
1025+
/// ```
1026+
fn parse_retry_config(tokens: &[&str]) -> Result<Option<RetryConfig>, ParseErrorKind> {
1027+
if tokens.is_empty() {
1028+
return Ok(None);
1029+
}
1030+
1031+
let mut iter = tokens.iter().peekable();
1032+
1033+
// Check if we have retry clause
1034+
match iter.next() {
1035+
Some(&"retry") => {}
1036+
Some(token) => return Err(ParseErrorKind::UnexpectedToken(token.to_string())),
1037+
None => return Ok(None),
1038+
}
1039+
1040+
// Parse number of attempts
1041+
let attempts = match iter.next() {
1042+
Some(attempts_str) => attempts_str
1043+
.parse::<usize>()
1044+
.map_err(|_| ParseErrorKind::InvalidNumber(attempts_str.to_string()))?,
1045+
None => {
1046+
return Err(ParseErrorKind::InvalidRetryConfig(
1047+
"expected a positive number of attempts".to_string(),
1048+
))
1049+
}
1050+
};
1051+
1052+
if attempts == 0 {
1053+
return Err(ParseErrorKind::InvalidRetryConfig(
1054+
"attempt must be greater than 0".to_string(),
1055+
));
1056+
}
1057+
1058+
// Expect "backoff" keyword
1059+
match iter.next() {
1060+
Some(&"backoff") => {}
1061+
Some(token) => return Err(ParseErrorKind::UnexpectedToken(token.to_string())),
1062+
None => {
1063+
return Err(ParseErrorKind::InvalidRetryConfig(
1064+
"expected keyword backoff".to_string(),
1065+
))
1066+
}
1067+
}
1068+
1069+
// Parse backoff duration
1070+
let duration_str = match iter.next() {
1071+
Some(s) => s,
1072+
None => {
1073+
return Err(ParseErrorKind::InvalidRetryConfig(
1074+
"expected backoff duration".to_string(),
1075+
))
1076+
}
1077+
};
1078+
1079+
let backoff = humantime::parse_duration(duration_str)
1080+
.map_err(|_| ParseErrorKind::InvalidDuration(duration_str.to_string()))?;
1081+
1082+
// No more tokens should be present
1083+
if iter.next().is_some() {
1084+
return Err(ParseErrorKind::UnexpectedToken("extra tokens".to_string()));
1085+
}
1086+
1087+
Ok(Some(RetryConfig { attempts, backoff }))
1088+
}
1089+
9851090
#[cfg(test)]
9861091
mod tests {
9871092
use std::io::Write;
@@ -1092,6 +1197,7 @@ select * from foo;
10921197
connection: Connection::Default,
10931198
sql: "select * from foo;".to_string(),
10941199
expected: QueryExpect::empty_results(),
1200+
retry: None,
10951201
}]
10961202
);
10971203
}
@@ -1186,4 +1292,14 @@ select * from foo;
11861292
}
11871293
}
11881294
}
1295+
1296+
#[test]
1297+
fn test_statement_retry() {
1298+
parse_roundtrip::<DefaultColumnType>("../tests/retry/statement_retry.slt")
1299+
}
1300+
1301+
#[test]
1302+
fn test_query_retry() {
1303+
parse_roundtrip::<DefaultColumnType>("../tests/retry/query_retry.slt")
1304+
}
11891305
}

0 commit comments

Comments
 (0)