Skip to content
This repository was archived by the owner on Dec 15, 2023. It is now read-only.

Commit f528d53

Browse files
committed
use sheculer when receiving data on the netty io thread
1 parent 60eb8da commit f528d53

File tree

1 file changed

+48
-26
lines changed
  • azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample

1 file changed

+48
-26
lines changed

Diff for: azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java

+48-26
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.CountDownLatch;
4848
import java.util.concurrent.ExecutorService;
4949
import java.util.concurrent.Executors;
50+
import java.util.concurrent.TimeUnit;
5051

5152
public class Main {
5253
private final ExecutorService executorService;
@@ -59,7 +60,7 @@ public class Main {
5960

6061
public Main() {
6162
executorService = Executors.newFixedThreadPool(100);
62-
// The SDK uses netty library for doing async IO operations, and the operations are performed the netty io threads.
63+
// The SDK uses netty library for doing async IO operations. The IO operations are performed on the netty io threads.
6364
// The number of IO netty threads are limited; it is the same as the number of CPU cores.
6465

6566
// The app should avoid doing anything which takes a lot of time from IO netty thread.
@@ -70,12 +71,11 @@ public Main() {
7071
// * deadlock
7172

7273
// The app code will receive the data from Azure Cosmos DB on the netty IO thread.
73-
// The app should ensure the user's blocking call or computationally/IO heavy work after receiving data
74-
// from Azure Cosmos DB is performed on a custom thread managed by the user (not the SDK netty IO threads).
74+
// The app should ensure the user's computationally/IO heavy work after receiving data
75+
// from Azure Cosmos DB is performed on a custom thread managed by the user (not on the SDK netty IO thread).
7576
//
76-
// If you are doing blocking calls or heavy work, provide your own scheduler to switch thread.
77-
// for example you can do this:
78-
// client.createDocument(.).observeOn(userCustomScheduler).toBlocking().single();
77+
// If you are doing heavy work after receiving the result from the SDK,
78+
// you should provide your own scheduler to switch thread.
7979

8080
// the following scheduler is used for switching from netty thread to user app thread.
8181
scheduler = Schedulers.from(executorService);
@@ -92,7 +92,6 @@ public void close() {
9292
* @param args command line args.
9393
*/
9494
public static void main(String[] args) {
95-
9695
Main p = new Main();
9796

9897
try {
@@ -196,7 +195,7 @@ private void createDatabaseIfNotExists() throws Exception {
196195
// wait for completion,
197196
// as waiting for completion is a blocking call try to
198197
// provide your own scheduler to avoid stealing netty io threads.
199-
databaseExistenceObs.toCompletable().observeOn(scheduler).await();
198+
databaseExistenceObs.toCompletable().await();
200199

201200
System.out.println("Checking database " + databaseName + " completed!\n");
202201
}
@@ -229,7 +228,7 @@ private void createDocumentCollectionIfNotExists() throws Exception {
229228
System.out.println("Collection " + collectionName + "already exists");
230229
return Observable.empty();
231230
}
232-
}).toCompletable().observeOn(scheduler).await();
231+
}).toCompletable().await();
233232

234233
System.out.println("Checking collection " + collectionName + " completed!\n");
235234
}
@@ -268,7 +267,6 @@ private void createFamiliesAsyncAndRegisterListener(List<Family> families, Count
268267
}
269268

270269
private void createFamiliesAndWaitForCompletion(List<Family> families) throws Exception {
271-
272270
String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName);
273271

274272
List<Observable<ResourceResponse<Document>>> createDocumentsOBs = new ArrayList<>();
@@ -280,6 +278,13 @@ private void createFamiliesAndWaitForCompletion(List<Family> families) throws Ex
280278

281279
Double totalRequestCharge = Observable.merge(createDocumentsOBs)
282280
.map(ResourceResponse::getRequestCharge)
281+
.observeOn(scheduler) // the scheduler will be used for the following work
282+
.map(charge -> {
283+
// as we don't want to run heavyWork() on netty IO thread, we provide the custom scheduler
284+
// for switching from netty IO thread to user thread.
285+
heavyWork();
286+
return charge;
287+
})
283288
.reduce((sum, value) -> sum + value)
284289
.toBlocking().single();
285290

@@ -288,6 +293,17 @@ private void createFamiliesAndWaitForCompletion(List<Family> families) throws Ex
288293
totalRequestCharge));
289294
}
290295

296+
private void heavyWork() {
297+
// I may do a lot of IO work: e.g., writing to log files
298+
// a lot of computational work
299+
// or may do Thread.sleep()
300+
301+
try {
302+
TimeUnit.SECONDS.sleep(2);
303+
} catch (Exception e) {
304+
}
305+
}
306+
291307
private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch completionLatch) {
292308
// Set some common query options
293309
FeedOptions queryOptions = new FeedOptions();
@@ -299,22 +315,28 @@ private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch
299315
client.queryDocuments(collectionLink,
300316
"SELECT * FROM Family WHERE Family.lastName = 'Andersen'", queryOptions);
301317

302-
queryObservable.subscribe(
303-
queryResultPage -> {
304-
System.out.println("Got a page of query result with " +
305-
queryResultPage.getResults().size() + " document(s)"
306-
+ " and request charge of " + queryResultPage.getRequestCharge());
307-
},
308-
// terminal error signal
309-
e -> {
310-
e.printStackTrace();
311-
completionLatch.countDown();
312-
},
313-
314-
// terminal completion signal
315-
() -> {
316-
completionLatch.countDown();
317-
});
318+
queryObservable
319+
.observeOn(scheduler)
320+
.subscribe(
321+
queryResultPage -> {
322+
// we want to make sure heavyWork() doesn't block any of netty IO threads
323+
// so we use observeOn(scheduler) to switch from the netty thread to user's thread.
324+
heavyWork();
325+
326+
System.out.println("Got a page of query result with " +
327+
queryResultPage.getResults().size() + " document(s)"
328+
+ " and request charge of " + queryResultPage.getRequestCharge());
329+
},
330+
// terminal error signal
331+
e -> {
332+
e.printStackTrace();
333+
completionLatch.countDown();
334+
},
335+
336+
// terminal completion signal
337+
() -> {
338+
completionLatch.countDown();
339+
});
318340
}
319341

320342
private void writeToConsoleAndPromptToContinue(String text) throws IOException {

0 commit comments

Comments
 (0)