Skip to content

Latest commit

 

History

History
174 lines (138 loc) · 7.32 KB

04-grpc.md

File metadata and controls

174 lines (138 loc) · 7.32 KB

gRPC 协议

EventMesh Java SDK 实现了 gRPC 同步、异步和广播消息的生产者和消费者。二者都需要一个 EventMeshHttpClientConfig 类实例来指定 EventMesh gRPC 客户端的配置信息。其中的 liteEventMeshAddruserNamepassword 字段需要和 EventMesh runtime eventmesh.properties 文件中的相匹配。

import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import io.cloudevents.CloudEvent;

public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
  public static void main(String[] args) throws InterruptedException {
    EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
      .serverAddr("localhost")
      .serverPort(10205)
      .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
      .env("env").idc("idc")
      .sys("1234").build();
    /* ... */
  }
}

gRPC 消费者

流消费者

EventMesh runtime 会将来自生产者的信息作为一系列事件流向流消费者发送。消费者应实现 ReceiveHook 类,其被定义在 ReceiveMsgHook.java

public interface ReceiveMsgHook<T> {
    Optional<T> handle(T msg) throws Throwable;
    String getProtocolType();
}

EventMeshGrpcConsumer 实现了 registerListenersubscribeunsubscribe 方法。subscribe 方法接收一个 SubscriptionItem 对象的列表,其中定义了要订阅的话题。registerListener 接收一个实现了 ReceiveMsgHook 的实例。handle 方法将会在消费者收到订阅的主题消息时被调用。如果 SubscriptionTypeSYNChandle 的返回值将被发送回生产者。

import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import io.cloudevents.CloudEvent;

public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
    public static CloudEventsAsyncSubscribe handler = new CloudEventsAsyncSubscribe();
    public static void main(String[] args) throws InterruptedException {
        /* ... */
        SubscriptionItem subscriptionItem = new SubscriptionItem(
          "eventmesh-async-topic",
          SubscriptionMode.CLUSTERING,
          SubscriptionType.ASYNC
        );
        EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);

        eventMeshGrpcConsumer.init();
        eventMeshGrpcConsumer.registerListener(handler);
        eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem));
        /* ... */
        eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem));
    }

    @Override
    public Optional<CloudEvent> handle(CloudEvent message) {
      log.info("Messaged received: {}", message);
      return Optional.empty();
    }

    @Override
    public String getProtocolType() {
      return EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME;
    }
}

Webhook 消费者

EventMeshGrpcConsumersubscribe 方法接收一个 SubscriptionItem 对象的列表,其中定义了要订阅的主题和一个可选的 timeout 值。如果提供了回调 URL,EventMesh runtime 将向回调 URL 地址发送一个包含 CloudEvents 格式 消息的 POST 请求。SubController.java 实现了一个接收并解析回调信息的 Spring Boot controller。

import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;

@Component
public class SubService implements InitializingBean {
  final String url = "http://localhost:8080/callback";

  public void afterPropertiesSet() throws Exception {
    /* ... */
    eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
    eventMeshGrpcConsumer.init();

    SubscriptionItem subscriptionItem = new SubscriptionItem(
      "eventmesh-async-topic",
      SubscriptionMode.CLUSTERING,
      SubscriptionType.ASYNC
    );

    eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem), url);
    /* ... */
    eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem), url);
  }
}

gRPC 生产者

异步生产者

EventMeshGrpcProducer 实现了 publish 方法。publish 方法接收将被发布的消息和一个可选的 timeout 值。消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent
/* ... */
EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig);
eventMeshGrpcProducer.init();

Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");

CloudEvent event = CloudEventBuilder.v1()
  .withId(UUID.randomUUID().toString())
  .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
  .withSource(URI.create("/"))
  .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
  .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
  .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
  .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
  .build();
eventMeshGrpcProducer.publish(event);

同步生产者

EventMeshGrpcProducer 实现了 requestReply 方法。requestReply 方法接收将被发布的消息和一个可选的 timeout 值。方法会返回消费者返回的消息。消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent

批量生产者

EventMeshGrpcProducer 重写了 publish 方法,该方法接收一个将被发布的消息列表和一个可选的 timeout 值。列表中的消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent
/* ... */
List<CloudEvent> cloudEventList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
  CloudEvent event = CloudEventBuilder.v1()
    .withId(UUID.randomUUID().toString())
    .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
    .withSource(URI.create("/"))
    .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
    .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
    .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
    .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
    .build();

  cloudEventList.add(event);
}

eventMeshGrpcProducer.publish(cloudEventList);
/* ... */