Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+tck #284 support "demand when all downstreams demand" Processor in TCK #287

Conversation

ktoso
Copy link
Contributor

@ktoso ktoso commented Jul 12, 2015

Resolves #284 not being able to handle "processor that awaits both downstreams before pulling from upstream" in test case required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError as reported by @RuedigerMoeller.

The test-case required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo pointed out in that ticket will be addressed in a separate PR.

@ktoso
Copy link
Contributor Author

ktoso commented Jul 12, 2015

Please review @reactive-streams/contributors @viktorklang :-)

@ktoso ktoso force-pushed the wip-284-emit-once-all-request-multi-ktoso branch 3 times, most recently from 89a4dc3 to 55b1140 Compare July 12, 2015 21:01
expectRequest();
final T x = sendNextTFromUpstream();
expectNextElement(sub1, x);
sub1.request(1);

// sub1 has received one element, and has one demand pending
// sub2 has not yet requested anything
// sub2 has received one element, and no more pending demand
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is that:

  • we did not request from sub2, which made it impossible for "lazy, wait for both" processors to start requesting from upstream
  • now we request at least one element from each downstream, causing one element to be signalled down
  • we now check that the error signal overtakes the outstanding elements, which was the goal of the test

@smaldini
Copy link
Contributor

very nice!


@Override
public void request(final long n) {
new Thread(new Runnable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to create a new Thread for each request call? Is this the only solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by using the already present execution service here instead of new Thread

@viktorklang
Copy link
Contributor

@ktoso I just noticed this one. Is this ready for merge? What do we need to proceed with it?

@ktoso
Copy link
Contributor Author

ktoso commented Aug 29, 2015

Oh, and I seem to have missed your review comments in July hm.
I think it is an important fix, I'll address the comments and ping.

@viktorklang
Copy link
Contributor

@ktoso let me know when the ping is coming :)

@ktoso
Copy link
Contributor Author

ktoso commented Sep 14, 2015

Thanks for pinging me back, fell through the cracks due to vacation :) soon

@viktorklang
Copy link
Contributor

No worries, just making sure it wasn't dropped on the floor :)

@ktoso ktoso force-pushed the wip-284-emit-once-all-request-multi-ktoso branch 2 times, most recently from bc13273 to 4d09161 Compare September 15, 2015 23:07
@ktoso
Copy link
Contributor Author

ktoso commented Sep 15, 2015

@viktorklang addressed the comments, please have a look :)

@Override
public void subscribe(final Subscriber<? super Integer> s) {
int subscriberCount = subs.size();
if (subscriberCount == 0) s.onSubscribe(createSubscription(awaitLatch, s, demand1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistic comment but I think I'd prefer a switch here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much more lines, I can convert if you'd prefer that :)

            int subscriberCount = subs.size();
            switch (subscriberCount) {
              case 0:
                s.onSubscribe(createSubscription(awaitLatch, s, demand1));
                break;
              case 1:
                s.onSubscribe(createSubscription(awaitLatch, s, demand2));
                break;
              default:
                throw new RuntimeException(String.format("This for-test-purposes-processor supports only 2 subscribers, yet got %s!", subscriberCount));
            }

@ktoso ktoso force-pushed the wip-284-emit-once-all-request-multi-ktoso branch from 4d09161 to 7fbd565 Compare September 16, 2015 23:57
@ktoso
Copy link
Contributor Author

ktoso commented Sep 16, 2015

Thanks for the last round of comments @viktorklang, addressed all of them (comments in line explain how)

@ktoso
Copy link
Contributor Author

ktoso commented Sep 25, 2015

Now it's my turn to ping 😉 @viktorklang

@viktorklang
Copy link
Contributor

Thanks @ktoso I'll have a look ASAP :)

@ktoso
Copy link
Contributor Author

ktoso commented Oct 22, 2015

ping @viktorklang ;) merge here as well to celebrate the merge day? ;)

@viktorklang
Copy link
Contributor

@ktoso Sorry, I've been swamped.

I'll have a look now.

@ktoso
Copy link
Contributor Author

ktoso commented Oct 22, 2015

I know, no worries :)

@Override
public void run() {
if (demand.get() >= 0) {
demand.addAndGet(n);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race-condition here between addAndGet and cancel

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may work but can overflow demand into a negative value.

@viktorklang
Copy link
Contributor

@ktoso Reviewed. There's a (to me) scary-looking custom Processor implementation.

@ktoso
Copy link
Contributor Author

ktoso commented Oct 22, 2015

Thanks a lot for reviewing – Ouch, so got more work to do here it seems - perfect for the flight tomorrow though :)

@viktorklang
Copy link
Contributor

@ktoso No worries. Processor is the trickiest thing to implement.


@Override
public void subscribe(final Subscriber<? super Integer> s) {
int subscriberCount = subs.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work if subscribe is called from different threads concurrently.

@viktorklang
Copy link
Contributor

@ktoso What's the plan here?

@ktoso
Copy link
Contributor Author

ktoso commented Apr 6, 2016

Need to get back to it finally (adding to todolist), day job consuming entire awake-time recently.

@ktoso
Copy link
Contributor Author

ktoso commented Jun 17, 2017

This is a pretty ancient PR - is there enough interest such that I should attempt reviving it?
It is not needed for the next minor release I think

@viktorklang
Copy link
Contributor

@ktoso Well, it's 2 years old so no hurry, for sure. Since we have no idea if/when there'll be a 1.0.2 depending on how much work it is to fix it, it might be better to do it now.

@viktorklang
Copy link
Contributor

@ktoso Did we want to make this into 1.0.1?

@ktoso
Copy link
Contributor Author

ktoso commented Jun 26, 2017

I don't think it's critical to include this.
I will however give it another look during my flight home today (pretty long one).

@viktorklang
Copy link
Contributor

@ktoso Cool. Thanks. Let me know if there's anything else I should include in the #379

@viktorklang
Copy link
Contributor

@ktoso Perhaps we can work on this together? I'd love to be able to either merge or close this issue, so it doesn't linger on. Let me know what you need from me.

@ktoso
Copy link
Contributor Author

ktoso commented Nov 10, 2017

I completely don't remember what this was about right now hm... If you'd be able to check was was missing here please do, currently in a time critical work thing sadly and can't look into this PR.

@viktorklang
Copy link
Contributor

viktorklang commented Nov 10, 2017 via email

@ktoso
Copy link
Contributor Author

ktoso commented Nov 22, 2017

Yeah I honestly also don't remember... would need some detective work ~:

@akarnokd
Copy link
Contributor

One can implement a Processor in a way that given a set of Subscribers, when all have expressed the number of items each want, the Processor picks the smallest amount and requests that from the upstream Subscription. Basically it paces the item production from the upstream to the slowest consumer (lockstep).

The TCK, however, expects that requesting from a Processor known to have data ready should be fulfilled immediately regardless the other Subscribers' state. I.e., the test subscribes with multiple test Subscribers and requests on one of them, then checks if items have arrived or not. Because the other Subscribers had no chance to request, a Processor won't request items from its upstream and won't emit to the first test Subscriber, causing test failures.

@viktorklang
Copy link
Contributor

@akarnokd Hmmm, I see… Do you have any proposal for how we could fix this limitation in the TCK?

@akarnokd
Copy link
Contributor

I see two options:

  1. Introduce a method IndentityProcessorVerification.doesCoordinatedEmission() (returns boolean, default false) and then split the test body based on its value.
  2. Introduce peeking method(s) on the test subscriber to detect the lack of emission to the first subscriber without failing the test, then issue the request for the second subscriber and repeat the assertion on the first subscriber.

@viktorklang
Copy link
Contributor

@akarnokd It would seem as the first option would be the least involved/error-prone one. Would you like to take a stab at that?

@akarnokd
Copy link
Contributor

Sure.

@viktorklang
Copy link
Contributor

@akarnokd Awesome

@akarnokd
Copy link
Contributor

akarnokd commented Dec 4, 2017

Close this via #414?

@viktorklang
Copy link
Contributor

@akarnokd Yes, thanks! :)

@viktorklang viktorklang closed this Dec 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants