diff --git a/app/model/jobs/JobRunner.scala b/app/model/jobs/JobRunner.scala index e90aa3bd..9909d279 100644 --- a/app/model/jobs/JobRunner.scala +++ b/app/model/jobs/JobRunner.scala @@ -27,7 +27,7 @@ class JobRunner @Inject() (lifecycle: ApplicationLifecycle)(implicit ec: Executi lifecycle.addStopHook{ () => Future.successful(stop) } serviceManager.startAsync() - def stop { + def stop: Unit = { serviceManager.stopAsync() serviceManager.awaitStopped(20, TimeUnit.SECONDS) } diff --git a/app/model/jobs/Step.scala b/app/model/jobs/Step.scala index 0785c593..b2f50bb2 100644 --- a/app/model/jobs/Step.scala +++ b/app/model/jobs/Step.scala @@ -15,13 +15,13 @@ trait Step extends Logging { // Inner details /** Do work. */ - protected def process(implicit ec: ExecutionContext) + protected def process(implicit ec: ExecutionContext): Unit /** Confirm this check ran successfully */ protected def check(implicit ec: ExecutionContext): Boolean /** Undo this step */ - protected def rollback + protected def rollback: Unit // Public methods that wrap the status updates def processStep(implicit ec: ExecutionContext) = { diff --git a/app/modules/clustersync/ClusterSynchronisation.scala b/app/modules/clustersync/ClusterSynchronisation.scala index f6930346..48c50cd8 100644 --- a/app/modules/clustersync/ClusterSynchronisation.scala +++ b/app/modules/clustersync/ClusterSynchronisation.scala @@ -32,7 +32,7 @@ class ClusterSynchronisation @Inject() (lifecycle: ApplicationLifecycle) extends initialise - def initialise { + def initialise: Unit = { try { logger.info("starting sync components...") val ns = NodeStatusRepository.register() @@ -57,7 +57,7 @@ class ClusterSynchronisation @Inject() (lifecycle: ApplicationLifecycle) extends } } - def pause { + def pause: Unit = { logger.warn("pausing cluster synchronisation") tagCacheSynchroniser.get.foreach{consumer => logger.warn("stopping consumer") @@ -72,7 +72,7 @@ class ClusterSynchronisation @Inject() (lifecycle: ApplicationLifecycle) extends } } - def heartbeat { + def heartbeat: Unit = { try { reservation.get() match { case Some(ns) => { diff --git a/app/modules/clustersync/NodeStatusRepository.scala b/app/modules/clustersync/NodeStatusRepository.scala index 8810c41a..95da2598 100644 --- a/app/modules/clustersync/NodeStatusRepository.scala +++ b/app/modules/clustersync/NodeStatusRepository.scala @@ -90,7 +90,7 @@ object NodeStatusRepository extends Logging { } } - def deregister(nodeStatus: NodeStatus) { + def deregister(nodeStatus: NodeStatus): Unit = { logger.info(s"deregistering as node ${nodeStatus.nodeId}") diff --git a/app/modules/clustersync/SectionSyncUpdateProcessor.scala b/app/modules/clustersync/SectionSyncUpdateProcessor.scala index 50ec4c46..7d637778 100644 --- a/app/modules/clustersync/SectionSyncUpdateProcessor.scala +++ b/app/modules/clustersync/SectionSyncUpdateProcessor.scala @@ -43,7 +43,7 @@ object SectionSyncUpdateProcessor extends KinesisStreamRecordProcessor with Logg } - private def updateSectionLookupCache(sectionEvent: SectionEvent) { + private def updateSectionLookupCache(sectionEvent: SectionEvent): Unit = { sectionEvent.eventType match { case EventType.Update => logger.info(s"inserting updated section ${sectionEvent.sectionId} into lookup cache") diff --git a/app/modules/clustersync/TagSyncUpdateProcessor.scala b/app/modules/clustersync/TagSyncUpdateProcessor.scala index e8fb114e..48d0b5ee 100644 --- a/app/modules/clustersync/TagSyncUpdateProcessor.scala +++ b/app/modules/clustersync/TagSyncUpdateProcessor.scala @@ -34,7 +34,7 @@ object TagEventDeserialiser { object TagSyncUpdateProcessor extends KinesisStreamRecordProcessor with Logging { - override def process(record: Record) { + override def process(record: Record): Unit = { logger.info(s"Kinesis consumer receives record \n $record") TagEventDeserialiser.deserialise(record) match { case Success(tagEvent) => updateTagsLookupCache(tagEvent) diff --git a/app/repositories/Sequences.scala b/app/repositories/Sequences.scala index 1dddb079..7f0e67c7 100644 --- a/app/repositories/Sequences.scala +++ b/app/repositories/Sequences.scala @@ -31,7 +31,7 @@ class DynamoSequence(sequenceTable: Table, sequenceName: String) { Dynamo.sequenceTable.getItem("sequenceName", sequenceName).getLong("value") } - def setCurrentId(v: Long) { + def setCurrentId(v: Long): Unit = { Dynamo.sequenceTable.putItem(new Item().withString("sequenceName", sequenceName).withLong("value", v)) } } diff --git a/app/services/AWS.scala b/app/services/AWS.scala index 0d26c968..6bc6f62b 100644 --- a/app/services/AWS.scala +++ b/app/services/AWS.scala @@ -79,21 +79,21 @@ object SQS { class KinesisStreamProducer(streamName: String, requireCompressionByte: Boolean = false) extends Logging { - def publishUpdate(key: String, data: String) { + def publishUpdate(key: String, data: String): Unit = { publishUpdate(key, ByteBuffer.wrap(data.getBytes("UTF-8"))) } - def publishUpdate(key: String, data: Array[Byte]) { + def publishUpdate(key: String, data: Array[Byte]): Unit = { publishUpdate(key, ByteBuffer.wrap(data)) } - def publishUpdate(key: String, struct: ThriftStruct) { + def publishUpdate(key: String, struct: ThriftStruct): Unit = { logger.info(s"Kinesis Producer publishUpdate for streamName: $streamName") val thriftKinesisEvent: Array[Byte] = ThriftSerializer.serializeToBytes(struct, requireCompressionByte) publishUpdate(key, ByteBuffer.wrap(thriftKinesisEvent)) } - def publishUpdate(key: String, dataBuffer: ByteBuffer) { + def publishUpdate(key: String, dataBuffer: ByteBuffer): Unit = { AWS.Kinesis.putRecord(streamName, dataBuffer, key) } } diff --git a/app/services/KinesisConsumer.scala b/app/services/KinesisConsumer.scala index 15bfaf22..049fb207 100644 --- a/app/services/KinesisConsumer.scala +++ b/app/services/KinesisConsumer.scala @@ -37,8 +37,8 @@ class KinesisConsumer(streamName: String, appName: String, processor: KinesisStr .config(kinesisClientLibConfiguration) .build() - def start() { Future{ worker.run() } } - def stop() { worker.shutdown() } + def start(): Unit = { Future{ worker.run() } } + def stop(): Unit = { worker.shutdown() } } class KinesisProcessorConsumerFactory(appName: String, processor: KinesisStreamRecordProcessor) extends IRecordProcessorFactory { diff --git a/app/services/SQSQueue.scala b/app/services/SQSQueue.scala index 1d7036fe..a4a75a70 100644 --- a/app/services/SQSQueue.scala +++ b/app/services/SQSQueue.scala @@ -22,7 +22,7 @@ class SQSQueue(val queueName: String) { response.getMessages.asScala.toList } - def deleteMessage(message: Message) { + def deleteMessage(message: Message): Unit = { SQS.SQSClient.deleteMessage( new DeleteMessageRequest(queueUrl, message.getReceiptHandle) )