Skip to content

Commit 06eda3c

Browse files
dgafkagitbook-bot
authored andcommitted
GITBOOK-876: No subject
1 parent f8e3d56 commit 06eda3c

8 files changed

+271
-18
lines changed

SUMMARY.md

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@
166166
* [OpenTelemetry (Tracing and Metrics)](modules/opentelemetry-tracing-and-metrics/README.md)
167167
* [Configuration](modules/opentelemetry-tracing-and-metrics/configuration.md)
168168
* [RabbitMQ Support](modules/amqp-support-rabbitmq.md)
169+
* [Kafka Support](modules/kafka-support.md)
169170
* [DBAL Support](modules/dbal-support.md)
170171
* [Amazon SQS Support](modules/amazon-sqs-support.md)
171172
* [Redis Support](modules/redis-support.md)

modelling/asynchronous-handling/delaying-messages.md

+50-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
In case of Ecotone we don't delay whole Message, but specific Message Handler. \
44
This helps in scenarios when we have multiple Event Handler and we would like to configure the delay differently. For example may we have a case, where as a result of Order being placed, we would want to delay notification, yet to call Payment Service right away. 
55

6-
## Static Delay
6+
{% hint style="info" %}
7+
[Asynchronous Message Channel ](./)need to support delays, in order to make use of this feature.
8+
{% endhint %}
9+
10+
## Message Handler Delay
711

812
You may delay handling given asynchronous message by adding `#[Delayed]` attribute.
913

@@ -17,9 +21,38 @@ public function sendWelcomeNotificationlWhen(UserWasRegistered $event): void
1721
}
1822
```
1923

20-
The delay is defined in milliseconds.
24+
### Using Expression language
25+
26+
To dynamically calculate expected delay, we can use expression language.
27+
28+
```php
29+
#[Delayed(expression: 'payload.dueDate']
30+
#[Asynchronous("orders")]
31+
#[EventHandler(endpointId: "cancelOrder")]
32+
public function cancelOrderIfExpired(OrderWasPlaced $event): void
33+
{
34+
// it will trigger at payload.dueDate, which is \DateTime object
35+
}
36+
```
37+
38+
{% hint style="success" %}
39+
**payload** variable in expression language will hold **Command/Event object.** \
40+
**headers** variable will hold all related **Mesage Headers**.
41+
{% endhint %}
2142

22-
## Dynamic Delay
43+
We could also **access any object from our Dependency Container,** in order to calculate the delay and pass there our **Command**:
44+
45+
```php
46+
#[Delayed(expression: 'reference("delayService").calculate(payload)']
47+
#[Asynchronous("orders")]
48+
#[EventHandler(endpointId: "cancelOrder")]
49+
public function cancelOrderIfExpired(OrderWasPlaced $event): void
50+
{
51+
52+
}
53+
```
54+
55+
## Message Delay
2356

2457
We may send an Message and tell Ecotone to delay it using **deliveryDelay** Message Header:
2558

@@ -31,6 +64,18 @@ $commandBus->sendWithRouting(
3164
);
3265
```
3366

34-
{% hint style="info" %}
35-
[Asynchronous Message Channel ](./)need to support this option to be used
67+
{% hint style="success" %}
68+
If Message Delay would be send for Event. Then all subscribing Event Handlers would be delayed. For customizing it on the single Handler level, use Message Handler delay.
3669
{% endhint %}
70+
71+
### Delaying using exact Date
72+
73+
We may also delay to given date time:
74+
75+
```php
76+
$commandBus->sendWithRouting(
77+
"askForOrderReview",
78+
"userId",
79+
metadata: ["deliveryDelay" => (new \DateTimeImmutable)->modify('+1 day')]
80+
);
81+
```

modelling/asynchronous-handling/message-priority.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ For example we may send a lot of different notifications, however when Customer
7171
[Asynchronous Message Channel ](./)need to support this option to be used
7272
{% endhint %}
7373

74-
## Static Priority
74+
## Message Handler Priority
7575

7676
You may prioritize handling given asynchronous message by adding `#[Priority]` attribute.
7777

@@ -93,7 +93,7 @@ public function sendAuthenticationToken(RequestToken $command): void
9393
}
9494
```
9595

96-
## Dynamic Priority
96+
## Message Priority
9797

9898
We may send an Message and tell Ecotone to prioritize it using **priority** Message Header:
9999

modelling/asynchronous-handling/time-to-live.md

+32-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
We may define Time to Live for Messages. This way if Message will not be handled within specific amount of time, it will be automatically discarded. \
44
This is useful in scenarios like sending notifications, where given notification like One Time Password may actually have meaning only for 5 minutes. 
55

6-
## Static Time to Live
6+
## Message Handler Time to Live
77

88
You may delay handling given asynchronous message by adding `#[Delayed]` attribute.
99

@@ -17,9 +17,38 @@ public function sendWelcomeNotificationWhen(UserWasRegistered $event): void
1717
}
1818
```
1919

20-
The delay is defined in milliseconds.
20+
### Using Expression language
2121

22-
## Dynamic Time to Live
22+
To dynamically calculate expected TTL, we can use expression language.
23+
24+
```php
25+
#[TimeToLive(expression: 'headers["expirationTime"]']
26+
#[Asynchronous("notifications")]
27+
#[EventHandler(endpointId: "welcomeEmail")]
28+
public function sendWelcomeNotificationWhen(UserWasRegistered $event): void
29+
{
30+
// handle welcome notification
31+
}
32+
```
33+
34+
{% hint style="success" %}
35+
**payload** variable in expression language will hold **Command/Event object.** \
36+
**headers** variable will hold all related **Mesage Headers**.
37+
{% endhint %}
38+
39+
We could also **access any object from our Dependency Container,** in order to calculate the delay and pass there our **Command**:
40+
41+
```php
42+
#[Delayed(expression: 'reference("delayService").calculate(payload)']
43+
#[Asynchronous("notifications")]
44+
#[EventHandler(endpointId: "welcomeEmail")]
45+
public function sendWelcomeNotificationWhen(UserWasRegistered $event): void
46+
{
47+
48+
}
49+
```
50+
51+
## Message Time to Live
2352

2453
We may send an Message and tell Ecotone to delay it using **deliveryDelay** Message Header:
2554

modelling/testing-support/testing-aggregates-and-sagas-with-message-flows.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public function test_success_verification_after_registration()
165165
$testSupport
166166
->sendCommand(new RegisterUser($userId, "John Snow", Email::create('[email protected]'), PhoneNumber::create('148518518518')))
167167
->discardRecordedMessages()
168-
->releaseAwaitingMessagesAndRunConsumer("asynchronous_messages", 1000 * 60 * 60)
168+
->run("asynchronous_messages", releaseAwaitingFor: TimeSpan::withHours(24))
169169
->getRecordedCommandsWithRouting()
170170
);
171171
}

modelling/testing-support/testing-asynchronous-messaging.md

+3-5
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,12 @@ $ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
171171
);
172172

173173
$ecotoneTestSupport
174-
->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'))
175-
->run('notifications', ExecutionPollingMetadata::createWithTestingSetup());
174+
->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'));
176175

177176
// 2. Releasing messages awaiting for 60 seconds
178-
$ecotoneTestSupport->releaseAwaitingMessagesAndRunConsumer(
177+
$ecotoneTestSupport->run(
179178
'orders',
180-
1000 * 60,
181-
ExecutionPollingMetadata::createWithTestingSetup()
179+
releaseAwaitingFor: TimeSpan::withSeconds(60)
182180
);
183181

184182
$this->assertEquals(

modules/amqp-support-rabbitmq.md

+14-2
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ class MessagingConfiguration
7979

8080
Now `orders` channel will be available in our Messaging System. 
8181

82+
{% hint style="success" %}
83+
Message Channels simplify to the maximum integration with Message Broker. \
84+
From application perspective all we need to do, is to provide channel implementation.\
85+
Ecotone will take care of whole publishing and consuming part. 
86+
{% endhint %}
87+
8288
### Message Channel Configuration
8389

8490
```php
@@ -104,9 +110,15 @@ public function orderChannel()
104110
}
105111
```
106112

107-
## Distributed Publisher and Consumer
113+
## Custom Publisher and Consumer
108114

109-
To create [distributed publisher or consumer](../modelling/microservices-php/) provide [Service Context](../messaging/service-application-configuration.md).
115+
To create [custom publisher or consumer](../modelling/microservices-php/) provide [Service Context](../messaging/service-application-configuration.md).
116+
117+
{% hint style="success" %}
118+
Message Channels simplify to the maximum integration with Message Broker. \
119+
From application perspective all we need to do, is to provide channel implementation.\
120+
Ecotone will take care of whole publishing and consuming part. 
121+
{% endhint %}
110122

111123
### Distributed Publisher
112124

modules/kafka-support.md

+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# Kafka Support
2+
3+
{% hint style="success" %}
4+
This module is available as part of **Ecotone Enterprise.**
5+
{% endhint %}
6+
7+
## Installation
8+
9+
```php
10+
composer require ecotone/kafka
11+
```
12+
13+
## Configuration
14+
15+
In order to use **Kafka Support** we need to add **KafkaBrokerConfiguration** to our **Dependency Container.** 
16+
17+
{% tabs %}
18+
{% tab title="Symfony" %}
19+
```php
20+
# config/services.yaml
21+
# You need to have RabbitMQ instance running on your localhost, or change DSN
22+
Ecotone\Kafka\Configuration\KafkaBrokerConfiguration:
23+
class: Ecotone\Kafka\Configuration\KafkaBrokerConfiguration
24+
arguments:
25+
bootstrapServers:
26+
- localhost:9094
27+
```
28+
{% endtab %}
29+
30+
{% tab title="Laravel" %}
31+
```php
32+
# Register AMQP Service in Provider
33+
34+
use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;
35+
36+
public function register()
37+
{
38+
$this->app->singleton(KafkaBrokerConfiguration::class, function () {
39+
return new KafkaBrokerConfiguration(['localhost:9094']);
40+
});
41+
}
42+
```
43+
{% endtab %}
44+
45+
{% tab title="Lite" %}
46+
```php
47+
use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;
48+
49+
$application = EcotoneLiteApplication::boostrap(
50+
[
51+
KafkaBrokerConfiguration::class => new KafkaBrokerConfiguration(['localhost:9094'])
52+
]
53+
);
54+
```
55+
{% endtab %}
56+
{% endtabs %}
57+
58+
{% hint style="info" %}
59+
We register our **KafkaBrokerConfiguration** under the class name **Ecotone\Kafka\Configuration\KafkaBrokerConfiguration**. This will help Ecotone resolve it automatically, without any additional configuration.
60+
{% endhint %}
61+
62+
## Message Channel
63+
64+
To create Kafka Backed [Message Channel](../modelling/asynchronous-handling/), we need to create [Service Context](../messaging/service-application-configuration.md). 
65+
66+
```php
67+
class MessagingConfiguration
68+
{
69+
#[ServiceContext]
70+
public function orderChannel()
71+
{
72+
return KafkaMessageChannelBuilder::create("orders");
73+
}
74+
}
75+
```
76+
77+
Now `orders` channel will be available in our Messaging System. 
78+
79+
{% hint style="success" %}
80+
Message Channels simplify to the maximum integration with Message Broker. \
81+
From application perspective all we need to do, is to provide channel implementation.\
82+
Ecotone will take care of whole publishing and consuming part. 
83+
{% endhint %}
84+
85+
### Customize Topic Name
86+
87+
By default the queue name will follow channel name, which in above example will be "orders".\
88+
However we can use "orders" as reference name in our Application, yet name queue differently:
89+
90+
```php
91+
#[ServiceContext]
92+
public function orderChannel()
93+
{
94+
return KafkaMessageChannelBuilder::create(
95+
channelName: "orders",
96+
topicName: "crm_orders"
97+
);
98+
}
99+
```
100+
101+
### Customize Group Id
102+
103+
We can also customize the group id, which by default with following channel name:
104+
105+
```php
106+
#[ServiceContext]
107+
public function orderChannel()
108+
{
109+
return KafkaMessageChannelBuilder::create(
110+
channelName: "orders",
111+
groupId: "crm_application"
112+
);
113+
}
114+
```
115+
116+
## Custom Publisher and Consumer
117+
118+
To create [custom publisher or consumer](../modelling/microservices-php/) provide [Service Context](../messaging/service-application-configuration.md).
119+
120+
{% hint style="success" %}
121+
Custom Publishers and Consumers are great for building integrations for existing infrastructure or setting up a customized way to communicate between applications. With this you can take over the control of what is published and how it's consumed.
122+
{% endhint %}
123+
124+
### Custom Publisher
125+
126+
```php
127+
class MessagingConfiguration
128+
{
129+
#[ServiceContext]
130+
public function distributedPublisher()
131+
{
132+
return KafkaPublisherConfiguration::createWithDefaults(
133+
topicName: 'orders'
134+
);
135+
}
136+
}
137+
```
138+
139+
Then Publisher will be available for us in Dependency Container under **MessagePublisher** reference.
140+
141+
### Custom Consumer
142+
143+
To set up Consumer, consuming from given topics, all we need to do, is to mark given method with KafkaConsumer attribute:
144+
145+
```php
146+
#[KafkaConsumer('ordersConsumers', 'orders')]
147+
public function handle(string $payload, array $metadata): void
148+
{
149+
// do something
150+
}
151+
```
152+
153+
Then we run it as any other [asynchronous consumer](../modelling/asynchronous-handling/), using **ordersConsumer** name.
154+
155+
### Custom Topic Configuration
156+
157+
We can also customize topic configuration. For example to create reference name for Consumers and publishers, which internally in Kafka will map to different name
158+
159+
```php
160+
class MessagingConfiguration
161+
{
162+
#[ServiceContext]
163+
public function distributedPublisher()
164+
{
165+
return TopicConfiguration::createWithReferenceName("orders", 'crm_orders');
166+
}
167+
}
168+
```

0 commit comments

Comments
 (0)