Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List of previously found problems (will be updated) #438

Open
lifinsky opened this issue Feb 3, 2025 · 28 comments
Open

List of previously found problems (will be updated) #438

lifinsky opened this issue Feb 3, 2025 · 28 comments

Comments

@lifinsky
Copy link
Contributor

lifinsky commented Feb 3, 2025

Ecotone version(s) affected: latest

Description

  1. If there is an error in the message payload converter for a distributed endpoint, then the message does not end up in the dead letter. Let's try to write a test for this.

  2. When several event sourcing aggregates and projections for them are processed in an asynchronous endpoint, then in the case of an incorrect (already renamed) event class name in one of the streams with a delayed retry, we get many records in the projections without the changes themselves in the stream - it feels like a complete rollback of the transaction is not happening. We plan to also write a test to reproduce this.

  3. Our projections all work synchronously with aggregates, while they receive a strange status in the projections table, either idle or running for the same events

  4. How to solve a situation when some exceptions should go to delayed retry, while others should be processed through a custom ServiceActivator handler? For now we are making one common ServiceActivator for the error channel and looking at the exception class there - but perhaps it’s worth providing a better option?

  5. For synchronous projection, what will happen in the case of gap detection (is such a scenario possible when the projection lags behind the event sourcing aggregate stream)? Can't there be a situation where the aggregate stream is updated after a sync retry (for example, due to OptimisticLockException), but the projection remains unupdated and waits for the next events in the stream?

  6. There was a case where an AMQP consumer got stuck on an event-sourcing aggregate command, and in the queue, it was visible that the message was received but not acknowledged. The last log message was: Executing Command Handler...
    It’s possible that this behavior is somehow related to defaultMemoryLimit: 256. It's hard to determine the exact cause since this happened only once so far. The first pod restart didn’t help, but after I completely deleted the deployment and started a new pod, all messages were processed successfully.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 3, 2025

@dgafka Hi! Perhaps in your practice you have experienced the problems described here, which we sometimes see when testing our solution.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 4, 2025

@dgafka Point 6 is particularly interesting...
UPD: The issue has occurred again, and it got stuck once more with the last log message: Executing Command Handler *\Domain\Entity\Card::close

UPD2: There are pending queries to the database: DELETE FROM ecotone_deduplication WHERE handled_at <= $1

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 5, 2025

@dgafka I'll monitor which transactions might remain open for too long and in which cases this happens. But if you can, please check whether rollbacks are being done correctly everywhere and whether the deletion from ecotone_deduplication can be done periodically with a low frequency and definitely not within the main transaction.

SELECT pid, usename, client_addr, application_name, state,
       age(clock_timestamp(), xact_start) AS transaction_age,
       query, query_start
FROM pg_stat_activity
WHERE state <> 'idle'
  AND xact_start IS NOT NULL
  AND age(clock_timestamp(), xact_start) > interval '1 minute'
ORDER BY transaction_age DESC;

@dgafka
Copy link
Member

dgafka commented Feb 5, 2025

  1. Hmm that should not be a case, where does it land then, is it requeued?
  2. Yea test case for that would be useful, as if all your projections are synchronous, then I don't see the way in which half state could be committed.
  3. Ecotone projections are based at this moment on Prooph Projections, and this is implementation details to protect against concurrency. However there is a plan to replace Prooph Projections with Projections inbuilt within Ecotone.
  4. Ye I saw your other comment about custom ErrorChannel per Message Endpoint. If we would build mechanism for this, then we could also build mechanism for skipping transactions for given Endpoints, as basically implementation would use same solution. I will think about this :)

For synchronous projection, what will happen in the case of gap detection (is such a scenario possible when the projection lags behind the event sourcing aggregate stream)

If you have transactions enabled for Command Bus, then projection update is atomic to events being appended to stream (projection trigger is wrapped within same transaction).
Concurrent access for same Aggregate is protected on the Event Stream level, as applying Events with same version, will fail and protect consistency.
However as Prooph projections work on the global level (they do not follow given event stream for aggregate, but whole event log) the problem may manifest in case of concurrent access between Aggregates. In that situation if one transaction started first, yet committed after the second transaction (which started after) it will create gap in sequence numbers.
In such situation the second transaction will actually wait for up 8 seconds for the first one to finish (which will actually make first transaction commit first), and then continue.
So the problem could be created if after those 8 seconds, first transaction has actually not committed and if it's in some idle stay and get committed after that 8 second, it will create gap in projection (because projection is in sequence number after that).
You can increase the timing of 8 seconds, however that comes with cost of performance in case of those situations.
The aim of new projecting system is actually to solve this problem on the root level without fragile gap detection system.
6. Hmmm, feels more like php itself issue. I had such situations in the past, but they were related with calling some external services, when TCP connection was faulty, it basically made php go into zombie mode.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 5, 2025

@dgafka Thank you for your attention.

  1. The message was simply lost without getting into the dead letter. For this we need to write an Application test with the Ecotone lite application, but I don't have time for this yet.

  2. This case is not so critical although it also requires attention.

3, 5. I am especially worried about this moment since there is a requirement to hold the user's balance during authorizations from his cards, and here we need to immediately update both the balance projection and his account statement in the balance before, balance after format, and making separate streams for each user is not an option. The main problem is that we need to respond quickly, otherwise the provider will decline the card authorization.

  1. Disabling transactions is very important because there are handlers, including asynchronous ones, where there is only an API call and either saving to an aggregate without projection, or saving to an outbox or, for example, sending to RabbitMQ. We configure small timeouts for waiting for API requests to be processed and rechecking in the event of a timeout asynchronously, but in general any such calls inside database transactions are not ok. And I don't want to create separate channels for this, and there are also synchronous command handlers

  2. The blocking occurred because there was a transaction with deleting messages from the ecotone_deduplication table and here of course I want to be able to move these garbage collecting to cron and not deal with this during the processing of aggregates or at least make this mechanism with some probability like how PHP deletes sessions. But most likely if we still notice this we will set the value for about a minute for idle_in_transaction_session_timeout.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 6, 2025

@dgafka Hi

SELECT pid, usename, client_addr, application_name, state,
       age(clock_timestamp(), xact_start) AS transaction_age,
       query, query_start
FROM pg_stat_activity
WHERE state <> 'idle'
  AND xact_start IS NOT NULL
  AND age(clock_timestamp(), xact_start) > interval '1 minute'
ORDER BY transaction_age DESC;
Service IP Address State Duration Query
fc_dev 10.11.33.206 idle in transaction 33 min 25.128738 sec SELECT 1
fc_dev 10.11.33.206 idle in transaction 33 min 13.002786 sec SELECT * FROM "_0d40978da2e32dab9b38fa91af861016c7499b28" WHERE metadata->>'_aggregate_type' = $1 AND metadata->>'_aggregate_id' = $2 AND no >= $3 ORDER BY no ASC LIMIT $4;
fc_dev 10.11.48.56 idle in transaction 33 min 10.424348 sec SELECT 1
fc_dev 10.11.32.93 idle in transaction 33 min 7.983414 sec INSERT INTO "_0d40978da2e32dab9b38fa91af861016c7499b28" (event_id, event_name, payload, metadata, created_at) VALUES ($1, $2, $3, $4, $5);
fc_dev 10.11.33.206 idle in transaction 33 min 6.572761 sec SELECT 1
fc_dev 10.11.32.93 idle in transaction 32 min 43.805161 sec SELECT 1
fc_dev 10.11.48.56 idle in transaction 32 min 36.825416 sec SELECT * FROM "_0d40978da2e32dab9b38fa91af861016c7499b28" WHERE metadata->>'_aggregate_type' = $1 AND metadata->>'_aggregate_id' = $2 AND no >= $3 ORDER BY no ASC LIMIT $4;
fc_dev 10.11.63.212 active 15 min 2.648735 sec INSERT INTO "_0d40978da2e32dab9b38fa91af861016c7499b28" (event_id, event_name, payload, metadata, created_at) VALUES ($1, $2, $3, $4, $5);

The issue automatically repeats after a long idle period, especially in the RabbitMQ listener. We will now try to limit the consumer's lifetime using the executionTimeLimit argument to one hour. Additionally, we have increased the Kubernetes graceful shutdown period from 30 seconds to 60 seconds — somewhere, a transaction is not being closed properly or a rollback is not being performed.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 6, 2025

Is it possible to implement periodic closing and reopening of the Doctrine connection for consumers?

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 6, 2025

@dgafka We found several places where the transaction was not closed correctly in the roadrunner worker. I'll write more detailed results and what we came up with based on optimizations.

@dgafka
Copy link
Member

dgafka commented Feb 7, 2025

Is it possible to implement periodic closing and reopening of the Doctrine connection for consumers?

Remember that you can build pretty easily any extension for the Message Consumers yourself. Specific cases does not need to be part of the framework for you to be able to build them :)

So extending asynchronous endpoints is pretty much straightforward, you can read more in this section.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 7, 2025

Is it possible to implement periodic closing and reopening of the Doctrine connection for consumers?

Remember that you can build pretty easily any extension for the Message Consumers yourself. Specific cases does not need to be part of the framework for you to be able to build them :)

So extending asynchronous endpoints is pretty much straightforward, you can read more in this section.

I'm wondering if it's possible to affect DbalTransactionInterceptor (CachedConnectionFactory::createFor(new DbalReconnectableConnectionFactory($connection))) and ObjectManagerInterceptor (ManagerRegistryConnectionFactory[])

@dgafka
Copy link
Member

dgafka commented Feb 7, 2025

Is it possible to implement periodic closing and reopening of the Doctrine connection for consumers?

Remember that you can build pretty easily any extension for the Message Consumers yourself. Specific cases does not need to be part of the framework for you to be able to build them :)
So extending asynchronous endpoints is pretty much straightforward, you can read more in this section.

I'm wondering if it's possible to affect DbalTransactionInterceptor (CachedConnectionFactory::createFor(new DbalReconnectableConnectionFactory($connection))) and ObjectManagerInterceptor (ManagerRegistryConnectionFactory[])

Well that would need to be done through framework, as those are framework related classes. But it will be pretty straight forward to actually roll customized version of it and disable the framework one.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 8, 2025

@dgafka Why might this situation occur, and wouldn't we expect a synchronous retry by Ecotone\Messaging\Support\ConcurrencyException in the aggregate version conflict? Have you encountered this issue before?

Prooph\EventStore\Pdo\Exception\RuntimeException: Error 25P02. 
Error-Info: ERROR:  current transaction is aborted, commands ignored until end of transaction block in /app/vendor/prooph/pdo-event-store/src/Exception/RuntimeException.php:22
Stack trace:
#0 /app/vendor/prooph/pdo-event-store/src/PostgresEventStore.php(189): Prooph\EventStore\Pdo\Exception\RuntimeException::fromStatementErrorInfo()
#1 /app/vendor/ecotone/pdo-event-sourcing/src/Prooph/LazyProophEventStore.php(96): Prooph\EventStore\Pdo\PostgresEventStore->hasStream()
#2 /app/vendor/ecotone/pdo-event-sourcing/src/Prooph/LazyProophEventStore.php(146): Ecotone\EventSourcing\Prooph\LazyProophEventStore->hasStream()
#3 /app/vendor/ecotone/pdo-event-sourcing/src/Prooph/EcotoneEventStoreProophWrapper.php(103): Ecotone\EventSourcing\Prooph\LazyProophEventStore->appendTo()
#4 /app/vendor/ecotone/pdo-event-sourcing/src/EventSourcingRepository.php(130): Ecotone\EventSourcing\Prooph\EcotoneEventStoreProophWrapper->appendTo()

@dgafka
Copy link
Member

dgafka commented Feb 8, 2025

@lifinsky as far as I remember that may happen if after sql exception before doing rollback another SQL is triggered

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 8, 2025

@lifinsky as far as I remember that may happen if after sql exception before doing rollback another SQL is triggered

I understand the technical reason, but there is only an event store and saving to the projection. Perhaps there is a problem with the foreign key or a violation of uniqueness in the projection, but then there should be a rollback? Instant retry is only configured for optimistic exception, so there is no second attempt.

On Monday I will look into this in more detail...

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 8, 2025

So the problem could be created if after those 8 seconds, first transaction has actually not committed and if it's in some idle stay and get committed after that 8 second, it will create gap in projection (because projection is in sequence number after that). You can increase the timing of 8 seconds, however that comes with cost of performance in case of those situations. The aim of new projecting system is actually to solve this problem on the root level without fragile gap detection system.

@dgafka Somehow I can't find information about 8 seconds. Probably you still have this default configuration above.

https://docs.ecotone.tech/modelling/event-sourcing/setting-up-projections/executing-and-managing/running-projections

gap_detection //Default: new \Prooph\EventStore\Pdo\Projection\GapDetection()
It also turns out not to correspond to reality.

@dgafka
Copy link
Member

dgafka commented Feb 9, 2025

@lifinsky yep, you're right, thought the default was different. So to provide greater waiting time, would have to be customized with config.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 9, 2025

@lifinsky yep, you're right, thought the default was different. So to provide greater waiting time, would have to be customized with config.

@dgafka Is it possible to allow modifying the gap detection retry setting at the service configuration level for each projection independently, or at least for all projections, including those running synchronously? I see that the documentation mentions it for pooling projections, but based on the code, it seems that this option is no longer available.

@lifinsky
Copy link
Contributor Author

    #[Distributed]
    #[EventHandler(listenTo: CardIssued::ROUTING_KEY)]
    public function listen(CardIssued $event): void
    {
        $this->registerCard($event);
        $this->createCardTransaction($event);
    }

@dgafka Are distributed endpoints wrapped in a transaction, similar to asynchronous ones? In our current example, two command buses are called independently, and we encountered a situation where the first one executed successfully, but the other failed when reading from the event store (unable to restore from a snapshot due to a missing BackedEnum converter – a very unpleasant situation. It would be great if Ecotone could automatically convert a BackedEnum to a string using its value). As a result, the first command bus persisted its changes to the projection, and now when retrying from the dead letter, we face a unique key duplication error.

This is perhaps one of the most pressing problems facing.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 15, 2025

<?php

use Ecotone\Messaging\Conversion\Converter;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\TypeDescriptor;
use BackedEnum;
use Ecotone\Messaging\Annotation\Converter as MediaTypeConverter;

#[MediaTypeConverter]
class JsonBackedEnumConverter implements Converter
{
    public function matches(TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType): bool
    {
        return ($sourceMediaType->isApplicationJson() && $targetType->isCompatibleWith(TypeDescriptor::create(BackedEnum::class)))
            || ($sourceType->isCompatibleWith(TypeDescriptor::create(BackedEnum::class)) && $targetMediaType->isApplicationJson());
    }

    public function convert($source, TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType)
    {
        if ($source instanceof BackedEnum) {
            return $source->value; 
        }

        if (is_string($source) || is_int($source)) {
            return $targetType->toClassReflection()->getMethod('from')->invoke(null, $source);
        }

        throw new \InvalidArgumentException("Invalid value for BackedEnum conversion");
    }
}

Maybe this should work. Currently, an example of the exception we get after restoring from an aggregate snapshot is:

Can't convert from application/json:string to application/x-php:***\Domain\Entity\CustomerAccountBalance The provided class "***\Domain\Enum\CardTransactionEntityType" is an enum, and cannot be instantiated
<?php

enum MyEnum: string
{
    case VALUE_ONE = 'one';
    case VALUE_TWO = 'two';
}

$source = 'one';

$targetType = new ReflectionClass(MyEnum::class);

$result = $targetType->getMethod('from')->invoke(null, $source);

echo $result->name; // VALUE_ONE

@dgafka
Copy link
Member

dgafka commented Feb 15, 2025

Are distributed endpoints wrapped in a transaction, similar to asynchronous ones? In our current example, two command buses are called independently, and we encountered a situation where the first one executed successfully, but the other failed when reading from the event store

Distributed Command Bus should trigger single Command Handler, not two. Can you provide code example, of what is actually happening?

@lifinsky
Copy link
Contributor Author

Are distributed endpoints wrapped in a transaction, similar to asynchronous ones? In our current example, two command buses are called independently, and we encountered a situation where the first one executed successfully, but the other failed when reading from the event store

Distributed Command Bus should trigger single Command Handler, not two. Can you provide code example, of what is actually happening?

We need to trigger saving the read model in the repository at that place. The command handler is directly in the repository, and the second command saves the event-sourcing aggregate. Of course, we could extract a common service, but I don’t want to do that since the commands are completely different. I was sure that Distributed handler wraps it in a transaction. If that’s not the case, then we need to add the DbalTransaction attribute.

@dgafka
Copy link
Member

dgafka commented Feb 15, 2025

Can't convert from application/json:string to application/x-php:\Domain\Entity\CustomerAccountBalance The provided class "\Domain\Enum\CardTransactionEntityType" is an enum, and cannot be instantiated

You forgot to add Converter for the enum.
https://docs.ecotone.tech/modules/jms-converter#custom-conversions-from-classes.

You should most likely drop snapshots from db, because by JMS will convert it in custom format, yet won't be able to deserialize. So if you add custom converter, the format from your db, will most likely differ.

@dgafka
Copy link
Member

dgafka commented Feb 15, 2025

The command handler is directly in the repository, and the second command saves the event-sourcing aggregate.

So Distributed Command Handler is in repo, and then repo trigger another command?

@lifinsky
Copy link
Contributor Author

The command handler is directly in the repository, and the second command saves the event-sourcing aggregate.

So Distributed Command Handler is in repo, and then repo trigger another command?

No. Distributed event handler in application level and triggers two command handlers.

@lifinsky
Copy link
Contributor Author

lifinsky commented Feb 15, 2025

Can't convert from application/json:string to application/x-php:\Domain\Entity\CustomerAccountBalance The provided class "\Domain\Enum\CardTransactionEntityType" is an enum, and cannot be instantiated

You forgot to add Converter for the enum. https://docs.ecotone.tech/modules/jms-converter#custom-conversions-from-classes.

You should most likely drop snapshots from db, because by JMS will convert it in custom format, yet won't be able to deserialize. So if you add custom converter, the format from your db, will most likely differ.

What do you think about default media type converter for backed enum?
Like this:
#438 (comment)

At the moment we have many similar converters for each enum and any omission leads to problems with snapshots which of course there is no point in doing too often.

@lifinsky
Copy link
Contributor Author

@dgafka Hi. Thank you for enum support and safety snapshots.
So, does the Distributed attribute actually wrap the event handler in a transaction as Asynchronous attribute or not? Or is it necessary to add DbalTransaction when there is more than one command bus call inside the event handler? It seems like this is not explicitly mentioned anywhere in the documentation.

@lifinsky
Copy link
Contributor Author

If we work on resolving and removing deprecation warnings, the framework is getting closer and closer to becoming a core addition, especially for Symfony projects. A huge amount of work was done in 2024 and continues this year. I am also putting in maximum effort to implement and promote it for production projects.

@dgafka
Copy link
Member

dgafka commented Feb 16, 2025

So, does the Distributed attribute actually wrap the event handler in a transaction as Asynchronous attribute or not? Or is it necessary to add DbalTransaction when there is more than one command bus call inside the event handler? It seems like this is not explicitly mentioned anywhere in the documentation.

Distributed is related only to "allowing" given Handler to be executed by Distributed Bus.
Transactional intercepting happens, because given Endpoint has AsynchronousRunningEndpoint attribute.
And as Distributed Handler is polling from asynchronous channel it should, therefore it should be wrapped by transaction.

The case that you call two command handlers inside will actually trigger Transaction Interceptor too, because it does hook on both CommandBus gateway and AsynchronousRunningEndpoint attribute. However those transactions will be ignored.

Therefore, if transacitonal wrapping does not happen, it's a bug, or maybe you're using different connections.
If it does not work for you for some reason, I would need test case scenario, as I quickly took a look, and that attribute is added to Message Endpoint, therefore it should actually be intercepted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants