From 646d2b4ad64ed707daa9a8a8c2df2d3d45c6b8fd Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 4 Jan 2023 15:27:15 +0800 Subject: [PATCH 1/5] add split region method Signed-off-by: Ping Yu --- src/transaction/client.rs | 42 ++++++++++++++++++++++++++++++-- src/transaction/lock.rs | 1 - src/transaction/requests.rs | 19 +++++++++++++++ tikv-client-store/src/errors.rs | 7 ++++++ tikv-client-store/src/request.rs | 1 + 5 files changed, 67 insertions(+), 3 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index d7ca5760..c0cb4037 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -8,16 +8,19 @@ use crate::{ request::{plan::CleanupLocksResult, Plan}, timestamp::TimestampExt, transaction::{ - lock::ResolveLocksOptions, ResolveLocksContext, Snapshot, Transaction, TransactionOptions, + lock::ResolveLocksOptions, requests, ResolveLocksContext, Snapshot, Transaction, + TransactionOptions, }, Backoff, Result, }; use slog::{Drain, Logger}; use std::{mem, sync::Arc}; -use tikv_client_proto::pdpb::Timestamp; +use tikv_client_common::Error; +use tikv_client_proto::{metapb::Region, pdpb::Timestamp}; // FIXME: cargo-culted value const SCAN_LOCK_BATCH_SIZE: u32 = 1024; +const SPLIT_REGION_RETRY_LIMIT: usize = 10; /// The TiKV transactional `Client` is used to interact with TiKV using transactional requests. /// @@ -301,4 +304,39 @@ impl Client { let logger = self.logger.new(o!("child" => 1)); Transaction::new(timestamp, self.pd.clone(), options, logger) } + + pub async fn split_region_with_retry( + &self, + #[allow(clippy::ptr_arg)] key: &Vec, + split_keys: Vec>, + is_raw_kv: bool, + ) -> Result> { + debug!(self.logger, "invoking split region with retry"); + // FIXME: Add backoff + let mut error = None; + for i in 0..SPLIT_REGION_RETRY_LIMIT { + debug!(self.logger, "split region: attempt {}", (i + 1)); + let store = self.pd.clone().store_for_key(key.into()).await?; + // let ver_id = store.region_with_leader.ver_id(); + let request = requests::new_split_region_request(split_keys.clone(), is_raw_kv); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), request) + .single_region_with_store(store) + .await? + .extract_error() + .plan(); + match plan.execute().await { + Ok(mut resp) => return Ok(resp.take_regions()), + Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { + e @ Some(Error::RegionError(_)) => { + error = e; + continue; + } + Some(e) => return Err(e), + None => unreachable!(), + }, + Err(e) => return Err(e), + } + } + Err(error.expect("no error is impossible")) + } } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index e5949cd4..44865e70 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -113,7 +113,6 @@ async fn resolve_lock_with_retry( let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version); - // The only place where single-region is used let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .single_region_with_store(store) .await? diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 2ed90041..31a9d052 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -782,6 +782,25 @@ impl HasLocks for kvrpcpb::PrewriteResponse { } } +pub fn new_split_region_request( + split_keys: Vec>, + is_raw_kv: bool, +) -> kvrpcpb::SplitRegionRequest { + let mut req = kvrpcpb::SplitRegionRequest::default(); + req.set_split_keys(split_keys.into()); + req.set_is_raw_kv(is_raw_kv); + req +} + +// Note: SplitRegionRequest is sent to a specified region without keys. So it's not Shardable. +// And we don't automatically retry on its region errors (in the Plan level). +// The region error must be manually handled (in the upper level). +impl KvRequest for kvrpcpb::SplitRegionRequest { + type Response = kvrpcpb::SplitRegionResponse; +} + +impl HasLocks for kvrpcpb::SplitRegionResponse {} + #[cfg(test)] #[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))] mod tests { diff --git a/tikv-client-store/src/errors.rs b/tikv-client-store/src/errors.rs index f40afd0d..44230d1b 100644 --- a/tikv-client-store/src/errors.rs +++ b/tikv-client-store/src/errors.rs @@ -66,6 +66,7 @@ has_region_error!(kvrpcpb::RawScanResponse); has_region_error!(kvrpcpb::RawBatchScanResponse); has_region_error!(kvrpcpb::RawCasResponse); has_region_error!(kvrpcpb::RawCoprocessorResponse); +has_region_error!(kvrpcpb::SplitRegionResponse); macro_rules! has_key_error { ($type:ty) => { @@ -167,6 +168,12 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse { } } +impl HasKeyErrors for kvrpcpb::SplitRegionResponse { + fn key_errors(&mut self) -> Option> { + None + } +} + impl HasKeyErrors for Result { fn key_errors(&mut self) -> Option> { match self { diff --git a/tikv-client-store/src/request.rs b/tikv-client-store/src/request.rs index 290f142a..80c7d001 100644 --- a/tikv-client-store/src/request.rs +++ b/tikv-client-store/src/request.rs @@ -125,3 +125,4 @@ impl_request!( kv_delete_range_async_opt, "kv_delete_range" ); +impl_request!(SplitRegionRequest, split_region_async_opt, "split_region"); From ca55424b79d7564fb45149f3de19209aaacfded8 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 4 Jan 2023 16:31:14 +0800 Subject: [PATCH 2/5] fix RUST_PROTOBUF error Signed-off-by: Ping Yu --- src/transaction/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index c0cb4037..681c2812 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -325,7 +325,7 @@ impl Client { .extract_error() .plan(); match plan.execute().await { - Ok(mut resp) => return Ok(resp.take_regions()), + Ok(mut resp) => return Ok(resp.take_regions().into()), Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { e @ Some(Error::RegionError(_)) => { error = e; From 3748f0160df746d6b60eb2feaa32f52b608d838f Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 4 Jan 2023 19:10:21 +0800 Subject: [PATCH 3/5] add unsafe_destroy_range Signed-off-by: Ping Yu --- src/transaction/client.rs | 82 ++++++++++++++++++++++++++++++-- src/transaction/requests.rs | 19 ++++++++ tikv-client-store/src/errors.rs | 2 + tikv-client-store/src/request.rs | 5 ++ 4 files changed, 104 insertions(+), 4 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 681c2812..60bb6779 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -6,21 +6,24 @@ use crate::{ config::Config, pd::{PdClient, PdRpcClient}, request::{plan::CleanupLocksResult, Plan}, + store::RegionStore, timestamp::TimestampExt, transaction::{ - lock::ResolveLocksOptions, requests, ResolveLocksContext, Snapshot, Transaction, - TransactionOptions, + lock::ResolveLocksOptions, requests, requests::new_unsafe_destroy_range_request, + ResolveLocksContext, Snapshot, Transaction, TransactionOptions, }, - Backoff, Result, + Backoff, BoundRange, Result, }; +use futures::{future::try_join_all, StreamExt}; use slog::{Drain, Logger}; -use std::{mem, sync::Arc}; +use std::{collections::HashMap, mem, sync::Arc}; use tikv_client_common::Error; use tikv_client_proto::{metapb::Region, pdpb::Timestamp}; // FIXME: cargo-culted value const SCAN_LOCK_BATCH_SIZE: u32 = 1024; const SPLIT_REGION_RETRY_LIMIT: usize = 10; +const UNSAFE_DESTROY_RANGE_RETRY_LIMIT: usize = 10; /// The TiKV transactional `Client` is used to interact with TiKV using transactional requests. /// @@ -339,4 +342,75 @@ impl Client { } Err(error.expect("no error is impossible")) } + + pub async fn unsafe_destroy_range(&self, start_key: Vec, end_key: Vec) -> Result<()> { + debug!(self.logger, "invoking unsafe destroy range"); + let stores = self + .list_stores_for_unsafe_destroy(start_key.clone(), end_key.clone()) + .await?; + let mut handles = Vec::with_capacity(stores.len()); + for store in stores.into_values() { + let logger = self.logger.clone(); + let start_key = start_key.clone(); + let end_key = end_key.clone(); + let pd = self.pd.clone(); + let task = async move { + let mut error = None; + for i in 0..UNSAFE_DESTROY_RANGE_RETRY_LIMIT { + debug!(logger, "unsafe destroy range {}", (i + 1)); + let request = + new_unsafe_destroy_range_request(start_key.clone(), end_key.clone()); + let plan = crate::request::PlanBuilder::new(pd.clone(), request) + .single_region_with_store(store.clone()) + .await? + .extract_error() + .plan(); + match plan.execute().await { + Ok(_) => return Ok(()), + Err(e) => { + warn!(logger, "unsafe destroy range error: {:?}", e); + error = Some(e); + continue; + } + } + } + Err(error.expect("no error is impossible")) + }; + handles.push(tokio::spawn(task)); + } + + let results = try_join_all(handles).await?; + match results.into_iter().find(|x| x.is_err()) { + Some(r) => r, + None => Ok(()), + } + } + + async fn list_stores_for_unsafe_destroy( + &self, + start_key: Vec, + end_key: Vec, + ) -> Result> { + let mut stores = HashMap::new(); + let bnd_range = BoundRange::from((start_key, end_key)); + self.pd + .clone() + .stores_for_range(bnd_range) + .map(|store| -> Result<()> { + let store = store?; + let store_id = store + .region_with_leader + .leader + .as_ref() + .unwrap() + .get_store_id(); + stores.entry(store_id).or_insert(store); + Ok(()) + }) + .collect::>() + .await + .into_iter() + .collect::>()?; + Ok(stores) + } } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 31a9d052..d6b593ae 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -792,6 +792,16 @@ pub fn new_split_region_request( req } +pub fn new_unsafe_destroy_range_request( + start_key: Vec, + end_key: Vec, +) -> kvrpcpb::UnsafeDestroyRangeRequest { + let mut req = kvrpcpb::UnsafeDestroyRangeRequest::default(); + req.set_start_key(start_key); + req.set_end_key(end_key); + req +} + // Note: SplitRegionRequest is sent to a specified region without keys. So it's not Shardable. // And we don't automatically retry on its region errors (in the Plan level). // The region error must be manually handled (in the upper level). @@ -801,6 +811,15 @@ impl KvRequest for kvrpcpb::SplitRegionRequest { impl HasLocks for kvrpcpb::SplitRegionResponse {} +// Note: UnsafeDestroyRangeRequest is sent to all stores cover the range. So it's not Shardable. +// And we don't automatically retry on its errors (in the Plan level). +// The errors must be manually handled (in the upper level). +impl KvRequest for kvrpcpb::UnsafeDestroyRangeRequest { + type Response = kvrpcpb::UnsafeDestroyRangeResponse; +} + +impl HasLocks for kvrpcpb::UnsafeDestroyRangeResponse {} + #[cfg(test)] #[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))] mod tests { diff --git a/tikv-client-store/src/errors.rs b/tikv-client-store/src/errors.rs index 44230d1b..8512f28d 100644 --- a/tikv-client-store/src/errors.rs +++ b/tikv-client-store/src/errors.rs @@ -67,6 +67,7 @@ has_region_error!(kvrpcpb::RawBatchScanResponse); has_region_error!(kvrpcpb::RawCasResponse); has_region_error!(kvrpcpb::RawCoprocessorResponse); has_region_error!(kvrpcpb::SplitRegionResponse); +has_region_error!(kvrpcpb::UnsafeDestroyRangeResponse); macro_rules! has_key_error { ($type:ty) => { @@ -119,6 +120,7 @@ has_str_error!(kvrpcpb::RawCasResponse); has_str_error!(kvrpcpb::RawCoprocessorResponse); has_str_error!(kvrpcpb::ImportResponse); has_str_error!(kvrpcpb::DeleteRangeResponse); +has_str_error!(kvrpcpb::UnsafeDestroyRangeResponse); impl HasKeyErrors for kvrpcpb::ScanResponse { fn key_errors(&mut self) -> Option> { diff --git a/tikv-client-store/src/request.rs b/tikv-client-store/src/request.rs index 80c7d001..963a8f9c 100644 --- a/tikv-client-store/src/request.rs +++ b/tikv-client-store/src/request.rs @@ -126,3 +126,8 @@ impl_request!( "kv_delete_range" ); impl_request!(SplitRegionRequest, split_region_async_opt, "split_region"); +impl_request!( + UnsafeDestroyRangeRequest, + unsafe_destroy_range_async_opt, + "unsafe_destroy_range" +); From f69ef93b2067fa24c4fd560c2c6d3702bde34999 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 4 Jan 2023 22:42:27 +0800 Subject: [PATCH 4/5] add backoff Signed-off-by: Ping Yu --- src/transaction/client.rs | 43 +++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 60bb6779..1fe4f45a 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -22,8 +22,6 @@ use tikv_client_proto::{metapb::Region, pdpb::Timestamp}; // FIXME: cargo-culted value const SCAN_LOCK_BATCH_SIZE: u32 = 1024; -const SPLIT_REGION_RETRY_LIMIT: usize = 10; -const UNSAFE_DESTROY_RANGE_RETRY_LIMIT: usize = 10; /// The TiKV transactional `Client` is used to interact with TiKV using transactional requests. /// @@ -315,12 +313,12 @@ impl Client { is_raw_kv: bool, ) -> Result> { debug!(self.logger, "invoking split region with retry"); - // FIXME: Add backoff - let mut error = None; - for i in 0..SPLIT_REGION_RETRY_LIMIT { - debug!(self.logger, "split region: attempt {}", (i + 1)); + let mut backoff = DEFAULT_REGION_BACKOFF; + let mut i = 0; + 'retry: loop { + i += 1; + debug!(self.logger, "split region: attempt {}", i); let store = self.pd.clone().store_for_key(key.into()).await?; - // let ver_id = store.region_with_leader.ver_id(); let request = requests::new_split_region_request(split_keys.clone(), is_raw_kv); let plan = crate::request::PlanBuilder::new(self.pd.clone(), request) .single_region_with_store(store) @@ -330,21 +328,24 @@ impl Client { match plan.execute().await { Ok(mut resp) => return Ok(resp.take_regions().into()), Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { - e @ Some(Error::RegionError(_)) => { - error = e; - continue; - } + Some(e @ Error::RegionError(_)) => match backoff.next_delay_duration() { + Some(duration) => { + futures_timer::Delay::new(duration).await; + continue 'retry; + } + None => return Err(e), + }, Some(e) => return Err(e), None => unreachable!(), }, Err(e) => return Err(e), } } - Err(error.expect("no error is impossible")) } pub async fn unsafe_destroy_range(&self, start_key: Vec, end_key: Vec) -> Result<()> { debug!(self.logger, "invoking unsafe destroy range"); + let backoff = DEFAULT_REGION_BACKOFF; let stores = self .list_stores_for_unsafe_destroy(start_key.clone(), end_key.clone()) .await?; @@ -354,10 +355,12 @@ impl Client { let start_key = start_key.clone(); let end_key = end_key.clone(); let pd = self.pd.clone(); + let mut backoff = backoff.clone(); let task = async move { - let mut error = None; - for i in 0..UNSAFE_DESTROY_RANGE_RETRY_LIMIT { - debug!(logger, "unsafe destroy range {}", (i + 1)); + let mut i = 0; + 'retry: loop { + i += 1; + debug!(logger, "unsafe destroy range: attempt {}", i); let request = new_unsafe_destroy_range_request(start_key.clone(), end_key.clone()); let plan = crate::request::PlanBuilder::new(pd.clone(), request) @@ -369,12 +372,16 @@ impl Client { Ok(_) => return Ok(()), Err(e) => { warn!(logger, "unsafe destroy range error: {:?}", e); - error = Some(e); - continue; + match backoff.next_delay_duration() { + Some(duration) => { + futures_timer::Delay::new(duration).await; + continue 'retry; + } + None => return Err(e), + } } } } - Err(error.expect("no error is impossible")) }; handles.push(tokio::spawn(task)); } From 5c2a2e8569f4424220b7cf0dd283c0bb182ed60f Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 5 Jan 2023 08:39:36 +0800 Subject: [PATCH 5/5] handle region error Signed-off-by: Ping Yu --- src/request/plan.rs | 199 +++++++++++++++++++------------------- src/transaction/client.rs | 19 +++- 2 files changed, 111 insertions(+), 107 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 78d4776d..eb6caa74 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -141,7 +141,7 @@ where match backoff.next_delay_duration() { Some(duration) => { let region_error_resolved = - Self::handle_region_error(pd_client.clone(), e, region_store).await?; + handle_region_error(pd_client.clone(), e, region_store).await?; // don't sleep if we have resolved the region error if !region_error_resolved { futures_timer::Delay::new(duration).await; @@ -161,107 +161,6 @@ where Ok(vec![Ok(resp)]) } } - - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn handle_region_error( - pd_client: Arc, - mut e: errorpb::Error, - region_store: RegionStore, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if e.has_not_leader() { - let not_leader = e.get_not_leader(); - if not_leader.has_leader() { - match pd_client - .update_leader( - region_store.region_with_leader.ver_id(), - not_leader.get_leader().clone(), - ) - .await - { - Ok(_) => Ok(true), - Err(e) => { - pd_client.invalidate_region_cache(ver_id).await; - Err(e) - } - } - } else { - // The peer doesn't know who is the current leader. Generally it's because - // the Raft group is in an election, but it's possible that the peer is - // isolated and removed from the Raft group. So it's necessary to reload - // the region from PD. - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } else if e.has_store_not_match() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.has_epoch_not_match() { - Self::on_region_epoch_not_match( - pd_client.clone(), - region_store, - e.take_epoch_not_match(), - ) - .await - } else if e.has_stale_command() || e.has_region_not_found() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.has_server_is_busy() - || e.has_raft_entry_too_large() - || e.has_max_timestamp_not_synced() - { - Err(Error::RegionError(Box::new(e))) - } else { - // TODO: pass the logger around - // info!("unknwon region error: {:?}", e); - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } - - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn on_region_epoch_not_match( - pd_client: Arc, - region_store: RegionStore, - error: EpochNotMatch, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if error.get_current_regions().is_empty() { - pd_client.invalidate_region_cache(ver_id).await; - return Ok(true); - } - - for r in error.get_current_regions() { - if r.get_id() == region_store.region_with_leader.id() { - let returned_conf_ver = r.get_region_epoch().get_conf_ver(); - let returned_version = r.get_region_epoch().get_version(); - let current_conf_ver = region_store - .region_with_leader - .region - .get_region_epoch() - .get_conf_ver(); - let current_version = region_store - .region_with_leader - .region - .get_region_epoch() - .get_version(); - - // Find whether the current region is ahead of TiKV's. If so, backoff. - if returned_conf_ver < current_conf_ver || returned_version < current_version { - return Ok(false); - } - } - } - // TODO: finer grained processing - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } } impl Clone for RetryableMultiRegion { @@ -298,6 +197,102 @@ where } } +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +pub async fn handle_region_error( + pd_client: Arc, + mut e: errorpb::Error, + region_store: RegionStore, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if e.has_not_leader() { + let not_leader = e.get_not_leader(); + if not_leader.has_leader() { + match pd_client + .update_leader( + region_store.region_with_leader.ver_id(), + not_leader.get_leader().clone(), + ) + .await + { + Ok(_) => Ok(true), + Err(e) => { + pd_client.invalidate_region_cache(ver_id).await; + Err(e) + } + } + } else { + // The peer doesn't know who is the current leader. Generally it's because + // the Raft group is in an election, but it's possible that the peer is + // isolated and removed from the Raft group. So it's necessary to reload + // the region from PD. + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + } else if e.has_store_not_match() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.has_epoch_not_match() { + on_region_epoch_not_match(pd_client.clone(), region_store, e.take_epoch_not_match()).await + } else if e.has_stale_command() || e.has_region_not_found() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.has_server_is_busy() + || e.has_raft_entry_too_large() + || e.has_max_timestamp_not_synced() + { + Err(Error::RegionError(Box::new(e))) + } else { + // TODO: pass the logger around + // info!("unknwon region error: {:?}", e); + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } +} + +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +async fn on_region_epoch_not_match( + pd_client: Arc, + region_store: RegionStore, + error: EpochNotMatch, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if error.get_current_regions().is_empty() { + pd_client.invalidate_region_cache(ver_id).await; + return Ok(true); + } + + for r in error.get_current_regions() { + if r.get_id() == region_store.region_with_leader.id() { + let returned_conf_ver = r.get_region_epoch().get_conf_ver(); + let returned_version = r.get_region_epoch().get_version(); + let current_conf_ver = region_store + .region_with_leader + .region + .get_region_epoch() + .get_conf_ver(); + let current_version = region_store + .region_with_leader + .region + .get_region_epoch() + .get_version(); + + // Find whether the current region is ahead of TiKV's. If so, backoff. + if returned_conf_ver < current_conf_ver || returned_version < current_version { + return Ok(false); + } + } + } + // TODO: finer grained processing + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) +} + /// A technique for merging responses into a single result (with type `Out`). pub trait Merge: Sized + Clone + Send + Sync + 'static { type Out: Send; diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 1fe4f45a..d747f960 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -5,7 +5,10 @@ use crate::{ backoff::DEFAULT_REGION_BACKOFF, config::Config, pd::{PdClient, PdRpcClient}, - request::{plan::CleanupLocksResult, Plan}, + request::{ + plan::{handle_region_error, CleanupLocksResult}, + Plan, + }, store::RegionStore, timestamp::TimestampExt, transaction::{ @@ -306,6 +309,7 @@ impl Client { Transaction::new(timestamp, self.pd.clone(), options, logger) } + // FIXME: make the loop a plan. pub async fn split_region_with_retry( &self, #[allow(clippy::ptr_arg)] key: &Vec, @@ -321,19 +325,23 @@ impl Client { let store = self.pd.clone().store_for_key(key.into()).await?; let request = requests::new_split_region_request(split_keys.clone(), is_raw_kv); let plan = crate::request::PlanBuilder::new(self.pd.clone(), request) - .single_region_with_store(store) + .single_region_with_store(store.clone()) .await? .extract_error() .plan(); match plan.execute().await { Ok(mut resp) => return Ok(resp.take_regions().into()), Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { - Some(e @ Error::RegionError(_)) => match backoff.next_delay_duration() { + Some(Error::RegionError(e)) => match backoff.next_delay_duration() { Some(duration) => { - futures_timer::Delay::new(duration).await; + let region_error_resolved = + handle_region_error(self.pd.clone(), *e, store).await?; + if !region_error_resolved { + futures_timer::Delay::new(duration).await; + } continue 'retry; } - None => return Err(e), + None => return Err(Error::RegionError(e)), }, Some(e) => return Err(e), None => unreachable!(), @@ -343,6 +351,7 @@ impl Client { } } + // FIXME: make list stores and retry a plan. pub async fn unsafe_destroy_range(&self, start_key: Vec, end_key: Vec) -> Result<()> { debug!(self.logger, "invoking unsafe destroy range"); let backoff = DEFAULT_REGION_BACKOFF;