Skip to content

Commit

Permalink
changes poll method async
Browse files Browse the repository at this point in the history
  • Loading branch information
valesteban committed Jan 10, 2024
1 parent 0554fcf commit ff2f91e
Showing 1 changed file with 70 additions and 51 deletions.
121 changes: 70 additions & 51 deletions src/async_resolver/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,42 +37,43 @@ pub struct LookupFutureStub {
///
/// The `Output` of this future is a `Result<DnsMessage, ResolverError>`.
/// The returned `DnsMessage` contains the corresponding response of the query.
query_answer: Arc<std::sync::Mutex<Pin<Box<dyn futures_util::Future<Output = Result<DnsMessage, ResolverError>> + Send>>>>,
/// Waker for the future.
waker: Option<Waker>,
query_answer: Pin<Box<dyn futures_util::Future<Output = Result<DnsMessage, ResolverError>> + Send>>,
// Waker for the future.
// waker: Option<Waker>,
}

impl Future for LookupFutureStub {
type Output = Result<DnsMessage, ResolverError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

let query = self.query_answer.lock().unwrap().as_mut().poll(cx) ;
let query = self.query_answer.as_mut().poll(cx) ;

match query {
Poll::Pending => {
return Poll::Pending;
},
Poll::Ready(Err(_)) => {
self.waker = Some(cx.waker().clone());
// self.waker = Some(cx.waker().clone());

let referenced_query = Arc::clone(&self.query_answer);
tokio::spawn(
lookup_stub(
self.name.clone(),
self.record_type,
self.record_class,
self.config.get_name_servers(),
self.waker.clone(),
referenced_query,
self.config.clone())
);
// let referenced_query = Arc::clone(&self.query_answer);

return Poll::Pending;
self.query_answer = lookup_stub(
self.name.clone(),
self.record_type,
self.record_class,
self.config.get_name_servers(),
// self.waker.clone(),
// referenced_query,
self.config.clone()).boxed();

//FIXME: Implement from for DnsMessage
return query.map(|f| f.map(DnsMessage::from));
},
Poll::Ready(Ok(ip_addr)) => {
return Poll::Ready(Ok(ip_addr));
}

}
}
}
Expand All @@ -97,8 +98,8 @@ impl LookupFutureStub {
record_class: qclass,
config: config,
query_answer:
Arc::new(Mutex::new(future::err(ResolverError::EmptyQuery).boxed())), //FIXME: cambiar a otro tipo el error/inicio
waker: None,
future::err(ResolverError::EmptyQuery).boxed(), //FIXME: cambiar a otro tipo el error/inicio
// waker: None,
}
}

Expand Down Expand Up @@ -169,8 +170,8 @@ pub async fn lookup_stub( //FIXME: podemos ponerle de nombre lookup_strategy y q
record_type: Qtype,
record_class: Qclass,
name_servers: Vec<(ClientUDPConnection, ClientTCPConnection)>,
waker: Option<Waker>,
referenced_query:Arc<std::sync::Mutex<Pin<Box<dyn futures_util::Future<Output = Result<DnsMessage, ResolverError>> + Send>>>>,
// waker: Option<Waker>,
// referenced_query:Arc<std::sync::Mutex<Pin<Box<dyn futures_util::Future<Output = Result<DnsMessage, ResolverError>> + Send>>>>,
config: ResolverConfig,
) -> Result<DnsMessage,ResolverError>{

Expand Down Expand Up @@ -207,7 +208,7 @@ pub async fn lookup_stub( //FIXME: podemos ponerle de nombre lookup_strategy y q
break;
}

result_dns_msg = send_query_resolver_by_protocol(config.get_protocol(),new_query.clone(), result_dns_msg.clone(), connections);
result_dns_msg = send_query_resolver_by_protocol(config.get_protocol(),new_query.clone(), result_dns_msg.clone(), connections).await;
if result_dns_msg.is_err(){
retry_count = retry_count + 1;
}
Expand All @@ -222,9 +223,9 @@ pub async fn lookup_stub( //FIXME: podemos ponerle de nombre lookup_strategy y q
}

// Wake up task
if let Some(waker) = waker {
waker.wake();
}
// if let Some(waker) = waker {
// waker.wake();
// }

let response_dns_msg = match result_dns_msg.clone() {
Ok(response_message) => response_message,
Expand All @@ -237,8 +238,8 @@ pub async fn lookup_stub( //FIXME: podemos ponerle de nombre lookup_strategy y q
}
Err(_) => response,
};
let mut future_query = referenced_query.lock().unwrap();
*future_query = future::ready(Ok(response_dns_msg)).boxed();
// let mut future_query = referenced_query.lock().unwrap();
// *future_query = future::ready(Ok(response_dns_msg)).boxed();

result_dns_msg
}
Expand All @@ -250,20 +251,25 @@ pub async fn lookup_stub( //FIXME: podemos ponerle de nombre lookup_strategy y q
/// it sends the query using the corresponding connection and updates the result
/// with the parsed response.
fn send_query_resolver_by_protocol(protocol: ConnectionProtocol,query:DnsMessage,mut result_dns_msg: Result<DnsMessage, ResolverError>, connections: &(ClientUDPConnection , ClientTCPConnection))
async fn send_query_resolver_by_protocol(
protocol: ConnectionProtocol,
query:DnsMessage,
mut result_dns_msg: Result<DnsMessage, ResolverError>,
connections: &(ClientUDPConnection , ClientTCPConnection)
)
-> Result<DnsMessage, ResolverError>{
let query_id = query.get_query_id();
match protocol{
ConnectionProtocol::UDP => {
let result_response = connections.0.send(query.clone());
let result_response = connections.0.send(query.clone()).await;
result_dns_msg = parse_response(result_response,query_id);
}
ConnectionProtocol::TCP => {
let result_response = connections.1.send(query.clone());
let result_response = connections.1.send(query.clone()).await;
result_dns_msg = parse_response(result_response,query_id);
}
_ => {},
}
};

result_dns_msg
}
Expand Down Expand Up @@ -357,9 +363,7 @@ mod async_resolver_test {

#[tokio::test]
async fn lookup_stub_a_response() {
let domain_name = DomainName::new_from_string("example.com".to_string());
let waker = None;
let query = Arc::new(Mutex::new(future::err(ResolverError::EmptyQuery).boxed()));
let domain_name: DomainName = DomainName::new_from_string("example.com".to_string());

let google_server:IpAddr = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8));
let timeout: Duration = Duration::from_secs(20);
Expand All @@ -371,19 +375,25 @@ mod async_resolver_test {
let record_type = Qtype::A;
let record_class = Qclass::IN;
let name_servers = vec![(conn_udp,conn_tcp)];
let response = lookup_stub(domain_name,record_type,record_class, name_servers, waker,query,config).await.unwrap();
let response = lookup_stub(
domain_name,
record_type,
record_class,
name_servers,
// waker,
// query,
config).await.unwrap();

assert_eq!(response.get_header().get_qr(),true);
assert_ne!(response.get_answer().len(),0);
println!("response {:?}",response);

// assert_eq!(response.get_header().get_qr(),true);
// assert_ne!(response.get_answer().len(),0);
}

#[tokio::test]
async fn lookup_stub_ns_response() {
let domain_name = DomainName::new_from_string("example.com".to_string());
let waker = None;

let query = Arc::new(Mutex::new(future::err(ResolverError::EmptyQuery).boxed()));

// Create vect of name servers
let google_server:IpAddr = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8));
let timeout: Duration = Duration::from_secs(20);
Expand All @@ -396,7 +406,14 @@ mod async_resolver_test {
let record_class = Qclass::IN;

let name_servers = vec![(conn_udp,conn_tcp)];
let response = lookup_stub(domain_name, record_type, record_class,name_servers, waker,query,config).await.unwrap();
let response = lookup_stub(
domain_name,
record_type,
record_class,
name_servers,
// waker,
// query,
config).await.unwrap();

assert_eq!(response.get_header().get_qr(),true);
assert_ne!(response.get_answer().len(),0);
Expand All @@ -406,8 +423,6 @@ mod async_resolver_test {
#[tokio::test]
async fn lookup_stub_ch_response() {
let domain_name = DomainName::new_from_string("example.com".to_string());
let waker = None;
let query = Arc::new(Mutex::new(future::err(ResolverError::EmptyQuery).boxed()));

let google_server:IpAddr = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8));
let timeout: Duration = Duration::from_secs(20);
Expand All @@ -419,7 +434,13 @@ mod async_resolver_test {
let record_type = Qtype::A;
let record_class = Qclass::CH;
let name_servers = vec![(conn_udp,conn_tcp)];
let response = lookup_stub(domain_name,record_type,record_class, name_servers, waker,query,config).await.unwrap();
let response = lookup_stub(
domain_name,
record_type,
record_class,
name_servers,
config
).await.unwrap();


assert_eq!(response.get_header().get_qr(),true);
Expand All @@ -431,8 +452,6 @@ mod async_resolver_test {
let max_retries =0;

let domain_name = DomainName::new_from_string("example.com".to_string());
let waker = None;
let query = Arc::new(Mutex::new(future::err(ResolverError::EmptyQuery).boxed()));
let timeout = Duration::from_secs(2);
let record_type = Qtype::A;
let record_class = Qclass::IN;
Expand All @@ -452,7 +471,7 @@ mod async_resolver_test {
config.set_name_servers(vec![(conn_udp_non,conn_tcp_non), (conn_udp_google,conn_tcp_google)]);

let name_servers =vec![(conn_udp_non,conn_tcp_non), (conn_udp_google,conn_tcp_google)];
let response = lookup_stub(domain_name, record_type, record_class,name_servers, waker,query,config).await;
let response = lookup_stub(domain_name, record_type, record_class,name_servers,config).await;
println!("response {:?}",response);

assert!(response.is_err())
Expand All @@ -465,8 +484,6 @@ mod async_resolver_test {
let max_retries = 1;

let domain_name = DomainName::new_from_string("example.com".to_string());
let waker = None;
let query = Arc::new(Mutex::new(future::err(ResolverError::EmptyQuery).boxed()));
let timeout = Duration::from_secs(2);
let record_type = Qtype::A;
let record_class = Qclass::IN;
Expand All @@ -486,7 +503,7 @@ mod async_resolver_test {
config.set_name_servers(vec![(conn_udp_non,conn_tcp_non), (conn_udp_google,conn_tcp_google)]);

let name_servers =vec![(conn_udp_non,conn_tcp_non), (conn_udp_google,conn_tcp_google)];
let response = lookup_stub(domain_name, record_type, record_class,name_servers, waker,query,config).await.unwrap();
let response = lookup_stub(domain_name, record_type, record_class,name_servers,config).await.unwrap();
println!("response {:?}",response);

assert!(response.get_answer().len() > 0);
Expand Down Expand Up @@ -679,4 +696,6 @@ mod async_resolver_test {
}
}
*/
}
}


0 comments on commit ff2f91e

Please sign in to comment.