Skip to content

Commit 4df87a2

Browse files
author
twitter-team
committed
Latest navi open source refresh
latest code change including the global thread pool Closes twitter#452 Closes twitter#505
1 parent 6e5c875 commit 4df87a2

File tree

12 files changed

+112
-68
lines changed

12 files changed

+112
-68
lines changed

navi/README.md

+5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ In navi/navi, you can run the following commands:
3131
- `scripts/run_onnx.sh` for [Onnx](https://onnx.ai/)
3232

3333
Do note that you need to create a models directory and create some versions, preferably using epoch time, e.g., `1679693908377`.
34+
so the models structure looks like:
35+
models/
36+
-web_click
37+
- 1809000
38+
- 1809010
3439

3540
## Build
3641
You can adapt the above scripts to build using Cargo.

navi/dr_transform/Cargo.toml

+5-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ name = "dr_transform"
33
version = "0.1.0"
44
edition = "2021"
55

6-
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
76
[dependencies]
87
serde = { version = "1.0", features = ["derive"] }
98
serde_json = "1.0"
@@ -12,7 +11,6 @@ bpr_thrift = { path = "../thrift_bpr_adapter/thrift/"}
1211
segdense = { path = "../segdense/"}
1312
thrift = "0.17.0"
1413
ndarray = "0.15"
15-
ort = {git ="https://github.com/pykeio/ort.git", tag="v1.14.2"}
1614
base64 = "0.20.0"
1715
npyz = "0.7.2"
1816
log = "0.4.17"
@@ -21,6 +19,11 @@ prometheus = "0.13.1"
2119
once_cell = "1.17.0"
2220
rand = "0.8.5"
2321
itertools = "0.10.5"
22+
anyhow = "1.0.70"
23+
[target.'cfg(not(target_os="linux"))'.dependencies]
24+
ort = {git ="https://github.com/pykeio/ort.git", features=["profiling"], tag="v1.14.6"}
25+
[target.'cfg(target_os="linux")'.dependencies]
26+
ort = {git ="https://github.com/pykeio/ort.git", features=["profiling", "tensorrt", "cuda", "copy-dylibs"], tag="v1.14.6"}
2427
[dev-dependencies]
2528
criterion = "0.3.0"
2629

navi/dr_transform/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ pub mod converter;
33
#[cfg(test)]
44
mod test;
55
pub mod util;
6+
pub extern crate ort;

navi/navi/Cargo.toml

+13-8
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
[package]
22
name = "navi"
3-
version = "2.0.42"
3+
version = "2.0.45"
44
edition = "2021"
5-
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
65

76
[[bin]]
87
name = "navi"
@@ -16,12 +15,19 @@ required-features=["torch"]
1615
name = "navi_onnx"
1716
path = "src/bin/navi_onnx.rs"
1817
required-features=["onnx"]
18+
[[bin]]
19+
name = "navi_onnx_test"
20+
path = "src/bin/bin_tests/navi_onnx_test.rs"
21+
[[bin]]
22+
name = "navi_torch_test"
23+
path = "src/bin/bin_tests/navi_torch_test.rs"
24+
required-features=["torch"]
1925

2026
[features]
2127
default=[]
2228
navi_console=[]
2329
torch=["tch"]
24-
onnx=["ort"]
30+
onnx=[]
2531
tf=["tensorflow"]
2632
[dependencies]
2733
itertools = "0.10.5"
@@ -47,6 +53,7 @@ parking_lot = "0.12.1"
4753
rand = "0.8.5"
4854
rand_pcg = "0.3.1"
4955
random = "0.12.2"
56+
x509-parser = "0.15.0"
5057
sha256 = "1.0.3"
5158
tonic = { version = "0.6.2", features=['compression', 'tls'] }
5259
tokio = { version = "1.17.0", features = ["macros", "rt-multi-thread", "fs", "process"] }
@@ -55,16 +62,12 @@ npyz = "0.7.3"
5562
base64 = "0.21.0"
5663
histogram = "0.6.9"
5764
tch = {version = "0.10.3", optional = true}
58-
tensorflow = { version = "0.20.0", optional = true }
65+
tensorflow = { version = "0.18.0", optional = true }
5966
once_cell = {version = "1.17.1"}
6067
ndarray = "0.15"
6168
serde = "1.0.154"
6269
serde_json = "1.0.94"
6370
dr_transform = { path = "../dr_transform"}
64-
[target.'cfg(not(target_os="linux"))'.dependencies]
65-
ort = {git ="https://github.com/pykeio/ort.git", features=["profiling"], optional = true, tag="v1.14.2"}
66-
[target.'cfg(target_os="linux")'.dependencies]
67-
ort = {git ="https://github.com/pykeio/ort.git", features=["profiling", "tensorrt", "cuda", "copy-dylibs"], optional = true, tag="v1.14.2"}
6871
[build-dependencies]
6972
tonic-build = {version = "0.6.2", features=['prost', "compression"] }
7073
[profile.release]
@@ -74,3 +77,5 @@ ndarray-rand = "0.14.0"
7477
tokio-test = "*"
7578
assert_cmd = "2.0"
7679
criterion = "0.4.0"
80+
81+

navi/navi/scripts/run_onnx.sh

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
#!/bin/sh
22
#RUST_LOG=debug LD_LIBRARY_PATH=so/onnx/lib target/release/navi_onnx --port 30 --num-worker-threads 8 --intra-op-parallelism 8 --inter-op-parallelism 8 \
33
RUST_LOG=info LD_LIBRARY_PATH=so/onnx/lib cargo run --bin navi_onnx --features onnx -- \
4-
--port 30 --num-worker-threads 8 --intra-op-parallelism 8 --inter-op-parallelism 8 \
4+
--port 8030 --num-worker-threads 8 \
55
--model-check-interval-secs 30 \
6-
--model-dir models/int8 \
7-
--output caligrated_probabilities \
8-
--input "" \
96
--modelsync-cli "echo" \
10-
--onnx-ep-options use_arena=true
7+
--onnx-ep-options use_arena=true \
8+
--model-dir models/prod_home --output caligrated_probabilities --input "" --intra-op-parallelism 8 --inter-op-parallelism 8 --max-batch-size 1 --batch-time-out-millis 1 \
9+
--model-dir models/prod_home1 --output caligrated_probabilities --input "" --intra-op-parallelism 8 --inter-op-parallelism 8 --max-batch-size 1 --batch-time-out-millis 1 \

navi/navi/src/bin/navi_onnx.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,24 @@
11
use anyhow::Result;
2+
use log::info;
23
use navi::cli_args::{ARGS, MODEL_SPECS};
34
use navi::onnx_model::onnx::OnnxModel;
45
use navi::{bootstrap, metrics};
56

67
fn main() -> Result<()> {
78
env_logger::init();
8-
assert_eq!(MODEL_SPECS.len(), ARGS.inter_op_parallelism.len());
9+
info!("global: {:?}", ARGS.onnx_global_thread_pool_options);
10+
let assert_session_params = if ARGS.onnx_global_thread_pool_options.is_empty() {
11+
// std::env::set_var("OMP_NUM_THREADS", "1");
12+
info!("now we use per session thread pool");
13+
MODEL_SPECS.len()
14+
}
15+
else {
16+
info!("now we use global thread pool");
17+
0
18+
};
19+
assert_eq!(assert_session_params, ARGS.inter_op_parallelism.len());
20+
assert_eq!(assert_session_params, ARGS.inter_op_parallelism.len());
21+
922
metrics::register_custom_metrics();
1023
bootstrap::bootstrap(OnnxModel::new)
1124
}

navi/navi/src/bootstrap.rs

+3
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ impl<T: Model> PredictionService for PredictService<T> {
207207
PredictResult::DropDueToOverload => Err(Status::resource_exhausted("")),
208208
PredictResult::ModelNotFound(idx) => {
209209
Err(Status::not_found(format!("model index {}", idx)))
210+
},
211+
PredictResult::ModelNotReady(idx) => {
212+
Err(Status::unavailable(format!("model index {}", idx)))
210213
}
211214
PredictResult::ModelVersionNotFound(idx, version) => Err(
212215
Status::not_found(format!("model index:{}, version {}", idx, version)),

navi/navi/src/cli_args.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,11 @@ pub struct Args {
8787
pub intra_op_parallelism: Vec<String>,
8888
#[clap(
8989
long,
90-
default_value = "14",
9190
help = "number of threads to parallelize computations of the graph"
9291
)]
9392
pub inter_op_parallelism: Vec<String>,
9493
#[clap(
9594
long,
96-
default_value = "serving_default",
9795
help = "signature of a serving. only TF"
9896
)]
9997
pub serving_sig: Vec<String>,
@@ -107,10 +105,12 @@ pub struct Args {
107105
help = "max warmup records to use. warmup only implemented for TF"
108106
)]
109107
pub max_warmup_records: usize,
108+
#[clap(long, value_parser = Args::parse_key_val::<String, String>, value_delimiter=',')]
109+
pub onnx_global_thread_pool_options: Vec<(String, String)>,
110110
#[clap(
111-
long,
112-
default_value = "true",
113-
help = "when to use graph parallelization. only for ONNX"
111+
long,
112+
default_value = "true",
113+
help = "when to use graph parallelization. only for ONNX"
114114
)]
115115
pub onnx_use_parallel_mode: String,
116116
// #[clap(long, default_value = "false")]

navi/navi/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ pub enum PredictResult {
146146
Ok(Vec<TensorScores>, i64),
147147
DropDueToOverload,
148148
ModelNotFound(usize),
149+
ModelNotReady(usize),
149150
ModelVersionNotFound(usize, i64),
150151
}
151152

navi/navi/src/onnx_model.rs

+31-23
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,22 @@ pub mod onnx {
1313
use dr_transform::converter::{BatchPredictionRequestToTorchTensorConverter, Converter};
1414
use itertools::Itertools;
1515
use log::{debug, info};
16-
use ort::environment::Environment;
17-
use ort::session::Session;
18-
use ort::tensor::InputTensor;
19-
use ort::{ExecutionProvider, GraphOptimizationLevel, SessionBuilder};
16+
use dr_transform::ort::environment::Environment;
17+
use dr_transform::ort::session::Session;
18+
use dr_transform::ort::tensor::InputTensor;
19+
use dr_transform::ort::{ExecutionProvider, GraphOptimizationLevel, SessionBuilder};
20+
use dr_transform::ort::LoggingLevel;
2021
use serde_json::Value;
2122
use std::fmt::{Debug, Display};
2223
use std::sync::Arc;
2324
use std::{fmt, fs};
2425
use tokio::time::Instant;
25-
2626
lazy_static! {
2727
pub static ref ENVIRONMENT: Arc<Environment> = Arc::new(
2828
Environment::builder()
2929
.with_name("onnx home")
30-
.with_log_level(ort::LoggingLevel::Error)
30+
.with_log_level(LoggingLevel::Error)
31+
.with_global_thread_pool(ARGS.onnx_global_thread_pool_options.clone())
3132
.build()
3233
.unwrap()
3334
);
@@ -101,23 +102,30 @@ pub mod onnx {
101102
let meta_info = format!("{}/{}/{}", ARGS.model_dir[idx], version, META_INFO);
102103
let mut builder = SessionBuilder::new(&ENVIRONMENT)?
103104
.with_optimization_level(GraphOptimizationLevel::Level3)?
104-
.with_parallel_execution(ARGS.onnx_use_parallel_mode == "true")?
105-
.with_inter_threads(
106-
utils::get_config_or(
107-
model_config,
108-
"inter_op_parallelism",
109-
&ARGS.inter_op_parallelism[idx],
110-
)
111-
.parse()?,
112-
)?
113-
.with_intra_threads(
114-
utils::get_config_or(
115-
model_config,
116-
"intra_op_parallelism",
117-
&ARGS.intra_op_parallelism[idx],
118-
)
119-
.parse()?,
120-
)?
105+
.with_parallel_execution(ARGS.onnx_use_parallel_mode == "true")?;
106+
if ARGS.onnx_global_thread_pool_options.is_empty() {
107+
builder = builder
108+
.with_inter_threads(
109+
utils::get_config_or(
110+
model_config,
111+
"inter_op_parallelism",
112+
&ARGS.inter_op_parallelism[idx],
113+
)
114+
.parse()?,
115+
)?
116+
.with_intra_threads(
117+
utils::get_config_or(
118+
model_config,
119+
"intra_op_parallelism",
120+
&ARGS.intra_op_parallelism[idx],
121+
)
122+
.parse()?,
123+
)?;
124+
}
125+
else {
126+
builder = builder.with_disable_per_session_threads()?;
127+
}
128+
builder = builder
121129
.with_memory_pattern(ARGS.onnx_use_memory_pattern == "true")?
122130
.with_execution_providers(&OnnxModel::ep_choices())?;
123131
match &ARGS.profiling {

navi/navi/src/predict_service.rs

+29-23
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::{anyhow, Result};
22
use arrayvec::ArrayVec;
33
use itertools::Itertools;
4-
use log::{error, info, warn};
4+
use log::{error, info};
55
use std::fmt::{Debug, Display};
66
use std::string::String;
77
use std::sync::Arc;
@@ -179,17 +179,17 @@ impl<T: Model> PredictService<T> {
179179
//initialize the latest version array
180180
let mut cur_versions = vec!["".to_owned(); MODEL_SPECS.len()];
181181
loop {
182-
let config = utils::read_config(&meta_file).unwrap_or_else(|e| {
183-
warn!("config file {} not found due to: {}", meta_file, e);
184-
Value::Null
185-
});
186182
info!("***polling for models***"); //nice deliminter
187-
info!("config:{}", config);
188183
if let Some(ref cli) = ARGS.modelsync_cli {
189184
if let Err(e) = call_external_modelsync(cli, &cur_versions).await {
190185
error!("model sync cli running error:{}", e)
191186
}
192187
}
188+
let config = utils::read_config(&meta_file).unwrap_or_else(|e| {
189+
info!("config file {} not found due to: {}", meta_file, e);
190+
Value::Null
191+
});
192+
info!("config:{}", config);
193193
for (idx, cur_version) in cur_versions.iter_mut().enumerate() {
194194
let model_dir = &ARGS.model_dir[idx];
195195
PredictService::scan_load_latest_model_from_model_dir(
@@ -229,26 +229,32 @@ impl<T: Model> PredictService<T> {
229229
let no_more_msg = match msg {
230230
Ok(PredictMessage::Predict(model_spec_at, version, val, resp, ts)) => {
231231
if let Some(model_predictors) = all_model_predictors.get_mut(model_spec_at) {
232-
match version {
233-
None => model_predictors[0].push(val, resp, ts),
234-
Some(the_version) => match model_predictors
235-
.iter_mut()
236-
.find(|x| x.model.version() == the_version)
237-
{
238-
None => resp
239-
.send(PredictResult::ModelVersionNotFound(
240-
model_spec_at,
241-
the_version,
242-
))
243-
.unwrap_or_else(|e| {
244-
error!("cannot send back version error: {:?}", e)
245-
}),
246-
Some(predictor) => predictor.push(val, resp, ts),
247-
},
232+
if model_predictors.is_empty() {
233+
resp.send(PredictResult::ModelNotReady(model_spec_at))
234+
.unwrap_or_else(|e| error!("cannot send back model not ready error: {:?}", e));
235+
}
236+
else {
237+
match version {
238+
None => model_predictors[0].push(val, resp, ts),
239+
Some(the_version) => match model_predictors
240+
.iter_mut()
241+
.find(|x| x.model.version() == the_version)
242+
{
243+
None => resp
244+
.send(PredictResult::ModelVersionNotFound(
245+
model_spec_at,
246+
the_version,
247+
))
248+
.unwrap_or_else(|e| {
249+
error!("cannot send back version error: {:?}", e)
250+
}),
251+
Some(predictor) => predictor.push(val, resp, ts),
252+
},
253+
}
248254
}
249255
} else {
250256
resp.send(PredictResult::ModelNotFound(model_spec_at))
251-
.unwrap_or_else(|e| error!("cannot send back model error: {:?}", e))
257+
.unwrap_or_else(|e| error!("cannot send back model not found error: {:?}", e))
252258
}
253259
MPSC_CHANNEL_SIZE.dec();
254260
false

navi/segdense/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ name = "segdense"
33
version = "0.1.0"
44
edition = "2021"
55

6-
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
76

87
[dependencies]
8+
env_logger = "0.10.0"
99
serde = { version = "1.0.104", features = ["derive"] }
1010
serde_json = "1.0.48"
1111
log = "0.4.17"

0 commit comments

Comments
 (0)