diff --git a/src/scala/main/ly/stealth/mesos/kafka/Broker.scala b/src/scala/main/ly/stealth/mesos/kafka/Broker.scala
index e3dda86..99c0d4f 100644
--- a/src/scala/main/ly/stealth/mesos/kafka/Broker.scala
+++ b/src/scala/main/ly/stealth/mesos/kafka/Broker.scala
@@ -169,20 +169,32 @@ class Broker(val id: Int = 0) {
     )
   }
 
-  private[kafka] def getSuitablePort(ports: util.List[Range]): Int = {
-    if (ports.isEmpty) return -1
-
-    val ports_ = ports.sortBy(r => r.start)
-    if (port == null)
-      return ports_.get(0).start
-
-    for (range <- ports_) {
-      val overlap = range.overlap(port)
-      if (overlap != null)
-        return overlap.start
+  private[kafka] def getSuitablePort(availablePorts: util.List[Range]): Int = {
+    // no available ports to choose from
+    if (availablePorts.isEmpty) return -1
+
+    // compute allowed usable ports based on broker config and offer
+    val usablePorts: List[Range] =
+      if (port == null) availablePorts.toList.sortBy(_.start)
+      else availablePorts.toList.flatMap { range =>
+        if (range.overlap(port) == null) None else Some(range.overlap(port))
+      }.sortBy(_.start)
+
+    // no port usable
+    if (usablePorts.isEmpty) return -1
+
+    // try to stick to the previous port if possible
+    if (stickiness.port != null) {
+      val preferedPort = new Range(stickiness.port)
+      for (range <- usablePorts) {
+        val found = range.overlap(preferedPort)
+        if (found != null)
+          return found.start
+      }
     }
 
-    -1
+    // else return first usable port
+    return usablePorts.get(0).start
   }
 
   /*
@@ -198,8 +210,8 @@ class Broker(val id: Int = 0) {
 
   def shouldStop: Boolean = !active && task != null && !task.stopping
 
-  def registerStart(hostname: String): Unit = {
-    stickiness.registerStart(hostname)
+  def registerStart(hostname: String, port: Integer): Unit = {
+    stickiness.registerStart(hostname, port)
     failover.resetFailures()
   }
 
@@ -289,12 +301,14 @@ object Broker {
   class Stickiness(_period: Period = new Period("10m")) {
     var period: Period = _period
     @volatile var hostname: String = null
+    @volatile var port: Integer = null
     @volatile var stopTime: Date = null
 
     def expires: Date = if (stopTime != null) new Date(stopTime.getTime + period.ms) else null
 
-    def registerStart(hostname: String): Unit = {
+    def registerStart(hostname: String, port: Integer): Unit = {
       this.hostname = hostname
+      this.port = port
       stopTime = null
     }
 
diff --git a/src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala b/src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala
index bb47848..a7a1da5 100644
--- a/src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala
+++ b/src/scala/main/ly/stealth/mesos/kafka/cli/BrokerCli.scala
@@ -489,6 +489,7 @@ trait BrokerCli {
       var stickiness = "stickiness:"
       stickiness += " period:" + broker.stickiness.period
       if (broker.stickiness.hostname != null) stickiness += ", hostname:" + broker.stickiness.hostname
+      if (broker.stickiness.port != null) stickiness += ", port:" + broker.stickiness.port
       if (broker.stickiness.stopTime != null) stickiness += ", expires:" + Repr.dateTime(broker.stickiness.expires)
       printLine(stickiness, indent)
 
diff --git a/src/scala/main/ly/stealth/mesos/kafka/json/Model.scala b/src/scala/main/ly/stealth/mesos/kafka/json/Model.scala
index e81f2a8..6cf2682 100644
--- a/src/scala/main/ly/stealth/mesos/kafka/json/Model.scala
+++ b/src/scala/main/ly/stealth/mesos/kafka/json/Model.scala
@@ -175,11 +175,11 @@ class RangeDeserializer extends StdDeserializer[Range](classOf[Range]) {
   }
 }
 
-case class StickinessModel(period: Period, stopTime: Date, hostname: String)
+case class StickinessModel(period: Period, stopTime: Date, hostname: String, port: Integer)
 
 class StickinessSerializer extends StdSerializer[Stickiness](classOf[Stickiness]) {
   override def serialize(s: Stickiness, gen: JsonGenerator, provider: SerializerProvider): Unit = {
-    provider.defaultSerializeValue(StickinessModel(s.period, s.stopTime, s.hostname), gen)
+    provider.defaultSerializeValue(StickinessModel(s.period, s.stopTime, s.hostname, s.port), gen)
   }
 }
 
@@ -188,6 +188,7 @@ class StickinessDeserializer extends StdDeserializer[Stickiness](classOf[Stickin
     val model = p.readValueAs(classOf[StickinessModel])
     val s = new Stickiness()
     s.hostname = model.hostname
+    s.port = model.port
     s.stopTime = model.stopTime
     s.period = model.period
     s
diff --git a/src/scala/main/ly/stealth/mesos/kafka/scheduler/BrokerLifecycleManager.scala b/src/scala/main/ly/stealth/mesos/kafka/scheduler/BrokerLifecycleManager.scala
index c203910..1905b84 100644
--- a/src/scala/main/ly/stealth/mesos/kafka/scheduler/BrokerLifecycleManager.scala
+++ b/src/scala/main/ly/stealth/mesos/kafka/scheduler/BrokerLifecycleManager.scala
@@ -198,7 +198,10 @@ trait BrokerLifecycleManagerComponentImpl extends BrokerLifecycleManagerComponen
       broker.task.state = Broker.State.RUNNING
       if (status.hasData && status.getData.size() > 0)
         broker.task.endpoint = new Broker.Endpoint(status.getData.toStringUtf8)
-      broker.registerStart(broker.task.hostname)
+
+      val port: Integer = if (broker.task.endpoint != null) broker.task.endpoint.port else null
+      logger.info(s"Registering broker at ${broker.task.hostname}:${port}")
+      broker.registerStart(broker.task.hostname, port)
     }
 
     private[this] def onStopped(broker: Broker, status: TaskStatus, failed: Boolean): Unit = {
diff --git a/src/test/ly/stealth/mesos/kafka/BrokerTest.scala b/src/test/ly/stealth/mesos/kafka/BrokerTest.scala
index 2d9591e..bf61b4a 100644
--- a/src/test/ly/stealth/mesos/kafka/BrokerTest.scala
+++ b/src/test/ly/stealth/mesos/kafka/BrokerTest.scala
@@ -162,23 +162,27 @@ class BrokerTest extends KafkaMesosTestCase {
   def matches_stickiness {
     val host0 = "host0"
     val host1 = "host1"
-    val resources = "ports:0..10"
+    val resources0 = "ports:0..10"
+    val resources1 = "ports:11..20"
 
 
-    BrokerTest.assertAccept(broker.matches(offer(host0, resources), new Date(0)))
-    BrokerTest.assertAccept(broker.matches(offer(host1, resources), new Date(0)))
+    BrokerTest.assertAccept(broker.matches(offer(host0, resources0), new Date(0)))
+    BrokerTest.assertAccept(broker.matches(offer(host1, resources0), new Date(0)))
 
-    broker.registerStart(host0)
+    broker.registerStart(host0, 5)
     broker.registerStop(new Date(0))
 
 
-    BrokerTest.assertAccept(broker.matches(offer(host0, resources), new Date(0)))
-    val theOffer = offer(host1, resources)
+    BrokerTest.assertAccept(broker.matches(offer(host0, resources0), new Date(0)))
+    val theOffer = offer(host1, resources0)
     assertEquals(
       OfferResult.eventuallyMatch(
         theOffer, broker,
         "hostname != stickiness host", broker.stickiness.period.ms().toInt / 1000),
       broker.matches(theOffer, new Date(0)))
+
+    // should still work even if sticky port unavailable
+    BrokerTest.assertAccept(broker.matches(offer(host0, resources1), new Date(0)))
   }
 
   @Test
@@ -349,6 +353,14 @@ class BrokerTest extends KafkaMesosTestCase {
     assertEquals(100, broker.getSuitablePort(ranges("100..200")))
     assertEquals(95, broker.getSuitablePort(ranges("0..90,95..96,101..200")))
     assertEquals(96, broker.getSuitablePort(ranges("0..90,96,101..200")))
+
+    broker.registerStart("", 100)
+    broker.port = new Range("92..105")
+    assertEquals(100, broker.getSuitablePort(ranges("1..200")))
+
+    broker.registerStart("", 100)
+    broker.port = new Range("92..99")
+    assertEquals(92, broker.getSuitablePort(ranges("1..200")))
   }
 
   @Test
@@ -459,6 +471,7 @@ class BrokerTest extends KafkaMesosTestCase {
     broker.log4jOptions = parseMap("b=2").toMap
     broker.jvmOptions = "-Xms512m"
 
+    broker.stickiness.registerStart("localhost", 1234)
     broker.failover.registerFailure(new Date())
     broker.task = new Task("1", "slave", "executor", "host")
 
@@ -497,7 +510,7 @@ class BrokerTest extends KafkaMesosTestCase {
     assertTrue(stickiness.matchesHostname("host0"))
     assertTrue(stickiness.matchesHostname("host1"))
 
-    stickiness.registerStart("host0")
+    stickiness.registerStart("host0", null)
     stickiness.registerStop(new Date(0))
     assertTrue(stickiness.matchesHostname("host0"))
     assertFalse(stickiness.matchesHostname("host1"))
@@ -508,7 +521,7 @@ class BrokerTest extends KafkaMesosTestCase {
     val stickiness = new Stickiness()
     assertEquals(stickiness.stickyTimeLeft(), 0)
 
-    stickiness.registerStart("host0")
+    stickiness.registerStart("host0", null)
     stickiness.registerStop(new Date(0))
     val stickyTimeSec = (stickiness.period.ms() / 1000).toInt
     assertEquals(stickiness.stickyTimeLeft(new Date(0)), stickyTimeSec)
@@ -521,23 +534,25 @@ class BrokerTest extends KafkaMesosTestCase {
     assertNull(stickiness.hostname)
     assertNull(stickiness.stopTime)
 
-    stickiness.registerStart("host")
+    stickiness.registerStart("host", 1234)
     assertEquals("host", stickiness.hostname)
+    assertEquals(1234, stickiness.port)
     assertNull(stickiness.stopTime)
 
     stickiness.registerStop(new Date(0))
     assertEquals("host", stickiness.hostname)
     assertEquals(new Date(0), stickiness.stopTime)
 
-    stickiness.registerStart("host1")
+    stickiness.registerStart("host1", 5678)
     assertEquals("host1", stickiness.hostname)
+    assertEquals(5678, stickiness.port)
     assertNull(stickiness.stopTime)
   }
 
   @Test
   def Stickiness_toJson_fromJson {
     val stickiness = new Stickiness()
-    stickiness.registerStart("localhost")
+    stickiness.registerStart("localhost", 1234)
     stickiness.registerStop(new Date(0))
 
     val read = JsonUtil.fromJson[Stickiness](JsonUtil.toJson(stickiness))
@@ -719,6 +734,7 @@ object BrokerTest {
     if (checkNulls(expected, actual)) return
 
     assertEquals(expected.period, actual.period)
+    assertEquals(expected.port, actual.port)
     assertEquals(expected.stopTime, actual.stopTime)
     assertEquals(expected.hostname, actual.hostname)
   }
diff --git a/src/test/ly/stealth/mesos/kafka/JsonTest.scala b/src/test/ly/stealth/mesos/kafka/JsonTest.scala
index 9ceb86b..d736a18 100644
--- a/src/test/ly/stealth/mesos/kafka/JsonTest.scala
+++ b/src/test/ly/stealth/mesos/kafka/JsonTest.scala
@@ -55,7 +55,7 @@ class JsonTest {
     b.task.endpoint = new Endpoint("host1:9092")
     b.syslog = false
     b.stickiness = new Stickiness(new Period("10m"))
-    b.stickiness.registerStart("host1")
+    b.stickiness.registerStart("host1", 1234)
     b.log4jOptions = Map("k1" -> "v1", "k2" -> "v2")
     b.options = Map("a" -> "1", "b" -> "2")
     b.active = true
diff --git a/src/test/resources/broker.json b/src/test/resources/broker.json
index db3d57d..52a7540 100644
--- a/src/test/resources/broker.json
+++ b/src/test/resources/broker.json
@@ -19,7 +19,8 @@
   "syslog": false,
   "stickiness": {
     "period": "10m",
-    "hostname": "host1"
+    "hostname": "host1",
+    "port": 1234
   },
   "log4jOptions": "k1=v1,k2=v2",
   "options": "a=1,b=2",