1- /**
1+ /*
22 * The MIT License (MIT)
33 * Copyright (c) 2018 Microsoft Corporation
44 *
2323
2424package com .microsoft .azure .cosmosdb .sample ;
2525
26- import java .io .IOException ;
27- import java .util .ArrayList ;
28- import java .util .List ;
29- import java .util .concurrent .CountDownLatch ;
30-
3126import com .microsoft .azure .cosmosdb .ConnectionPolicy ;
3227import com .microsoft .azure .cosmosdb .ConsistencyLevel ;
3328import com .microsoft .azure .cosmosdb .Database ;
4237import com .microsoft .azure .cosmosdb .SqlParameterCollection ;
4338import com .microsoft .azure .cosmosdb .SqlQuerySpec ;
4439import com .microsoft .azure .cosmosdb .rx .AsyncDocumentClient ;
45-
4640import rx .Observable ;
41+ import rx .Scheduler ;
42+ import rx .schedulers .Schedulers ;
43+
44+ import java .io .IOException ;
45+ import java .util .ArrayList ;
46+ import java .util .List ;
47+ import java .util .concurrent .CountDownLatch ;
48+ import java .util .concurrent .ExecutorService ;
49+ import java .util .concurrent .Executors ;
4750
4851public class Main {
52+ private final ExecutorService executorService ;
53+ private final Scheduler scheduler ;
4954
5055 private AsyncDocumentClient client ;
5156
52- private String databaseName = "AzureSampleFamilyDB" ;
53- private String collectionName = "FamilyCollection" ;
57+ private final String databaseName = "AzureSampleFamilyDB" ;
58+ private final String collectionName = "FamilyCollection" ;
59+
60+ public Main () {
61+ executorService = Executors .newFixedThreadPool (100 );
62+ // The SDK uses netty library for doing async IO operations, and the operations are performed the netty io threads.
63+ // The number of IO netty threads are limited; it is the same as the number of CPU cores.
64+
65+ // The app should avoid doing anything which takes a lot of time from IO netty thread.
66+ // If the app consumes too much of IO netty thread you may face:
67+ // * low throughput
68+ // * bad latency
69+ // * ReadTimeoutException because there is no netty IO thread available to read data from network.
70+ // * deadlock
71+
72+ // The app code will receive the data from Azure Cosmos DB on the netty IO thread.
73+ // The app should ensure the user's blocking call or computationally/IO heavy work after receiving data
74+ // from Azure Cosmos DB is performed on a custom thread managed by the user (not the SDK netty IO threads).
75+ //
76+ // If you are doing blocking calls or heavy work, provide your own scheduler to switch thread.
77+ // for example you can do this:
78+ // client.createDocument(.).observeOn(userCustomScheduler).toBlocking().single();
79+
80+ // the following scheduler is used for switching from netty thread to user app thread.
81+ scheduler = Schedulers .from (executorService );
82+ }
83+
84+ public void close () {
85+ executorService .shutdown ();
86+ client .close ();
87+ }
5488
5589 /**
5690 * Run a Hello DocumentDB console application.
57- *
58- * @param args
91+ *
92+ * @param args command line args.
5993 */
6094 public static void main (String [] args ) {
6195
6296 Main p = new Main ();
6397
64- try {
98+ try {
6599 p .getStartedDemo ();
66100 System .out .println (String .format ("Demo complete, please hold while resources are deleted" ));
67101 } catch (Exception e ) {
68- System .out .println (String .format ("DocumentDB GetStarted failed with %s" , e ));
102+ System .err .println (String .format ("DocumentDB GetStarted failed with %s" , e ));
69103 } finally {
70104 System .out .println ("close the client" );
71- p .client .close ();
72- System .exit (0 );
105+ p .close ();
73106 }
74107 }
75108
@@ -78,13 +111,13 @@ private void getStartedDemo() throws Exception {
78111
79112 client = new AsyncDocumentClient .Builder ()
80113 .withServiceEndpoint (AccountSettings .HOST )
81- .withMasterKey (AccountSettings .MASTER_KEY )
114+ .withMasterKeyOrResourceToken (AccountSettings .MASTER_KEY )
82115 .withConnectionPolicy (ConnectionPolicy .GetDefault ())
83116 .withConsistencyLevel (ConsistencyLevel .Session )
84117 .build ();
85118
86- this . createDatabaseIfNotExists ();
87- this . createDocumentCollectionIfNotExists ();
119+ createDatabaseIfNotExists ();
120+ createDocumentCollectionIfNotExists ();
88121
89122 Family andersenFamily = Families .getAndersenFamilyDocument ();
90123 Family wakefieldFamily = Families .getWakefieldFamilyDocument ();
@@ -126,42 +159,44 @@ private void createDatabaseIfNotExists() throws Exception {
126159
127160 String databaseLink = String .format ("/dbs/%s" , databaseName );
128161
129- Observable <ResourceResponse <Database >> databaseReadObs =
162+ Observable <ResourceResponse <Database >> databaseReadObs =
130163 client .readDatabase (databaseLink , null );
131164
132- Observable <ResourceResponse <Database >> databaseExistenceObs =
165+ Observable <ResourceResponse <Database >> databaseExistenceObs =
133166 databaseReadObs
134- .doOnNext (x -> {
135- System .out .println ("database " + databaseName + " already exists." );
136- })
137- .onErrorResumeNext (
138- e -> {
139- // if the database doesn't already exists
140- // readDatabase() will result in 404 error
141- if (e instanceof DocumentClientException ) {
142- DocumentClientException de = (DocumentClientException ) e ;
143- // if database
144- if (de .getStatusCode () == 404 ) {
145- // if the database doesn't exist, create it.
146- System .out .println ("database " + databaseName + " doesn't existed,"
147- + " creating it..." );
148-
149- Database dbDefinition = new Database ();
150- dbDefinition .setId (databaseName );
151-
152- return client .createDatabase (dbDefinition , null );
153- }
154- }
155-
156- // some unexpected failure in reading database happened.
157- // pass the error up.
158- System .err .println ("Reading database " + databaseName + " failed." );
159- return Observable .error (e );
160- });
161-
162-
163- // wait for completion
164- databaseExistenceObs .toCompletable ().await ();
167+ .doOnNext (x -> {
168+ System .out .println ("database " + databaseName + " already exists." );
169+ })
170+ .onErrorResumeNext (
171+ e -> {
172+ // if the database doesn't already exists
173+ // readDatabase() will result in 404 error
174+ if (e instanceof DocumentClientException ) {
175+ DocumentClientException de = (DocumentClientException ) e ;
176+ // if database
177+ if (de .getStatusCode () == 404 ) {
178+ // if the database doesn't exist, create it.
179+ System .out .println ("database " + databaseName + " doesn't existed,"
180+ + " creating it..." );
181+
182+ Database dbDefinition = new Database ();
183+ dbDefinition .setId (databaseName );
184+
185+ return client .createDatabase (dbDefinition , null );
186+ }
187+ }
188+
189+ // some unexpected failure in reading database happened.
190+ // pass the error up.
191+ System .err .println ("Reading database " + databaseName + " failed." );
192+ return Observable .error (e );
193+ });
194+
195+
196+ // wait for completion,
197+ // as waiting for completion is a blocking call try to
198+ // provide your own scheduler to avoid stealing netty io threads.
199+ databaseExistenceObs .toCompletable ().observeOn (scheduler ).await ();
165200
166201 System .out .println ("Checking database " + databaseName + " completed!\n " );
167202 }
@@ -176,76 +211,76 @@ private void createDocumentCollectionIfNotExists() throws Exception {
176211
177212 String databaseLink = String .format ("/dbs/%s" , databaseName );
178213
179- client .queryCollections (databaseLink ,
180- new SqlQuerySpec ("SELECT * FROM r where r.id = @id" ,
214+ client .queryCollections (databaseLink ,
215+ new SqlQuerySpec ("SELECT * FROM r where r.id = @id" ,
181216 new SqlParameterCollection (
182217 new SqlParameter ("@id" , collectionName ))), null )
183- .single () // we know there is only single page of result (empty or with a match)
184- .flatMap (page -> {
185- if (page .getResults ().isEmpty ()) {
186- // if there is no matching collection create the collection.
187- DocumentCollection collection = new DocumentCollection ();
188- collection .setId (collectionName );
189- System .out .println ("Creating collection " + collectionName );
190-
191- return client .createCollection (databaseLink , collection , null );
192- } else {
193- // collection already exists, nothing else to be done.
194- System .out .println ("Collection " + collectionName + "already exists" );
195- return Observable .empty ();
196- }
197- }).toCompletable ().await ();
218+ .single () // we know there is only single page of result (empty or with a match)
219+ .flatMap (page -> {
220+ if (page .getResults ().isEmpty ()) {
221+ // if there is no matching collection create the collection.
222+ DocumentCollection collection = new DocumentCollection ();
223+ collection .setId (collectionName );
224+ System .out .println ("Creating collection " + collectionName );
225+
226+ return client .createCollection (databaseLink , collection , null );
227+ } else {
228+ // collection already exists, nothing else to be done.
229+ System .out .println ("Collection " + collectionName + "already exists" );
230+ return Observable .empty ();
231+ }
232+ }).toCompletable (). observeOn ( scheduler ).await ();
198233
199234 System .out .println ("Checking collection " + collectionName + " completed!\n " );
200235 }
201236
202- private void createFamiliesAsyncAndRegisterListener (List <Family > families , CountDownLatch completionLatch ) throws Exception {
237+ private void createFamiliesAsyncAndRegisterListener (List <Family > families , CountDownLatch completionLatch ) {
203238
204239 String collectionLink = String .format ("/dbs/%s/colls/%s" , databaseName , collectionName );
205240
206241 List <Observable <ResourceResponse <Document >>> createDocumentsOBs = new ArrayList <>();
207- for (Family family : families ) {
242+ for (Family family : families ) {
208243 Observable <ResourceResponse <Document >> obs = client .createDocument (
209244 collectionLink , family , new RequestOptions (), true );
210245 createDocumentsOBs .add (obs );
211246 }
212247
213248 Observable .merge (createDocumentsOBs )
214- .map (ResourceResponse ::getRequestCharge )
215- .reduce ((sum , value ) -> sum + value )
216- .subscribe (
217- totalRequestCharge -> {
218- // this will get print out when completed
219- System .out .println ("total charge for creating documents is "
220- + totalRequestCharge );
221- },
222-
223- // terminal error signal
224- e -> {
225- e .printStackTrace ();
226- completionLatch .countDown ();
227- },
249+ .map (ResourceResponse ::getRequestCharge )
250+ .reduce ((sum , value ) -> sum + value )
251+ .subscribe (
252+ totalRequestCharge -> {
253+ // this will get print out when completed
254+ System .out .println ("total charge for creating documents is "
255+ + totalRequestCharge );
256+ },
257+
258+ // terminal error signal
259+ e -> {
260+ e .printStackTrace ();
261+ completionLatch .countDown ();
262+ },
228263
229- // terminal completion signal
230- () -> {
231- completionLatch .countDown ();
232- });
264+ // terminal completion signal
265+ () -> {
266+ completionLatch .countDown ();
267+ });
233268 }
234269
235270 private void createFamiliesAndWaitForCompletion (List <Family > families ) throws Exception {
236271
237272 String collectionLink = String .format ("/dbs/%s/colls/%s" , databaseName , collectionName );
238273
239274 List <Observable <ResourceResponse <Document >>> createDocumentsOBs = new ArrayList <>();
240- for (Family family : families ) {
275+ for (Family family : families ) {
241276 Observable <ResourceResponse <Document >> obs = client .createDocument (
242277 collectionLink , family , new RequestOptions (), true );
243278 createDocumentsOBs .add (obs );
244279 }
245280
246281 Double totalRequestCharge = Observable .merge (createDocumentsOBs )
247282 .map (ResourceResponse ::getRequestCharge )
248- .reduce ((sum , value ) -> sum + value )
283+ .reduce ((sum , value ) -> sum + value )
249284 .toBlocking ().single ();
250285
251286 writeToConsoleAndPromptToContinue (String .format ("Created %d documents with total request charge of %.2f" ,
@@ -260,7 +295,7 @@ private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch
260295 queryOptions .setEnableCrossPartitionQuery (true );
261296
262297 String collectionLink = String .format ("/dbs/%s/colls/%s" , databaseName , collectionName );
263- Observable <FeedResponse <Document >> queryObservable =
298+ Observable <FeedResponse <Document >> queryObservable =
264299 client .queryDocuments (collectionLink ,
265300 "SELECT * FROM Family WHERE Family.lastName = 'Andersen'" , queryOptions );
266301
0 commit comments