Skip to content

Commit deb98b5

Browse files
tabVersiontabversion
and
tabversion
authored
feat: tvf with cdc source (#20439)
Signed-off-by: tabversion <[email protected]> Co-authored-by: tabversion <[email protected]>
1 parent f284677 commit deb98b5

File tree

6 files changed

+179
-86
lines changed

6 files changed

+179
-86
lines changed

e2e_test/source_inline/kafka/handling_mode.slt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,47 +78,47 @@ format debezium encode json (timestamptz.handling.mode = 'milli');
7878

7979
sleep 2s
8080

81-
query TT
81+
query TT retry 3 backoff 5s
8282
select "case", (payload).after.at from plain_guess order by 1;
8383
----
8484
0 number small 1970-01-01 00:01:40+00:00
8585
1 number recent 2024-04-11 02:00:00.123456+00:00
8686
2 string utc 2024-04-11 02:00:00.654321+00:00
8787
3 string naive NULL
8888

89-
query TT
89+
query TT retry 3 backoff 5s
9090
select "case", (payload).after.at from plain_milli order by 1;
9191
----
9292
0 number small 1970-01-01 00:00:00.100+00:00
9393
1 number recent 56246-07-01 08:02:03.456+00:00
9494
2 string utc 2024-04-11 02:00:00.654321+00:00
9595
3 string naive NULL
9696

97-
query TT
97+
query TT retry 3 backoff 5s
9898
select "case", (payload).after.at from plain_micro order by 1;
9999
----
100100
0 number small 1970-01-01 00:00:00.000100+00:00
101101
1 number recent 2024-04-11 02:00:00.123456+00:00
102102
2 string utc 2024-04-11 02:00:00.654321+00:00
103103
3 string naive NULL
104104

105-
query TT
105+
query TT retry 3 backoff 5s
106106
select "case", (payload).after.at from plain_utc order by 1;
107107
----
108108
0 number small NULL
109109
1 number recent NULL
110110
2 string utc 2024-04-11 02:00:00.654321+00:00
111111
3 string naive NULL
112112

113-
query TT
113+
query TT retry 3 backoff 5s
114114
select "case", (payload).after.at from plain_naive order by 1;
115115
----
116116
0 number small NULL
117117
1 number recent NULL
118118
2 string utc NULL
119119
3 string naive 2024-04-11 02:00:00.234321+00:00
120120

121-
query TT
121+
query TT retry 3 backoff 5s
122122
select "case", at from debezium_milli order by 1;
123123
----
124124
0 number small 1970-01-01 00:00:00.100+00:00

e2e_test/source_inline/tvf/mysql_query.slt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,24 @@ INSERT INTO test SELECT
6262
null as v23;
6363
"
6464

65+
statement ok
66+
create source mysql_cdc_source with (
67+
${RISEDEV_MYSQL_WITH_OPTIONS_COMMON},
68+
username = '$RISEDEV_MYSQL_USER',
69+
password = '$MYSQL_PWD',
70+
database.name = 'tvf',
71+
);
6572

6673
query
6774
select * from mysql_query('$MYSQL_HOST', '$MYSQL_TCP_PORT', '$RISEDEV_MYSQL_USER', '$MYSQL_PWD', 'tvf', 'select * from test;');
6875
----
6976
1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL
7077

78+
query
79+
select * from mysql_query('mysql_cdc_source', 'select * from test;');
80+
----
81+
1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL
82+
7183

7284
system ok
7385
mysql -e "
@@ -161,4 +173,7 @@ system ok
161173
mysql -e "
162174
USE tvf;
163175
DROP DATABASE tvf;
164-
"
176+
"
177+
178+
statement ok
179+
drop source mysql_cdc_source;

e2e_test/source_inline/tvf/postgres_query.slt

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ PGDATABASE=source_test psql -c "
3030
INSERT INTO test SELECT generate_series(1, 100), true, 1, 1, 1, 1.0, 1.0, 1.0, '2021-01-01', '00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00 pst', 'text', 'varchar', '1 day', '{}', '\x01';
3131
"
3232

33+
statement ok
34+
create source postgres_cdc_source with (
35+
${RISEDEV_POSTGRES_WITH_OPTIONS_COMMON},
36+
username = '$PGUSER',
37+
password = '$PGPASSWORD',
38+
database.name = 'source_test'
39+
);
40+
41+
sleep 1s
42+
3343
query II
3444
select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '$PGPASSWORD', 'source_test', 'select * from test where id > 90;');
3545
----
@@ -43,3 +53,20 @@ select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '$PGPASSWORD', 'so
4353
98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
4454
99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
4555
100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
56+
57+
query II
58+
select * from postgres_query('postgres_cdc_source', 'select * from test where id > 90;');
59+
----
60+
91 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
61+
92 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
62+
93 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
63+
94 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
64+
95 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
65+
96 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
66+
97 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
67+
98 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
68+
99 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
69+
100 t 1 1 1 1 1 1.0 2021-01-01 00:00:00 2021-01-01 00:00:00 2021-01-01 08:00:00+00:00 text varchar 1 day {} \x01
70+
71+
statement ok
72+
drop source postgres_cdc_source;

src/frontend/src/binder/expr/function/mod.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,14 @@ impl Binder {
317317
"`VARIADIC` is not allowed in table function call"
318318
);
319319
self.ensure_table_function_allowed()?;
320-
return Ok(TableFunction::new_postgres_query(args)
321-
.context("postgres_query error")?
322-
.into());
320+
return Ok(TableFunction::new_postgres_query(
321+
&self.catalog,
322+
&self.db_name,
323+
self.bind_schema_path(schema_name.as_deref()),
324+
args,
325+
)
326+
.context("postgres_query error")?
327+
.into());
323328
}
324329
// `mysql_query` table function
325330
if func_name.eq("mysql_query") {
@@ -328,9 +333,14 @@ impl Binder {
328333
"`VARIADIC` is not allowed in table function call"
329334
);
330335
self.ensure_table_function_allowed()?;
331-
return Ok(TableFunction::new_mysql_query(args)
332-
.context("mysql_query error")?
333-
.into());
336+
return Ok(TableFunction::new_mysql_query(
337+
&self.catalog,
338+
&self.db_name,
339+
self.bind_schema_path(schema_name.as_deref()),
340+
args,
341+
)
342+
.context("mysql_query error")?
343+
.into());
334344
}
335345
// UDTF
336346
if let Some(ref udf) = udf

src/frontend/src/expr/table_function.rs

Lines changed: 109 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use itertools::Itertools;
1919
use mysql_async::consts::ColumnType as MySqlColumnType;
2020
use mysql_async::prelude::*;
2121
use risingwave_common::array::arrow::IcebergArrowConvert;
22+
use risingwave_common::secret::LocalSecretManager;
2223
use risingwave_common::types::{DataType, ScalarImpl, StructType};
2324
use risingwave_connector::source::iceberg::{
2425
extract_bucket_and_file_name, get_parquet_fields, list_data_directory, new_azblob_operator,
@@ -30,10 +31,15 @@ use thiserror_ext::AsReport;
3031
use tokio_postgres::types::Type as TokioPgType;
3132

3233
use super::{infer_type, Expr, ExprImpl, ExprRewriter, Literal, RwResult};
34+
use crate::catalog::catalog_service::CatalogReadGuard;
3335
use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind};
36+
use crate::catalog::root_catalog::SchemaPath;
3437
use crate::error::ErrorCode::BindError;
3538
use crate::utils::FRONTEND_RUNTIME;
3639

40+
const INLINE_ARG_LEN: usize = 6;
41+
const CDC_SOURCE_ARG_LEN: usize = 2;
42+
3743
/// A table function takes a row as input and returns a table. It is also known as Set-Returning
3844
/// Function.
3945
///
@@ -291,45 +297,76 @@ impl TableFunction {
291297
})
292298
}
293299

294-
pub fn new_postgres_query(args: Vec<ExprImpl>) -> RwResult<Self> {
295-
let args = {
296-
if args.len() != 6 {
297-
return Err(BindError("postgres_query function only accepts 6 arguments: postgres_query(hostname varchar, port varchar, username varchar, password varchar, database_name varchar, postgres_query varchar)".to_owned()).into());
298-
}
299-
let mut cast_args = Vec::with_capacity(6);
300-
for arg in args {
301-
let arg = arg.cast_implicit(DataType::Varchar)?;
302-
cast_args.push(arg);
300+
fn handle_postgres_or_mysql_query_args(
301+
catalog_reader: &CatalogReadGuard,
302+
db_name: &str,
303+
schema_path: SchemaPath<'_>,
304+
args: Vec<ExprImpl>,
305+
expect_connector_name: &str,
306+
) -> RwResult<Vec<ExprImpl>> {
307+
let cast_args = match args.len() {
308+
INLINE_ARG_LEN => {
309+
let mut cast_args = Vec::with_capacity(INLINE_ARG_LEN);
310+
for arg in args {
311+
let arg = arg.cast_implicit(DataType::Varchar)?;
312+
cast_args.push(arg);
313+
}
314+
cast_args
303315
}
304-
cast_args
305-
};
306-
let evaled_args = {
307-
let mut evaled_args: Vec<String> = Vec::with_capacity(6);
308-
for arg in &args {
309-
match arg.try_fold_const() {
310-
Some(Ok(value)) => {
311-
let Some(scalar) = value else {
312-
return Err(BindError(
313-
"postgres_query function does not accept null arguments".to_owned(),
314-
)
315-
.into());
316-
};
317-
evaled_args.push(scalar.into_utf8().into());
318-
}
319-
Some(Err(err)) => {
320-
return Err(err);
321-
}
322-
None => {
323-
return Err(BindError(
324-
"postgres_query function only accepts constant arguments".to_owned(),
325-
)
326-
.into());
327-
}
316+
CDC_SOURCE_ARG_LEN => {
317+
let source_name = expr_impl_to_string_fn(&args[0])?;
318+
let source_catalog = catalog_reader
319+
.get_source_by_name(db_name, schema_path, &source_name)?
320+
.0;
321+
if !source_catalog
322+
.connector_name()
323+
.eq_ignore_ascii_case(expect_connector_name)
324+
{
325+
return Err(BindError(format!("TVF function only accepts `mysql-cdc` and `postgres-cdc` source. Expected: {}, but got: {}", expect_connector_name, source_catalog.connector_name())).into());
328326
}
327+
328+
let (props, secret_refs) = source_catalog.with_properties.clone().into_parts();
329+
let secret_resolved =
330+
LocalSecretManager::global().fill_secrets(props, secret_refs)?;
331+
332+
vec![
333+
ExprImpl::literal_varchar(secret_resolved["hostname"].clone()),
334+
ExprImpl::literal_varchar(secret_resolved["port"].clone()),
335+
ExprImpl::literal_varchar(secret_resolved["username"].clone()),
336+
ExprImpl::literal_varchar(secret_resolved["password"].clone()),
337+
ExprImpl::literal_varchar(secret_resolved["database.name"].clone()),
338+
args.get(1)
339+
.unwrap()
340+
.clone()
341+
.cast_implicit(DataType::Varchar)?,
342+
]
343+
}
344+
_ => {
345+
return Err(BindError("postgres_query function and mysql_query function accept either 2 arguments: (cdc_source_name varchar, query varchar) or 6 arguments: (hostname varchar, port varchar, username varchar, password varchar, database_name varchar, query varchar)".to_owned()).into());
329346
}
330-
evaled_args
331347
};
332348

349+
Ok(cast_args)
350+
}
351+
352+
pub fn new_postgres_query(
353+
catalog_reader: &CatalogReadGuard,
354+
db_name: &str,
355+
schema_path: SchemaPath<'_>,
356+
args: Vec<ExprImpl>,
357+
) -> RwResult<Self> {
358+
let args = Self::handle_postgres_or_mysql_query_args(
359+
catalog_reader,
360+
db_name,
361+
schema_path,
362+
args,
363+
"postgres-cdc",
364+
)?;
365+
let evaled_args = args
366+
.iter()
367+
.map(expr_impl_to_string_fn)
368+
.collect::<RwResult<Vec<_>>>()?;
369+
333370
#[cfg(madsim)]
334371
{
335372
return Err(crate::error::ErrorCode::BindError(
@@ -411,45 +448,23 @@ impl TableFunction {
411448
}
412449
}
413450

414-
pub fn new_mysql_query(args: Vec<ExprImpl>) -> RwResult<Self> {
415-
static MYSQL_ARGS_LEN: usize = 6;
416-
let args = {
417-
if args.len() != MYSQL_ARGS_LEN {
418-
return Err(BindError("mysql_query function only accepts 6 arguments: mysql_query(hostname varchar, port varchar, username varchar, password varchar, database_name varchar, mysql_query varchar)".to_owned()).into());
419-
}
420-
let mut cast_args = Vec::with_capacity(MYSQL_ARGS_LEN);
421-
for arg in args {
422-
let arg = arg.cast_implicit(DataType::Varchar)?;
423-
cast_args.push(arg);
424-
}
425-
cast_args
426-
};
427-
let evaled_args = {
428-
let mut evaled_args: Vec<String> = Vec::with_capacity(MYSQL_ARGS_LEN);
429-
for arg in &args {
430-
match arg.try_fold_const() {
431-
Some(Ok(value)) => {
432-
let Some(scalar) = value else {
433-
return Err(BindError(
434-
"mysql_query function does not accept null arguments".to_owned(),
435-
)
436-
.into());
437-
};
438-
evaled_args.push(scalar.into_utf8().into());
439-
}
440-
Some(Err(err)) => {
441-
return Err(err);
442-
}
443-
None => {
444-
return Err(BindError(
445-
"mysql_query function only accepts constant arguments".to_owned(),
446-
)
447-
.into());
448-
}
449-
}
450-
}
451-
evaled_args
452-
};
451+
pub fn new_mysql_query(
452+
catalog_reader: &CatalogReadGuard,
453+
db_name: &str,
454+
schema_path: SchemaPath<'_>,
455+
args: Vec<ExprImpl>,
456+
) -> RwResult<Self> {
457+
let args = Self::handle_postgres_or_mysql_query_args(
458+
catalog_reader,
459+
db_name,
460+
schema_path,
461+
args,
462+
"mysql-cdc",
463+
)?;
464+
let evaled_args = args
465+
.iter()
466+
.map(expr_impl_to_string_fn)
467+
.collect::<RwResult<Vec<_>>>()?;
453468

454469
#[cfg(madsim)]
455470
{
@@ -624,3 +639,24 @@ impl Expr for TableFunction {
624639
unreachable!("Table function should not be converted to ExprNode")
625640
}
626641
}
642+
643+
fn expr_impl_to_string_fn(arg: &ExprImpl) -> RwResult<String> {
644+
match arg.try_fold_const() {
645+
Some(Ok(value)) => {
646+
let Some(scalar) = value else {
647+
return Err(BindError(
648+
"postgres_query function and mysql_query function do not accept null arguments"
649+
.to_owned(),
650+
)
651+
.into());
652+
};
653+
Ok(scalar.into_utf8().to_string())
654+
}
655+
Some(Err(err)) => Err(err),
656+
None => Err(BindError(
657+
"postgres_query function and mysql_query function only accept constant arguments"
658+
.to_owned(),
659+
)
660+
.into()),
661+
}
662+
}

src/risedevtool/src/risedev_env.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ pub fn generate_risedev_env(services: &Vec<ServiceConfig>) -> String {
117117
writeln!(env, r#"PGUSER="{user}""#,).unwrap();
118118
writeln!(env, r#"PGPASSWORD="{password}""#,).unwrap();
119119
writeln!(env, r#"PGDATABASE="{database}""#,).unwrap();
120+
writeln!(
121+
env,
122+
r#"RISEDEV_POSTGRES_WITH_OPTIONS_COMMON="connector='postgres-cdc',hostname='{host}',port='{port}'""#,
123+
)
124+
.unwrap();
120125
}
121126
ServiceConfig::SqlServer(c) => {
122127
let host = &c.address;

0 commit comments

Comments
 (0)