Skip to content

Commit

Permalink
distage-framework-docker: Revert to deleting file locks for reused co…
Browse files Browse the repository at this point in the history
…ntainers on exit, instead of as soon as possible (#971)

* distage-framework-docker: Revert to deleting file locks for reused containers on exit, instead of as soon as possible

Could cause conflicts if a file was deleted at the same time another resource in parallel tried to lock it

* warning
  • Loading branch information
neko-kai authored Mar 24, 2020
1 parent a5a95c1 commit b87d681
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package izumi.distage.docker

import distage.TagK
import izumi.distage.model.definition.DIResource.DIResourceBase
import izumi.distage.model.providers.ProviderMagnet
import izumi.fundamentals.platform.language.Quirks._

trait ContainerDef {
self =>
type Tag
type Container = DockerContainer[Tag]
type Config = Docker.ContainerConfig[Tag]

def config: Config

/**
* For binding in `ModuleDef`:
*
* {{{
* object KafkaDocker extends ContainerDef
* object ZookeeperDocker extends ContainerDef
*
* make[KafkaDocker.Container].fromResource {
* KafkaDocker
* .make[F]
* .dependOnDocker(ZookeeperDocker)
* }
* }}}
*
* To kill all containers spawned by distage, use the following command:
*
* {{{
* docker rm -f $(docker ps -q -a -f 'label=distage.type')
* }}}
*
*/
final def make[F[_]: TagK](implicit tag: distage.Tag[Tag]): ProviderMagnet[DockerContainer.ContainerResource[F, Tag] with DIResourceBase[F, Container]] = {
tag.discard()
DockerContainer.resource[F](this)
}

final def copy(config: Config): ContainerDef.Aux[self.Tag] = {
@inline def c = config
new ContainerDef {
override type Tag = self.Tag
override def config: Config = c
}
}
}

object ContainerDef {
type Aux[T] = ContainerDef { type Tag = T }
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import izumi.logstage.api.IzLogger
import izumi.fundamentals.platform.language.Quirks._

import scala.collection.JavaConverters._
import scala.concurrent.duration._

trait ContainerNetworkDef {
self =>
Expand Down Expand Up @@ -62,7 +63,7 @@ object ContainerNetworkDef {

override def acquire: F[ContainerNetwork[T]] = {
if (config.reuse) {
FileLockMutex.withLocalMutex(prefix, logger) {
FileLockMutex.withLocalMutex(logger)(prefix, waitFor = 1.second, maxAttempts = 10) {
val labelsSet = stableLabels.toSet
val existedNetworks = client.listNetworksCmd().exec().asScala.toList
existedNetworks.find(_.labels.asScala.toSet == labelsSet).fold(createNew()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ import java.util.concurrent.TimeUnit
import com.github.dockerjava.api.command.InspectContainerResponse
import com.github.dockerjava.api.model._
import com.github.ghik.silencer.silent
import distage.TagK
import izumi.distage.docker.Docker._
import izumi.distage.docker.DockerContainer.ContainerResource
import izumi.distage.framework.model.exceptions.IntegrationCheckException
import izumi.distage.model.definition.DIResource
import izumi.distage.model.definition.DIResource.DIResourceBase
import izumi.distage.model.effect.DIEffect.syntax._
import izumi.distage.model.effect.{DIEffect, DIEffectAsync}
import izumi.distage.model.providers.ProviderMagnet
Expand All @@ -23,53 +20,6 @@ import izumi.logstage.api.IzLogger
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

trait ContainerDef {
self =>
type Tag
type Container = DockerContainer[Tag]
type Config = Docker.ContainerConfig[Tag]

def config: Config

/**
* For binding in `ModuleDef`:
*
* {{{
* object KafkaDocker extends ContainerDef
* object ZookeeperDocker extends ContainerDef
*
* make[KafkaDocker.Container].fromResource {
* KafkaDocker
* .make[F]
* .dependOnDocker(ZookeeperDocker)
* }
* }}}
*
* To kill all containers spawned by distage, use the following command:
*
* {{{
* docker rm -f $(docker ps -q -a -f 'label=distage.type')
* }}}
*
*/
final def make[F[_]: TagK](implicit tag: distage.Tag[Tag]): ProviderMagnet[ContainerResource[F, Tag] with DIResourceBase[F, Container]] = {
tag.discard()
DockerContainer.resource[F](this)
}

final def copy(config: Config): ContainerDef.Aux[self.Tag] = {
@inline def c = config
new ContainerDef {
override type Tag = self.Tag
override def config: Config = c
}
}
}

object ContainerDef {
type Aux[T] = ContainerDef { type Tag = T }
}

final case class DockerContainer[Tag](
id: Docker.ContainerId,
name: String,
Expand Down Expand Up @@ -247,11 +197,10 @@ object DockerContainer {

private[this] def runReused(ports: Seq[PortDecl]): F[DockerContainer[T]] = {
logger.info(s"Running container with reused option with ${config.pullTimeout}.")
FileLockMutex.withLocalMutex(
FileLockMutex.withLocalMutex(logger)(
s"${config.image.replace("/", "_")}:${config.ports.mkString(";")}",
logger,
1.second,
config.pullTimeout.toSeconds.toInt
waitFor = 1.second,
maxAttempts = config.pullTimeout.toSeconds.toInt
) {
for {
containers <- F.maybeSuspend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,45 @@ import izumi.logstage.api.IzLogger
import scala.concurrent.duration._

object FileLockMutex {
def withLocalMutex[F[_]: DIEffect: DIEffectAsync, E](

def withLocalMutex[F[_], E](logger: IzLogger)(
filename: String,
logger: IzLogger,
waitFor: FiniteDuration = 1.second,
maxAttempts: Int = 10
)(eff: F[E]): F[E] = {
def acquireAndRun(chanel: FileChannel, attempts: Int = 0): F[E] = {
DIEffect[F].maybeSuspend {
waitFor: FiniteDuration,
maxAttempts: Int,
)(effect: F[E])(implicit
F: DIEffect[F],
P: DIEffectAsync[F],
): F[E] = {
def acquireAndRun(channel: FileChannel, attempts: Int = 0): F[E] = {
F.maybeSuspend {
logger.debug(s"Attempt ${attempts -> "num"} to acquire lock for $filename.")
try {
Option(chanel.tryLock())
Option(channel.tryLock())
} catch {
case _: OverlappingFileLockException => None
}
}.flatMap {
case Some(v) =>
eff.guarantee(DIEffect[F].maybeSuspend(v.close()))
effect.guarantee(F.maybeSuspend(v.close()))
case None if attempts < maxAttempts =>
DIEffectAsync[F].sleep(waitFor).flatMap(_ => acquireAndRun(chanel, attempts + 1))
P.sleep(waitFor).flatMap(_ => acquireAndRun(channel, attempts + 1))
case _ =>
logger.warn(s"Cannot acquire lock for image $filename after $attempts. This may lead to creation of a new container duplicate.")
eff
effect
}
}

val tmpDir = System.getProperty("java.io.tmpdir")
val file = new File(s"$tmpDir/$filename.tmp")
file.createNewFile()
DIEffect[F].bracket(DIEffect[F].maybeSuspend(FileChannel.open(file.toPath, StandardOpenOption.WRITE))) {
ch =>
DIEffect[F].definitelyRecover {
DIEffect[F].maybeSuspend {
ch.close()
file.delete()
()
}
}(_ => DIEffect[F].unit)
} {
acquireAndRun(_)
}
F.bracket(
acquire = F.maybeSuspend {
val tmpDir = System.getProperty("java.io.tmpdir")
val file = new File(s"$tmpDir/$filename.tmp")
val newFileCreated = file.createNewFile()
if (newFileCreated) file.deleteOnExit()
FileChannel.open(file.toPath, StandardOpenOption.WRITE)
}
)(release = ch => F.definitelyRecover(F.maybeSuspend(ch.close()))(_ => F.unit)) {
acquireAndRun(_)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class BIOSyntaxTest extends AnyWordSpec {
}.toKleisli
}.provide(4).flatMap(_ => F.unit).widenError[Throwable].leftMap(identity)
}
def docExamples = {
def docExamples() = {
import izumi.functional.bio.{F, BIOMonad, BIOMonadAsk, BIOPrimitives, BIORef3}

def adder[F[+_, +_]: BIOMonad: BIOPrimitives](i: Int): F[Nothing, Int] =
Expand Down Expand Up @@ -211,7 +211,7 @@ class BIOSyntaxTest extends AnyWordSpec {
biotemporalPlusLocal[zio.ZIO],
biomonadPlusLocal[zio.ZIO],
bifunctorOnly[zio.ZIO],
docExamples,
docExamples(),
)
}
}

0 comments on commit b87d681

Please sign in to comment.