diff --git a/azure-cosmosdb-get-started/pom.xml b/azure-cosmosdb-get-started/pom.xml index f54bce5..bb2f929 100644 --- a/azure-cosmosdb-get-started/pom.xml +++ b/azure-cosmosdb-get-started/pom.xml @@ -42,8 +42,8 @@ com.microsoft.azure - azure-cosmosdb - 2.2.0 + azure-cosmos + 3.0.0 org.slf4j diff --git a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java index ff0ced1..28cf3b4 100644 --- a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java +++ b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java @@ -24,7 +24,7 @@ public class Families { - public static Family getAndersenFamilyDocument() { + public static Family getAndersenFamilyItem() { Family andersenFamily = new Family(); andersenFamily.setId("Andersen-" + System.currentTimeMillis()); andersenFamily.setLastName("Andersen"); @@ -59,7 +59,7 @@ public static Family getAndersenFamilyDocument() { return andersenFamily; } - public static Family getWakefieldFamilyDocument() { + public static Family getWakefieldFamilyItem() { Family wakefieldFamily = new Family(); wakefieldFamily.setId("Wakefield-" + System.currentTimeMillis()); wakefieldFamily.setLastName("Wakefield"); @@ -106,7 +106,7 @@ public static Family getWakefieldFamilyDocument() { return wakefieldFamily; } - public static Family getJohnsonFamilyDocument() { + public static Family getJohnsonFamilyItem() { Family andersenFamily = new Family(); andersenFamily.setId("Johnson-" + System.currentTimeMillis()); andersenFamily.setLastName("Johnson"); @@ -120,7 +120,7 @@ public static Family getJohnsonFamilyDocument() { return andersenFamily; } - public static Family getSmithFamilyDocument() { + public static Family getSmithFamilyItem() { Family andersenFamily = new Family(); andersenFamily.setId("Smith-" + System.currentTimeMillis()); andersenFamily.setLastName("Smith"); diff --git a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java index bb6bf74..a065994 100644 --- a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java +++ b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java @@ -23,44 +23,47 @@ package com.microsoft.azure.cosmosdb.sample; -import com.microsoft.azure.cosmosdb.ConnectionPolicy; -import com.microsoft.azure.cosmosdb.ConsistencyLevel; -import com.microsoft.azure.cosmosdb.Database; -import com.microsoft.azure.cosmosdb.Document; -import com.microsoft.azure.cosmosdb.DocumentClientException; -import com.microsoft.azure.cosmosdb.DocumentCollection; -import com.microsoft.azure.cosmosdb.FeedOptions; -import com.microsoft.azure.cosmosdb.FeedResponse; -import com.microsoft.azure.cosmosdb.RequestOptions; -import com.microsoft.azure.cosmosdb.ResourceResponse; -import com.microsoft.azure.cosmosdb.SqlParameter; -import com.microsoft.azure.cosmosdb.SqlParameterCollection; -import com.microsoft.azure.cosmosdb.SqlQuerySpec; -import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; -import rx.Observable; -import rx.Scheduler; -import rx.schedulers.Schedulers; +import com.azure.data.cosmos.ConnectionPolicy; +import com.azure.data.cosmos.ConsistencyLevel; +import com.azure.data.cosmos.CosmosClient; +import com.azure.data.cosmos.CosmosClientException; +import com.azure.data.cosmos.CosmosContainer; +import com.azure.data.cosmos.CosmosContainerProperties; +import com.azure.data.cosmos.CosmosDatabase; +import com.azure.data.cosmos.CosmosDatabaseProperties; +import com.azure.data.cosmos.CosmosDatabaseResponse; +import com.azure.data.cosmos.CosmosItemProperties; +import com.azure.data.cosmos.CosmosItemResponse; +import com.azure.data.cosmos.FeedOptions; +import com.azure.data.cosmos.FeedResponse; +import com.azure.data.cosmos.PartitionKeyDefinition; +import com.azure.data.cosmos.SqlParameter; +import com.azure.data.cosmos.SqlParameterList; +import com.azure.data.cosmos.SqlQuerySpec; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class Main { - private final ExecutorService executorService; private final Scheduler scheduler; - private AsyncDocumentClient client; + private CosmosClient client; + private CosmosDatabase database; + private CosmosContainer container; private final String databaseName = "AzureSampleFamilyDB"; - private final String collectionName = "FamilyCollection"; + private final String containerName = "FamilyContainer"; public Main() { - executorService = Executors.newFixedThreadPool(100); // The SDK uses netty library for doing async IO operations. The IO operations are performed on the netty io threads. // The number of IO netty threads are limited; it is the same as the number of CPU cores. @@ -79,16 +82,15 @@ public Main() { // you should provide your own scheduler to switch thread. // the following scheduler is used for switching from netty thread to user app thread. - scheduler = Schedulers.from(executorService); + scheduler = Schedulers.elastic(); } public void close() { - executorService.shutdown(); client.close(); } /** - * Run a Hello DocumentDB console application. + * Run a Hello Azure Cosmos DB console application. * * @param args command line args. */ @@ -99,7 +101,7 @@ public static void main(String[] args) { p.getStartedDemo(); System.out.println(String.format("Demo complete, please hold while resources are released")); } catch (Exception e) { - System.err.println(String.format("DocumentDB GetStarted failed with %s", e)); + System.err.println(String.format("Cosmos DB GetStarted failed with %s", e)); } finally { System.out.println("close the client"); p.close(); @@ -110,18 +112,18 @@ public static void main(String[] args) { private void getStartedDemo() throws Exception { System.out.println("Using Azure Cosmos DB endpoint: " + AccountSettings.HOST); - client = new AsyncDocumentClient.Builder() - .withServiceEndpoint(AccountSettings.HOST) - .withMasterKeyOrResourceToken(AccountSettings.MASTER_KEY) - .withConnectionPolicy(ConnectionPolicy.GetDefault()) - .withConsistencyLevel(ConsistencyLevel.Eventual) + client = CosmosClient.builder() + .endpoint(AccountSettings.HOST) + .key(AccountSettings.MASTER_KEY) + .connectionPolicy(ConnectionPolicy.defaultPolicy()) + .consistencyLevel(ConsistencyLevel.EVENTUAL) .build(); createDatabaseIfNotExists(); - createDocumentCollectionIfNotExists(); + createContainerIfNotExists(); - Family andersenFamily = Families.getAndersenFamilyDocument(); - Family wakefieldFamily = Families.getWakefieldFamilyDocument(); + Family andersenFamily = Families.getAndersenFamilyItem(); + Family wakefieldFamily = Families.getWakefieldFamilyItem(); ArrayList familiesToCreate = new ArrayList<>(); familiesToCreate.add(andersenFamily); @@ -130,23 +132,23 @@ private void getStartedDemo() throws Exception { createFamiliesAndWaitForCompletion(familiesToCreate); familiesToCreate = new ArrayList<>(); - familiesToCreate.add(Families.getJohnsonFamilyDocument()); - familiesToCreate.add(Families.getSmithFamilyDocument()); + familiesToCreate.add(Families.getJohnsonFamilyItem()); + familiesToCreate.add(Families.getSmithFamilyItem()); - CountDownLatch createDocumentsCompletionLatch = new CountDownLatch(1); + CountDownLatch createItemsCompletionLatch = new CountDownLatch(1); - System.out.println("Creating documents async and registering listener for the completion."); - createFamiliesAsyncAndRegisterListener(familiesToCreate, createDocumentsCompletionLatch); + System.out.println("Creating items async and registering listener for the completion."); + createFamiliesAsyncAndRegisterListener(familiesToCreate, createItemsCompletionLatch); CountDownLatch queryCompletionLatch = new CountDownLatch(1); - System.out.println("Querying documents async and registering listener for the result."); + System.out.println("Querying items async and registering listener for the result."); executeSimpleQueryAsyncAndRegisterListenerForResult(queryCompletionLatch); // as createFamiliesAsyncAndRegisterListener starts the operation in background - // and only registers a listener, we used the createDocumentsCompletionLatch + // and only registers a listener, we used the createItemsCompletionLatch // to ensure we wait for the completion - createDocumentsCompletionLatch.await(); + createItemsCompletionLatch.await(); // as executeSimpleQueryAsyncAndRegisterListenerForResult starts the operation in background // and only registers a listener, we used the queryCompletionLatch @@ -158,139 +160,132 @@ private void createDatabaseIfNotExists() throws Exception { writeToConsoleAndPromptToContinue( "Check if database " + databaseName + " exists."); - String databaseLink = String.format("/dbs/%s", databaseName); - - Observable> databaseReadObs = - client.readDatabase(databaseLink, null); - - Observable> databaseExistenceObs = - databaseReadObs - .doOnNext(x -> { - System.out.println("database " + databaseName + " already exists."); - }) - .onErrorResumeNext( - e -> { - // if the database doesn't already exists - // readDatabase() will result in 404 error - if (e instanceof DocumentClientException) { - DocumentClientException de = (DocumentClientException) e; - // if database - if (de.getStatusCode() == 404) { - // if the database doesn't exist, create it. - System.out.println("database " + databaseName + " doesn't existed," - + " creating it..."); - - Database dbDefinition = new Database(); - dbDefinition.setId(databaseName); - - return client.createDatabase(dbDefinition, null); - } - } - - // some unexpected failure in reading database happened. - // pass the error up. - System.err.println("Reading database " + databaseName + " failed."); - return Observable.error(e); - }); + Mono databaseReadObs = client.getDatabase(databaseName).read(); + + Mono databaseExistenceObs = databaseReadObs + .doOnNext(x -> { + System.out.println("database " + databaseName + " already exists."); + }) + .onErrorResume( + e -> { + // if the database doesn't already exists + // readDatabase() will result in 404 error + if (e instanceof CosmosClientException) { + CosmosClientException de = (CosmosClientException) e; + // if database + if (de.statusCode() == 404) { + // if the database doesn't exist, create it. + System.out.println("database " + databaseName + " doesn't existed," + + " creating it..."); + + CosmosDatabaseProperties dbDefinition = new CosmosDatabaseProperties(databaseName); + + return client.createDatabase(dbDefinition, null); + } + } + + // some unexpected failure in reading database happened. + // pass the error up. + System.err.println("Reading database " + databaseName + " failed."); + return Mono.error(e); + }); // wait for completion, // as waiting for completion is a blocking call try to // provide your own scheduler to avoid stealing netty io threads. - databaseExistenceObs.toCompletable().await(); + database = databaseExistenceObs.block().database(); System.out.println("Checking database " + databaseName + " completed!\n"); } - private void createDocumentCollectionIfNotExists() throws Exception { + private void createContainerIfNotExists() throws Exception { writeToConsoleAndPromptToContinue( - "Check if collection " + collectionName + " exists."); + "Check if container " + containerName + " exists."); - // query for a collection with a given id + // query for a container with a given id // if it exists nothing else to be done - // if the collection doesn't exist, create it. + // if the container doesn't exist, create it. - String databaseLink = String.format("/dbs/%s", databaseName); + // if there is no matching container create the container. - client.queryCollections(databaseLink, + container = database.queryContainers( new SqlQuerySpec("SELECT * FROM r where r.id = @id", - new SqlParameterCollection( - new SqlParameter("@id", collectionName))), null) + new SqlParameterList( + new SqlParameter("@id", containerName))), null) .single() // we know there is only single page of result (empty or with a match) .flatMap(page -> { - if (page.getResults().isEmpty()) { - // if there is no matching collection create the collection. - DocumentCollection collection = new DocumentCollection(); - collection.setId(collectionName); - System.out.println("Creating collection " + collectionName); + if (page.results().isEmpty()) { + PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition(); + ArrayList partitionKeyPaths = new ArrayList(); + partitionKeyPaths.add("/id"); + partitionKeyDefinition.paths(partitionKeyPaths); + + CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerName, partitionKeyDefinition); + System.out.println("Creating container " + containerName); - return client.createCollection(databaseLink, collection, null); + return database.createContainer(containerProperties); } else { - // collection already exists, nothing else to be done. - System.out.println("Collection " + collectionName + "already exists"); - return Observable.empty(); + // container already exists, nothing else to be done. + System.out.println("Container " + containerName + "already exists"); + return Mono.empty(); } - }).toCompletable().await(); + }).block().container(); - System.out.println("Checking collection " + collectionName + " completed!\n"); + System.out.println("Checking container " + containerName + " completed!\n"); } private void createFamiliesAsyncAndRegisterListener(List families, CountDownLatch completionLatch) { - String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); - - List>> createDocumentsOBs = new ArrayList<>(); + List> createItemsOBs = new ArrayList<>(); for (Family family : families) { - Observable> obs = client.createDocument( - collectionLink, family, new RequestOptions(), true); - createDocumentsOBs.add(obs); + Mono obs = container.createItem(family); + createItemsOBs.add(obs); } - Observable.merge(createDocumentsOBs) - .map(ResourceResponse::getRequestCharge) - .reduce((sum, value) -> sum + value) - .subscribe( - totalRequestCharge -> { - // this will get print out when completed - System.out.println("total charge for creating documents is " - + totalRequestCharge); - }, - - // terminal error signal - e -> { - e.printStackTrace(); - completionLatch.countDown(); - }, - - // terminal completion signal - () -> { - completionLatch.countDown(); - }); + Flux.merge(createItemsOBs) + .map(CosmosItemResponse::requestCharge) + .reduce((sum, value) -> sum + value) + .subscribe( + totalRequestCharge -> { + // this will get print out when completed + System.out.println("total charge for creating items is " + + totalRequestCharge); + }, + + // terminal error signal + e -> { + e.printStackTrace(); + completionLatch.countDown(); + }, + + // terminal completion signal + () -> { + completionLatch.countDown(); + }); } private void createFamiliesAndWaitForCompletion(List families) throws Exception { - String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); - List>> createDocumentsOBs = new ArrayList<>(); + List> createItemsOBs = new ArrayList<>(); for (Family family : families) { - Observable> obs = client.createDocument( - collectionLink, family, new RequestOptions(), true); - createDocumentsOBs.add(obs); + Mono obs = container.createItem(family); + createItemsOBs.add(obs); } - Double totalRequestCharge = Observable.merge(createDocumentsOBs) - .map(ResourceResponse::getRequestCharge) - .observeOn(scheduler) // the scheduler will be used for the following work - .map(charge -> { - // as we don't want to run heavyWork() on netty IO thread, we provide the custom scheduler - // for switching from netty IO thread to user thread. - heavyWork(); - return charge; - }) - .reduce((sum, value) -> sum + value) - .toBlocking().single(); - - writeToConsoleAndPromptToContinue(String.format("Created %d documents with total request charge of %.2f", + Double totalRequestCharge = Flux.merge(createItemsOBs) + .map(CosmosItemResponse::requestCharge) + .publishOn(scheduler) + .map(charge -> { + // as we don't want to run heavyWork() on netty IO thread, we provide the custom scheduler + // for switching from netty IO thread to user thread. + heavyWork(); + return charge; + }) + .reduce((sum, value) -> sum + value) + .block(); + + writeToConsoleAndPromptToContinue(String.format("Created %d items with total request charge of %.2f", families.size(), totalRequestCharge)); } @@ -309,16 +304,14 @@ private void heavyWork() { private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch completionLatch) { // Set some common query options FeedOptions queryOptions = new FeedOptions(); - queryOptions.setMaxItemCount(10); - queryOptions.setEnableCrossPartitionQuery(true); + queryOptions.maxItemCount(10); + queryOptions.enableCrossPartitionQuery(true); - String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); - Observable> queryObservable = - client.queryDocuments(collectionLink, - "SELECT * FROM Family WHERE Family.lastName != 'Andersen'", queryOptions); + Flux> queryObservable = container.queryItems( + "SELECT * FROM Family WHERE Family.lastName != 'Andersen'", queryOptions); queryObservable - .observeOn(scheduler) + .publishOn(scheduler) .subscribe( page -> { // we want to make sure heavyWork() doesn't block any of netty IO threads @@ -326,11 +319,11 @@ private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch heavyWork(); System.out.println("Got a page of query result with " + - page.getResults().size() + " document(s)" - + " and request charge of " + page.getRequestCharge()); + page.results().size() + " item(s)" + + " and request charge of " + page.requestCharge()); - System.out.println("Document Ids " + page.getResults().stream().map(d -> d.getId()) + System.out.println("Item Ids " + page.results().stream().map(d -> d.id()) .collect(Collectors.toList())); }, // terminal error signal