Skip to content

Commit af09583

Browse files
analogrelayheaths
andauthored
[Cosmos] Refactor query responses to their own Pager and Page types (#2393)
* detach Cosmos "feed" responses from HTTP types * revert rename of PagerResult field * move feed module out of models * fix warnings * update CHANGELOG * fix doc comment link * Update sdk/cosmos/azure_data_cosmos/CHANGELOG.md Co-authored-by: Heath Stewart <[email protected]> --------- Co-authored-by: Heath Stewart <[email protected]>
1 parent 8312f69 commit af09583

File tree

13 files changed

+187
-95
lines changed

13 files changed

+187
-95
lines changed

sdk/core/azure_core/src/http/pager.rs

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,12 @@ use typespec::Error;
1010
#[derive(Debug)]
1111
pub enum PagerResult<T, C> {
1212
/// The [`Pager`] may fetch additional pages with the included `continuation` token.
13-
Continue {
14-
response: Response<T>,
15-
continuation: C,
16-
},
13+
Continue { response: T, continuation: C },
1714
/// The [`Pager`] is complete and there are no additional pages to fetch.
18-
Complete { response: Response<T> },
15+
Complete { response: T },
1916
}
2017

21-
impl<T> PagerResult<T, String> {
18+
impl<T> PagerResult<Response<T>, String> {
2219
/// Creates a [`PagerResult<T, C>`] from the provided response, extracting the continuation value from the provided header.
2320
///
2421
/// If the provided response has a header with the matching name, this returns [`PagerResult::Continue`], using the value from the header as the continuation.
@@ -34,29 +31,33 @@ impl<T> PagerResult<T, String> {
3431
}
3532
}
3633

37-
/// Represents a paginated result across multiple requests.
34+
/// Represents a paginated stream of results generated through HTTP requests to a service.
35+
///
36+
/// Specifically, this is a [`PageStream`] that yields [`Response<T>`] values.
37+
pub type Pager<T> = PageStream<Response<T>>;
38+
39+
/// Represents a paginated stream of results from a service.
3840
#[pin_project::pin_project]
39-
pub struct Pager<T> {
41+
pub struct PageStream<T> {
4042
#[pin]
4143
#[cfg(not(target_arch = "wasm32"))]
42-
stream: Pin<Box<dyn Stream<Item = Result<Response<T>, Error>> + Send>>,
44+
stream: Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>,
4345

4446
#[pin]
4547
#[cfg(target_arch = "wasm32")]
46-
stream: Pin<Box<dyn Stream<Item = Result<Response<T>, Error>>>>,
48+
stream: Pin<Box<dyn Stream<Item = Result<T, Error>>>>,
4749
}
4850

49-
impl<T> Pager<T> {
51+
impl<T> PageStream<T> {
5052
/// Creates a [`Pager<T>`] from a callback that will be called repeatedly to request each page.
5153
///
52-
/// This method expect a callback that accepts a single `Option<C>` parameter, and returns a `(Response<T>, Option<C>)` tuple, asynchronously.
53-
/// The `C` type parameter is the type of the continuation. It may be any [`Send`]able type.
54-
/// The result will be an asynchronous stream of [`Result<Response<T>>`](typespec::Result<Response<T>>) values.
54+
/// This method expect a callback that accepts a single `Option<C>` parameter, and returns a [`PagerResult<T, C>`] value, asynchronously.
55+
/// The `C` type parameter is the type of the continuation/state. It may be any [`Send`]able type.
56+
/// The result will be an asynchronous stream of [`Result<T>`](typespec::Result<T>) values.
5557
///
5658
/// The first time your callback is called, it will be called with [`Option::None`], indicating no continuation value is present.
5759
/// Your callback must return one of:
58-
/// * `Ok((response, Some(continuation)))` - The request succeeded, and return a response `response` and a continuation value `continuation`. The response will be yielded to the stream and the callback will be called again immediately with `Some(continuation)`.
59-
/// * `Ok((response, None))` - The request succeeded, and there are no more pages. The response will be yielded to the stream, the stream will end, and the callback will not be called again.
60+
/// * `Ok(result)` - The request succeeded, and the provided [`PagerResult`] indicates the value to return and if there are more pages.
6061
/// * `Err(..)` - The request failed. The error will be yielded to the stream, the stream will end, and the callback will not be called again.
6162
///
6263
/// ## Examples
@@ -102,7 +103,7 @@ impl<T> Pager<T> {
102103
State::Continuation(c) => make_request(Some(c)).await,
103104
State::Done => return None,
104105
};
105-
let (response, next_state) = match result {
106+
let (item, next_state) = match result {
106107
Err(e) => return Some((Err(e), (State::Done, make_request))),
107108
Ok(PagerResult::Continue {
108109
response,
@@ -112,7 +113,7 @@ impl<T> Pager<T> {
112113
};
113114

114115
// Flow 'make_request' through to avoid cloning
115-
Some((response, (next_state, make_request)))
116+
Some((item, (next_state, make_request)))
116117
},
117118
);
118119
Self {
@@ -121,8 +122,8 @@ impl<T> Pager<T> {
121122
}
122123
}
123124

124-
impl<T> futures::Stream for Pager<T> {
125-
type Item = Result<Response<T>, Error>;
125+
impl<T> futures::Stream for PageStream<T> {
126+
type Item = Result<T, Error>;
126127

127128
fn poll_next(
128129
self: Pin<&mut Self>,
@@ -132,7 +133,7 @@ impl<T> futures::Stream for Pager<T> {
132133
}
133134
}
134135

135-
impl<T> std::fmt::Debug for Pager<T> {
136+
impl<T> std::fmt::Debug for PageStream<T> {
136137
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137138
f.debug_struct("Pager").finish_non_exhaustive()
138139
}

sdk/cosmos/azure_data_cosmos/CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,19 @@
11
# Release History
22

3+
## 0.23.0 (Unreleased)
4+
5+
### Features Added
6+
7+
* Decoupled query responses from HTTP to allow for handling non-HTTP transports for queries. ([#2393](https://github.com/Azure/azure-sdk-for-rust/pull/2393))
8+
9+
### Breaking Changes
10+
11+
* Query APIs (`CosmosClient::query_databases`, `DatabaseClient::query_containers`, `ContainerClient::query_items`) now return a `FeedPager` instead of an `azure_core::Pager`. The `FeedPager` type provides an abstraction over the transport layer, allowing for more flexibility when queries are executed over non-HTTP transports or are decoupled from specific HTTP responses (such as in cross-partition queries). ([#2393](https://github.com/Azure/azure-sdk-for-rust/pull/2393))
12+
13+
### Bugs Fixed
14+
15+
### Other Changes
16+
317
## 0.22.1 (2025-03-05)
418

519
### Bugs Fixed

sdk/cosmos/azure_data_cosmos/examples/cosmos/query.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::error::Error;
22

33
use azure_data_cosmos::{CosmosClient, PartitionKey};
44
use clap::{Args, Subcommand};
5-
use futures::StreamExt;
5+
use futures::TryStreamExt;
66

77
/// Run a single-partition query against a container.
88
#[derive(Clone, Args)]
@@ -56,11 +56,10 @@ impl QueryCommand {
5656
let mut items =
5757
container_client.query_items::<serde_json::Value>(&query, pk, None)?;
5858

59-
while let Some(page) = items.next().await {
60-
let page = page?.into_body().await?;
59+
while let Some(page) = items.try_next().await? {
6160
println!("Results Page");
6261
println!(" Items:");
63-
for item in page.items {
62+
for item in page.into_items() {
6463
println!(" * {:#?}", item);
6564
}
6665
}
@@ -69,11 +68,10 @@ impl QueryCommand {
6968
Subcommands::Databases { query } => {
7069
let mut dbs = client.query_databases(query, None)?;
7170

72-
while let Some(page) = dbs.next().await {
73-
let page = page?.into_body().await?;
71+
while let Some(page) = dbs.try_next().await? {
7472
println!("Results Page");
7573
println!(" Databases:");
76-
for item in page.databases {
74+
for item in page.into_items() {
7775
println!(" * {:#?}", item);
7876
}
7977
}
@@ -83,11 +81,10 @@ impl QueryCommand {
8381
let db_client = client.database_client(&database);
8482
let mut dbs = db_client.query_containers(query, None)?;
8583

86-
while let Some(page) = dbs.next().await {
87-
let page = page?.into_body().await?;
84+
while let Some(page) = dbs.try_next().await? {
8885
println!("Results Page");
8986
println!(" Containers:");
90-
for item in page.containers {
87+
for item in page.into_items() {
9188
println!(" * {:#?}", item);
9289
}
9390
}

sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33

44
use crate::{
55
constants,
6-
models::{ContainerProperties, PatchDocument, QueryResults, ThroughputProperties},
6+
models::{ContainerProperties, PatchDocument, ThroughputProperties},
77
options::{QueryOptions, ReadContainerOptions},
88
pipeline::CosmosPipeline,
99
resource_context::{ResourceLink, ResourceType},
10-
DeleteContainerOptions, ItemOptions, PartitionKey, Query, QueryPartitionStrategy,
10+
DeleteContainerOptions, FeedPager, ItemOptions, PartitionKey, Query, QueryPartitionStrategy,
1111
ReplaceContainerOptions, ThroughputOptions,
1212
};
1313

14-
use azure_core::http::{headers, request::Request, response::Response, Method, Pager};
14+
use azure_core::http::{headers, request::Request, response::Response, Method};
1515
use serde::{de::DeserializeOwned, Serialize};
1616

1717
/// A client for working with a specific container in a Cosmos DB account.
@@ -671,7 +671,7 @@ impl ContainerClient {
671671
query: impl Into<Query>,
672672
partition_key: impl Into<QueryPartitionStrategy>,
673673
options: Option<QueryOptions<'_>>,
674-
) -> azure_core::Result<Pager<QueryResults<T>>> {
674+
) -> azure_core::Result<FeedPager<T>> {
675675
let options = options.unwrap_or_default();
676676
let url = self.pipeline.url(&self.items_link);
677677
let mut base_request = Request::new(url, Method::Post);

sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33

44
use crate::{
55
clients::DatabaseClient,
6-
models::{DatabaseProperties, DatabaseQueryResults},
6+
models::DatabaseProperties,
77
pipeline::{AuthorizationPolicy, CosmosPipeline},
88
resource_context::{ResourceLink, ResourceType},
9-
CosmosClientOptions, CreateDatabaseOptions, Query, QueryDatabasesOptions,
9+
CosmosClientOptions, CreateDatabaseOptions, FeedPager, Query, QueryDatabasesOptions,
1010
};
1111
use azure_core::{
1212
credentials::TokenCredential,
13-
http::{request::Request, response::Response, Method, Pager, Url},
13+
http::{request::Request, response::Response, Method, Url},
1414
};
1515
use serde::Serialize;
1616
use std::sync::Arc;
@@ -132,7 +132,7 @@ impl CosmosClient {
132132
&self,
133133
query: impl Into<Query>,
134134
options: Option<QueryDatabasesOptions<'_>>,
135-
) -> azure_core::Result<Pager<DatabaseQueryResults>> {
135+
) -> azure_core::Result<FeedPager<DatabaseProperties>> {
136136
let options = options.unwrap_or_default();
137137
let url = self.pipeline.url(&self.databases_link);
138138
let base_request = Request::new(url, Method::Post);

sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,15 @@
33

44
use crate::{
55
clients::ContainerClient,
6-
models::{
7-
ContainerProperties, ContainerQueryResults, DatabaseProperties, ThroughputProperties,
8-
},
6+
models::{ContainerProperties, DatabaseProperties, ThroughputProperties},
97
options::ReadDatabaseOptions,
108
pipeline::CosmosPipeline,
119
resource_context::{ResourceLink, ResourceType},
12-
CreateContainerOptions, DeleteDatabaseOptions, Query, QueryContainersOptions,
10+
CreateContainerOptions, DeleteDatabaseOptions, FeedPager, Query, QueryContainersOptions,
1311
ThroughputOptions,
1412
};
1513

16-
use azure_core::http::{request::Request, response::Response, Method, Pager};
14+
use azure_core::http::{request::Request, response::Response, Method};
1715

1816
/// A client for working with a specific database in a Cosmos DB account.
1917
///
@@ -109,7 +107,7 @@ impl DatabaseClient {
109107
&self,
110108
query: impl Into<Query>,
111109
options: Option<QueryContainersOptions<'_>>,
112-
) -> azure_core::Result<Pager<ContainerQueryResults>> {
110+
) -> azure_core::Result<FeedPager<ContainerProperties>> {
113111
let options = options.unwrap_or_default();
114112
let url = self.pipeline.url(&self.containers_link);
115113
let base_request = Request::new(url, Method::Post);
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use azure_core::http::{headers::Headers, PageStream, PagerResult, Response};
2+
use serde::{de::DeserializeOwned, Deserialize};
3+
4+
use crate::constants;
5+
6+
/// Represents a single page of results from a Cosmos DB feed.
7+
///
8+
/// A feed could be a list of items, databases, containers, etc.
9+
/// The feed may represent a single-partition or cross-partition query.
10+
///
11+
/// Cosmos DB queries can be executed using non-HTTP transports, depending on the circumstances.
12+
/// They may also produce results that don't directly correlate to specific HTTP responses (as in the case of cross-partition queries).
13+
/// Because of this, Cosmos DB query responses use `FeedPage` to represent the results, rather than a more generic type like [`Response`](azure_core::http::Response).
14+
pub struct FeedPage<T> {
15+
/// The items in the response.
16+
items: Vec<T>,
17+
18+
/// The continuation token for the next page of results.
19+
continuation: Option<String>,
20+
21+
/// Response headers from the server for this page of results.
22+
/// In a cross-partition query, these headers may be missing on some pages.
23+
headers: Headers,
24+
}
25+
26+
impl<T> FeedPage<T> {
27+
/// Gets the items in this page of results.
28+
pub fn items(&self) -> &[T] {
29+
&self.items
30+
}
31+
32+
/// Consumes the page and returns a vector of the items.
33+
///
34+
/// This is essentially shorthand for `self.deconstruct().0`.
35+
pub fn into_items(self) -> Vec<T> {
36+
self.items
37+
}
38+
39+
/// Deconstructs the page into its components.
40+
pub fn deconstruct(self) -> (Vec<T>, Option<String>, Headers) {
41+
(self.items, self.continuation, self.headers)
42+
}
43+
44+
/// Gets the continuation token for the next page of results, if any.
45+
pub fn continuation(&self) -> Option<&str> {
46+
self.continuation.as_deref()
47+
}
48+
49+
/// Gets any headers returned by the server for this page of results.
50+
pub fn headers(&self) -> &Headers {
51+
&self.headers
52+
}
53+
}
54+
55+
impl<T> From<FeedPage<T>> for PagerResult<FeedPage<T>, String> {
56+
fn from(value: FeedPage<T>) -> Self {
57+
let continuation = value.continuation.clone();
58+
match continuation {
59+
Some(continuation) => PagerResult::Continue {
60+
response: value,
61+
continuation,
62+
},
63+
None => PagerResult::Complete { response: value },
64+
}
65+
}
66+
}
67+
68+
#[derive(Deserialize)]
69+
struct FeedBody<T> {
70+
#[serde(alias = "Documents")]
71+
#[serde(alias = "DocumentCollections")]
72+
#[serde(alias = "Databases")]
73+
#[serde(alias = "Offers")]
74+
items: Vec<T>,
75+
}
76+
77+
impl<T: DeserializeOwned> FeedPage<T> {
78+
pub(crate) async fn from_response(response: Response) -> azure_core::Result<Self> {
79+
let headers = response.headers().clone();
80+
let continuation = headers.get_optional_string(&constants::CONTINUATION);
81+
let body: FeedBody<T> = response.into_json_body::<FeedBody<T>>().await?;
82+
83+
Ok(Self {
84+
items: body.items,
85+
continuation,
86+
headers,
87+
})
88+
}
89+
}
90+
91+
/// Represents a stream of pages from a Cosmos DB feed.
92+
///
93+
/// See [`FeedPage`] for more details on Cosmos DB feeds.
94+
pub type FeedPager<T> = PageStream<FeedPage<T>>;

sdk/cosmos/azure_data_cosmos/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
pub mod clients;
1414
pub mod constants;
15+
mod feed;
1516
mod options;
1617
mod partition_key;
1718
pub(crate) mod pipeline;
@@ -27,3 +28,5 @@ pub use clients::CosmosClient;
2728
pub use options::*;
2829
pub use partition_key::*;
2930
pub use query::*;
31+
32+
pub use feed::{FeedPage, FeedPager};

0 commit comments

Comments
 (0)