diff --git a/README.md b/README.md index fa0e39b0..36c7903d 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,8 @@ I've also been giving talks about Learning Rx using many of the examples listed 14. [Pagination with Rx (using Subjects)](#14-pagination-with-rx-using-subjects) 15. [Orchestrating Observable: make parallel network calls, then combine the result into a single data point (using flatmap & zip)](#15-orchestrating-observable-make-parallel-network-calls-then-combine-the-result-into-a-single-data-point-using-flatmap--zip) 16. [Simple Timeout example (using timeout)](#16-simple-timeout-example-using-timeout) +17. [Setup and teardown resources (using `using`)](#17-setup-and-teardown-resources-using-using) +18. [Multicast playground](#18-multicast-playground) ## Description @@ -161,7 +163,7 @@ Cases demonstrated here: 4. run a task constantly every 3s, but after running it 5 times, terminate automatically 5. run a task A, pause for sometime, then execute Task B, then terminate -### 11. RxBus : event bus using RxJava (using RxRelay (never terminating Subjects) and debouncedBuffer) +### 11. RxBus : event bus using RxJava (using RxRelay (never terminating Subjects) and debouncedBuffer) There are accompanying blog posts that do a much better job of explaining the details on this demo: @@ -222,6 +224,20 @@ This is a simple example demonstrating the use of the `.timeout` operator. Butto Notice how we can provide a custom Observable that indicates how to react under a timeout Exception. +### 17. Setup and teardown resources (using `using`) + +The [operator `using`](http://reactivex.io/documentation/operators/using.html) is relatively less known and notoriously hard to Google. It's a beautiful API that helps to setup a (costly) resource, use it and then dispose off in a clean way. + +The nice thing about this operator is that it provides a mechansim to use potentially costly resources in a tightly scoped manner. using -> setup, use and dispose. Think DB connections (like Realm instances), socket connections, thread locks etc. + +### 18. Multicast Playground + +Multicasting in Rx is like a dark art. Not too many folks know how to pull it off without concern. This example condiers two subscribers (in the forms of buttons) and allows you to add/remove subscribers at different points of time and see how the different operators behave under those circumstances. + +The source observale is a timer (`interval`) observable and the reason this was chosen was to intentionally pick a non-terminating observable, so you can test/confirm if your multicast experiment will leak. + +_I also gave a talk about [Multicasting in detail at 360|Andev](https://speakerdeck.com/kaushikgopal/rx-by-example-volume-3-the-multicast-edition). If you have the inclination and time, I highly suggest watching that talk first (specifically the Multicast operator permutation segment) and then messing around with the example here._ + ## Rx 2.x All the examples here have been migrated to use RxJava 2.X. diff --git a/app/build.gradle b/app/build.gradle index 74a7d975..022d666d 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -18,13 +18,14 @@ apply plugin: 'com.f2prateek.javafmt' apply plugin: 'kotlin-android' dependencies { + compile 'com.android.support:multidex:1.0.1' compile "com.android.support:support-v13:${supportLibVersion}" compile "com.android.support:appcompat-v7:${supportLibVersion}" compile "com.android.support:recyclerview-v7:${supportLibVersion}" compile 'com.github.kaushikgopal:CoreTextUtils:c703fa12b6' compile "com.jakewharton:butterknife:${butterKnifeVersion}" - annotationProcessor "com.jakewharton:butterknife-compiler:${butterKnifeVersion}" + kapt "com.jakewharton:butterknife-compiler:${butterKnifeVersion}" compile 'com.jakewharton.timber:timber:4.5.1' compile "com.squareup.retrofit2:retrofit:${retrofitVersion}" compile "com.squareup.retrofit2:converter-gson:${retrofitVersion}" @@ -44,8 +45,10 @@ dependencies { // explicitly depend on RxJava's latest version for bug fixes and new features. compile 'io.reactivex.rxjava2:rxandroid:2.0.1' + compile 'com.jakewharton.rx:replaying-share-kotlin:2.0.0' compile "com.github.akarnokd:rxjava2-extensions:0.16.0" compile 'com.jakewharton.rxrelay2:rxrelay:2.0.0' + compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0' compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0' @@ -65,6 +68,7 @@ android { targetSdkVersion sdkVersion versionCode 2 versionName "1.2" + multiDexEnabled true } buildTypes { release { diff --git a/app/src/main/java/com/morihacky/android/rxjava/MyApp.java b/app/src/main/java/com/morihacky/android/rxjava/MyApp.java index a3b025ac..23a90b23 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/MyApp.java +++ b/app/src/main/java/com/morihacky/android/rxjava/MyApp.java @@ -1,12 +1,12 @@ package com.morihacky.android.rxjava; -import android.app.Application; +import android.support.multidex.MultiDexApplication; import com.morihacky.android.rxjava.volley.MyVolley; import com.squareup.leakcanary.LeakCanary; import com.squareup.leakcanary.RefWatcher; import timber.log.Timber; -public class MyApp extends Application { +public class MyApp extends MultiDexApplication { private static MyApp _instance; private RefWatcher _refWatcher; diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/MainFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/MainFragment.java index c09925be..dd5709e1 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/MainFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/MainFragment.java @@ -116,6 +116,16 @@ void demoNetworkDetector() { clickedOn(new NetworkDetectorFragment()); } + @OnClick(R.id.btn_demo_using) + void demoUsing() { + clickedOn(new UsingFragment()); + } + + @OnClick(R.id.btn_demo_multicastPlayground) + void demoMulticastPlayground() { + clickedOn(new MulticastPlaygroundFragment()); + } + private void clickedOn(@NonNull Fragment fragment) { final String tag = fragment.getClass().toString(); getActivity() diff --git a/app/src/main/kotlin/com/morihacky/android/rxjava/ext/RxExt.kt b/app/src/main/kotlin/com/morihacky/android/rxjava/ext/RxExt.kt new file mode 100644 index 00000000..9c99ed69 --- /dev/null +++ b/app/src/main/kotlin/com/morihacky/android/rxjava/ext/RxExt.kt @@ -0,0 +1,11 @@ +package com.morihacky.android.rxjava.ext + +import io.reactivex.disposables.CompositeDisposable +import io.reactivex.disposables.Disposable + +operator fun CompositeDisposable.plus(disposable: Disposable): CompositeDisposable { + add(disposable) + return this +} + + diff --git a/app/src/main/kotlin/com/morihacky/android/rxjava/fragments/MulticastPlaygroundFragment.kt b/app/src/main/kotlin/com/morihacky/android/rxjava/fragments/MulticastPlaygroundFragment.kt new file mode 100644 index 00000000..5ece0e32 --- /dev/null +++ b/app/src/main/kotlin/com/morihacky/android/rxjava/fragments/MulticastPlaygroundFragment.kt @@ -0,0 +1,166 @@ +package com.morihacky.android.rxjava.fragments + +import android.content.Context +import android.os.Bundle +import android.os.Handler +import android.os.Looper +import android.view.LayoutInflater +import android.view.View +import android.view.ViewGroup +import android.widget.* +import butterknife.BindView +import butterknife.ButterKnife +import butterknife.OnClick +import com.jakewharton.rx.replayingShare +import com.morihacky.android.rxjava.R +import io.reactivex.Observable +import io.reactivex.disposables.Disposable +import java.util.concurrent.TimeUnit + +class MulticastPlaygroundFragment : BaseFragment() { + + @BindView(R.id.list_threading_log) lateinit var logList: ListView + @BindView(R.id.dropdown) lateinit var pickOperatorDD: Spinner + @BindView(R.id.msg_text) lateinit var messageText: TextView + + private lateinit var sharedObservable: Observable + private lateinit var adapter: LogAdapter + + private var logs: MutableList = ArrayList() + private var disposable1: Disposable? = null + private var disposable2: Disposable? = null + + override fun onCreateView(inflater: LayoutInflater?, + container: ViewGroup?, + savedInstanceState: Bundle?): View? { + val layout = inflater!!.inflate(R.layout.fragment_multicast_playground, container, false) + ButterKnife.bind(this, layout) + + _setupLogger() + _setupDropdown() + + return layout + } + + @OnClick(R.id.btn_1) + fun onBtn1Click() { + + disposable1?.let { + it.dispose() + _log("subscriber 1 disposed") + disposable1 = null + return + } + + disposable1 = + sharedObservable + .doOnSubscribe { _log("subscriber 1 (subscribed)") } + .subscribe({ long -> _log("subscriber 1: onNext $long") }) + + } + + @OnClick(R.id.btn_2) + fun onBtn2Click() { + disposable2?.let { + it.dispose() + _log("subscriber 2 disposed") + disposable2 = null + return + } + + disposable2 = + sharedObservable + .doOnSubscribe { _log("subscriber 2 (subscribed)") } + .subscribe({ long -> _log("subscriber 2: onNext $long") }) + } + + @OnClick(R.id.btn_3) + fun onBtn3Click() { + logs = ArrayList() + adapter.clear() + } + + // ----------------------------------------------------------------------------------- + // Method that help wiring up the example (irrelevant to RxJava) + + private fun _log(logMsg: String) { + + if (_isCurrentlyOnMainThread()) { + logs.add(0, logMsg + " (main thread) ") + adapter.clear() + adapter.addAll(logs) + } else { + logs.add(0, logMsg + " (NOT main thread) ") + + // You can only do below stuff on main thread. + Handler(Looper.getMainLooper()).post { + adapter.clear() + adapter.addAll(logs) + } + } + } + + private fun _setupLogger() { + logs = ArrayList() + adapter = LogAdapter(activity, ArrayList()) + logList.adapter = adapter + } + + private fun _setupDropdown() { + pickOperatorDD.adapter = ArrayAdapter(context, + android.R.layout.simple_spinner_dropdown_item, + arrayOf(".publish().refCount()", + ".publish().autoConnect(2)", + ".replay(1).autoConnect(2)", + ".replay(1).refCount()", + ".replayingShare()")) + + + pickOperatorDD.onItemSelectedListener = object : AdapterView.OnItemSelectedListener { + + override fun onItemSelected(p0: AdapterView<*>?, p1: View?, index: Int, p3: Long) { + + val sourceObservable = Observable.interval(0L, 3, TimeUnit.SECONDS) + .doOnSubscribe { _log("observer (subscribed)") } + .doOnDispose { _log("observer (disposed)") } + .doOnTerminate { _log("observer (terminated)") } + + sharedObservable = + when (index) { + 0 -> { + messageText.setText(R.string.msg_demo_multicast_publishRefCount) + sourceObservable.publish().refCount() + } + 1 -> { + messageText.setText(R.string.msg_demo_multicast_publishAutoConnect) + sourceObservable.publish().autoConnect(2) + } + 2 -> { + messageText.setText(R.string.msg_demo_multicast_replayAutoConnect) + sourceObservable.replay(1).autoConnect(2) + } + 3 -> { + messageText.setText(R.string.msg_demo_multicast_replayRefCount) + sourceObservable.replay(1).refCount() + } + 4 -> { + messageText.setText(R.string.msg_demo_multicast_replayingShare) + sourceObservable.replayingShare() + } + else -> throw RuntimeException("got to pick an op yo!") + } + } + + override fun onNothingSelected(p0: AdapterView<*>?) {} + } + } + + private fun _isCurrentlyOnMainThread(): Boolean { + return Looper.myLooper() == Looper.getMainLooper() + } + + private inner class LogAdapter(context: Context, logs: List) : + ArrayAdapter(context, R.layout.item_log, R.id.item_log, logs) + +} + diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/PlaygroundFragment.kt b/app/src/main/kotlin/com/morihacky/android/rxjava/fragments/PlaygroundFragment.kt similarity index 74% rename from app/src/main/java/com/morihacky/android/rxjava/fragments/PlaygroundFragment.kt rename to app/src/main/kotlin/com/morihacky/android/rxjava/fragments/PlaygroundFragment.kt index 37f76532..cf77e04f 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/PlaygroundFragment.kt +++ b/app/src/main/kotlin/com/morihacky/android/rxjava/fragments/PlaygroundFragment.kt @@ -16,25 +16,21 @@ class PlaygroundFragment : BaseFragment() { private var _logsList: ListView? = null private var _adapter: LogAdapter? = null - private var _attempt = 0 private var _logs: MutableList = ArrayList() override fun onCreateView(inflater: LayoutInflater?, container: ViewGroup?, savedInstanceState: Bundle?): View? { - return inflater!!.inflate(R.layout.fragment_concurrency_schedulers, container, false) - } - - override fun onActivityCreated(savedInstanceState: Bundle?) { - super.onActivityCreated(savedInstanceState) + val view = inflater?.inflate(R.layout.fragment_concurrency_schedulers, container, false) - _logsList = activity.findViewById(R.id.list_threading_log) as ListView + _logsList = view?.findViewById(R.id.list_threading_log) as ListView + _setupLogger() - activity.findViewById(R.id.btn_start_operation).setOnClickListener { _ -> + view.findViewById(R.id.btn_start_operation).setOnClickListener { _ -> _log("Button clicked") } - _setupLogger() + return view } // ----------------------------------------------------------------------------------- @@ -44,15 +40,15 @@ class PlaygroundFragment : BaseFragment() { if (_isCurrentlyOnMainThread()) { _logs.add(0, logMsg + " (main thread) ") - _adapter!!.clear() - _adapter!!.addAll(_logs) + _adapter?.clear() + _adapter?.addAll(_logs) } else { _logs.add(0, logMsg + " (NOT main thread) ") // You can only do below stuff on main thread. Handler(Looper.getMainLooper()).post { - _adapter!!.clear() - _adapter!!.addAll(_logs) + _adapter?.clear() + _adapter?.addAll(_logs) } } } @@ -60,7 +56,7 @@ class PlaygroundFragment : BaseFragment() { private fun _setupLogger() { _logs = ArrayList() _adapter = LogAdapter(activity, ArrayList()) - _logsList!!.adapter = _adapter + _logsList?.adapter = _adapter } private fun _isCurrentlyOnMainThread(): Boolean { diff --git a/app/src/main/kotlin/com/morihacky/android/rxjava/fragments/UsingFragment.kt b/app/src/main/kotlin/com/morihacky/android/rxjava/fragments/UsingFragment.kt new file mode 100644 index 00000000..1b9ee372 --- /dev/null +++ b/app/src/main/kotlin/com/morihacky/android/rxjava/fragments/UsingFragment.kt @@ -0,0 +1,94 @@ +package com.morihacky.android.rxjava.fragments + +import android.content.Context +import android.os.Bundle +import android.os.Handler +import android.os.Looper +import android.view.LayoutInflater +import android.view.View +import android.view.ViewGroup +import android.widget.ArrayAdapter +import android.widget.ListView +import android.widget.TextView +import com.morihacky.android.rxjava.R +import io.reactivex.Flowable +import io.reactivex.functions.Consumer +import io.reactivex.functions.Function +import org.reactivestreams.Publisher +import timber.log.Timber +import java.util.* +import java.util.concurrent.Callable + +class UsingFragment : BaseFragment() { + + private lateinit var _logs: MutableList + private lateinit var _logsList: ListView + private lateinit var _adapter: UsingFragment.LogAdapter + + override fun onCreateView(inflater: LayoutInflater?, container: ViewGroup?, savedInstanceState: Bundle?): View? { + val view = inflater?.inflate(R.layout.fragment_buffer, container, false) + _logsList = view?.findViewById(R.id.list_threading_log) as ListView + + (view.findViewById(R.id.text_description) as TextView).setText(R.string.msg_demo_using) + + _setupLogger() + view.findViewById(R.id.btn_start_operation).setOnClickListener { executeUsingOperation() } + return view + } + + private fun executeUsingOperation() { + val resourceSupplier = Callable { Realm() } + val sourceSupplier = Function> { realm -> + Flowable.just(true) + .map { + realm.doSomething() + // i would use the copyFromRealm and change it to a POJO + Random().nextInt(50) + } + } + val disposer = Consumer { realm -> + realm.clear() + } + + Flowable.using(resourceSupplier, sourceSupplier, disposer) + .subscribe({ i -> + _log("got a value $i - (look at the logs)") + }) + } + + inner class Realm { + init { + Timber.d("--- initializing Realm instance") + } + + fun doSomething() { + Timber.d("--- do something with Realm instance") + } + + fun clear() { + // notice how this is called even before you manually "dispose" + Timber.d("--- cleaning up the resources (happens before a manual 'dispose'") + } + } + + // ----------------------------------------------------------------------------------- + // Method that help wiring up the example (irrelevant to RxJava) + + private fun _log(logMsg: String) { + _logs.add(0, logMsg) + + // You can only do below stuff on main thread. + Handler(Looper.getMainLooper()).post { + _adapter.clear() + _adapter.addAll(_logs) + } + } + + private fun _setupLogger() { + _logs = ArrayList() + _adapter = LogAdapter(activity, ArrayList()) + _logsList.adapter = _adapter + } + + private class LogAdapter(context: Context, logs: List) : ArrayAdapter(context, R.layout.item_log, R.id.item_log, logs) +} \ No newline at end of file diff --git a/app/src/main/res/layout/fragment_buffer.xml b/app/src/main/res/layout/fragment_buffer.xml index 86d70150..3464ac5b 100644 --- a/app/src/main/res/layout/fragment_buffer.xml +++ b/app/src/main/res/layout/fragment_buffer.xml @@ -7,7 +7,8 @@ > + +