Skip to content

Commit 55b3a52

Browse files
overvenusti-chi-bot[bot]
authored andcommitted
*: remove unnecessary async blocks to save memory (tikv#16541)
close tikv#16540 *: enable linters about async and futures We should be pedantic about writing async code, as it's easy to write suboptimal or even bloat code. See: rust-lang/rust#69826 *: remove unnecessary async blocks to save memory This commit favors FutureExt::map over async blocks to mitigate the issue of async block doubled memory usage. Through the sysbench oltp_read_only test, it was observed that this adjustment resulted in approximately 26% reduction in memory usage. See: rust-lang/rust#59087 Signed-off-by: Neil Shen <[email protected]> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Signed-off-by: dbsid <[email protected]>
1 parent 72d5717 commit 55b3a52

File tree

11 files changed

+84
-64
lines changed

11 files changed

+84
-64
lines changed

components/backup-stream/src/checkpoint_manager.rs

+1
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ pub mod tests {
613613
Self(Arc::new(Mutex::new(inner)))
614614
}
615615

616+
#[allow(clippy::unused_async)]
616617
pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> {
617618
panic!("failed in a case should never fail: {}", status);
618619
}

components/backup-stream/src/router.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -940,7 +940,7 @@ impl StreamTaskInfo {
940940
#[allow(clippy::map_entry)]
941941
if !w.contains_key(&key) {
942942
let path = key.temp_file_name();
943-
let val = Mutex::new(DataFile::new(path, &self.temp_file_pool).await?);
943+
let val = Mutex::new(DataFile::new(path, &self.temp_file_pool)?);
944944
w.insert(key, val);
945945
}
946946

@@ -1444,7 +1444,7 @@ impl MetadataInfo {
14441444
impl DataFile {
14451445
/// create and open a logfile at the path.
14461446
/// Note: if a file with same name exists, would truncate it.
1447-
async fn new(local_path: impl AsRef<Path>, files: &Arc<TempFilePool>) -> Result<Self> {
1447+
fn new(local_path: impl AsRef<Path>, files: &Arc<TempFilePool>) -> Result<Self> {
14481448
let sha256 = Hasher::new(MessageDigest::sha256())
14491449
.map_err(|err| Error::Other(box_err!("openssl hasher failed to init: {}", err)))?;
14501450
let inner = files.open_for_write(local_path.as_ref())?;
@@ -2434,7 +2434,7 @@ mod tests {
24342434
let mut f = pool.open_for_write(file_path).unwrap();
24352435
f.write_all(b"test-data").await?;
24362436
f.done().await?;
2437-
let mut data_file = DataFile::new(&file_path, &pool).await.unwrap();
2437+
let mut data_file = DataFile::new(file_path, &pool).unwrap();
24382438
let info = DataFileInfo::new();
24392439

24402440
let mut meta = MetadataInfo::with_capacity(1);

components/backup-stream/src/subscription_manager.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ where
435435
let now = Instant::now();
436436
let timedout = self.wait(Duration::from_secs(5)).await;
437437
if timedout {
438-
warn!("waiting for initial scanning done timed out, forcing progress!";
438+
warn!("waiting for initial scanning done timed out, forcing progress!";
439439
"take" => ?now.saturating_elapsed(), "timedout" => %timedout);
440440
}
441441
let regions = resolver.resolve(self.subs.current_regions(), min_ts).await;
@@ -453,7 +453,7 @@ where
453453
callback(ResolvedRegions::new(rts, cps));
454454
}
455455
ObserveOp::HighMemUsageWarning { region_id } => {
456-
self.on_high_memory_usage(region_id).await;
456+
self.on_high_memory_usage(region_id);
457457
}
458458
}
459459
}
@@ -507,7 +507,7 @@ where
507507
}
508508
}
509509

510-
async fn on_high_memory_usage(&mut self, inconsistent_region_id: u64) {
510+
fn on_high_memory_usage(&mut self, inconsistent_region_id: u64) {
511511
let mut lame_region = Region::new();
512512
lame_region.set_id(inconsistent_region_id);
513513
let mut act_region = None;
@@ -517,9 +517,9 @@ where
517517
});
518518
let delay = OOM_BACKOFF_BASE
519519
+ Duration::from_secs(rand::thread_rng().gen_range(0..OOM_BACKOFF_JITTER_SECS));
520-
info!("log backup triggering high memory usage.";
521-
"region" => %inconsistent_region_id,
522-
"mem_usage" => %self.memory_manager.used_ratio(),
520+
info!("log backup triggering high memory usage.";
521+
"region" => %inconsistent_region_id,
522+
"mem_usage" => %self.memory_manager.used_ratio(),
523523
"mem_max" => %self.memory_manager.capacity());
524524
if let Some(region) = act_region {
525525
self.schedule_start_observe(delay, region, None);
@@ -786,7 +786,7 @@ where
786786
let feedback_channel = match self.messenger.upgrade() {
787787
Some(ch) => ch,
788788
None => {
789-
warn!("log backup subscription manager is shutting down, aborting new scan.";
789+
warn!("log backup subscription manager is shutting down, aborting new scan.";
790790
utils::slog_region(region), "handle" => ?handle.id);
791791
return;
792792
}

components/backup/src/softlimit.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl SoftLimit {
3838
Ok(())
3939
}
4040

41-
async fn grant_tokens(&self, n: usize) {
41+
fn grant_tokens(&self, n: usize) {
4242
self.0.semaphore.add_permits(n);
4343
}
4444

@@ -53,9 +53,9 @@ impl SoftLimit {
5353

5454
/// Grows the tasks can be executed concurrently by n
5555
#[cfg(test)]
56-
pub async fn grow(&self, n: usize) {
56+
pub fn grow(&self, n: usize) {
5757
self.0.cap.fetch_add(n, Ordering::SeqCst);
58-
self.grant_tokens(n).await;
58+
self.grant_tokens(n);
5959
}
6060

6161
/// resize the tasks available concurrently.
@@ -66,7 +66,7 @@ impl SoftLimit {
6666
self.take_tokens(current - target).await?;
6767
}
6868
CmpOrder::Less => {
69-
self.grant_tokens(target - current).await;
69+
self.grant_tokens(target - current);
7070
}
7171
_ => {}
7272
}
@@ -304,7 +304,7 @@ mod softlimit_test {
304304
)
305305
.await;
306306

307-
limit_cloned.grow(1).await;
307+
limit_cloned.grow(1);
308308
let working_cloned = working.clone();
309309
should_satisfy_in(
310310
Duration::from_secs(10),
@@ -314,7 +314,7 @@ mod softlimit_test {
314314
.await;
315315

316316
let working_cloned = working.clone();
317-
limit_cloned.grow(2).await;
317+
limit_cloned.grow(2);
318318
should_satisfy_in(
319319
Duration::from_secs(10),
320320
"waiting for worker grow to 4",

components/resolved_ts/src/scanner.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub struct ScanTask {
4343
}
4444

4545
impl ScanTask {
46-
async fn send_entries(&self, entries: ScanEntries, apply_index: u64) {
46+
fn send_entries(&self, entries: ScanEntries, apply_index: u64) {
4747
let task = Task::ScanLocks {
4848
region_id: self.region.get_id(),
4949
observe_id: self.handle.id,
@@ -159,11 +159,10 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine> ScannerPool<T, E> {
159159
if has_remaining {
160160
start_key = Some(locks.last().unwrap().0.clone())
161161
}
162-
task.send_entries(ScanEntries::Lock(locks), apply_index)
163-
.await;
162+
task.send_entries(ScanEntries::Lock(locks), apply_index);
164163
}
165164
RTS_SCAN_DURATION_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64());
166-
task.send_entries(ScanEntries::None, apply_index).await;
165+
task.send_entries(ScanEntries::None, apply_index);
167166
};
168167
self.workers.spawn(fut);
169168
}

components/resource_control/src/future.rs

+1
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ mod tests {
275275
}
276276
}
277277

278+
#[allow(clippy::unused_async)]
278279
async fn empty() {}
279280

280281
#[test]

components/tikv_util/src/yatp_pool/future_pool.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::{
1313

1414
use fail::fail_point;
1515
use futures::channel::oneshot::{self, Canceled};
16+
use futures_util::future::FutureExt;
1617
use prometheus::{IntCounter, IntGauge};
1718
use tracker::TrackedFuture;
1819
use yatp::{queue::Extras, task::future};
@@ -216,11 +217,13 @@ impl PoolInner {
216217

217218
metrics_running_task_count.inc();
218219

219-
let f = async move {
220-
let _ = future.await;
220+
// NB: Prefer FutureExt::map to async block, because an async block
221+
// doubles memory usage.
222+
// See https://github.com/rust-lang/rust/issues/59087
223+
let f = future.map(move |_| {
221224
metrics_handled_task_count.inc();
222225
metrics_running_task_count.dec();
223-
};
226+
});
224227

225228
if let Some(extras) = extras {
226229
self.pool.spawn(future::TaskCell::new(f, extras));
@@ -246,12 +249,14 @@ impl PoolInner {
246249

247250
let (tx, rx) = oneshot::channel();
248251
metrics_running_task_count.inc();
249-
self.pool.spawn(async move {
250-
let res = future.await;
252+
// NB: Prefer FutureExt::map to async block, because an async block
253+
// doubles memory usage.
254+
// See https://github.com/rust-lang/rust/issues/59087
255+
self.pool.spawn(future.map(move |res| {
251256
metrics_handled_task_count.inc();
252257
metrics_running_task_count.dec();
253258
let _ = tx.send(res);
254-
});
259+
}));
255260
Ok(rx)
256261
}
257262
}

scripts/clippy

+24-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ fi
2121
# - `derive_partial_eq_without_eq` has compilation overhead.
2222
# - Blocking issue for enabling `result_large_err` is the protobuf messages.
2323
# - Blocking issue for clippy::large_enum_variant is the raftstore peer message.
24-
# - Enables `clippy::needless_return_with_question_mark` after
25-
# https://github.com/rust-lang/rust-clippy/issues/11982 is fixed.
2624
CLIPPY_LINTS=(
2725
-A clippy::module_inception \
2826
-A clippy::result_large_err \
@@ -50,9 +48,32 @@ CLIPPY_LINTS=(
5048
-D clippy::disallowed_methods \
5149
-D rust-2018-idioms \
5250
-D clippy::assertions_on_result_states \
53-
-A clippy::needless_return_with_question_mark \
5451
-A clippy::non_canonical_partial_ord_impl \
5552
-A clippy::arc_with_non_send_sync \
53+
)
54+
55+
# TODO: Enables `clippy::needless_return_with_question_mark` after
56+
# https://github.com/rust-lang/rust-clippy/issues/11982 is fixed.
57+
CLIPPY_LINTS+=(
58+
-A clippy::needless_return_with_question_mark \
59+
)
60+
61+
# We should be pedantic about writing async code, as it's easy to write
62+
# suboptimal or even bloat code. See:
63+
# - https://github.com/rust-lang/rust/issues/69826
64+
# - https://github.com/rust-lang/rust/issues/69663
65+
# - https://github.com/rust-lang/rust/issues/71407
66+
CLIPPY_LINTS+=(
67+
-D clippy::redundant_async_block \
68+
-D clippy::unused_async \
69+
-D clippy::manual_async_fn \
70+
-D clippy::large_futures \
71+
)
72+
73+
# Allow let_underscore_future temporary due to lots of counterexamples in
74+
# tests.
75+
# TODO: deny it.
76+
CLIPPY_LINTS+=(
5677
-A clippy::let_underscore_future \
5778
)
5879

src/read_pool.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use std::{
1212
};
1313

1414
use file_system::{set_io_type, IoType};
15-
use futures::{channel::oneshot, future::TryFutureExt};
15+
use futures::{
16+
channel::oneshot,
17+
future::{FutureExt, TryFutureExt},
18+
};
1619
use kvproto::{errorpb, kvrpcpb::CommandPri};
1720
use online_config::{ConfigChange, ConfigManager, ConfigValue, Result as CfgResult};
1821
use prometheus::{core::Metric, Histogram, IntCounter, IntGauge};
@@ -172,10 +175,9 @@ impl ReadPoolHandle {
172175
TaskCell::new(
173176
TrackedFuture::new(with_resource_limiter(
174177
ControlledFuture::new(
175-
async move {
176-
f.await;
178+
f.map(move |_| {
177179
running_tasks.dec();
178-
},
180+
}),
179181
resource_ctl.clone(),
180182
group_name,
181183
),
@@ -185,10 +187,9 @@ impl ReadPoolHandle {
185187
)
186188
} else {
187189
TaskCell::new(
188-
TrackedFuture::new(async move {
189-
f.await;
190+
TrackedFuture::new(f.map(move |_| {
190191
running_tasks.dec();
191-
}),
192+
})),
192193
extras,
193194
)
194195
};
@@ -212,10 +213,9 @@ impl ReadPoolHandle {
212213
{
213214
let (tx, rx) = oneshot::channel::<T>();
214215
let res = self.spawn(
215-
async move {
216-
let res = f.await;
216+
f.map(move |res| {
217217
let _ = tx.send(res);
218-
},
218+
}),
219219
priority,
220220
task_id,
221221
metadata,

0 commit comments

Comments
 (0)