diff --git a/README.md b/README.md index e308c22..1457859 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ codecs: [ CODEC_JSON ] stream_types: [ STREAM_TYPE_UNARY ] supports_tls: false supports_trailers: false -supports_connect_get: false +supports_connect_get: true supports_message_receive_limit: false ``` @@ -87,7 +87,7 @@ Diagnostic data from the server itself is output in the `out/out.log` file. ### Conformance tests status -Current status: 6/78 tests pass +Current status: 6/79 tests pass Known issues: diff --git a/build.sbt b/build.sbt index 4ef15f7..5882a6d 100644 --- a/build.sbt +++ b/build.sbt @@ -24,6 +24,7 @@ lazy val noPublish = List( lazy val Versions = new { val grpc = "1.68.1" val http4s = "0.23.29" + val logback = "1.5.12" } lazy val core = project @@ -51,6 +52,8 @@ lazy val core = project "org.http4s" %% "http4s-client" % Versions.http4s % Test, "org.scalatest" %% "scalatest" % "3.2.19" % Test, + + "ch.qos.logback" % "logback-classic" % Versions.logback % Test, ), ) @@ -63,7 +66,7 @@ lazy val conformance = project libraryDependencies ++= Seq( "org.http4s" %% "http4s-ember-server" % Versions.http4s, - "ch.qos.logback" % "logback-classic" % "1.5.12" % Runtime, + "ch.qos.logback" % "logback-classic" % Versions.logback % Runtime, ), ) diff --git a/conformance-suite.yaml b/conformance-suite.yaml index a156f3b..65cc930 100644 --- a/conformance-suite.yaml +++ b/conformance-suite.yaml @@ -6,5 +6,5 @@ features: stream_types: [ STREAM_TYPE_UNARY ] supports_tls: false supports_trailers: false - supports_connect_get: false + supports_connect_get: true supports_message_receive_limit: false diff --git a/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala b/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala index 2090e5b..e30ba12 100644 --- a/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala +++ b/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala @@ -19,14 +19,48 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai private val logger = LoggerFactory.getLogger(getClass) - override def unary(request: UnaryRequest, ctx: Metadata): F[(UnaryResponse, Metadata)] = { - val responseDefinition = request.getResponseDefinition + override def unary( + request: UnaryRequest, + ctx: Metadata + ): F[(UnaryResponse, Metadata)] = { + for + payload <- handleUnaryRequest( + request.getResponseDefinition, + Seq(request.toProtoAny), + ctx + ) + yield (UnaryResponse(payload.some), new Metadata()) + } - val trailers = new Metadata() - responseDefinition.responseTrailers.foreach { h => - val key = Metadata.Key.of(h.name, Metadata.ASCII_STRING_MARSHALLER) - h.value.foreach(v => trailers.put(key, v)) - } + override def idempotentUnary( + request: IdempotentUnaryRequest, + ctx: Metadata, + ): F[(IdempotentUnaryResponse, Metadata)] = { + for + payload <- handleUnaryRequest( + request.getResponseDefinition, + Seq(request.toProtoAny), + ctx + ) + yield (IdempotentUnaryResponse(payload.some), new Metadata()) + } + + private def handleUnaryRequest( + responseDefinition: UnaryResponseDefinition, + requests: Seq[com.google.protobuf.any.Any], + ctx: Metadata, + ): F[ConformancePayload] = { + //val trailers = new Metadata() + //responseDefinition.responseTrailers.foreach { h => + // val key = Metadata.Key.of(h.name, Metadata.ASCII_STRING_MARSHALLER) + // h.value.foreach(v => trailers.put(key, v)) + //} + + val requestInfo = ConformancePayload.RequestInfo( + requestHeaders = mkConformanceHeaders(ctx), + timeoutMs = extractTimeout(ctx), + requests = requests + ) val responseData = responseDefinition.response match { case UnaryResponseDefinition.Response.ResponseData(bs) => @@ -37,39 +71,33 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai val status = Status.fromCodeValue(code.value) .withDescription(message.orNull) .augmentDescription( - TextFormat.printToSingleLineUnicodeString( - ConformancePayload.RequestInfo( - requests = Seq(request.toProtoAny) - ).toProtoAny - ) + TextFormat.printToSingleLineUnicodeString(requestInfo.toProtoAny) ) - throw new StatusRuntimeException(status, trailers) + throw new StatusRuntimeException(status) } - val timeout = Option(ctx.get(Metadata.Key.of("grpc-timeout", Metadata.ASCII_STRING_MARSHALLER))) - .map(v => v.substring(0, v.length - 1).toLong / 1000) - - val payload = ConformancePayload( - data = responseData.getOrElse(ByteString.EMPTY), - requestInfo = ConformancePayload.RequestInfo( - requestHeaders = mkConformanceHeaders(ctx), - timeoutMs = timeout, - requests = Seq(request.toProtoAny), - connectGetInfo = None, - ).some - ) - Async[F].sleep(Duration(responseDefinition.responseDelayMs, TimeUnit.MILLISECONDS)) *> - Async[F].pure((UnaryResponse(payload.some), trailers)) + Async[F].pure(ConformancePayload( + responseData.getOrElse(ByteString.EMPTY), + requestInfo.some + )) } + private def keyof(key: String): Metadata.Key[String] = + Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER) + private def mkConformanceHeaders(metadata: Metadata): Seq[Header] = { metadata.keys().asScala.map { key => - Header(key, metadata.getAll(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)).asScala.toSeq) + Header(key, metadata.getAll(keyof(key)).asScala.toSeq) }.toSeq } + private def extractTimeout(metadata: Metadata): Option[Long] = { + Option(metadata.get(keyof("grpc-timeout"))) + .map(v => v.substring(0, v.length - 1).toLong / 1000) + } + override def serverStream( request: ServerStreamRequest, ctx: Metadata @@ -91,8 +119,4 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai ctx: Metadata ): F[(UnimplementedResponse, Metadata)] = ??? - override def idempotentUnary( - request: IdempotentUnaryRequest, - ctx: Metadata - ): F[(IdempotentUnaryResponse, Metadata)] = ??? } diff --git a/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/Main.scala b/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/Main.scala index 0e43feb..3c6c8a3 100644 --- a/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/Main.scala +++ b/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/Main.scala @@ -46,6 +46,7 @@ object Main extends IOApp.Simple { p.withTypeRegistry( TypeRegistry.default .addMessage[connectrpc.conformance.v1.UnaryRequest] + .addMessage[connectrpc.conformance.v1.IdempotentUnaryRequest] .addMessage[connectrpc.conformance.v1.ConformancePayload.RequestInfo] ) } diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRpcHttpRoutes.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRpcHttpRoutes.scala index 19699ef..1c22bad 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRpcHttpRoutes.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRpcHttpRoutes.scala @@ -5,6 +5,7 @@ import cats.effect.Async import cats.effect.kernel.Resource import cats.implicits.* import fs2.compression.Compression +import fs2.{Chunk, Stream} import io.grpc.* import io.grpc.MethodDescriptor.MethodType import io.grpc.stub.MetadataUtils @@ -12,8 +13,9 @@ import org.http4s.* import org.http4s.dsl.Http4sDsl import org.http4s.headers.`Content-Type` import org.ivovk.connect_rpc_scala.http.* -import org.ivovk.connect_rpc_scala.http.Headers.{`Connect-Timeout-Ms`, `X-Test-Case-Name`} +import org.ivovk.connect_rpc_scala.http.Headers.* import org.ivovk.connect_rpc_scala.http.MessageCodec.given +import org.ivovk.connect_rpc_scala.http.QueryParams.* import org.slf4j.{Logger, LoggerFactory} import scalapb.grpc.ClientCalls import scalapb.json4s.{JsonFormat, Printer} @@ -54,6 +56,41 @@ object ConnectRpcHttpRoutes { ipChannel <- InProcessChannelBridge.create(services, configuration.waitForShutdown) yield HttpRoutes.of[F] { + case req@Method.GET -> Root / serviceName / methodName :? EncodingQP(contentType) +& MessageQP(message) => + val grpcMethod = grpcMethodName(serviceName, methodName) + + codecRegistry.byContentType(contentType) match { + case Some(codec) => + given MessageCodec[F] = codec + + val media = Media[F](Stream.chunk(Chunk.array(message.getBytes)), req.headers) + + methodRegistry.get(grpcMethod) match { + // Support GET-requests for all methods until https://github.com/scalapb/ScalaPB/pull/1774 is merged + case Some(entry) if entry.methodDescriptor.isSafe || true => + entry.methodDescriptor.getType match + case MethodType.UNARY => + handleUnary(dsl, entry, media, ipChannel) + case unsupported => + NotImplemented(connectrpc.Error( + code = io.grpc.Status.UNIMPLEMENTED.toConnectCode, + message = s"Unsupported method type: $unsupported".some + )) + case Some(_) => + Forbidden(connectrpc.Error( + code = io.grpc.Status.PERMISSION_DENIED.toConnectCode, + message = s"Method supports calling using POST: $grpcMethod".some + )) + case None => + NotFound(connectrpc.Error( + code = io.grpc.Status.NOT_FOUND.toConnectCode, + message = s"Method not found: $grpcMethod".some + )) + } + case None => + UnsupportedMediaType(s"Unsupported content-type ${contentType.show}. " + + s"Supported content types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}") + } case req@Method.POST -> Root / serviceName / methodName => val grpcMethod = grpcMethodName(serviceName, methodName) val contentType = req.headers.get[`Content-Type`].map(_.mediaType) @@ -79,7 +116,8 @@ object ConnectRpcHttpRoutes { )) } case None => - UnsupportedMediaType(s"Unsupported Content-Type header ${contentType.map(_.show).orNull}") + UnsupportedMediaType(s"Unsupported content-type ${contentType.map(_.show).orNull}. " + + s"Supported content types: ${MediaTypes.allSupported.map(_.show).mkString(", ")}") } } } @@ -88,7 +126,7 @@ object ConnectRpcHttpRoutes { private def handleUnary[F[_] : Async]( dsl: Http4sDsl[F], entry: RegistryEntry, - req: Request[F], + req: Media[F], channel: Channel )(using codec: MessageCodec[F]): F[Response[F]] = { import dsl.* diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MediaTypes.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MediaTypes.scala new file mode 100644 index 0000000..84b63fc --- /dev/null +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MediaTypes.scala @@ -0,0 +1,17 @@ +package org.ivovk.connect_rpc_scala.http + +import org.http4s.MediaType + +import scala.annotation.targetName + +object MediaTypes { + + @targetName("applicationJson") + val `application/json`: MediaType = MediaType.application.json + + @targetName("applicationProto") + val `application/proto`: MediaType = MediaType.unsafeParse("application/proto") + + val allSupported: Seq[MediaType] = List(`application/json`, `application/proto`) + +} diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala index 5f0995f..625eb5c 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala @@ -36,7 +36,7 @@ class JsonMessageCodec[F[_] : Sync : Compression](jsonPrinter: Printer) extends private val logger: Logger = LoggerFactory.getLogger(getClass) - override val mediaType: MediaType = MediaType.application.`json` + override val mediaType: MediaType = MediaTypes.`application/json` override def decode[A <: Message](m: Media[F])(using cmp: Companion[A]): DecodeResult[F, A] = { val charset = m.charset.getOrElse(Charset.`UTF-8`).nioCharset @@ -72,8 +72,7 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] { private val logger: Logger = LoggerFactory.getLogger(getClass) - override val mediaType: MediaType = - MediaType.unsafeParse("application/proto") + override val mediaType: MediaType = MediaTypes.`application/proto` override def decode[A <: Message](m: Media[F])(using cmp: Companion[A]): DecodeResult[F, A] = { val f = toInputStreamResource(decompressed(m)).use { is => diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/QueryParams.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/QueryParams.scala new file mode 100644 index 0000000..cea30a9 --- /dev/null +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/QueryParams.scala @@ -0,0 +1,23 @@ +package org.ivovk.connect_rpc_scala.http + +import org.http4s.dsl.impl.QueryParamDecoderMatcher +import org.http4s.{Charset, MediaType, ParseFailure, QueryParamDecoder} + +import java.net.URLDecoder + +object QueryParams { + + private val encodingQPDecoder = QueryParamDecoder[String].emap { + case "json" => Right(MediaTypes.`application/json`) + case "proto" => Right(MediaTypes.`application/proto`) + case other => Left(ParseFailure(other, "Invalid encoding")) + } + + object EncodingQP extends QueryParamDecoderMatcher[MediaType]("encoding")(encodingQPDecoder) + + private val messageQPDecoder = QueryParamDecoder[String] + .map(URLDecoder.decode(_, Charset.`UTF-8`.nioCharset)) + + object MessageQP extends QueryParamDecoderMatcher[String]("message")(messageQPDecoder) + +} diff --git a/core/src/test/protobuf/TestService.proto b/core/src/test/protobuf/TestService.proto index d1ca16d..f2e27fe 100644 --- a/core/src/test/protobuf/TestService.proto +++ b/core/src/test/protobuf/TestService.proto @@ -4,6 +4,11 @@ package org.ivovk.connect_rpc_scala.test; service TestService { rpc Add(AddRequest) returns (AddResponse) {} + + // This method can be called using GET request + rpc Get(GetRequest) returns (GetResponse) { + option idempotency_level = NO_SIDE_EFFECTS; + } } message AddRequest { @@ -14,3 +19,11 @@ message AddRequest { message AddResponse { int32 sum = 1; } + +message GetRequest { + string key = 1; +} + +message GetResponse { + string value = 1; +} diff --git a/core/src/test/resources/logback.xml b/core/src/test/resources/logback.xml new file mode 100644 index 0000000..276fcaa --- /dev/null +++ b/core/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + + + %-4relative %-5level %logger{35} -%kvp- %msg%n + + + + + + + + \ No newline at end of file diff --git a/core/src/test/scala/org/ivovk/connect_rpc_scala/HttpTest.scala b/core/src/test/scala/org/ivovk/connect_rpc_scala/HttpTest.scala index e15c80a..56c6b04 100644 --- a/core/src/test/scala/org/ivovk/connect_rpc_scala/HttpTest.scala +++ b/core/src/test/scala/org/ivovk/connect_rpc_scala/HttpTest.scala @@ -5,14 +5,17 @@ import cats.effect.unsafe.implicits.global import cats.syntax.all.* import io.grpc.ServerServiceDefinition import org.http4s.client.Client +import org.http4s.dsl.io.Root import org.http4s.headers.`Content-Type` import org.http4s.implicits.* import org.http4s.{Method, *} +import org.ivovk.connect_rpc_scala.http.MediaTypes import org.ivovk.connect_rpc_scala.test.TestService.TestServiceGrpc.TestService -import org.ivovk.connect_rpc_scala.test.TestService.{AddRequest, AddResponse} +import org.ivovk.connect_rpc_scala.test.TestService.{AddRequest, AddResponse, GetRequest, GetResponse} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import java.net.URLEncoder import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters.* @@ -21,12 +24,15 @@ class HttpTest extends AnyFunSuite, Matchers { object TestServiceImpl extends TestService { override def add(request: AddRequest): Future[AddResponse] = Future.successful(AddResponse(request.a + request.b)) + + override def get(request: GetRequest): Future[GetResponse] = { + Future.successful(GetResponse("Key is: " + request.key)) + } } // String-JSON encoder - given [F[_]]: EntityEncoder[F, String] = - EntityEncoder.stringEncoder[F] - .withContentType(`Content-Type`(MediaType.application.json)) + given [F[_]]: EntityEncoder[F, String] = EntityEncoder.stringEncoder[F] + .withContentType(`Content-Type`(MediaTypes.`application/json`)) test("basic") { val services: Seq[ServerServiceDefinition] = Seq( @@ -49,7 +55,41 @@ class HttpTest extends AnyFunSuite, Matchers { } yield { assert(body == """{"sum":3}""") assert(status == Status.Ok) - assert(response.headers.get[`Content-Type`].map(_.mediaType).contains(MediaType.application.json)) + assert(response.headers.get[`Content-Type`].map(_.mediaType).contains(MediaTypes.`application/json`)) + } + } + .unsafeRunSync() + } + + test("GET request") { + val services: Seq[ServerServiceDefinition] = Seq( + TestService.bindService(TestServiceImpl, ExecutionContext.global) + ) + + ConnectRpcHttpRoutes.create[IO](services.toList) + .flatMap { routes => + val client = Client.fromHttpApp(routes.orNotFound) + + val requestJson = URLEncoder.encode("""{"key":"123"}""", Charset.`UTF-8`.nioCharset) + + client.run( + Request[IO]( + Method.GET, + Uri( + path = Root / "org.ivovk.connect_rpc_scala.test.TestService" / "Get", + query = Query.fromPairs("encoding" -> "json", "message" -> requestJson) + ) + ) + ) + } + .use { response => + for { + body <- response.as[String] + status <- response.status.pure[IO] + } yield { + assert(body == """{"value":"Key is: 123"}""") + assert(status == Status.Ok) + assert(response.headers.get[`Content-Type`].map(_.mediaType).contains(MediaTypes.`application/json`)) } } .unsafeRunSync()