Skip to content

Commit e8b04a9

Browse files
authored
Merge pull request #79 from nrc/ts
Some more refactoring
2 parents 1815a3b + 35fe306 commit e8b04a9

File tree

9 files changed

+40
-77
lines changed

9 files changed

+40
-77
lines changed

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ derive-new = "0.5"
2323
failure = "0.1"
2424
futures-preview = { version = "0.3.0-alpha.15", features = ["compat"] }
2525
grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false }
26-
lazy_static = "0.2.1"
26+
lazy_static = "1"
2727
log = "0.3.9"
2828
serde = "1.0"
2929
serde_derive = "1.0"
@@ -44,4 +44,7 @@ tempdir = "0.3"
4444
runtime = "0.3.0-alpha.3"
4545
runtime-tokio = "0.3.0-alpha.3"
4646
proptest = "0.9"
47-
proptest-derive = "0.1.0"
47+
proptest-derive = "0.1.0"
48+
49+
[replace]
50+
"prost:0.5.0" = { git = "https://github.com/danburkert/prost", rev = "1944c27c3029d01ff216e7b126d846b6cf8c7d77" }

rust-toolchain

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
nightly-2019-07-09

src/compat.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
//! from futures 0.1 to 1.0 easier.
55
66
use futures::prelude::*;
7+
use futures::ready;
78
use futures::task::{Context, Poll};
8-
use futures::{ready, try_ready};
99
use std::pin::Pin;
1010

1111
/// The status of a `loop_fn` loop.
@@ -52,7 +52,7 @@ where
5252
loop {
5353
unsafe {
5454
let this = Pin::get_unchecked_mut(self);
55-
match try_ready!(Pin::new_unchecked(&mut this.future).poll(cx)) {
55+
match ready!(Pin::new_unchecked(&mut this.future).poll(cx))? {
5656
Loop::Break(x) => return Poll::Ready(Ok(x)),
5757
Loop::Continue(s) => this.future = (this.func)(s),
5858
}
@@ -112,7 +112,7 @@ where
112112
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T, E>> {
113113
unsafe {
114114
let this = Pin::get_unchecked_mut(self);
115-
let result = try_ready!(Pin::new_unchecked(&mut this.future).poll(cx));
115+
let result = ready!(Pin::new_unchecked(&mut this.future).poll(cx))?;
116116
Poll::Ready((this.func)(result))
117117
}
118118
}
@@ -145,13 +145,13 @@ pub(crate) trait SinkCompat<I, E> {
145145
fn send_all_compat<S>(self, stream: S) -> SendAllCompat<Self, S>
146146
where
147147
S: Stream<Item = I> + Unpin,
148-
Self: Sink<I, SinkError = E> + Sized + Unpin,
148+
Self: Sink<I, Error = E> + Sized + Unpin,
149149
{
150150
SendAllCompat::new(self, stream)
151151
}
152152
}
153153

154-
impl<T, E, S: Sink<T, SinkError = E>> SinkCompat<T, E> for S {}
154+
impl<T, E, S: Sink<T, Error = E>> SinkCompat<T, E> for S {}
155155

156156
#[derive(Debug)]
157157
#[must_use = "futures do nothing unless polled"]
@@ -219,7 +219,7 @@ where
219219
&mut self,
220220
item: St::Item,
221221
cx: &mut Context,
222-
) -> Poll<Result<(()), Si::SinkError>> {
222+
) -> Poll<Result<(()), Si::Error>> {
223223
debug_assert!(self.buffered.is_none());
224224
match self.sink_mut().poll_ready(cx) {
225225
Poll::Ready(Ok(())) => Poll::Ready(self.sink_mut().start_send(item)),
@@ -237,22 +237,22 @@ where
237237
Si: Sink<St::Item> + Unpin,
238238
St: Stream + Unpin,
239239
{
240-
type Output = Result<((Si, St)), Si::SinkError>;
240+
type Output = Result<((Si, St)), Si::Error>;
241241

242-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<((Si, St)), Si::SinkError>> {
242+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<((Si, St)), Si::Error>> {
243243
if let Some(item) = self.buffered.take() {
244-
try_ready!(self.try_start_send(item, cx))
244+
ready!(self.try_start_send(item, cx))?
245245
}
246246

247247
loop {
248248
match self.stream_mut().poll_next(cx) {
249-
Poll::Ready(Some(item)) => try_ready!(self.try_start_send(item, cx)),
249+
Poll::Ready(Some(item)) => ready!(self.try_start_send(item, cx))?,
250250
Poll::Ready(None) => {
251-
try_ready!(self.sink_mut().poll_close(cx));
251+
ready!(self.sink_mut().poll_close(cx))?;
252252
return Poll::Ready(Ok(self.take_result()));
253253
}
254254
Poll::Pending => {
255-
try_ready!(self.sink_mut().poll_flush(cx));
255+
ready!(self.sink_mut().poll_flush(cx))?;
256256
return Poll::Pending;
257257
}
258258
}

src/rpc/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ mod pd;
88
mod security;
99
mod tikv;
1010

11-
pub(crate) use crate::rpc::client::RpcClient;
11+
pub(crate) use self::client::RpcClient;
12+
pub use self::pd::Timestamp;

src/rpc/pd/client.rs

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ use std::{
99
use futures::compat::Compat01As03;
1010
use futures::prelude::*;
1111
use grpcio::{CallOption, Environment};
12-
use kvproto::{metapb, pdpb, pdpb::PdClient as RpcClient};
12+
use kvproto::{pdpb, pdpb::PdClient as RpcClient};
1313

1414
use crate::{
1515
rpc::{
1616
context::RequestContext,
1717
pd::{
18-
context::request_context, leader::LeaderClient, request::Request, PdTimestamp, Region,
19-
RegionId, Store, StoreId,
18+
context::request_context, leader::LeaderClient, request::Request, Region, Store,
19+
StoreId, Timestamp,
2020
},
2121
security::SecurityManager,
2222
},
@@ -74,10 +74,7 @@ impl PdClient {
7474
self.leader.read().unwrap().members.get_leader().clone()
7575
}
7676

77-
fn get_region_and_leader(
78-
&self,
79-
key: &[u8],
80-
) -> impl Future<Output = Result<(metapb::Region, Option<metapb::Peer>)>> {
77+
pub fn get_region(&self, key: &[u8]) -> impl Future<Output = Result<Region>> {
8178
let mut req = pd_request!(self.cluster_id, pdpb::GetRegionRequest);
8279
req.set_region_key(key.to_owned());
8380
let key = req.get_region_key().to_owned();
@@ -99,14 +96,11 @@ impl PdClient {
9996
} else {
10097
None
10198
};
102-
future::ready(Ok((region, leader)))
99+
future::ready(Ok(Region::new(region, leader)))
103100
})
104101
}
105102

106-
fn get_region_and_leader_by_id(
107-
&self,
108-
region_id: u64,
109-
) -> impl Future<Output = Result<(metapb::Region, Option<metapb::Peer>)>> {
103+
pub fn get_region_by_id(&self, region_id: u64) -> impl Future<Output = Result<Region>> {
110104
let mut req = pd_request!(self.cluster_id, pdpb::GetRegionByIdRequest);
111105
req.set_region_id(region_id);
112106

@@ -128,7 +122,7 @@ impl PdClient {
128122
} else {
129123
None
130124
};
131-
future::ready(Ok((region, leader)))
125+
future::ready(Ok(Region::new(region, leader)))
132126
})
133127
}
134128

@@ -147,7 +141,7 @@ impl PdClient {
147141
let option = CallOption::default().timeout(timeout);
148142
let cli = &cli.read().unwrap().client;
149143
executor(cli, option).unwrap().map(|r| match r {
150-
Err(e) => Err(ErrorKind::Grpc(e))?,
144+
Err(e) => Err(ErrorKind::Grpc(e).into()),
151145
Ok(r) => {
152146
{
153147
let header = r.header();
@@ -195,17 +189,7 @@ impl PdClient {
195189
.map_ok(|mut resp| resp.take_store())
196190
}
197191

198-
pub fn get_region(&self, key: &[u8]) -> impl Future<Output = Result<Region>> {
199-
self.get_region_and_leader(key)
200-
.map_ok(|x| Region::new(x.0, x.1))
201-
}
202-
203-
pub fn get_region_by_id(&self, id: RegionId) -> impl Future<Output = Result<Region>> {
204-
self.get_region_and_leader_by_id(id)
205-
.map_ok(|x| Region::new(x.0, x.1))
206-
}
207-
208-
pub fn get_ts(&self) -> impl Future<Output = Result<PdTimestamp>> {
192+
pub fn get_ts(&self) -> impl Future<Output = Result<Timestamp>> {
209193
self.leader.write().unwrap().get_ts()
210194
}
211195
}

src/rpc/pd/leader.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use futures::channel::{
1212
oneshot,
1313
};
1414
use futures::compat::{Compat01As03, Compat01As03Sink};
15+
use futures::future::TryFutureExt;
1516
use futures::prelude::*;
1617
use grpcio::{CallOption, Environment, WriteFlags};
1718
use kvproto::pdpb;
@@ -22,7 +23,7 @@ use crate::{
2223
rpc::{
2324
pd::{
2425
context::{observe_tso_batch, request_context},
25-
PdTimestamp,
26+
Timestamp,
2627
},
2728
security::SecurityManager,
2829
},
@@ -39,12 +40,12 @@ macro_rules! pd_request {
3940
}};
4041
}
4142

42-
type TsoChannel = oneshot::Sender<PdTimestamp>;
43+
type TsoChannel = oneshot::Sender<Timestamp>;
4344

4445
enum PdTask {
4546
Init,
4647
Request,
47-
Response(Vec<oneshot::Sender<PdTimestamp>>, pdpb::TsoResponse),
48+
Response(Vec<oneshot::Sender<Timestamp>>, pdpb::TsoResponse),
4849
}
4950

5051
struct PdReactor {
@@ -189,7 +190,7 @@ impl PdReactor {
189190
let timestamp = response.get_timestamp();
190191
for (offset, request) in requests.drain(..).enumerate() {
191192
request
192-
.send(PdTimestamp {
193+
.send(Timestamp {
193194
physical: timestamp.physical,
194195
logical: timestamp.logical + offset as i64,
195196
})
@@ -206,9 +207,9 @@ impl PdReactor {
206207
}
207208
}
208209

209-
fn get_ts(&mut self) -> impl Future<Output = Result<PdTimestamp>> {
210+
fn get_ts(&mut self) -> impl Future<Output = Result<Timestamp>> {
210211
let context = request_context("get_ts", ());
211-
let (tx, rx) = oneshot::channel::<PdTimestamp>();
212+
let (tx, rx) = oneshot::channel::<Timestamp>();
212213
self.tso_batch.push(tx);
213214
if self.tso_pending.is_none() {
214215
// Schedule tso request to run.
@@ -260,7 +261,7 @@ impl LeaderClient {
260261
Ok(client)
261262
}
262263

263-
pub fn get_ts(&mut self) -> impl Future<Output = Result<PdTimestamp>> {
264+
pub fn get_ts(&mut self) -> impl Future<Output = Result<Timestamp>> {
264265
self.reactor.get_ts()
265266
}
266267

src/rpc/pd/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,10 @@ impl Region {
8686
.map(Into::into)
8787
.ok_or_else(|| Error::stale_epoch(None))
8888
}
89-
90-
pub fn meta(&self) -> metapb::Region {
91-
self.region.clone()
92-
}
9389
}
9490

9591
#[derive(Eq, PartialEq, Debug)]
96-
pub struct PdTimestamp {
92+
pub struct Timestamp {
9793
pub physical: i64,
9894
pub logical: i64,
9995
}

src/rpc/tikv/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ impl KvClient {
666666
)
667667
.unwrap()
668668
.map(|r| match r {
669-
Err(e) => Err(ErrorKind::Grpc(e))?,
669+
Err(e) => Err(ErrorKind::Grpc(e).into()),
670670
Ok(mut r) => {
671671
if let Some(e) = r.region_error() {
672672
Err(e)

src/transaction/mod.rs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
pub use self::client::{Client, Connect};
1313
pub use self::requests::{BatchGet, Commit, Delete, Get, LockKeys, Rollback, Scanner, Set};
1414
pub use self::transaction::{IsolationLevel, Snapshot, Transaction, TxnInfo};
15+
pub use super::rpc::Timestamp;
1516

1617
use crate::{Key, Value};
1718

@@ -26,27 +27,3 @@ pub enum Mutation {
2627
Lock(Key),
2728
Rollback(Key),
2829
}
29-
30-
/// A logical timestamp produced by PD.
31-
#[derive(Copy, Clone)]
32-
pub struct Timestamp(u64);
33-
34-
impl From<u64> for Timestamp {
35-
fn from(v: u64) -> Self {
36-
Timestamp(v)
37-
}
38-
}
39-
40-
impl Timestamp {
41-
pub fn timestamp(self) -> u64 {
42-
self.0
43-
}
44-
45-
pub fn physical(self) -> i64 {
46-
(self.0 >> 16) as i64
47-
}
48-
49-
pub fn logical(self) -> i64 {
50-
(self.0 & 0xFFFF as u64) as i64
51-
}
52-
}

0 commit comments

Comments
 (0)