Skip to content

Commit 1a2ff6c

Browse files
authored
refactor:完善服务路由以及配置中心的能力 (#19)
1 parent a14d27f commit 1a2ff6c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+4881
-6561
lines changed

Cargo.toml

+8-2
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@ exclude = ["examples/*", "benches/*", "tests/*", ".gitignore"]
1717

1818
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1919
[dependencies]
20+
21+
# polaris-spec dep
22+
polaris-specification = {version = "=1.5.4-2"}
23+
24+
# common dep
2025
bytes = {version = "1.4.0"}
26+
regex = {version = "1.11.1"}
2127
schemars = {version = "0.8.16"}
2228
serde = {version = "1.0.198", features = ["derive"]}
2329
serde-duration-ext = {version = "0.1.0"}
2430
serde_json = {version = "1.0.116"}
2531
serde_yaml = {version = "0.9.34"}
26-
2732
uuid = {version = "1.8.0", features = [
2833
"v4", # Lets you generate random UUIDs
2934
"fast-rng", # Use a faster (but still sufficiently random) RNG
@@ -54,6 +59,8 @@ tonic = {version = "0.11.0"}
5459

5560
# logging
5661
tracing = {version = "0.1.36"}
62+
tracing-appender = {version = "0.2.3"}
63+
tracing-subscriber = {version = "0.3", features = ["default"]}
5764

5865
# crypto
5966
aes = {version = "0.7.4"}
@@ -64,7 +71,6 @@ rand = {version = "0.8.4"}
6471
rsa = {version = "0.9.6"}
6572

6673
[dev-dependencies]
67-
tracing-subscriber = {version = "0.3", features = ["default"]}
6874

6975
[[example]]
7076
name = "discover"

examples/config.rs

+12-15
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,19 @@
1515

1616
use std::{collections::HashMap, sync::Arc, time::Duration};
1717

18-
use polaris_rust::{
19-
config::{
20-
api::{new_config_file_api_by_context, ConfigFileAPI},
21-
req::{
22-
CreateConfigFileRequest, PublishConfigFileRequest, UpdateConfigFileRequest,
23-
UpsertAndPublishConfigFileRequest, WatchConfigFileRequest,
24-
},
18+
use polaris_rust::{config::{
19+
api::{new_config_file_api_by_context, ConfigFileAPI},
20+
req::{
21+
CreateConfigFileRequest, PublishConfigFileRequest, UpdateConfigFileRequest,
22+
UpsertAndPublishConfigFileRequest, WatchConfigFileRequest,
2523
},
26-
core::{
27-
context::SDKContext,
28-
model::{
29-
config::{ConfigFile, ConfigFileRelease},
30-
error::PolarisError,
31-
},
24+
}, core::{
25+
context::SDKContext,
26+
model::{
27+
config::{ConfigFile, ConfigFileRelease},
28+
error::PolarisError,
3229
},
33-
};
30+
}, info};
3431
use tracing::level_filters::LevelFilter;
3532

3633
#[tokio::main]
@@ -163,7 +160,7 @@ async fn main() -> Result<(), PolarisError> {
163160
group: "rust".to_string(),
164161
file: "rust.toml".to_string(),
165162
call_back: Arc::new(|event| {
166-
tracing::info!("receive config change event: {:?}", event);
163+
info!("receive config change event: {:?}", event);
167164
}),
168165
})
169166
.await;

examples/discover.rs

+43-23
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,15 @@
1515

1616
use std::{collections::HashMap, sync::Arc, time::Duration};
1717

18-
use polaris_rust::{
19-
core::{
20-
context::SDKContext,
21-
model::{error::PolarisError, naming::Location},
18+
use polaris_rust::{core::{
19+
context::SDKContext,
20+
model::{error::PolarisError, loadbalance::Criteria, naming::Location, router::RouteInfo},
21+
}, discovery::{
22+
api::{new_consumer_api_by_context, new_provider_api_by_context, ConsumerAPI, ProviderAPI},
23+
req::{
24+
GetAllInstanceRequest, GetOneInstanceRequest, InstanceDeregisterRequest, InstanceRegisterRequest, WatchInstanceRequest
2225
},
23-
discovery::{
24-
api::{new_consumer_api_by_context, new_provider_api_by_context, ConsumerAPI, ProviderAPI},
25-
req::{
26-
GetAllInstanceRequest, InstanceDeregisterRequest, InstanceRegisterRequest,
27-
WatchInstanceRequest,
28-
},
29-
},
30-
};
26+
}, error, info};
3127
use tracing::level_filters::LevelFilter;
3228

3329
#[tokio::main]
@@ -48,7 +44,7 @@ async fn main() -> Result<(), PolarisError> {
4844

4945
let sdk_context_ret = SDKContext::default();
5046
if sdk_context_ret.is_err() {
51-
tracing::error!(
47+
error!(
5248
"create sdk context fail: {}",
5349
sdk_context_ret.err().unwrap()
5450
);
@@ -61,7 +57,7 @@ async fn main() -> Result<(), PolarisError> {
6157

6258
let provider_ret = new_provider_api_by_context(arc_ctx.clone());
6359
if provider_ret.is_err() {
64-
tracing::error!("create provider fail: {}", provider_ret.err().unwrap());
60+
error!("create provider fail: {}", provider_ret.err().unwrap());
6561
return Err(PolarisError::new(
6662
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
6763
"".to_string(),
@@ -70,7 +66,7 @@ async fn main() -> Result<(), PolarisError> {
7066

7167
let consumer_ret = new_consumer_api_by_context(arc_ctx);
7268
if consumer_ret.is_err() {
73-
tracing::error!("create consumer fail: {}", consumer_ret.err().unwrap());
69+
error!("create consumer fail: {}", consumer_ret.err().unwrap());
7470
return Err(PolarisError::new(
7571
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
7672
"".to_string(),
@@ -80,7 +76,7 @@ async fn main() -> Result<(), PolarisError> {
8076
let provider = provider_ret.unwrap();
8177
let consumer = consumer_ret.unwrap();
8278

83-
tracing::info!(
79+
info!(
8480
"create discovery api client cost: {:?}",
8581
start_time.elapsed()
8682
);
@@ -114,25 +110,25 @@ async fn main() -> Result<(), PolarisError> {
114110
let _ret = provider.register(req).await;
115111
match _ret {
116112
Err(err) => {
117-
tracing::error!("register fail: {}", err.to_string());
113+
error!("register fail: {}", err.to_string());
118114
}
119115
Ok(_) => {}
120116
}
121117

122-
tracing::info!("begin do watch service_instances change");
118+
info!("begin do watch service_instances change");
123119
let watch_rsp = consumer
124120
.watch_instance(WatchInstanceRequest {
125121
namespace: "rust-demo".to_string(),
126122
service: "polaris-rust-provider".to_string(),
127123
call_back: Arc::new(|instances| {
128-
tracing::info!("watch instance: {:?}", instances.instances);
124+
info!("watch instance: {:?}", instances.instances);
129125
}),
130126
})
131127
.await;
132128

133129
match watch_rsp {
134130
Err(err) => {
135-
tracing::error!("watch instance fail: {}", err.to_string());
131+
error!("watch instance fail: {}", err.to_string());
136132
}
137133
Ok(_) => {}
138134
}
@@ -148,10 +144,34 @@ async fn main() -> Result<(), PolarisError> {
148144

149145
match instances_ret {
150146
Err(err) => {
151-
tracing::error!("get all instance fail: {}", err.to_string());
147+
error!("get all instance fail: {}", err.to_string());
152148
}
153149
Ok(instances) => {
154-
tracing::info!("get all instance: {:?}", instances);
150+
info!("get all instance: {:?}", instances);
151+
}
152+
}
153+
154+
// 执行路由以及负载均衡能力
155+
let mut route_info = RouteInfo::default();
156+
157+
let ret = consumer.get_one_instance(GetOneInstanceRequest{
158+
flow_id: uuid::Uuid::new_v4().to_string(),
159+
timeout: Duration::from_secs(10),
160+
namespace: "rust-demo".to_string(),
161+
service: "polaris-rust-provider".to_string(),
162+
criteria: Criteria{
163+
policy: "random".to_string(),
164+
hash_key: "".to_string(),
165+
},
166+
route_info: route_info,
167+
}).await;
168+
169+
match ret {
170+
Err(err) => {
171+
tracing::error!("get one instance fail: {}", err.to_string());
172+
}
173+
Ok(instance) => {
174+
tracing::info!("get one instance: {:?}", instance);
155175
}
156176
}
157177

@@ -173,7 +193,7 @@ async fn main() -> Result<(), PolarisError> {
173193
let _ret = provider.deregister(deregister_req).await;
174194
match _ret {
175195
Err(err) => {
176-
tracing::error!("deregister fail: {}", err.to_string());
196+
error!("deregister fail: {}", err.to_string());
177197
}
178198
Ok(_) => {}
179199
}

src/circuitbreaker/api.rs

+128-1
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,131 @@
1313
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
1414
// specific language governing permissions and limitations under the License.
1515

16-
pub trait CircuitBreakerAPI {}
16+
use std::{sync::Arc, time::Duration};
17+
18+
use crate::core::{
19+
flow::CircuitBreakerFlow,
20+
model::{
21+
circuitbreaker::{CallAbortedError, CheckResult, MethodResource, Resource, ResourceStat, RetStatus, ServiceResource},
22+
error::PolarisError,
23+
},
24+
};
25+
26+
use super::req::{RequestContext, ResponseContext};
27+
28+
/// CircuitBreakerAPI .
29+
#[async_trait::async_trait]
30+
pub trait CircuitBreakerAPI
31+
where
32+
Self: Send + Sync,
33+
{
34+
/// check_resource .
35+
async fn check_resource(&self, resource: Resource) -> Result<CheckResult, PolarisError>;
36+
/// report_stat .
37+
async fn report_stat(&self, stat: ResourceStat) -> Result<(), PolarisError>;
38+
/// make_invoke_handler .
39+
async fn make_invoke_handler(
40+
&self,
41+
req: RequestContext,
42+
) -> Result<Arc<InvokeHandler>, PolarisError>;
43+
}
44+
45+
/// InvokeHandler .
46+
pub struct InvokeHandler {
47+
// req_ctx: 请求上下文
48+
req_ctx: RequestContext,
49+
// flow: 熔断器流程
50+
flow: Arc<CircuitBreakerFlow>,
51+
}
52+
53+
impl InvokeHandler {
54+
pub fn new(req_ctx: RequestContext, flow: Arc<CircuitBreakerFlow>) -> Self {
55+
InvokeHandler { req_ctx, flow }
56+
}
57+
58+
/// acquire_permission 检查当前请求是否可放通
59+
async fn acquire_permission(&self) -> Result<(), CallAbortedError> {
60+
let svc_res = ServiceResource::new_waith_caller(
61+
self.req_ctx.caller_service.clone(),
62+
self.req_ctx.callee_service.clone());
63+
64+
match self.flow.check_resource(Resource::ServiceResource(svc_res)).await {
65+
Ok(ret) => {
66+
if ret.pass {
67+
Ok(())
68+
} else {
69+
Err(CallAbortedError::new(ret.rule_name, ret.fallback_info))
70+
}
71+
},
72+
Err(e) => {
73+
// 内部异常,不触发垄断,但是需要记录
74+
crate::error!("[circuitbreaker][invoke] check resource failed: {:?}", e);
75+
Ok(())
76+
},
77+
}
78+
}
79+
80+
async fn on_success(&self, rsp: ResponseContext) -> Result<(), PolarisError> {
81+
let cost = rsp.duration.clone();
82+
let mut code = -1 as i32;
83+
let mut status = RetStatus::RetSuccess;
84+
85+
if let Some(r) = &self.req_ctx.result_to_code {
86+
code = r.on_success(rsp.result.unwrap());
87+
}
88+
if let Some(e) = rsp.error {
89+
let ret = e.downcast::<CallAbortedError>();
90+
if ret.is_ok() {
91+
status = RetStatus::RetReject;
92+
}
93+
}
94+
self.common_report(cost, code, status).await
95+
}
96+
97+
async fn on_error(&self, rsp: ResponseContext) -> Result<(), PolarisError> {
98+
let cost = rsp.duration.clone();
99+
let mut code = 0 as i32;
100+
let mut status = RetStatus::RetUnknown;
101+
102+
if let Some(r) = &self.req_ctx.result_to_code {
103+
code = r.on_success(rsp.result.unwrap());
104+
}
105+
self.common_report(cost, code, status).await
106+
}
107+
108+
async fn common_report(&self, cost: Duration, code: i32, status: RetStatus) -> Result<(), PolarisError> {
109+
let stat = ResourceStat {
110+
resource: Resource::ServiceResource(ServiceResource::new_waith_caller(
111+
self.req_ctx.caller_service.clone(),
112+
self.req_ctx.callee_service.clone())),
113+
ret_code: code.to_string(),
114+
delay: cost,
115+
status: status.clone(),
116+
};
117+
118+
let ret = self.flow.report_stat(stat).await;
119+
if ret.is_err() {
120+
crate::error!("[circuitbreaker][invoke] report stat failed");
121+
return ret;
122+
}
123+
124+
if self.req_ctx.path.is_empty() {
125+
return Ok(());
126+
}
127+
128+
// 补充一个接口级别的数据上报
129+
let stat = ResourceStat {
130+
resource: Resource::MethodResource(MethodResource::new_waith_caller(
131+
self.req_ctx.caller_service.clone(),
132+
self.req_ctx.callee_service.clone(),
133+
self.req_ctx.protocol.clone(),
134+
self.req_ctx.method.clone(),
135+
self.req_ctx.path.clone(),
136+
)),
137+
ret_code: code.to_string(),
138+
delay: cost,
139+
status,
140+
};
141+
self.flow.report_stat(stat).await
142+
}
143+
}

0 commit comments

Comments
 (0)