Skip to content

Commit

Permalink
Fix for radio not being released on operation complete. (#246)
Browse files Browse the repository at this point in the history
#244
There was an issue (race condition) with releasing radio in `.onTerminate()` inside `RxBleRadioOperation.run()`. If the `Observable` called `.onNext()` and `.onCompleted()` in lines next to each other and user would use `observable.first()` then the `.onComplete()` may not be noticed by the `Subscriber` and `RadioReleaseInterface.release()` in `.onTerminate()` would not be called.
  • Loading branch information
dariuszseweryn authored Jul 7, 2017
1 parent a65f71a commit d2ff2da
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Change Log

Version 1.3.3-SNAPSHOT
* Fixed scan filtering by name on API <21 (https://github.com/Polidea/RxAndroidBle/pull/243)
* Fixed race condition (which would cause the library to hang) when using `.first()` on calls to `RxBleConnection` that emit a single result. (https://github.com/Polidea/RxAndroidBle/issues/244)

Version 1.3.2
* Fixed completing the `Observable<byte[]>` emitted by `RxBleConnection.setupNotification()`/`RxBleConnection.setupIndication()` when unsubscribed (https://github.com/Polidea/RxAndroidBle/issues/231)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import rx.Emitter;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

/**
Expand Down Expand Up @@ -46,22 +45,15 @@ public void call(Emitter<T> emitter) {
}
},
Emitter.BackpressureMode.NONE
)
.doOnTerminate(new Action0() {
@Override
public void call() {
radioReleaseInterface.release();
}
});
);
}

/**
* This method will be overridden in a concrete operation implementations and will contain specific operation logic.
*
* Implementations should call emitter methods to inform the outside world about emissions of `onNext()`/`onError()`/`onCompleted()`.
* Implementations must call at least one of methods:
* {@link RadioReleaseInterface#release()}/{@link Emitter#onError(Throwable)}/{@link Emitter#onCompleted()}
* at some point to release the radio for any other operations that were queued if the emitter has not canceled.
* Implementations must call {@link RadioReleaseInterface#release()} at appropriate point to release the radio for any other operations
* that are queued.
*
* If the emitter has been canceled it is response of the operation to call {@link RadioReleaseInterface#release()} when possible
* subsequent operations will be able to start {@link android.bluetooth.BluetoothGatt} functions successfully. Check usage of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,22 @@ public <T> Observable<T> queue(@NonNull final RxBleRadioOperationCustom<T> opera
return rxBleRadio.queue(new RxBleRadioOperation<T>() {
@Override
@SuppressWarnings("ConstantConditions")
protected void protectedRun(Emitter<T> emitter, RadioReleaseInterface radioReleaseInterface) throws Throwable {
final RadioReleasingEmitterWrapper<T> emitterWrapper = new RadioReleasingEmitterWrapper<>(emitter, radioReleaseInterface);
protected void protectedRun(final Emitter<T> emitter, final RadioReleaseInterface radioReleaseInterface) throws Throwable {
final Observable<T> operationObservable;

try {
operationObservable = operation.asObservable(bluetoothGatt, gattCallback, callbackScheduler);
} catch (Throwable throwable) {
radioReleaseInterface.release();
throw throwable;
}

Observable<T> operationObservable = operation.asObservable(bluetoothGatt, gattCallback, callbackScheduler);
if (operationObservable == null) {
radioReleaseInterface.release();
throw new IllegalArgumentException("The custom operation asObservable method must return a non-null observable");
}

final RadioReleasingEmitterWrapper<T> emitterWrapper = new RadioReleasingEmitterWrapper<>(emitter, radioReleaseInterface);
operationObservable.subscribe(emitterWrapper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,17 @@ public RxBleRadioOperationConnect build() {

@Override
protected void protectedRun(final Emitter<BluetoothGatt> emitter, final RadioReleaseInterface radioReleaseInterface) {
final Action0 releaseRadioAction = new Action0() {
@Override
public void call() {
radioReleaseInterface.release();
}
};
final Subscription subscription = getConnectedBluetoothGatt()
.compose(wrapWithTimeoutWhenNotAutoconnecting())
// when there are no subscribers there is no point of continuing work -> next will be disconnect operation
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
radioReleaseInterface.release();
}
})
.doOnUnsubscribe(releaseRadioAction)
.doOnTerminate(releaseRadioAction)
.subscribe(emitter);

emitter.setSubscription(subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ public class RxBleRadioOperationDisconnect extends RxBleRadioOperation<Void> {
}

@Override
protected void protectedRun(final Emitter<Void> emitter, RadioReleaseInterface radioReleaseInterface) {
protected void protectedRun(final Emitter<Void> emitter, final RadioReleaseInterface radioReleaseInterface) {
//noinspection Convert2MethodRef
final BluetoothGatt bluetoothGatt = bluetoothGattProvider.getBluetoothGatt();

if (bluetoothGatt == null) {
RxBleLog.w("Disconnect operation has been executed but GATT instance was null.");
radioReleaseInterface.release();
emitter.onCompleted();
} else {
(isDisconnected(bluetoothGatt) ? just(bluetoothGatt) : disconnect(bluetoothGatt))
Expand All @@ -75,12 +76,14 @@ public void call(BluetoothGatt bluetoothGatt) {
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
radioReleaseInterface.release();
emitter.onError(throwable);
}
},
new Action0() {
@Override
public void call() {
radioReleaseInterface.release();
emitter.onCompleted();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.polidea.rxandroidble.internal.RadioReleaseInterface;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.Emitter;
import rx.Subscriber;
import rx.Observer;
import rx.functions.Cancellable;

/**
Expand All @@ -14,7 +14,7 @@
* being unsubscribed / canceled.
* @param <T> parameter of the wrapped {@link Emitter}
*/
public class RadioReleasingEmitterWrapper<T> extends Subscriber<T> implements Cancellable {
public class RadioReleasingEmitterWrapper<T> implements Observer<T>, Cancellable {

private final AtomicBoolean isEmitterCanceled = new AtomicBoolean(false);

Expand All @@ -30,17 +30,13 @@ public RadioReleasingEmitterWrapper(Emitter<T> emitter, RadioReleaseInterface ra

@Override
public void onCompleted() {
if (releaseRadioIfUnsubscribed()) {
return;
}
radioReleaseInterface.release();
emitter.onCompleted();
}

@Override
public void onError(Throwable e) {
if (releaseRadioIfUnsubscribed()) {
return;
}
radioReleaseInterface.release();
emitter.onError(e);
}

Expand All @@ -57,12 +53,4 @@ synchronized public void cancel() throws Exception {
synchronized public boolean isWrappedEmitterUnsubscribed() {
return isEmitterCanceled.get();
}

synchronized private boolean releaseRadioIfUnsubscribed() {
if (isEmitterCanceled.get()) {
radioReleaseInterface.release();
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class MockOperation extends RxBleRadioOperation<Object> {
executionCount++
lastExecutedOnThread = Thread.currentThread().getName()
closure?.call(emitter)
radioReleaseInterface.release()
behaviorSubject.onNext(this)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ class RxBleConnectionTest extends Specification {
setupWriteClosure.call(objectUnderTest, characteristic, OTHER_DATA).subscribe(testSubscriber)

then:
testSubscriber.assertError BleGattCannotStartException
testSubscriber.assertError { it.bleGattOperationType == BleGattOperationType.CHARACTERISTIC_WRITE }
testSubscriber.assertError { BleGattCannotStartException e -> e.bleGattOperationType == BleGattOperationType.CHARACTERISTIC_WRITE }

where:
setupWriteClosure << [
Expand All @@ -106,8 +105,7 @@ class RxBleConnectionTest extends Specification {
setupReadClosure.call(objectUnderTest, characteristic).subscribe(testSubscriber)

then:
testSubscriber.assertError BleGattCannotStartException
testSubscriber.assertError { it.bleGattOperationType == BleGattOperationType.CHARACTERISTIC_READ }
testSubscriber.assertError { BleGattCannotStartException e -> e.bleGattOperationType == BleGattOperationType.CHARACTERISTIC_READ }

where:
setupReadClosure << [
Expand All @@ -124,8 +122,7 @@ class RxBleConnectionTest extends Specification {
objectUnderTest.readRssi().subscribe(testSubscriber)

then:
testSubscriber.assertError BleGattCannotStartException
testSubscriber.assertError { it.bleGattOperationType == BleGattOperationType.READ_RSSI }
testSubscriber.assertError { BleGattCannotStartException e -> e.bleGattOperationType == BleGattOperationType.READ_RSSI }
}

def "should emit BleCharacteristicNotFoundException during read operation if no services were found"() {
Expand All @@ -149,8 +146,7 @@ class RxBleConnectionTest extends Specification {
objectUnderTest.readCharacteristic(CHARACTERISTIC_UUID).subscribe(testSubscriber)

then:
testSubscriber.assertError BleCharacteristicNotFoundException
testSubscriber.assertError { it.charactersisticUUID == CHARACTERISTIC_UUID }
testSubscriber.assertError { BleCharacteristicNotFoundException e -> e.charactersisticUUID == CHARACTERISTIC_UUID }
}

def "should read first found characteristic with matching UUID"() {
Expand Down Expand Up @@ -178,8 +174,7 @@ class RxBleConnectionTest extends Specification {
objectUnderTest.writeCharacteristic(CHARACTERISTIC_UUID, NOT_EMPTY_DATA).subscribe(testSubscriber)

then:
testSubscriber.assertError BleCharacteristicNotFoundException
testSubscriber.assertError { it.charactersisticUUID == CHARACTERISTIC_UUID }
testSubscriber.assertError { BleCharacteristicNotFoundException e -> e.charactersisticUUID == CHARACTERISTIC_UUID }
}

def "should emit BleCharacteristicNotFoundException if characteristic was not found during write operation"() {
Expand All @@ -190,8 +185,7 @@ class RxBleConnectionTest extends Specification {
objectUnderTest.writeCharacteristic(CHARACTERISTIC_UUID, NOT_EMPTY_DATA).subscribe(testSubscriber)

then:
testSubscriber.assertError BleCharacteristicNotFoundException
testSubscriber.assertError { it.charactersisticUUID == CHARACTERISTIC_UUID }
testSubscriber.assertError { BleCharacteristicNotFoundException e -> e.charactersisticUUID == CHARACTERISTIC_UUID }
}

@Unroll
Expand Down Expand Up @@ -247,8 +241,7 @@ class RxBleConnectionTest extends Specification {
setupTriggerNotificationClosure.call(objectUnderTest, characteristic).flatMap({ it }).subscribe(testSubscriber)

then:
testSubscriber.assertError(BleCharacteristicNotFoundException)
testSubscriber.assertError { it.charactersisticUUID == CHARACTERISTIC_UUID }
testSubscriber.assertError { BleCharacteristicNotFoundException e -> e.charactersisticUUID == CHARACTERISTIC_UUID }

where:
setupTriggerNotificationClosure << [
Expand Down Expand Up @@ -296,11 +289,21 @@ class RxBleConnectionTest extends Specification {
objectUnderTest.queue(radioOperationCustom).subscribe(testSubscriber)

then:
testSubscriber.assertCompleted()
testSubscriber.assertValues(true, false, true)
}

def "should pass error and release the radio if custom operation will throw out of RxBleRadioOperationCustom.asObservable()"() {
def "should pass error if custom operation will throw out of RxBleRadioOperationCustom.asObservable()"() {
given:
def radioOperationCustom = customRadioOperationWithOutcome { throw new RuntimeException() }

when:
objectUnderTest.queue(radioOperationCustom).subscribe(testSubscriber)

then:
testSubscriber.assertError(RuntimeException.class)
}

def "should release the radio if custom operation will throw out of RxBleRadioOperationCustom.asObservable()"() {
given:
def radioOperationCustom = customRadioOperationWithOutcome { throw new RuntimeException() }

Expand All @@ -309,10 +312,20 @@ class RxBleConnectionTest extends Specification {

then:
flatRadio.semaphore.isReleased()
}

def "should pass error if observable returned from RxBleRadioOperationCustom.asObservable() will emit error"() {
given:
def radioOperationCustom = customRadioOperationWithOutcome { Observable.error(new RuntimeException()) }

when:
objectUnderTest.queue(radioOperationCustom).subscribe(testSubscriber)

then:
testSubscriber.assertError(RuntimeException.class)
}

def "should pass error and release the radio if observable returned from RxBleRadioOperationCustom.asObservable() will emit error"() {
def "should release the radio if observable returned from RxBleRadioOperationCustom.asObservable() will emit error"() {
given:
def radioOperationCustom = customRadioOperationWithOutcome { Observable.error(new RuntimeException()) }

Expand All @@ -321,7 +334,17 @@ class RxBleConnectionTest extends Specification {

then:
flatRadio.semaphore.isReleased()
testSubscriber.assertError(RuntimeException.class)
}

def "should pass completion to subscriber when observable returned from RxBleRadioOperationCustom.asObservable() will complete"() {
given:
def radioOperationCustom = customRadioOperationWithOutcome { Observable.empty() }

when:
objectUnderTest.queue(radioOperationCustom).subscribe(testSubscriber)

then:
testSubscriber.assertCompleted()
}

def "should release the radio when observable returned from RxBleRadioOperationCustom.asObservable() will complete"() {
Expand All @@ -333,7 +356,6 @@ class RxBleConnectionTest extends Specification {

then:
flatRadio.semaphore.isReleased()
testSubscriber.assertCompleted()
}

def "should throw illegal argument exception if RxBleRadioOperationCustom.asObservable() return null"() {
Expand All @@ -344,10 +366,20 @@ class RxBleConnectionTest extends Specification {
objectUnderTest.queue(radioOperationCustom).subscribe(testSubscriber)

then:
flatRadio.semaphore.isReleased()
testSubscriber.assertError(IllegalArgumentException.class)
}

def "should release radio if RxBleRadioOperationCustom.asObservable() return null"() {
given:
def radioOperationCustom = customRadioOperationWithOutcome { null }

when:
objectUnderTest.queue(radioOperationCustom).subscribe(testSubscriber)

then:
flatRadio.semaphore.isReleased()
}

@Unroll
def "should release the radio when observable returned from RxBleRadioOperationCustom.asObservable() will terminate even if was unsubscribed before"() {

Expand Down

0 comments on commit d2ff2da

Please sign in to comment.