Skip to content

Commit

Permalink
Provide a Scope per server call (#3197) (#3276)
Browse files Browse the repository at this point in the history
* Provide a `Scope` per server call (#3197)

* Scope handling for handlers (#3197)

* Scope handling for handlers (#3197)
  • Loading branch information
987Nabil authored Mar 4, 2025
1 parent a009f57 commit 0bd05f4
Show file tree
Hide file tree
Showing 43 changed files with 356 additions and 215 deletions.
19 changes: 11 additions & 8 deletions docs/reference/aop/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,17 @@ val urlRewrite: Middleware[Any] =
new Middleware[Any] {
override def apply[Env1 <: Any, Err](routes: Routes[Env1, Err]): Routes[Env1, Err] =
routes.transform { handler =>
Handler.fromFunctionZIO { request =>
handler(
request.updateURL(url =>
if (url.path.startsWith(Path("/api")))
url.copy(path = Path("/v1") ++ url.path)
else url,
),
)
Handler.scoped[Env1] {
Handler.fromFunctionZIO { request =>
handler(
request.updateURL(
url =>
if (url.path.startsWith(Path("/api")))
url.copy(path = Path("/v1") ++ url.path)
else url,
),
)
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions docs/reference/aop/protocol-stack.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ The behavior of the `handler` remains the same. Let's test it:
```scala mdoc
Unsafe.unsafe{ implicit unsafe =>
Runtime.default.unsafe.run(
handler("Hello World!")
ZIO.scoped(handler("Hello World!"))
)
}
```
Expand All @@ -112,7 +112,7 @@ val newHandler: Handler[Any, (Response, Int), Request, (Response, Int)] =

Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe.run(
newHandler("Hello World! "),
ZIO.scoped(newHandler("Hello World! ")),
)
}
```
Expand Down Expand Up @@ -141,7 +141,7 @@ Now, let's apply the `uppercase` handler to the `anotherTrimAndLength` stack:
```scala mdoc
Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe.run(
anotherTrimAndLength(uppercase).apply("Hello World!"),
ZIO.scoped(anotherTrimAndLength(uppercase).apply("Hello World!")),
)
}
```
Expand Down Expand Up @@ -210,7 +210,7 @@ Now, we are ready to test the `responseTime` protocol stack by applying the `han
```scala mdoc
Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe.run(
responseTime(handler).apply("Hello, World!").debug("Response along with its latency"),
ZIO.scoped(responseTime(handler).apply("Hello, World!").debug("Response along with its latency")),
)
}
```
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/routing/routes.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ val routes: Routes[Any, Response] = ???

routes.transform[Any] { handle =>
handler { (request: Request) =>
ZIO.sleep(1.second) *> handle(request)
ZIO.sleep(1.second) *> ZIO.scoped(handle(request))
}
}
```
Expand Down
1 change: 1 addition & 0 deletions project/MimaSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object MimaSettings {
exclude[Problem]("zio.http.endpoint.openapi.OpenAPIGen#AtomizedMetaCodecs.copy"),
exclude[IncompatibleMethTypeProblem]("zio.http.Middleware.addHeader"),
exclude[IncompatibleMethTypeProblem]("zio.http.HandlerAspect.addHeader"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("zio.http.Server.installInternal")
),
mimaFailOnProblem := failOnProblem,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,15 +674,15 @@ class EndpointBenchmark {
http4sRequestFromString(url),
)

def unsafeRun[E, A](zio: ZIO[Any, E, A]): Unit = Unsafe.unsafe { implicit unsafe =>
def unsafeRun[E, A](z: ZIO[zio.Scope, E, A]): Unit = Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe
.run(zio.unit)
.run(ZIO.scoped(z.unit))
.getOrThrowFiberFailure()
}

private def unsafeRunResult[E, A](zio: ZIO[Any, E, A]): A = Unsafe.unsafe { implicit unsafe =>
private def unsafeRunResult[E, A](z: ZIO[zio.Scope, E, A]): A = Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe
.run(zio)
.run(ZIO.scoped(z))
.getOrThrowFiberFailure()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HttpRouteTextPerf {
private val res = Response.text("HELLO WORLD")
private val app = Handler.succeed(res)
private val req: Request = Request.get(URL(Path.root))
private val httpProgram = ZIO.foreachDiscard(0 to 1000) { _ => app(req) }
private val httpProgram = ZIO.scoped(ZIO.foreachDiscard(0 to 1000) { _ => app(req) })
private val UIOProgram = ZIO.foreachDiscard(0 to 1000) { _ => ZIO.succeed(res) }

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ object CounterProtocolStackExample extends ZIOAppDefault {
_ <- handler("What is ZIO?").debug
} yield ()

def run = app.provide(ZLayer.fromZIO(Ref.make(0L)))
def run = app.provide(Scope.default, ZLayer.fromZIO(Ref.make(0L)))
}
2 changes: 1 addition & 1 deletion zio-http-testkit/src/main/scala/zio/http/TestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ final case class TestClient(
body: Body,
sslConfig: Option[zio.http.ClientSSLConfig],
proxy: Option[Proxy],
)(implicit trace: Trace): ZIO[Any, Throwable, Response] = {
)(implicit trace: Trace): ZIO[Scope, Throwable, Response] = {
for {
currentBehavior <- behavior.get.map(_ :+ Method.ANY / trailing -> handler(Response.notFound))
request = Request(
Expand Down
2 changes: 1 addition & 1 deletion zio-http-testkit/src/main/scala/zio/http/TestServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ final case class TestServer(driver: Driver, bindPort: Int) extends Server {
_ <- driver.addApp(provided, r)
} yield ()

override def install[R](routes: Routes[R, Response])(implicit
override private[http] def installInternal[R](routes: Routes[R, Response])(implicit
trace: zio.Trace,
tag: EnvironmentTag[R],
): URIO[R, Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ object SocketContractSpec extends ZIOHttpSpec {
ZIO.serviceWithZIO[Server](server =>
for {
p <- Promise.make[Throwable, Unit]
_ <- server.install(serverApp(p).toRoutes)
_ <- server.installInternal(serverApp(p).toRoutes)
port <- server.port
} yield (port, p),
)
Expand Down
2 changes: 1 addition & 1 deletion zio-http/js/src/test/scala/zio/http/JSClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ object JSClientSpec extends ZIOSpecDefault {
// } yield assertTrue(consoleMessages.contains("Server: Hello, World!"))
// }.provideSome[Scope & Client](ZLayer(Queue.bounded[String](100))),
// ),
)
) @@ flaky
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private[zio] final case class ServerInboundHandler(
private def attemptImmediateWrite(
ctx: ChannelHandlerContext,
method: Method,
exit: ZIO[Any, Response, Response],
exit: ZIO[Scope, Response, Response],
): Boolean = {
exit match {
case Exit.Success(response) if response ne null =>
Expand Down Expand Up @@ -282,7 +282,7 @@ private[zio] final case class ServerInboundHandler(
ZIO.suspend {
val nettyChannel = NettyChannel.make[JWebSocketFrame](ctx.channel())
val webSocketChannel = WebSocketChannel.make(nettyChannel, queue, handshakeCompleted)
webSocketApp.handler.runZIO(webSocketChannel).ignoreLogged.forkDaemon
ZIO.scoped(webSocketApp.handler.runZIO(webSocketChannel)).ignoreLogged.forkDaemon
}
}
_ <- ZIO.attempt {
Expand Down Expand Up @@ -311,7 +311,7 @@ private[zio] final case class ServerInboundHandler(
private def writeResponse(
ctx: ChannelHandlerContext,
runtime: NettyRuntime,
exit: ZIO[Any, Response, Response],
exit: ZIO[Scope, Response, Response],
req: Request,
)(ensured: () => Unit): Unit = {

Expand All @@ -334,13 +334,16 @@ private[zio] final case class ServerInboundHandler(
)
}

val program = exit.foldCauseZIO(
_.failureOrCause match {
case Left(resp) => writeResponse(resp)
case Right(c) if c.isInterruptedOnly => closeChannel()
case Right(c) => writeResponse(withDefaultErrorResponse(FiberFailure(c)))
},
writeResponse,
val scope = Scope.unsafe.make
val program = scope.use(
exit.foldCauseZIO(
_.failureOrCause match {
case Left(resp) => writeResponse(resp)
case Right(c) if c.isInterruptedOnly => closeChannel()
case Right(c) => writeResponse(withDefaultErrorResponse(FiberFailure(c)))
},
writeResponse,
),
)

runtime.run(ctx, ensured, preferOnCurrentThread = avoidCtxSwitching)(program)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object ClientConnectionSpec extends RoutesRunnableSpec {
serve.as(tests)
}
.provideShared(
Scope.default,
DynamicServer.live,
serverTestLayer,
Client.live,
Expand Down
2 changes: 1 addition & 1 deletion zio-http/jvm/src/test/scala/zio/http/ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,6 @@ object ClientSpec extends RoutesRunnableSpec {
override def spec = {
suite("Client") {
serve.as(List(clientSpec))
}.provideShared(DynamicServer.live, serverTestLayer, Client.default) @@ sequential @@ withLiveClock
}.provideShared(Scope.default, DynamicServer.live, serverTestLayer, Client.default) @@ sequential @@ withLiveClock
}
}
2 changes: 1 addition & 1 deletion zio-http/jvm/src/test/scala/zio/http/ContentTypeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ object ContentTypeSpec extends RoutesRunnableSpec {
override def spec = {
suite("Content-type") {
serve.as(List(contentSpec))
}.provideShared(DynamicServer.live, serverTestLayer, Client.default) @@ withLiveClock @@ sequential
}.provideShared(Scope.default, DynamicServer.live, serverTestLayer, Client.default) @@ withLiveClock @@ sequential
}
}
2 changes: 1 addition & 1 deletion zio-http/jvm/src/test/scala/zio/http/FlashSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ object FlashSpec extends ZIOHttpSpec {
)
bodyString <- response2.body.asString
} yield assertTrue(bodyString.contains("successfully") && bodyString.contains("green"))
}.provideLayer(Flash.Backend.inMemory),
}.provide(Scope.default, Flash.Backend.inMemory),
)

}
4 changes: 2 additions & 2 deletions zio-http/jvm/src/test/scala/zio/http/HandlerAspectSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object HandlerAspectSpec extends ZIOSpecDefault {
//can't be infix because of Scala 3
}.@@[Boolean](handlerAspect)
for {
response <- handler0(Request(headers = Headers("accept", "*"))).provideEnvironment(ZEnvironment(true))
response <- ZIO.scoped(handler0(Request(headers = Headers("accept", "*")))).provideEnvironment(ZEnvironment(true))
bodyString <- response.body.asString
} yield assertTrue(bodyString == "1")
},
Expand All @@ -41,7 +41,7 @@ object HandlerAspectSpec extends ZIOSpecDefault {
//can't be infix because of Scala 3
}.@@[Boolean](handlerAspect)
for {
response <- handler0(Request(headers = Headers("accept", "*"))).provideEnvironment(ZEnvironment(true) ++ ZEnvironment("test"))
response <- ZIO.scoped(handler0(Request(headers = Headers("accept", "*")))).provideEnvironment(ZEnvironment(true) ++ ZEnvironment("test"))
bodyString <- response.body.asString
} yield assertTrue(bodyString == "1 test")
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ object HybridRequestStreamingServerSpec extends RoutesRunnableSpec {
appWithHybridReqStreaming.as(List(requestBodySpec, hybridStreamingServerSpec))
}
}.provideShared(
Scope.default,
DynamicServer.live,
ZLayer.succeed(configAppWithHybridRequestStreaming),
Server.customized,
Expand Down
2 changes: 1 addition & 1 deletion zio-http/jvm/src/test/scala/zio/http/KeepAliveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object KeepAliveSpec extends RoutesRunnableSpec {
override def spec: Spec[Any, Throwable] = {
suite("KeepAliveSpec") {
keepAliveSpec
}.provide(DynamicServer.live, serverTestLayer, Client.default) @@ withLiveClock @@ sequential
}.provide(Scope.default, DynamicServer.live, serverTestLayer, Client.default) @@ withLiveClock @@ sequential
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object ResponseCompressionSpec extends ZIOHttpSpec {
for {
server <- ZIO.service[Server]
client <- ZIO.service[Client]
_ <- server.install(app)
_ <- server.installInternal(app)
port <- server.port
response <- client.batched(
Request(
Expand All @@ -110,7 +110,7 @@ object ResponseCompressionSpec extends ZIOHttpSpec {
for {
server <- ZIO.service[Server]
client <- ZIO.service[Client]
_ <- server.install(app)
_ <- server.installInternal(app)
port <- server.port
response <- client.batched(
Request(
Expand All @@ -134,7 +134,7 @@ object ResponseCompressionSpec extends ZIOHttpSpec {
for {
server <- ZIO.service[Server]
client <- ZIO.service[Client]
_ <- server.install(app)
_ <- server.installInternal(app)
port <- server.port
response <- client.batched(
Request(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@ object RoutesMiddlewareSpec extends ZIOHttpSpec with ExitAssertion {
result2 <- ref.get
} yield assertTrue(result1 == 1, result2 == 1)
},
)
).provide(Scope.default)
}
43 changes: 42 additions & 1 deletion zio-http/jvm/src/test/scala/zio/http/ServerRuntimeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import zio._
import zio.test.TestAspect._
import zio.test._

import zio.stream.ZStream

import zio.http.codec.HttpContentCodec._
import zio.http.internal.{DynamicServer, RoutesRunnableSpec}
import zio.http.netty.NettyConfig

Expand Down Expand Up @@ -75,10 +78,48 @@ object ServerRuntimeSpec extends RoutesRunnableSpec {
.scoped(serve[Foo](server))
.zipRight(server.deploy.body.run(path = Path.root / "test", method = Method.GET))
.flatMap(_.asString(Charsets.Utf8))
.map(b => assertTrue(b == "1"))
.map(b => assertTrue(b == "2")) // one extra for Scope
} +
test("with scope") {
val ref = Ref.unsafe.make(0)(zio.Unsafe)
val routes = Routes(
Method.GET / "test" -> handler(
ZIO.addFinalizer(ref.set(1)).as(Response.text("ok")),
),
)
serve(routes)
.zipRight(routes.deploy.body.run(path = Path.root / "test", method = Method.GET))
.flatMap(_.asString(Charsets.Utf8))
.map(b => assertTrue(b == "ok")) *> ref.get.map { v => assertTrue(v == 1) }
} +
test("with scope streaming") {
val ref = Ref.unsafe.make(0)(zio.Unsafe)
val routes = Routes(
Method.GET / "test" -> handler(
Body
.fromStreamEnv(
ZStream.fromZIO(
ZIO.addFinalizer(ref.set(1)) *> ref.get.flatMap(v =>
if (v == 0) Exit.succeed(Chunk.fromArray("ok".getBytes)) else Exit.fail(new Exception("error")),
),
),
)
.map(body =>
Response(
body = body,
),
)
.orDie,
),
)
serve(routes)
.zipRight(routes.deploy.body.run(path = Path.root / "test", method = Method.GET))
.flatMap(_.asString(Charsets.Utf8))
.map(b => assertTrue(b == "ok")) *> ref.get.map { v => assertTrue(v == 1) }
}
}
.provide(
Scope.default,
DynamicServer.live,
Server.customized,
ZLayer.succeed(Server.Config.default),
Expand Down
1 change: 1 addition & 0 deletions zio-http/jvm/src/test/scala/zio/http/ServerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ object ServerSpec extends RoutesRunnableSpec {
val spec = dynamicAppSpec + responseSpec + requestSpec + requestBodySpec + serverErrorSpec
suite("app without request streaming") { app.as(List(spec)) }
}.provideShared(
Scope.default,
DynamicServer.live,
ZLayer.succeed(configApp),
Server.customized,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object StaticFileServerSpec extends RoutesRunnableSpec {

override def spec = suite("StaticFileServerSpec") {
serve.as(List(staticSpec))
}.provideShared(DynamicServer.live, serverTestLayer, Client.default) @@ withLiveClock @@ sequential
}.provideShared(Scope.default, DynamicServer.live, serverTestLayer, Client.default) @@ withLiveClock @@ sequential

private def staticSpec = suite("Static RandomAccessFile Server")(
suite("fromResource")(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ object EndpointSpec extends ZIOHttpSpec {
def testEndpoint[R](service: Routes[R, Nothing])(
url: String,
expected: String,
): ZIO[R, Response, TestResult] =
): ZIO[Scope & R, Response, TestResult] =
testEndpointWithHeaders(service)(url, headers = List.empty, expected)

def testEndpointWithHeaders[R](service: Routes[R, Nothing])(
url: String,
headers: List[(String, String)],
expected: String,
): ZIO[R, Response, TestResult] = {
): ZIO[Scope & R, Response, TestResult] = {
val request = Request
.get(url = URL.decode(url).toOption.get)
.addHeaders(headers.foldLeft(Headers.empty) { case (hs, (k, v)) => hs ++ Headers(k, v) })
Expand Down
Loading

0 comments on commit 0bd05f4

Please sign in to comment.