From 60fe768ba20ada5103d43d91bb12aaf735009a69 Mon Sep 17 00:00:00 2001 From: johannes karoff Date: Thu, 13 Jun 2024 01:10:29 +0200 Subject: [PATCH] wip --- build.sbt | 24 ++++++ .../src/main/scala/HttpRpcTransport.scala | 60 +++++++++++++++ .../src/main/scala/HttpRpcRoutes.scala | 73 +++++++++++++++++++ project/Deps.scala | 7 ++ 4 files changed, 164 insertions(+) create mode 100644 http4sClient/src/main/scala/HttpRpcTransport.scala create mode 100644 http4sServer/src/main/scala/HttpRpcRoutes.scala diff --git a/build.sbt b/build.sbt index b755aee9..cd57906f 100644 --- a/build.sbt +++ b/build.sbt @@ -74,3 +74,27 @@ lazy val sloth = crossProject(JSPlatform, JVMPlatform) Deps.scalaTest.value % Test :: Nil ).jsSettings(jsSettings) + +lazy val http4sClient = crossProject(JSPlatform, JVMPlatform) + .crossType(CrossType.Pure) + .dependsOn(sloth) + .settings(commonSettings) + .settings( + name := "sloth-http4s-client", + libraryDependencies ++= + Deps.http4s.core.value :: + Deps.http4s.client.value :: + Nil + ).jsSettings(jsSettings) + +lazy val http4sServer = crossProject(JSPlatform, JVMPlatform) + .crossType(CrossType.Pure) + .dependsOn(sloth) + .settings(commonSettings) + .settings( + name := "sloth-http4s-server", + libraryDependencies ++= + Deps.http4s.core.value :: + Deps.http4s.dsl.value :: + Nil + ).jsSettings(jsSettings) diff --git a/http4sClient/src/main/scala/HttpRpcTransport.scala b/http4sClient/src/main/scala/HttpRpcTransport.scala new file mode 100644 index 00000000..0d28b572 --- /dev/null +++ b/http4sClient/src/main/scala/HttpRpcTransport.scala @@ -0,0 +1,60 @@ +package sloth.ext.http4s.client + +import cats.effect.Concurrent +import cats.implicits._ +import org.http4s.client.Client +import org.http4s.{EntityBody, EntityDecoder, EntityEncoder, Headers, HttpVersion, Method, Request, ServerSentEvent, Uri} +import fs2.Stream +import sloth.RequestTransport + +case class HttpRequestConfig( + baseUri: Uri = Uri(path = Uri.Path.Root), + headers: Headers = Headers.empty, + httpVersion: HttpVersion = HttpVersion.`HTTP/1.1`, +) { + def toRequest[F[_]](requestPath: List[String], entityBody: EntityBody[F]): Request[F] = Request[F]( + method = Method.POST, + uri = baseUri / requestPath.mkString("/"), + httpVersion = httpVersion, + headers = headers, + body = entityBody, + ) +} + +object HttpRpcTransport { + def apply[PickleType, F[_]: Concurrent]( + client: Client[F], + )(implicit + encoder: EntityEncoder[F, PickleType], + decoder: EntityDecoder[F, PickleType] + ): RequestTransport[PickleType, F] = apply(client, HttpRequestConfig().pure[F]) + + def apply[PickleType, F[_]: Concurrent]( + client: Client[F], + config: F[HttpRequestConfig] + )(implicit + encoder: EntityEncoder[F, PickleType], + decoder: EntityDecoder[F, PickleType] + ): RequestTransport[PickleType, F] = new sloth.RequestTransport[PickleType, F] { + override def apply(request: sloth.Request[PickleType]): F[PickleType] = for { + config <- config + responseBody <- client.expect[PickleType](config.toRequest(request.path, encoder.toEntity(request.payload).body)) + } yield responseBody + } + + def eventStream[F[_]: Concurrent]( + client: Client[F], + ): RequestTransport[String, Stream[F, *]] = eventStream(client, HttpRequestConfig().pure[F]) + + def eventStream[F[_]: Concurrent]( + client: Client[F], + config: F[HttpRequestConfig] + ): RequestTransport[String, Stream[F, *]] = new sloth.RequestTransport[String, Stream[F, *]] { + override def apply(request: sloth.Request[String]): Stream[F, String] = for { + config <- Stream.eval(config) + response <- Stream.resource(client.run(config.toRequest(request.path, EntityEncoder[F, String].toEntity(request.payload).body))) + event <- response.body.through(ServerSentEvent.decoder[F]) + data <- Stream.fromOption(event.data) + } yield data + } +} \ No newline at end of file diff --git a/http4sServer/src/main/scala/HttpRpcRoutes.scala b/http4sServer/src/main/scala/HttpRpcRoutes.scala new file mode 100644 index 00000000..a1985e45 --- /dev/null +++ b/http4sServer/src/main/scala/HttpRpcRoutes.scala @@ -0,0 +1,73 @@ +package sloth.ext.http4s.server + +import cats.data.OptionT +import cats.implicits._ +import cats.effect.Concurrent +import org.http4s._ +import org.http4s.dsl.Http4sDsl +import fs2.Stream +import sloth.{Router, ServerFailure} + +class HttpRpcRoutes { + + def apply[PickleType: EntityDecoder[F, *]: EntityEncoder[F, *], F[_]: Concurrent]( + router: Router[PickleType, F], + onError: PartialFunction[Throwable, F[Response[F]]] = PartialFunction.empty + ): HttpRoutes[F] = { + val dsl = Http4sDsl[F] + import dsl._ + + HttpRoutes[F] { request => + request.pathInfo.segments match { + case Vector(apiName, methodName) => + val path = List(apiName.decoded(), methodName.decoded()) + val result = router.getFunction(path).traverse { f => + request.as[PickleType].flatMap { payload => + f(payload) match { + case Left(error) => serverFailureToResponse[F](dsl, onError)(error) + case Right(response) => Ok(response) + } + } + } + + OptionT(result) + case _ => OptionT.none + } + } + } + + def eventStream[F[_]: Concurrent]( + router: Router[String, Stream[F, *]], + onError: PartialFunction[Throwable, F[Response[F]]] = PartialFunction.empty + ): HttpRoutes[F] = { + val dsl = Http4sDsl[F] + import dsl._ + + HttpRoutes[F] { request => + request.pathInfo.segments match { + case Vector(apiName, methodName) => + val path = List(apiName.decoded(), methodName.decoded()) + val result = router.getFunction(path).traverse { f => + request.as[String].flatMap { payload => + f(payload) match { + case Left(error) => serverFailureToResponse[F](dsl, onError)(error) + case Right(response) => Ok(response.map(r => ServerSentEvent(data = Some(r)))) + } + } + } + + OptionT(result) + case _ => OptionT.none + } + } + } + + private def serverFailureToResponse[F[_]: Concurrent](dsl: Http4sDsl[F], onError: PartialFunction[Throwable, F[Response[F]]])(failure: ServerFailure): F[Response[F]] = { + import dsl._ + failure match { + case ServerFailure.PathNotFound(_) => NotFound() + case ServerFailure.HandlerError(err) => onError.lift(err).getOrElse(InternalServerError(err.getMessage)) + case ServerFailure.DeserializerError(err) => onError.lift(err).getOrElse(BadRequest(err.getMessage)) + } + } +} diff --git a/project/Deps.scala b/project/Deps.scala index 6bc60d93..2c1a46a8 100644 --- a/project/Deps.scala +++ b/project/Deps.scala @@ -20,4 +20,11 @@ object Deps { val parser = dep("io.circe" %%% "circe-parser" % version) val shapes = dep("io.circe" %%% "circe-shapes" % version) } + + val http4s = new { + private val version = "0.23.24" + val core = dep("org.http4s" %%% "http4s-core" % version) + val dsl = dep("org.http4s" %%% "http4s-dsl" % version) + val client = dep("org.http4s" %%% "http4s-client" % version) + } }