@@ -7,7 +7,7 @@ import FunctionDescription from '@site/src/components/FunctionDescription';
7
7
8
8
<FunctionDescription description =" Introduced or updated: v1.2.738 " />
9
9
10
- CREATE TASK 语句用于定义一个新的 task,该 task 按照计划的时间表或基于 dag 的 task 图执行指定的 SQL 语句。
10
+ CREATE TASK 语句用于定义新任务,该任务可按计划或基于 DAG(有向无环图)任务图执行指定 SQL 语句。
11
11
12
12
** 注意:** 此功能仅在 Databend Cloud 中开箱即用。
13
13
27
27
< sql>
28
28
```
29
29
30
- | 参数 | 描述 |
31
- | ------------------------------------------------ | ------ -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
32
- | IF NOT EXISTS | 可选。如果指定,则仅当不存在同名的 task 时才会创建 task。 |
33
- | name | task 的名称。这是一个必填字段。 |
34
- | WAREHOUSE | 必需。指定用于 task 的虚拟计算集群。 |
35
- | SCHEDULE | 必需。定义 task 运行的时间表。可以以分钟为单位指定,也可以使用 CRON 表达式以及时区指定。 |
36
- | SUSPEND_TASK_AFTER_NUM_FAILURES | 可选。task 在自动挂起之前连续失败的次数。 |
37
- | AFTER | 列出必须在此 task 启动之前完成的 task。 |
38
- | WHEN boolean_expr | task 运行必须为 true 的条件。 |
39
- | [ ERROR_INTEGRATION] ( ../16-notification/index.md ) | 可选。用于 task 错误通知的通知集成的名称,并应用特定的 [ task 错误负载 ] ( ./10-task-error-integration-payload.md ) 。 |
40
- | COMMENT | 可选。一个字符串文字,用作 task 的注释或描述。 |
41
- | session_parameter | 可选。指定在 task 运行期间用于 task 的会话参数。 |
42
- | sql | task 将执行的 SQL 语句,它可以是单个语句或脚本。这是一个必填字段。 |
30
+ | 参数 | 描述 |
31
+ | -------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
32
+ | IF NOT EXISTS | 可选。如果指定,仅当同名任务不存在时才创建任务 |
33
+ | name | 任务名称(必填) |
34
+ | WAREHOUSE | 必需。指定任务使用的虚拟计算集群(Warehouse) |
35
+ | SCHEDULE | 必需。定义任务运行计划,可按分钟/秒指定或使用 CRON 表达式及时区 |
36
+ | SUSPEND_TASK_AFTER_NUM_FAILURES | 可选。连续失败指定次数后自动挂起任务 |
37
+ | AFTER | 列出当前任务启动前必须完成的任务 |
38
+ | WHEN boolean_expr | 任务运行必须满足的条件 |
39
+ | [ ERROR_INTEGRATION] ( ../16-notification/index.md ) | 可选。用于任务错误通知的通知集成名称,应用特定 [ 任务错误负载 ] ( ./10-task-error-integration-payload.md ) |
40
+ | COMMENT | 可选。任务注释或描述的字符串字面量 |
41
+ | session_parameter | 可选。指定任务运行时使用的会话参数(必须位于所有其他参数之后) |
42
+ | sql | 任务执行的 SQL 语句(单语句或脚本,必填) |
43
43
44
44
### 使用说明
45
45
46
- - 必须为独立 task 或 task DAG 中的根 task 定义时间表;否则,task 仅在手动使用 EXECUTE TASK 执行时运行。
47
- - 无法为 DAG 中的子 task 指定时间表。
48
- - 创建 task 后,必须先执行 ALTER TASK … RESUME,然后 task 才能根据 task 定义中指定的参数运行。
49
- - When Condition 仅支持 ` <boolean_expression> ` 的子集
50
- task WHEN 子句中支持以下内容:
51
-
52
- - [ STREAM_STATUS] ( ../../../20-sql-functions/17-table-functions/stream-status.md ) 支持在 SQL 表达式中进行评估。此函数指示指定的流是否包含更改跟踪数据。您可以使用此函数评估指定的流是否包含更改数据,然后再启动当前运行。如果结果为 FALSE,则 task 不运行。
53
- - 布尔运算符,例如 AND、OR、NOT 等。
54
- - 数值、字符串和布尔类型之间的转换。
55
- - 比较运算符,例如等于、不等于、大于、小于等。
46
+ - 独立任务或 DAG(有向无环图)根任务必须定义计划,否则只能通过 EXECUTE TASK 手动执行
47
+ - DAG 子任务不可指定计划
48
+ - 创建任务后需执行 ALTER TASK … RESUME 才能按定义参数运行
49
+ - WHEN 条件仅支持 ` <boolean_expression> ` 子集:
50
+ - 支持在 SQL 表达式中使用 [ STREAM_STATUS] ( ../../../20-sql-functions/17-table-functions/stream-status.md ) 函数评估流是否含变更数据
51
+ - 支持布尔运算符(AND/OR/NOT 等)
52
+ - 支持数值/字符串/布尔类型转换
53
+ - 支持比较运算符(等于/不等于/大于/小于等)
56
54
57
55
::: note
58
- 警告:在 task 中使用 STREAM_STATUS 时,引用流时必须包含数据库名称(例如, ` STREAM_STATUS('mydb.stream_name') ` )。
56
+ 警告:任务中使用 STREAM_STATUS 时,引用流必须包含数据库名(如 ` STREAM_STATUS('mydb.stream_name') ` )
59
57
:::
60
58
61
- - 多个从单个表流中使用更改数据的 task 检索不同的增量。当 task 使用 DML 语句使用流中的更改数据时,流会前进偏移量。更改数据不再可供下一个 task 使用。目前,我们建议只有一个 task 使用流中的更改数据。可以为同一表创建多个流,并由不同的 task 使用。
62
- - task 不会在每次执行时重试;每次执行都是串行的。每个脚本 SQL 逐个执行,没有并行执行。这确保了 task 执行的顺序和依赖关系得以维护。
63
- - 基于间隔的 task 以严格的方式遵循固定的间隔点。这意味着如果当前 task 执行时间超过间隔单位,则下一个 task 将立即执行。否则,下一个 task 将等待直到下一个间隔单位被触发。例如,如果一个 task 定义为 1 秒的间隔,并且一个 task 执行需要 1.5 秒,则下一个 task 将立即执行。如果一个 task 执行需要 0.5 秒,则下一个 task 将等待直到下一个 1 秒间隔刻度开始。
59
+ - 多个任务消费同一表流时会获取不同增量数据。当任务通过 DML 消费流数据时,流偏移量会推进,后续任务无法再消费相同数据。建议单任务消费单流,可为同表创建多流供不同任务使用
60
+ - 任务执行不重试,每次串行执行。脚本 SQL 按顺序逐一执行,无并行处理,确保任务执行顺序和依赖关系
61
+ - 基于间隔的任务严格遵循固定间隔:若当前任务超时,下一任务立即执行;若提前完成,则等待下一间隔触发。例如 1 秒间隔任务:执行 1.5 秒则下一任务立即启动;执行 0.5 秒则等待至下一秒触发
62
+ - 会话参数可在创建时指定,也可通过 ALTER TASK 修改:
63
+ ``` sql
64
+ ALTER TASK simple_task SET
65
+ enable_query_result_cache = 1 ,
66
+ query_result_cache_min_execute_secs = 5 ;
67
+ ```
64
68
65
- ### 关于 Cron 表达式的重要说明
69
+ ### Cron 表达式重要说明
66
70
67
- - ` SCHEDULE ` 参数中使用的 cron 表达式必须包含** 正好 6 个字段** 。
68
- - 这些字段表示以下内容:
71
+ - ` SCHEDULE ` 的 cron 表达式必须包含 ** 6 个字段** :
69
72
1 . ** 秒** (0-59)
70
73
2 . ** 分钟** (0-59)
71
74
3 . ** 小时** (0-23)
72
- 4 . ** 月份中的日期 ** (1-31)
73
- 5 . ** 月份 ** (1-12 或 JAN-DEC)
74
- 6 . ** 星期几 ** (0-6,其中 0 是星期日, 或 SUN-SAT)
75
+ 4 . ** 日 ** (1-31)
76
+ 5 . ** 月 ** (1-12 或 JAN-DEC)
77
+ 6 . ** 星期 ** (0-6,0=周日 或 SUN-SAT)
75
78
76
79
#### Cron 表达式示例:
77
80
78
- - ** 太平洋时间每天上午 9:00:00:**
81
+ - ** 太平洋时间每天 9:00:00:**
79
82
- ` USING CRON '0 0 9 * * *' 'America/Los_Angeles' `
80
83
81
84
- ** 每分钟:**
82
85
- ` USING CRON '0 * * * * *' 'UTC' `
83
- - 这会在每分钟的开始时运行 task。
86
+ - 每分钟开始时执行
84
87
85
- - ** 每小时的第 15 分钟:**
88
+ - ** 每小时第 15 分钟:**
86
89
- ` USING CRON '0 15 * * * *' 'UTC' `
87
- - 这会在每小时的 15 分钟后运行 task。
90
+ - 每小时过 15 分钟时执行
88
91
89
- - ** 每个星期一的下午 12:00:00:**
92
+ - ** 每周一 12:00:00:**
90
93
- ` USING CRON '0 0 12 * * 1' 'UTC' `
91
- - 这会在每个星期一的中午运行 task。
94
+ - 每周一中午执行
92
95
93
- - ** 每个月的第一天的午夜 :**
96
+ - ** 每月首日午夜 :**
94
97
- ` USING CRON '0 0 0 1 * *' 'UTC' `
95
- - 这会在每个月的第一天的午夜运行 task。
98
+ - 每月第一天午夜执行
96
99
97
- - ** 每个工作日的上午 8:30:00:**
100
+ - ** 工作日 8:30:00:**
98
101
- ` USING CRON '0 30 8 * * 1-5' 'UTC' `
99
- - 这会在每个工作日(星期一至星期五)的上午 8:30 运行 task。
102
+ - 周一至周五 8:30 执行
100
103
101
104
## 使用示例
102
105
106
+ ### CRON 计划
107
+
103
108
``` sql
104
109
CREATE TASK my_daily_task
105
110
WAREHOUSE = ' compute_wh'
@@ -109,7 +114,9 @@ CREATE TASK my_daily_task
109
114
INSERT INTO summary_table SELECT * FROM source_table;
110
115
```
111
116
112
- 在此示例中,创建了一个名为 my_daily_task 的 task。它使用 compute_wh 计算集群运行一个 SQL 语句,该语句将数据从 source_table 插入到 summary_table 中。该 task 计划在太平洋时间每天上午 9 点运行。
117
+ 此示例创建任务 ` my_daily_task ` ,使用 ** compute_wh** 计算集群(Warehouse)将 source_table 数据插入 summary_table。通过 ** CRON 表达式** 设定** 每天太平洋时间 9:00** 执行。
118
+
119
+ ### 自动挂起
113
120
114
121
``` sql
115
122
CREATE TASK IF NOT EXISTS mytask
120
127
INSERT INTO compaction_test .test VALUES ((1 ));
121
128
```
122
129
123
- 此示例创建一个名为 mytask 的 task(如果该 task 尚不存在)。该 task 被分配给 system 计算集群,并计划每 2 分钟运行一次。如果连续失败三次,它将被挂起。该 task 执行一个 INSERT 操作到 compaction_test.test 表中。
130
+ 此示例创建任务 ` mytask ` (不存在时),分配至 ** system** 计算集群(Warehouse),** 每 2 分钟** 运行。若** 连续失败 3 次** 则** 自动挂起** ,向 compaction_test.test 表插入数据。
131
+
132
+ ### 秒级调度
124
133
125
134
``` sql
126
135
CREATE TASK IF NOT EXISTS daily_sales_summary
127
136
WAREHOUSE = ' analytics'
128
137
SCHEDULE = 30 SECOND
138
+ AS
139
+ SELECT sales_date, SUM (amount) AS daily_total
129
140
FROM sales_data
130
141
GROUP BY sales_date;
131
142
```
132
143
133
- 在此示例中,创建了一个名为 daily_sales_summary 的 task,并具有秒级调度。它计划每 30 秒运行一次。该 task 使用 'analytics' 计算集群,并通过聚合 sales_data 表中的数据来计算每日销售额摘要。
144
+ 此示例创建** 秒级调度** 任务 ` daily_sales_summary ` ,** 每 30 秒** 运行。使用 ** analytics** 计算集群(Warehouse)聚合 sales_data 表数据生成每日销售摘要。
145
+
146
+ ### 任务依赖
134
147
135
148
``` sql
136
149
CREATE TASK IF NOT EXISTS process_orders
@@ -140,20 +153,23 @@ ASINSERT INTO data_warehouse.orders
140
153
SELECT * FROM staging .orders ;
141
154
```
142
155
143
- 在此示例中,创建了一个名为 process_orders 的 task,并将其定义为在 task1 和 task2 成功完成后运行。这对于在有向无环图 (DAG) 中创建依赖关系非常有用。该 task 使用 'etl' 计算集群,并将数据从暂存区域传输到数仓。
156
+ 此示例创建任务 ` process_orders ` ,在 ** task1** 和 ** task2** ** 成功完成后** 运行。通过 ** etl** 计算集群(Warehouse)将暂存区数据迁移至数据仓库,建立 ** DAG(有向无环图)依赖关系** 。
157
+
158
+ ### 条件执行
144
159
145
160
``` sql
146
161
CREATE TASK IF NOT EXISTS hourly_data_cleanup
147
162
WAREHOUSE = ' maintenance'
148
163
SCHEDULE = ' 0 0 * * * *'
149
- WHEN STREAM_STATUS(' change_stream' ) = TRUE
164
+ WHEN STREAM_STATUS(' db1. change_stream' ) = TRUE
150
165
AS
151
166
DELETE FROM archived_data
152
167
WHERE archived_date < DATEADD(HOUR, - 24 , CURRENT_TIMESTAMP ());
153
-
154
168
```
155
169
156
- 在此示例中,创建了一个名为 hourly_data_cleanup 的 task。它使用 maintenance 计算集群,并计划每小时运行一次。该 task 从 archived_data 表中删除早于 24 小时的数据。该 task 仅在 change_stream 流包含更改数据时运行。
170
+ 此示例创建任务 ` hourly_data_cleanup ` ,使用 ** maintenance** 计算集群(Warehouse)** 每小时** 清理 archived_data 表中 24 小时前数据。仅当 ** STREAM_STATUS** 检测到 ` db1.change_stream ` 含变更数据时执行。
171
+
172
+ ### 错误集成
157
173
158
174
``` sql
159
175
CREATE TASK IF NOT EXISTS mytask
@@ -169,4 +185,22 @@ BEGIN
169
185
END;
170
186
```
171
187
172
- 在此示例中,创建了一个名为 mytask 的 task。它使用 mywh 计算集群,并计划每 30 秒运行一次。该 task 执行一个 BEGIN 块,其中包含一个 INSERT 语句和一个 DELETE 语句。该 task 在执行完两个语句后提交事务。当 task 失败时,它将触发名为 myerror 的错误集成。
188
+ 此示例创建任务 ` mytask ` ,使用 ** mywh** 计算集群(Warehouse)** 每 30 秒** 执行含 INSERT/DELETE 的 ** BEGIN 块** 。失败时触发 ** myerror** ** 错误集成** 。
189
+
190
+ ### 会话参数
191
+
192
+ ``` sql
193
+ CREATE TASK IF NOT EXISTS cache_enabled_task
194
+ WAREHOUSE = ' analytics'
195
+ SCHEDULE = 5 MINUTE
196
+ COMMENT = ' Task with query result cache enabled'
197
+ enable_query_result_cache = 1 ,
198
+ query_result_cache_min_execute_secs = 5
199
+ AS
200
+ SELECT SUM (amount) AS total_sales
201
+ FROM sales_data
202
+ WHERE transaction_date >= DATEADD(DAY, - 7 , CURRENT_DATE ())
203
+ GROUP BY product_category;
204
+ ```
205
+
206
+ 此示例创建任务 ` cache_enabled_task ` ,使用 ** analytics** 计算集群(Warehouse)** 每 5 分钟** 生成销售汇总。通过** 会话参数** ** ` enable_query_result_cache = 1 ` ** 和 ** ` query_result_cache_min_execute_secs = 5 ` ** 为执行超 5 秒的查询启用结果缓存,** 提升重复执行性能** 。
0 commit comments