Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor:完善服务路由以及配置中心的能力 #19

Merged
merged 7 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ exclude = ["examples/*", "benches/*", "tests/*", ".gitignore"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

# polaris-spec dep
polaris-specification = {version = "=1.5.4-2"}

# common dep
bytes = {version = "1.4.0"}
regex = {version = "1.11.1"}
schemars = {version = "0.8.16"}
serde = {version = "1.0.198", features = ["derive"]}
serde-duration-ext = {version = "0.1.0"}
serde_json = {version = "1.0.116"}
serde_yaml = {version = "0.9.34"}

uuid = {version = "1.8.0", features = [
"v4", # Lets you generate random UUIDs
"fast-rng", # Use a faster (but still sufficiently random) RNG
Expand Down Expand Up @@ -54,6 +59,8 @@ tonic = {version = "0.11.0"}

# logging
tracing = {version = "0.1.36"}
tracing-appender = {version = "0.2.3"}
tracing-subscriber = {version = "0.3", features = ["default"]}

# crypto
aes = {version = "0.7.4"}
Expand All @@ -64,7 +71,6 @@ rand = {version = "0.8.4"}
rsa = {version = "0.9.6"}

[dev-dependencies]
tracing-subscriber = {version = "0.3", features = ["default"]}

[[example]]
name = "discover"
Expand Down
27 changes: 12 additions & 15 deletions examples/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@

use std::{collections::HashMap, sync::Arc, time::Duration};

use polaris_rust::{
config::{
api::{new_config_file_api_by_context, ConfigFileAPI},
req::{
CreateConfigFileRequest, PublishConfigFileRequest, UpdateConfigFileRequest,
UpsertAndPublishConfigFileRequest, WatchConfigFileRequest,
},
use polaris_rust::{config::{
api::{new_config_file_api_by_context, ConfigFileAPI},
req::{
CreateConfigFileRequest, PublishConfigFileRequest, UpdateConfigFileRequest,
UpsertAndPublishConfigFileRequest, WatchConfigFileRequest,
},
core::{
context::SDKContext,
model::{
config::{ConfigFile, ConfigFileRelease},
error::PolarisError,
},
}, core::{
context::SDKContext,
model::{
config::{ConfigFile, ConfigFileRelease},
error::PolarisError,
},
};
}, info};
use tracing::level_filters::LevelFilter;

#[tokio::main]
Expand Down Expand Up @@ -163,7 +160,7 @@ async fn main() -> Result<(), PolarisError> {
group: "rust".to_string(),
file: "rust.toml".to_string(),
call_back: Arc::new(|event| {
tracing::info!("receive config change event: {:?}", event);
info!("receive config change event: {:?}", event);
}),
})
.await;
Expand Down
66 changes: 43 additions & 23 deletions examples/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@

use std::{collections::HashMap, sync::Arc, time::Duration};

use polaris_rust::{
core::{
context::SDKContext,
model::{error::PolarisError, naming::Location},
use polaris_rust::{core::{
context::SDKContext,
model::{error::PolarisError, loadbalance::Criteria, naming::Location, router::RouteInfo},
}, discovery::{
api::{new_consumer_api_by_context, new_provider_api_by_context, ConsumerAPI, ProviderAPI},
req::{
GetAllInstanceRequest, GetOneInstanceRequest, InstanceDeregisterRequest, InstanceRegisterRequest, WatchInstanceRequest
},
discovery::{
api::{new_consumer_api_by_context, new_provider_api_by_context, ConsumerAPI, ProviderAPI},
req::{
GetAllInstanceRequest, InstanceDeregisterRequest, InstanceRegisterRequest,
WatchInstanceRequest,
},
},
};
}, error, info};
use tracing::level_filters::LevelFilter;

#[tokio::main]
Expand All @@ -48,7 +44,7 @@ async fn main() -> Result<(), PolarisError> {

let sdk_context_ret = SDKContext::default();
if sdk_context_ret.is_err() {
tracing::error!(
error!(
"create sdk context fail: {}",
sdk_context_ret.err().unwrap()
);
Expand All @@ -61,7 +57,7 @@ async fn main() -> Result<(), PolarisError> {

let provider_ret = new_provider_api_by_context(arc_ctx.clone());
if provider_ret.is_err() {
tracing::error!("create provider fail: {}", provider_ret.err().unwrap());
error!("create provider fail: {}", provider_ret.err().unwrap());
return Err(PolarisError::new(
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
"".to_string(),
Expand All @@ -70,7 +66,7 @@ async fn main() -> Result<(), PolarisError> {

let consumer_ret = new_consumer_api_by_context(arc_ctx);
if consumer_ret.is_err() {
tracing::error!("create consumer fail: {}", consumer_ret.err().unwrap());
error!("create consumer fail: {}", consumer_ret.err().unwrap());
return Err(PolarisError::new(
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
"".to_string(),
Expand All @@ -80,7 +76,7 @@ async fn main() -> Result<(), PolarisError> {
let provider = provider_ret.unwrap();
let consumer = consumer_ret.unwrap();

tracing::info!(
info!(
"create discovery api client cost: {:?}",
start_time.elapsed()
);
Expand Down Expand Up @@ -114,25 +110,25 @@ async fn main() -> Result<(), PolarisError> {
let _ret = provider.register(req).await;
match _ret {
Err(err) => {
tracing::error!("register fail: {}", err.to_string());
error!("register fail: {}", err.to_string());
}
Ok(_) => {}
}

tracing::info!("begin do watch service_instances change");
info!("begin do watch service_instances change");
let watch_rsp = consumer
.watch_instance(WatchInstanceRequest {
namespace: "rust-demo".to_string(),
service: "polaris-rust-provider".to_string(),
call_back: Arc::new(|instances| {
tracing::info!("watch instance: {:?}", instances.instances);
info!("watch instance: {:?}", instances.instances);
}),
})
.await;

match watch_rsp {
Err(err) => {
tracing::error!("watch instance fail: {}", err.to_string());
error!("watch instance fail: {}", err.to_string());
}
Ok(_) => {}
}
Expand All @@ -148,10 +144,34 @@ async fn main() -> Result<(), PolarisError> {

match instances_ret {
Err(err) => {
tracing::error!("get all instance fail: {}", err.to_string());
error!("get all instance fail: {}", err.to_string());
}
Ok(instances) => {
tracing::info!("get all instance: {:?}", instances);
info!("get all instance: {:?}", instances);
}
}

// 执行路由以及负载均衡能力
let mut route_info = RouteInfo::default();

let ret = consumer.get_one_instance(GetOneInstanceRequest{
flow_id: uuid::Uuid::new_v4().to_string(),
timeout: Duration::from_secs(10),
namespace: "rust-demo".to_string(),
service: "polaris-rust-provider".to_string(),
criteria: Criteria{
policy: "random".to_string(),
hash_key: "".to_string(),
},
route_info: route_info,
}).await;

match ret {
Err(err) => {
tracing::error!("get one instance fail: {}", err.to_string());
}
Ok(instance) => {
tracing::info!("get one instance: {:?}", instance);
}
}

Expand All @@ -173,7 +193,7 @@ async fn main() -> Result<(), PolarisError> {
let _ret = provider.deregister(deregister_req).await;
match _ret {
Err(err) => {
tracing::error!("deregister fail: {}", err.to_string());
error!("deregister fail: {}", err.to_string());
}
Ok(_) => {}
}
Expand Down
129 changes: 128 additions & 1 deletion src/circuitbreaker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,131 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

pub trait CircuitBreakerAPI {}
use std::{sync::Arc, time::Duration};

use crate::core::{
flow::CircuitBreakerFlow,
model::{
circuitbreaker::{CallAbortedError, CheckResult, MethodResource, Resource, ResourceStat, RetStatus, ServiceResource},
error::PolarisError,
},
};

use super::req::{RequestContext, ResponseContext};

/// CircuitBreakerAPI .
#[async_trait::async_trait]
pub trait CircuitBreakerAPI
where
Self: Send + Sync,
{
/// check_resource .
async fn check_resource(&self, resource: Resource) -> Result<CheckResult, PolarisError>;
/// report_stat .
async fn report_stat(&self, stat: ResourceStat) -> Result<(), PolarisError>;
/// make_invoke_handler .
async fn make_invoke_handler(
&self,
req: RequestContext,
) -> Result<Arc<InvokeHandler>, PolarisError>;
}

/// InvokeHandler .
pub struct InvokeHandler {
// req_ctx: 请求上下文
req_ctx: RequestContext,
// flow: 熔断器流程
flow: Arc<CircuitBreakerFlow>,
}

impl InvokeHandler {
pub fn new(req_ctx: RequestContext, flow: Arc<CircuitBreakerFlow>) -> Self {
InvokeHandler { req_ctx, flow }
}

/// acquire_permission 检查当前请求是否可放通
async fn acquire_permission(&self) -> Result<(), CallAbortedError> {
let svc_res = ServiceResource::new_waith_caller(
self.req_ctx.caller_service.clone(),
self.req_ctx.callee_service.clone());

match self.flow.check_resource(Resource::ServiceResource(svc_res)).await {
Ok(ret) => {
if ret.pass {
Ok(())
} else {
Err(CallAbortedError::new(ret.rule_name, ret.fallback_info))
}
},
Err(e) => {
// 内部异常,不触发垄断,但是需要记录
crate::error!("[circuitbreaker][invoke] check resource failed: {:?}", e);
Ok(())
},
}
}

async fn on_success(&self, rsp: ResponseContext) -> Result<(), PolarisError> {
let cost = rsp.duration.clone();
let mut code = -1 as i32;
let mut status = RetStatus::RetSuccess;

if let Some(r) = &self.req_ctx.result_to_code {
code = r.on_success(rsp.result.unwrap());
}
if let Some(e) = rsp.error {
let ret = e.downcast::<CallAbortedError>();
if ret.is_ok() {
status = RetStatus::RetReject;
}
}
self.common_report(cost, code, status).await
}

async fn on_error(&self, rsp: ResponseContext) -> Result<(), PolarisError> {
let cost = rsp.duration.clone();
let mut code = 0 as i32;
let mut status = RetStatus::RetUnknown;

if let Some(r) = &self.req_ctx.result_to_code {
code = r.on_success(rsp.result.unwrap());
}
self.common_report(cost, code, status).await
}

async fn common_report(&self, cost: Duration, code: i32, status: RetStatus) -> Result<(), PolarisError> {
let stat = ResourceStat {
resource: Resource::ServiceResource(ServiceResource::new_waith_caller(
self.req_ctx.caller_service.clone(),
self.req_ctx.callee_service.clone())),
ret_code: code.to_string(),
delay: cost,
status: status.clone(),
};

let ret = self.flow.report_stat(stat).await;
if ret.is_err() {
crate::error!("[circuitbreaker][invoke] report stat failed");
return ret;
}

if self.req_ctx.path.is_empty() {
return Ok(());
}

// 补充一个接口级别的数据上报
let stat = ResourceStat {
resource: Resource::MethodResource(MethodResource::new_waith_caller(
self.req_ctx.caller_service.clone(),
self.req_ctx.callee_service.clone(),
self.req_ctx.protocol.clone(),
self.req_ctx.method.clone(),
self.req_ctx.path.clone(),
)),
ret_code: code.to_string(),
delay: cost,
status,
};
self.flow.report_stat(stat).await
}
}
Loading
Loading