|  | 
|  | 1 | +use hyper::{ | 
|  | 2 | +    service::{make_service_fn, service_fn}, | 
|  | 3 | +    Body, Request, Response, Server, StatusCode, | 
|  | 4 | +}; | 
|  | 5 | +use opentelemetry::{ | 
|  | 6 | +    global, | 
|  | 7 | +    trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer}, | 
|  | 8 | +    Context, KeyValue, | 
|  | 9 | +}; | 
|  | 10 | +use opentelemetry_http::HeaderExtractor; | 
|  | 11 | +use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider}; | 
|  | 12 | +use opentelemetry_semantic_conventions::trace; | 
|  | 13 | +use opentelemetry_stdout::SpanExporter; | 
|  | 14 | +use std::{convert::Infallible, net::SocketAddr}; | 
|  | 15 | + | 
|  | 16 | +// Utility function to extract the context from the incoming request headers | 
|  | 17 | +fn extract_context_from_request(req: &Request<Body>) -> Context { | 
|  | 18 | +    global::get_text_map_propagator(|propagator| { | 
|  | 19 | +        propagator.extract(&HeaderExtractor(req.headers())) | 
|  | 20 | +    }) | 
|  | 21 | +} | 
|  | 22 | + | 
|  | 23 | +// Separate async function for the handle endpoint | 
|  | 24 | +async fn handle_health_check(_req: Request<Body>) -> Result<Response<Body>, Infallible> { | 
|  | 25 | +    let tracer = global::tracer("example/server"); | 
|  | 26 | +    let mut span = tracer | 
|  | 27 | +        .span_builder("health_check") | 
|  | 28 | +        .with_kind(SpanKind::Internal) | 
|  | 29 | +        .start(&tracer); | 
|  | 30 | +    span.add_event("Health check accessed", vec![]); | 
|  | 31 | +    let res = Response::new(Body::from("Server is up and running!")); | 
|  | 32 | +    Ok(res) | 
|  | 33 | +} | 
|  | 34 | + | 
|  | 35 | +// Separate async function for the echo endpoint | 
|  | 36 | +async fn handle_echo(req: Request<Body>) -> Result<Response<Body>, Infallible> { | 
|  | 37 | +    let tracer = global::tracer("example/server"); | 
|  | 38 | +    let mut span = tracer | 
|  | 39 | +        .span_builder("echo") | 
|  | 40 | +        .with_kind(SpanKind::Internal) | 
|  | 41 | +        .start(&tracer); | 
|  | 42 | +    span.add_event("Echoing back the request", vec![]); | 
|  | 43 | +    let res = Response::new(req.into_body()); | 
|  | 44 | +    Ok(res) | 
|  | 45 | +} | 
|  | 46 | + | 
|  | 47 | +async fn router(req: Request<Body>) -> Result<Response<Body>, Infallible> { | 
|  | 48 | +    // Extract the context from the incoming request headers | 
|  | 49 | +    let parent_cx = extract_context_from_request(&req); | 
|  | 50 | +    let response = { | 
|  | 51 | +        // Create a span parenting the remote client span. | 
|  | 52 | +        let tracer = global::tracer("example/server"); | 
|  | 53 | +        let mut span = tracer | 
|  | 54 | +            .span_builder("router") | 
|  | 55 | +            .with_kind(SpanKind::Server) | 
|  | 56 | +            .start_with_context(&tracer, &parent_cx); | 
|  | 57 | + | 
|  | 58 | +        span.add_event("dispatching request", vec![]); | 
|  | 59 | + | 
|  | 60 | +        let cx = Context::default().with_span(span); | 
|  | 61 | +        match (req.method(), req.uri().path()) { | 
|  | 62 | +            (&hyper::Method::GET, "/health") => handle_health_check(req).with_context(cx).await, | 
|  | 63 | +            (&hyper::Method::GET, "/echo") => handle_echo(req).with_context(cx).await, | 
|  | 64 | +            _ => { | 
|  | 65 | +                cx.span() | 
|  | 66 | +                    .set_attribute(KeyValue::new(trace::HTTP_RESPONSE_STATUS_CODE, 404)); | 
|  | 67 | +                let mut not_found = Response::default(); | 
|  | 68 | +                *not_found.status_mut() = StatusCode::NOT_FOUND; | 
|  | 69 | +                Ok(not_found) | 
|  | 70 | +            } | 
|  | 71 | +        } | 
|  | 72 | +    }; | 
|  | 73 | +    response | 
|  | 74 | +} | 
|  | 75 | + | 
|  | 76 | +fn init_tracer() { | 
|  | 77 | +    global::set_text_map_propagator(TraceContextPropagator::new()); | 
|  | 78 | + | 
|  | 79 | +    // Install stdout exporter pipeline to be able to retrieve the collected spans. | 
|  | 80 | +    // For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production | 
|  | 81 | +    // application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio. | 
|  | 82 | +    let provider = TracerProvider::builder() | 
|  | 83 | +        .with_simple_exporter(SpanExporter::default()) | 
|  | 84 | +        .build(); | 
|  | 85 | + | 
|  | 86 | +    global::set_tracer_provider(provider); | 
|  | 87 | +} | 
|  | 88 | + | 
|  | 89 | +#[tokio::main] | 
|  | 90 | +async fn main() { | 
|  | 91 | +    init_tracer(); | 
|  | 92 | +    let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); | 
|  | 93 | + | 
|  | 94 | +    let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(router)) }); | 
|  | 95 | + | 
|  | 96 | +    let server = Server::bind(&addr).serve(make_svc); | 
|  | 97 | + | 
|  | 98 | +    println!("Listening on {addr}"); | 
|  | 99 | +    if let Err(e) = server.await { | 
|  | 100 | +        eprintln!("server error: {e}"); | 
|  | 101 | +    } | 
|  | 102 | +} | 
0 commit comments