Skip to content

[Cosmos] Refactor query responses to their own Pager and Page types #2393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions sdk/core/azure_core/src/http/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ use typespec::Error;
#[derive(Debug)]
pub enum PagerResult<T, C> {
/// The [`Pager`] may fetch additional pages with the included `continuation` token.
Continue {
response: Response<T>,
continuation: C,
},
Continue { response: T, continuation: C },
/// The [`Pager`] is complete and there are no additional pages to fetch.
Complete { response: Response<T> },
Complete { response: T },
}

impl<T> PagerResult<T, String> {
impl<T> PagerResult<Response<T>, String> {
/// Creates a [`PagerResult<T, C>`] from the provided response, extracting the continuation value from the provided header.
///
/// If the provided response has a header with the matching name, this returns [`PagerResult::Continue`], using the value from the header as the continuation.
Expand All @@ -34,29 +31,33 @@ impl<T> PagerResult<T, String> {
}
}

/// Represents a paginated result across multiple requests.
/// Represents a paginated stream of results generated through HTTP requests to a service.
///
/// Specifically, this is a [`PageStream`] that yields [`Response<T>`] values.
pub type Pager<T> = PageStream<Response<T>>;

/// Represents a paginated stream of results from a service.
#[pin_project::pin_project]
pub struct Pager<T> {
pub struct PageStream<T> {
#[pin]
#[cfg(not(target_arch = "wasm32"))]
stream: Pin<Box<dyn Stream<Item = Result<Response<T>, Error>> + Send>>,
stream: Pin<Box<dyn Stream<Item = Result<T, Error>> + Send>>,

#[pin]
#[cfg(target_arch = "wasm32")]
stream: Pin<Box<dyn Stream<Item = Result<Response<T>, Error>>>>,
stream: Pin<Box<dyn Stream<Item = Result<T, Error>>>>,
}

impl<T> Pager<T> {
impl<T> PageStream<T> {
/// Creates a [`Pager<T>`] from a callback that will be called repeatedly to request each page.
///
/// This method expect a callback that accepts a single `Option<C>` parameter, and returns a `(Response<T>, Option<C>)` tuple, asynchronously.
/// The `C` type parameter is the type of the continuation. It may be any [`Send`]able type.
/// The result will be an asynchronous stream of [`Result<Response<T>>`](typespec::Result<Response<T>>) values.
/// This method expect a callback that accepts a single `Option<C>` parameter, and returns a [`PagerResult<T, C>`] value, asynchronously.
/// The `C` type parameter is the type of the continuation/state. It may be any [`Send`]able type.
/// The result will be an asynchronous stream of [`Result<T>`](typespec::Result<T>) values.
///
/// The first time your callback is called, it will be called with [`Option::None`], indicating no continuation value is present.
/// Your callback must return one of:
/// * `Ok((response, Some(continuation)))` - The request succeeded, and return a response `response` and a continuation value `continuation`. The response will be yielded to the stream and the callback will be called again immediately with `Some(continuation)`.
/// * `Ok((response, None))` - The request succeeded, and there are no more pages. The response will be yielded to the stream, the stream will end, and the callback will not be called again.
/// * `Ok(result)` - The request succeeded, and the provided [`PagerResult`] indicates the value to return and if there are more pages.
/// * `Err(..)` - The request failed. The error will be yielded to the stream, the stream will end, and the callback will not be called again.
///
/// ## Examples
Expand Down Expand Up @@ -102,7 +103,7 @@ impl<T> Pager<T> {
State::Continuation(c) => make_request(Some(c)).await,
State::Done => return None,
};
let (response, next_state) = match result {
let (item, next_state) = match result {
Err(e) => return Some((Err(e), (State::Done, make_request))),
Ok(PagerResult::Continue {
response,
Expand All @@ -112,7 +113,7 @@ impl<T> Pager<T> {
};

// Flow 'make_request' through to avoid cloning
Some((response, (next_state, make_request)))
Some((item, (next_state, make_request)))
},
);
Self {
Expand All @@ -121,8 +122,8 @@ impl<T> Pager<T> {
}
}

impl<T> futures::Stream for Pager<T> {
type Item = Result<Response<T>, Error>;
impl<T> futures::Stream for PageStream<T> {
type Item = Result<T, Error>;

fn poll_next(
self: Pin<&mut Self>,
Expand All @@ -132,7 +133,7 @@ impl<T> futures::Stream for Pager<T> {
}
}

impl<T> std::fmt::Debug for Pager<T> {
impl<T> std::fmt::Debug for PageStream<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pager").finish_non_exhaustive()
}
Expand Down
14 changes: 14 additions & 0 deletions sdk/cosmos/azure_data_cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Release History

## 0.23.0 (Unreleased)

### Features Added

* Decoupled query responses from HTTP to allow for handling non-HTTP transports for queries. ([#2393](https://github.com/Azure/azure-sdk-for-rust/pull/2393))

### Breaking Changes

* Query APIs (`CosmosClient::query_databases`, `DatabaseClient::query_containers`, `ContainerClient::query_items`) now return a `FeedPager` instead of an `azure_core::Pager`. The `FeedPager` type provides an abstraction over the transport layer, allowing for more flexibility when queries are executed over non-HTTP transports or are decoupled from specific HTTP responses (such as in cross-partition queries). ([#2393](https://github.com/Azure/azure-sdk-for-rust/pull/2393))

### Bugs Fixed

### Other Changes

## 0.22.1 (2025-03-05)

### Bugs Fixed
Expand Down
17 changes: 7 additions & 10 deletions sdk/cosmos/azure_data_cosmos/examples/cosmos/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::error::Error;

use azure_data_cosmos::{CosmosClient, PartitionKey};
use clap::{Args, Subcommand};
use futures::StreamExt;
use futures::TryStreamExt;

/// Run a single-partition query against a container.
#[derive(Clone, Args)]
Expand Down Expand Up @@ -56,11 +56,10 @@ impl QueryCommand {
let mut items =
container_client.query_items::<serde_json::Value>(&query, pk, None)?;

while let Some(page) = items.next().await {
let page = page?.into_body().await?;
while let Some(page) = items.try_next().await? {
println!("Results Page");
println!(" Items:");
for item in page.items {
for item in page.into_items() {
println!(" * {:#?}", item);
}
}
Expand All @@ -69,11 +68,10 @@ impl QueryCommand {
Subcommands::Databases { query } => {
let mut dbs = client.query_databases(query, None)?;

while let Some(page) = dbs.next().await {
let page = page?.into_body().await?;
while let Some(page) = dbs.try_next().await? {
println!("Results Page");
println!(" Databases:");
for item in page.databases {
for item in page.into_items() {
println!(" * {:#?}", item);
}
}
Expand All @@ -83,11 +81,10 @@ impl QueryCommand {
let db_client = client.database_client(&database);
let mut dbs = db_client.query_containers(query, None)?;

while let Some(page) = dbs.next().await {
let page = page?.into_body().await?;
while let Some(page) = dbs.try_next().await? {
println!("Results Page");
println!(" Containers:");
for item in page.containers {
for item in page.into_items() {
println!(" * {:#?}", item);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

use crate::{
constants,
models::{ContainerProperties, PatchDocument, QueryResults, ThroughputProperties},
models::{ContainerProperties, PatchDocument, ThroughputProperties},
options::{QueryOptions, ReadContainerOptions},
pipeline::CosmosPipeline,
resource_context::{ResourceLink, ResourceType},
DeleteContainerOptions, ItemOptions, PartitionKey, Query, QueryPartitionStrategy,
DeleteContainerOptions, FeedPager, ItemOptions, PartitionKey, Query, QueryPartitionStrategy,
ReplaceContainerOptions, ThroughputOptions,
};

use azure_core::http::{headers, request::Request, response::Response, Method, Pager};
use azure_core::http::{headers, request::Request, response::Response, Method};
use serde::{de::DeserializeOwned, Serialize};

/// A client for working with a specific container in a Cosmos DB account.
Expand Down Expand Up @@ -671,7 +671,7 @@ impl ContainerClient {
query: impl Into<Query>,
partition_key: impl Into<QueryPartitionStrategy>,
options: Option<QueryOptions<'_>>,
) -> azure_core::Result<Pager<QueryResults<T>>> {
) -> azure_core::Result<FeedPager<T>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.items_link);
let mut base_request = Request::new(url, Method::Post);
Expand Down
8 changes: 4 additions & 4 deletions sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

use crate::{
clients::DatabaseClient,
models::{DatabaseProperties, DatabaseQueryResults},
models::DatabaseProperties,
pipeline::{AuthorizationPolicy, CosmosPipeline},
resource_context::{ResourceLink, ResourceType},
CosmosClientOptions, CreateDatabaseOptions, Query, QueryDatabasesOptions,
CosmosClientOptions, CreateDatabaseOptions, FeedPager, Query, QueryDatabasesOptions,
};
use azure_core::{
credentials::TokenCredential,
http::{request::Request, response::Response, Method, Pager, Url},
http::{request::Request, response::Response, Method, Url},
};
use serde::Serialize;
use std::sync::Arc;
Expand Down Expand Up @@ -132,7 +132,7 @@ impl CosmosClient {
&self,
query: impl Into<Query>,
options: Option<QueryDatabasesOptions<'_>>,
) -> azure_core::Result<Pager<DatabaseQueryResults>> {
) -> azure_core::Result<FeedPager<DatabaseProperties>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.databases_link);
let base_request = Request::new(url, Method::Post);
Expand Down
10 changes: 4 additions & 6 deletions sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@

use crate::{
clients::ContainerClient,
models::{
ContainerProperties, ContainerQueryResults, DatabaseProperties, ThroughputProperties,
},
models::{ContainerProperties, DatabaseProperties, ThroughputProperties},
options::ReadDatabaseOptions,
pipeline::CosmosPipeline,
resource_context::{ResourceLink, ResourceType},
CreateContainerOptions, DeleteDatabaseOptions, Query, QueryContainersOptions,
CreateContainerOptions, DeleteDatabaseOptions, FeedPager, Query, QueryContainersOptions,
ThroughputOptions,
};

use azure_core::http::{request::Request, response::Response, Method, Pager};
use azure_core::http::{request::Request, response::Response, Method};

/// A client for working with a specific database in a Cosmos DB account.
///
Expand Down Expand Up @@ -109,7 +107,7 @@ impl DatabaseClient {
&self,
query: impl Into<Query>,
options: Option<QueryContainersOptions<'_>>,
) -> azure_core::Result<Pager<ContainerQueryResults>> {
) -> azure_core::Result<FeedPager<ContainerProperties>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.containers_link);
let base_request = Request::new(url, Method::Post);
Expand Down
94 changes: 94 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/feed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use azure_core::http::{headers::Headers, PageStream, PagerResult, Response};
use serde::{de::DeserializeOwned, Deserialize};

use crate::constants;

/// Represents a single page of results from a Cosmos DB feed.
///
/// A feed could be a list of items, databases, containers, etc.
/// The feed may represent a single-partition or cross-partition query.
///
/// Cosmos DB queries can be executed using non-HTTP transports, depending on the circumstances.
/// They may also produce results that don't directly correlate to specific HTTP responses (as in the case of cross-partition queries).
/// Because of this, Cosmos DB query responses use `FeedPage` to represent the results, rather than a more generic type like [`Response`](azure_core::http::Response).
pub struct FeedPage<T> {
/// The items in the response.
items: Vec<T>,

/// The continuation token for the next page of results.
continuation: Option<String>,

/// Response headers from the server for this page of results.
/// In a cross-partition query, these headers may be missing on some pages.
headers: Headers,
}

impl<T> FeedPage<T> {
/// Gets the items in this page of results.
pub fn items(&self) -> &[T] {
&self.items
}

/// Consumes the page and returns a vector of the items.
///
/// This is essentially shorthand for `self.deconstruct().0`.
pub fn into_items(self) -> Vec<T> {
self.items
}

/// Deconstructs the page into its components.
pub fn deconstruct(self) -> (Vec<T>, Option<String>, Headers) {
(self.items, self.continuation, self.headers)
}

/// Gets the continuation token for the next page of results, if any.
pub fn continuation(&self) -> Option<&str> {
self.continuation.as_deref()
}

/// Gets any headers returned by the server for this page of results.
pub fn headers(&self) -> &Headers {
&self.headers
}
}

impl<T> From<FeedPage<T>> for PagerResult<FeedPage<T>, String> {
fn from(value: FeedPage<T>) -> Self {
let continuation = value.continuation.clone();
match continuation {
Some(continuation) => PagerResult::Continue {
response: value,
continuation,
},
None => PagerResult::Complete { response: value },
}
}
}

#[derive(Deserialize)]
struct FeedBody<T> {
#[serde(alias = "Documents")]
#[serde(alias = "DocumentCollections")]
#[serde(alias = "Databases")]
#[serde(alias = "Offers")]
items: Vec<T>,
}

impl<T: DeserializeOwned> FeedPage<T> {
pub(crate) async fn from_response(response: Response) -> azure_core::Result<Self> {
let headers = response.headers().clone();
let continuation = headers.get_optional_string(&constants::CONTINUATION);
let body: FeedBody<T> = response.into_json_body::<FeedBody<T>>().await?;

Ok(Self {
items: body.items,
continuation,
headers,
})
}
}

/// Represents a stream of pages from a Cosmos DB feed.
///
/// See [`FeedPage`] for more details on Cosmos DB feeds.
pub type FeedPager<T> = PageStream<FeedPage<T>>;
3 changes: 3 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

pub mod clients;
pub mod constants;
mod feed;
mod options;
mod partition_key;
pub(crate) mod pipeline;
Expand All @@ -27,3 +28,5 @@ pub use clients::CosmosClient;
pub use options::*;
pub use partition_key::*;
pub use query::*;

pub use feed::{FeedPage, FeedPager};
Loading