-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #34 from ZiluTian/latest
Fix concurrent simulations in Akka
- Loading branch information
Showing
16 changed files
with
281 additions
and
288 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,101 +1,88 @@ | ||
package simulation.akka.API | ||
|
||
import com.typesafe.config.ConfigFactory | ||
import meta.API.SimulationSnapshot | ||
import meta.API.{DeforestationStrategy, SimulationData, SimulationDataBuilder, SnapshotBuilder, TimeseriesBuilder} | ||
import meta.runtime.{Actor, Message} | ||
import scala.concurrent.Await | ||
import scala.concurrent.duration._ | ||
import akka.actor.typed.ActorSystem | ||
|
||
object Simulate { | ||
private var stoppedAgents = IndexedSeq[Actor]() | ||
def apply(actors: IndexedSeq[Actor], totalTurn: Long, conf: Map[String, Any], cond: Option[Iterable[Iterable[Serializable]] => Boolean] = None)(implicit strategy: DeforestationStrategy): SimulationData = { | ||
|
||
var lastWords: IndexedSeq[Message] = IndexedSeq[Message]() | ||
require(conf.isDefinedAt("role")) // Standalone, Driver, Machine-$id | ||
require(conf.isDefinedAt("port")) // network port | ||
require(conf.isDefinedAt("name")) // name of the actor system, to allow concurrent simulations | ||
require(conf.isDefinedAt("data")) // timeseries or snapshot | ||
|
||
def addStoppedAgents(agents: IndexedSeq[Actor]): Unit = { | ||
stoppedAgents = agents | ||
} | ||
|
||
var timeseries: Iterable[Iterable[Serializable]] = null | ||
|
||
def initialize(): Unit = { | ||
stoppedAgents=IndexedSeq[Actor]() | ||
lastWords=IndexedSeq[Message]() | ||
} | ||
|
||
def driver(totalTurn: Long, port: Int = 25251): SimulationSnapshot = { | ||
initialize() | ||
val role: String = conf("role").asInstanceOf[String] | ||
val port: Int = conf("port").asInstanceOf[Int] | ||
val name: String = conf("name").asInstanceOf[String] | ||
val dataConf: String = conf("data").asInstanceOf[String] | ||
|
||
val config = ConfigFactory.parseString(s""" | ||
akka.remote.artery.canonical.port=$port | ||
akka.cluster.roles = [Driver] | ||
""").withFallback(ConfigFactory.load("application")) | ||
// If there are more workers than agents, then set the worker number to the same as agents | ||
val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt | ||
val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt | ||
var totalWorkers = workersPerMachine * totalMachines | ||
println(f"${totalMachines} total machines, ${totalWorkers} total workers") | ||
val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers), "SimsCluster", config) | ||
Await.ready(actorSystem.whenTerminated, 10.days) | ||
println("Simulation ends!") | ||
SimulationSnapshot(stoppedAgents, lastWords) | ||
} | ||
|
||
|
||
// Materialized (actors are all containedActors) | ||
def machine(mid: Int, actors: IndexedSeq[Actor], totalTurn: Long, port: Int = 0): SimulationSnapshot = { | ||
initialize() | ||
val config = ConfigFactory.parseString(s""" | ||
akka.remote.artery.canonical.port=$port | ||
akka.cluster.roles = [Machine-$mid] | ||
""").withFallback(ConfigFactory.load("application")) | ||
// If there are more workers than agents, then set the worker number to the same as agents | ||
val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt | ||
val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt | ||
var totalWorkers = workersPerMachine * totalMachines | ||
println(f"${totalMachines} total machines, ${totalWorkers} total workers") | ||
|
||
val actorSystem = ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, actors), "SimsCluster", config) | ||
Await.ready(actorSystem.whenTerminated, 10.days) | ||
println("Simulation ends!") | ||
SimulationSnapshot(stoppedAgents, lastWords) | ||
} | ||
|
||
def apply(actors: IndexedSeq[Actor], totalTurn: Long, | ||
role: String= "Standalone", port: Int = 25251): SimulationSnapshot = { | ||
initialize() | ||
val config = ConfigFactory.parseString(s""" | ||
akka.remote.artery.canonical.port=$port | ||
akka.cluster.roles = [$role] | ||
""").withFallback(ConfigFactory.load("application")) | ||
// If there are more workers than agents, then set the worker number to the same as agents | ||
val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt | ||
val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt | ||
var totalWorkers = workersPerMachine * totalMachines | ||
|
||
println(f"${totalMachines} total machines, ${totalWorkers} total workers, and ${actors.size} actors") | ||
// well-formedness check | ||
val machinePrefix = "Machine-" | ||
val workerPrefix = "Worker-" | ||
try { | ||
role match { | ||
case "Standalone" => totalWorkers = workersPerMachine // ignore totalMachine setting | ||
case "Driver" => | ||
case s if s.startsWith(machinePrefix) && s.stripPrefix(machinePrefix).toInt < totalMachines => | ||
case s if s.startsWith(workerPrefix) && s.stripPrefix(workerPrefix).toInt < totalWorkers => | ||
case _ => throw new Exception("Invalid role!") | ||
} | ||
} catch { | ||
case e: Exception => throw new Exception(f"Invalid role ${role}. Available roles are Standalone, Driver, Machine-id, or Worker-id. Replacing id with 0-based int (less than total machines or workers)") | ||
} | ||
|
||
if (totalWorkers > actors.size){ | ||
println(f"Found more workers than agents! Set total workers from ${totalWorkers} to ${actors.size}") | ||
totalWorkers = actors.size | ||
} | ||
|
||
val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers, actors), "SimsCluster", config) | ||
val machinePrefix = "Machine-" | ||
val builder: SimulationDataBuilder = if (dataConf == "timeseries") { | ||
new TimeseriesBuilder(strategy) | ||
} else { | ||
new SnapshotBuilder() | ||
} | ||
|
||
val ip: String = conf.getOrElse("ip", "localhost").asInstanceOf[String] | ||
|
||
val actorSystem = role match { | ||
case "Standalone" => { | ||
// local mode | ||
val config = ConfigFactory.parseString(s""" | ||
akka.remote.artery.canonical.port=$port | ||
akka.remote.artery.canonical.hostname=localhost | ||
akka.cluster.roles = [$role] | ||
akka.cluster.seed-nodes = ["akka://$name@localhost:$port"] | ||
""").withFallback(ConfigFactory.load("application")) | ||
ActorSystem(AkkaExp(totalTurn, totalWorkers, builder, actors, cond), name, config) | ||
} | ||
case "Driver" => { | ||
require(conf.isDefinedAt("ip")) | ||
// By default, driver is also the seed node | ||
val config = ConfigFactory.parseString(s""" | ||
akka.remote.artery.canonical.hostname=$ip | ||
akka.remote.artery.canonical.port=$port | ||
akka.cluster.roles = [$role] | ||
akka.cluster.seed-nodes = ["akka://$name@$ip:$port"] | ||
""").withFallback(ConfigFactory.load("application")) | ||
ActorSystem(AkkaExp(totalTurn, totalWorkers, builder, Vector[Actor](), None), name, config) | ||
} | ||
case s if s.startsWith(machinePrefix) => { | ||
require(conf.isDefinedAt("ip")) | ||
require(conf.isDefinedAt("seed")) // ip:port | ||
val seed: String = conf("seed").asInstanceOf[String] | ||
val config = ConfigFactory.parseString(s""" | ||
akka.remote.artery.canonical.hostname=$ip | ||
akka.remote.artery.canonical.port=$port | ||
akka.cluster.roles = [$role] | ||
akka.cluster.seed-nodes = ["akka://$name@$seed"] | ||
""").withFallback(ConfigFactory.load("application")) | ||
|
||
// 0-based | ||
val mid = s.stripPrefix(machinePrefix).toInt | ||
assert(mid < totalMachines) | ||
ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, builder, actors), name, config) | ||
} | ||
case _ => throw new Exception("Invalid role! Supported roles are Standalone, Driver, and Machine-$id (o-based)") | ||
} | ||
Await.ready(actorSystem.whenTerminated, 10.days) | ||
|
||
println("Simulation ends!") | ||
SimulationSnapshot(stoppedAgents, lastWords) | ||
builder.build() | ||
} | ||
} | ||
} |
Oops, something went wrong.