diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/AvailabilityChecker.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/AvailabilityChecker.scala new file mode 100644 index 000000000..f4d8a9df9 --- /dev/null +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/AvailabilityChecker.scala @@ -0,0 +1,30 @@ +package org.apache.mesos.chronos.scheduler.mesos + +import java.util.logging.Logger + +import org.apache.mesos.Protos + +/** + * Helper for checking availability using mesos primitives + */ +object AvailabilityChecker { + + private[this] val log = Logger.getLogger(getClass.getName) + + def checkAvailability(offer: Protos.Offer): Boolean = { + val now = System.nanoTime() + if (offer.hasUnavailability && offer.getUnavailability.hasStart) { + val start = offer.getUnavailability.getStart.getNanoseconds + if (now.>=(start)) { + if (offer.getUnavailability.hasDuration) { + return start.+(offer.getUnavailability.getDuration.getNanoseconds).<(now) + } else { + return false; + } + + } + } + return true + } + +} diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala index b24cf5897..ec5d8e429 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala @@ -131,7 +131,7 @@ class MesosJobFramework @Inject()( case None => val neededResources = new Resources(job) offerResources.toIterator.find { ors => - ors._2.canSatisfy(neededResources) && ConstraintChecker.checkConstraints(ors._1, job.constraints) + ors._2.canSatisfy(neededResources) && ConstraintChecker.checkConstraints(ors._1, job.constraints) && AvailabilityChecker.checkAvailability(ors._1) } match { case Some((offer, resources)) => // Subtract this job's resource requirements from the remaining available resources in this offer. diff --git a/src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFrameworkSpec.scala b/src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFrameworkSpec.scala index f215709d5..77459d21e 100644 --- a/src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFrameworkSpec.scala +++ b/src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFrameworkSpec.scala @@ -2,7 +2,7 @@ package org.apache.mesos.chronos.scheduler.mesos import mesosphere.mesos.protos._ import mesosphere.mesos.util.FrameworkIdUtil -import org.apache.mesos.Protos.Offer +import org.apache.mesos.Protos.{DurationInfo, Offer, TimeInfo, Unavailability} import org.apache.mesos.chronos.ChronosTestHelper._ import org.apache.mesos.chronos.scheduler.jobs.{BaseJob, JobScheduler, MockJobUtils, TaskManager} import org.apache.mesos.{Protos, SchedulerDriver} @@ -76,6 +76,33 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito { there was one(mockSchedulerDriver).declineOffer(OfferID("1"), Protos.Filters.getDefaultInstance) } + "Reject unavailable offer" in { + import mesosphere.mesos.protos.Implicits._ + + import scala.collection.JavaConverters._ + + val mockDriverFactory = MockJobUtils.mockDriverFactory + val mockSchedulerDriver = mockDriverFactory.get + + val mesosJobFramework = spy( + new MesosJobFramework( + mockDriverFactory, + mock[JobScheduler], + mock[TaskManager], + makeConfig(), + mock[FrameworkIdUtil], + mock[MesosTaskBuilder], + mock[MesosOfferReviver])) + + val tasks = mutable.Buffer[(String, BaseJob, Offer)]() + doReturn(tasks).when(mesosJobFramework).generateLaunchableTasks(any) + + val offer: Offer = makeUnavailableOffer + mesosJobFramework.resourceOffers(mockSchedulerDriver, Seq[Protos.Offer](offer).asJava) + + there was one(mockSchedulerDriver).declineOffer(OfferID("1"), Protos.Filters.getDefaultInstance) + } + "Reject unused offers with default RefuseSeconds if --decline_offer_duration is not set" in { import mesosphere.mesos.protos.Implicits._ @@ -176,6 +203,22 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito { } private[this] def makeBasicOffer: Offer = { + + makeBasicOfferBuilder + .build() + } + + private[this] def makeUnavailableOffer: Offer = { + + makeBasicOfferBuilder.setUnavailability( + Unavailability.newBuilder() + .setStart(TimeInfo.newBuilder().setNanoseconds(System.nanoTime())) + .setDuration(DurationInfo.newBuilder().setNanoseconds(-1l)) + .build()) + .build() + } + + private[this] def makeBasicOfferBuilder: Offer.Builder = { import mesosphere.mesos.protos.Implicits._ Protos.Offer.newBuilder() @@ -186,7 +229,6 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito { .addResources(ScalarResource(Resource.CPUS, 1, "*")) .addResources(ScalarResource(Resource.MEM, 100, "*")) .addResources(ScalarResource(Resource.DISK, 100, "*")) - .build() }