Skip to content

Commit 4fa72b1

Browse files
authored
refactor: create/drop_warehouse return WarehouseInfo (#17254)
1 parent 26d0008 commit 4fa72b1

File tree

7 files changed

+80
-30
lines changed

7 files changed

+80
-30
lines changed

src/query/ee/src/resource_management/resources_management_kubernetes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ impl ResourcesManagement for KubernetesResourcesManagement {
4242
))
4343
}
4444

45-
async fn create_warehouse(&self, _: String, _: Vec<SelectedNode>) -> Result<()> {
45+
async fn create_warehouse(&self, _: String, _: Vec<SelectedNode>) -> Result<WarehouseInfo> {
4646
Err(ErrorCode::Unimplemented(
4747
"Unimplemented kubernetes resources management",
4848
))
4949
}
5050

51-
async fn drop_warehouse(&self, _: String) -> Result<()> {
51+
async fn drop_warehouse(&self, _: String) -> Result<WarehouseInfo> {
5252
Err(ErrorCode::Unimplemented(
5353
"Unimplemented kubernetes resources management",
5454
))

src/query/ee/src/resource_management/resources_management_self_managed.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ impl ResourcesManagement for SelfManagedResourcesManagement {
5353
Ok(())
5454
}
5555

56-
async fn create_warehouse(&self, _: String, _: Vec<SelectedNode>) -> Result<()> {
56+
async fn create_warehouse(&self, _: String, _: Vec<SelectedNode>) -> Result<WarehouseInfo> {
5757
Err(ErrorCode::Unimplemented(
5858
"Unimplemented create warehouse with self-managed resources management",
5959
))
6060
}
6161

62-
async fn drop_warehouse(&self, _: String) -> Result<()> {
62+
async fn drop_warehouse(&self, _: String) -> Result<WarehouseInfo> {
6363
Err(ErrorCode::Unimplemented(
6464
"Unimplemented drop warehouse with self-managed resources management",
6565
))

src/query/ee/src/resource_management/resources_management_system.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,15 @@ impl ResourcesManagement for SystemResourcesManagement {
6262
Ok(())
6363
}
6464

65-
async fn create_warehouse(&self, name: String, nodes: Vec<SelectedNode>) -> Result<()> {
65+
async fn create_warehouse(
66+
&self,
67+
name: String,
68+
nodes: Vec<SelectedNode>,
69+
) -> Result<WarehouseInfo> {
6670
self.warehouse_manager.create_warehouse(name, nodes).await
6771
}
6872

69-
async fn drop_warehouse(&self, name: String) -> Result<()> {
73+
async fn drop_warehouse(&self, name: String) -> Result<WarehouseInfo> {
7074
self.warehouse_manager.drop_warehouse(name).await
7175
}
7276

src/query/ee_features/resources_management/src/resources_management.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,13 @@ pub trait ResourcesManagement: Sync + Send + 'static {
3030

3131
async fn init_node(&self, node: &mut NodeInfo) -> Result<()>;
3232

33-
async fn create_warehouse(&self, name: String, nodes: Vec<SelectedNode>) -> Result<()>;
33+
async fn create_warehouse(
34+
&self,
35+
name: String,
36+
nodes: Vec<SelectedNode>,
37+
) -> Result<WarehouseInfo>;
3438

35-
async fn drop_warehouse(&self, name: String) -> Result<()>;
39+
async fn drop_warehouse(&self, name: String) -> Result<WarehouseInfo>;
3640

3741
async fn resume_warehouse(&self, name: String) -> Result<()>;
3842

@@ -91,11 +95,11 @@ impl ResourcesManagement for DummyResourcesManagement {
9195
Ok(())
9296
}
9397

94-
async fn create_warehouse(&self, _: String, _: Vec<SelectedNode>) -> Result<()> {
98+
async fn create_warehouse(&self, _: String, _: Vec<SelectedNode>) -> Result<WarehouseInfo> {
9599
Err(ErrorCode::Unimplemented("The use of this feature requires a Databend Enterprise Edition license. To unlock enterprise features, please contact Databend to obtain a license. Learn more at https://docs.databend.com/guides/overview/editions/dee/"))
96100
}
97101

98-
async fn drop_warehouse(&self, _: String) -> Result<()> {
102+
async fn drop_warehouse(&self, _: String) -> Result<WarehouseInfo> {
99103
Err(ErrorCode::Unimplemented("The use of this feature requires a Databend Enterprise Edition license. To unlock enterprise features, please contact Databend to obtain a license. Learn more at https://docs.databend.com/guides/overview/editions/dee/"))
100104
}
101105

src/query/management/src/warehouse/warehouse_api.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,13 @@ pub trait WarehouseApi: Sync + Send {
7373
/// Keep the tenant's cluster node alive.
7474
async fn heartbeat_node(&self, node: &mut NodeInfo, seq: u64) -> Result<u64>;
7575

76-
async fn drop_warehouse(&self, warehouse: String) -> Result<()>;
76+
async fn drop_warehouse(&self, warehouse: String) -> Result<WarehouseInfo>;
7777

78-
async fn create_warehouse(&self, warehouse: String, nodes: Vec<SelectedNode>) -> Result<()>;
78+
async fn create_warehouse(
79+
&self,
80+
warehouse: String,
81+
nodes: Vec<SelectedNode>,
82+
) -> Result<WarehouseInfo>;
7983

8084
async fn resume_warehouse(&self, warehouse: String) -> Result<()>;
8185

src/query/management/src/warehouse/warehouse_mgr.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,7 +1024,7 @@ impl WarehouseApi for WarehouseMgr {
10241024
}
10251025
}
10261026

1027-
async fn drop_warehouse(&self, warehouse: String) -> Result<()> {
1027+
async fn drop_warehouse(&self, warehouse: String) -> Result<WarehouseInfo> {
10281028
if warehouse.is_empty() {
10291029
return Err(ErrorCode::InvalidWarehouse("Warehouse name is empty."));
10301030
}
@@ -1075,15 +1075,19 @@ impl WarehouseApi for WarehouseMgr {
10751075
continue;
10761076
}
10771077

1078-
return Ok(());
1078+
return Ok(warehouse_snapshot.warehouse_info);
10791079
}
10801080

10811081
Err(ErrorCode::WarehouseOperateConflict(
10821082
"Warehouse operate conflict(tried 10 times).",
10831083
))
10841084
}
10851085

1086-
async fn create_warehouse(&self, warehouse: String, nodes: Vec<SelectedNode>) -> Result<()> {
1086+
async fn create_warehouse(
1087+
&self,
1088+
warehouse: String,
1089+
nodes: Vec<SelectedNode>,
1090+
) -> Result<WarehouseInfo> {
10871091
if warehouse.is_empty() {
10881092
return Err(ErrorCode::InvalidWarehouse("Warehouse name is empty."));
10891093
}
@@ -1129,26 +1133,28 @@ impl WarehouseApi for WarehouseMgr {
11291133

11301134
let warehouse_info_key = self.warehouse_info_key(&warehouse)?;
11311135

1136+
let warehouse_info = WarehouseInfo::SystemManaged(SystemManagedWarehouse {
1137+
role_id: GlobalUniqName::unique(),
1138+
status: "Running".to_string(),
1139+
id: warehouse.clone(),
1140+
clusters: HashMap::from([(
1141+
String::from(DEFAULT_CLUSTER_ID),
1142+
SystemManagedCluster {
1143+
nodes: nodes.clone(),
1144+
},
1145+
)]),
1146+
});
1147+
11321148
txn.condition
11331149
.push(map_condition(&warehouse_info_key, MatchSeq::Exact(0)));
11341150
txn.if_then.push(TxnOp::put(
11351151
warehouse_info_key.clone(),
1136-
serde_json::to_vec(&WarehouseInfo::SystemManaged(SystemManagedWarehouse {
1137-
role_id: GlobalUniqName::unique(),
1138-
status: "Running".to_string(),
1139-
id: warehouse.clone(),
1140-
clusters: HashMap::from([(
1141-
String::from(DEFAULT_CLUSTER_ID),
1142-
SystemManagedCluster {
1143-
nodes: nodes.clone(),
1144-
},
1145-
)]),
1146-
}))?,
1152+
serde_json::to_vec(&warehouse_info)?,
11471153
));
11481154
txn.else_then.push(TxnOp::get(warehouse_info_key));
11491155

11501156
return match self.metastore.transaction(txn).await? {
1151-
res if res.success => Ok(()),
1157+
res if res.success => Ok(warehouse_info),
11521158
res => match res.responses.last() {
11531159
Some(TxnOpResponse {
11541160
response: Some(Response::Get(res)),

src/query/management/tests/it/warehouse.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,22 @@ async fn test_successfully_create_system_managed_warehouse() -> Result<()> {
314314
SelectedNode::Random(None),
315315
]);
316316

317-
create_warehouse.await?;
317+
let cw = create_warehouse.await?;
318+
319+
assert!(
320+
!matches!(cw, WarehouseInfo::SelfManaged(_)),
321+
"Expected WarehouseInfo to not be SelfManaged"
322+
);
323+
if let WarehouseInfo::SystemManaged(sw) = cw {
324+
assert_eq!(sw.id, "test_warehouse");
325+
for warehouse in warehouse_manager.list_warehouses().await? {
326+
if let WarehouseInfo::SystemManaged(w) = warehouse {
327+
if w.id == sw.id {
328+
assert_eq!(w.role_id, sw.role_id)
329+
}
330+
}
331+
}
332+
}
318333

319334
for node in &nodes {
320335
let online_node = format!("__fd_clusters_v6/test%2dtenant%2did/online_nodes/{}", node);
@@ -526,7 +541,8 @@ async fn test_create_warehouse_with_self_manage() -> Result<()> {
526541
None,
527542
)]);
528543

529-
create_warehouse.await
544+
assert!(create_warehouse.await.is_ok());
545+
Ok(())
530546
}
531547

532548
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
@@ -936,7 +952,23 @@ async fn test_drop_system_managed_warehouse() -> Result<()> {
936952
create_warehouse.await?;
937953

938954
let drop_warehouse = warehouse_manager.drop_warehouse(String::from("test_warehouse"));
939-
drop_warehouse.await?;
955+
956+
let cw = drop_warehouse.await?;
957+
958+
assert!(
959+
!matches!(cw, WarehouseInfo::SelfManaged(_)),
960+
"Expected WarehouseInfo to not be SelfManaged"
961+
);
962+
if let WarehouseInfo::SystemManaged(sw) = cw {
963+
assert_eq!(sw.id, "test_warehouse");
964+
for warehouse in warehouse_manager.list_warehouses().await? {
965+
if let WarehouseInfo::SystemManaged(w) = warehouse {
966+
if w.id == sw.id {
967+
assert_eq!(w.role_id, sw.role_id)
968+
}
969+
}
970+
}
971+
}
940972

941973
let create_warehouse =
942974
warehouse_manager.create_warehouse(String::from("test_warehouse"), vec![

0 commit comments

Comments
 (0)