Skip to content

Commit

Permalink
Added logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mizvol committed May 27, 2017
1 parent 1156069 commit 19e1d1b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 20 deletions.
27 changes: 16 additions & 11 deletions src/main/scala/WikiBrainHebb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import ch.epfl.lts2.Utils._
import ch.epfl.lts2.Globals._
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
import org.slf4j.Logger

/**
* Created by volodymyrmiz on 29.04.17.
Expand All @@ -12,7 +14,9 @@ object WikiBrainHebb {

suppressLogs(List("org", "akka"))

println("WikiBrainHebb.scala")
val log: Logger = LoggerFactory.getLogger(this.getClass)

log.info("Create graph using dynamic signals")

val spark = SparkSession.builder
.master("local[*]")
Expand All @@ -24,9 +28,10 @@ object WikiBrainHebb {
val path: String = "./src/main/resources/wikiTS/"
val fileName: String = "signal_500.csv"

log.info("Read signal from disk")
val rdd: RDD[(Long, Map[Int, Double])] = spark.sparkContext.objectFile(PATH_RESOURCES + "RDDs/tabularSignalRDD")

println("Total number of pages: " + rdd.count())
log.info("Total number of pages: " + rdd.count())

val startTime = JAN_START
val endTime = JAN_END
Expand All @@ -38,9 +43,9 @@ object WikiBrainHebb {
// .filter(v => v._2.keys.size > 10 & v._2.keys.size < 730)
.filter(v => v._2.values.max > 5000)

println("Number of pages with a certain number visits: " + verticesRDD.count())
log.info("Number of pages with a certain number visits: " + verticesRDD.count())

println("Edges generation...")
log.info("Edges generation...")
val edgeIndexesRDD = verticesRDD.map(_._1).repartition(12).cache()
edgeIndexesRDD.take(1)

Expand All @@ -53,16 +58,16 @@ object WikiBrainHebb {

val graph = Graph(verticesGX, edgesGX)

println("Applying Hebbian plasticity... N pages: " + verticesGX.count() + "; N edges: " + edgesGX.count())
log.info("Applying Hebbian plasticity... N pages: " + verticesGX.count() + "; N edges: " + edgesGX.count())
val trainedGraph = graph.mapTriplets(trplt => compareTimeSeries(trplt.dstAttr, trplt.srcAttr, start = startTime, stop = endTime, isFiltered = false)).mapVertices((vID, attr) => vID)

println("Removing low weight edges...")
log.info("Removing low weight edges...")
val prunedGraph = removeLowWeightEdges(trainedGraph, minWeight = 500.0)
println("Filtered graph with " + prunedGraph.edges.count() + " edges.")
log.info("Filtered graph with " + prunedGraph.edges.count() + " edges.")

println("Removing singletone vertices...")
log.info("Removing singletone vertices...")
val cleanGraph = removeSingletons(prunedGraph)
println(cleanGraph.vertices.count() + " vertices left.")
log.info(cleanGraph.vertices.count() + " vertices left.")

// Name vertices by IDs
val idsfileName: String = "ids_titles_for_500_filtered.csv"
Expand All @@ -76,9 +81,9 @@ object WikiBrainHebb {

val graphWithIds = cleanGraph.mapVertices((vId, v) => idsTitlesMap(v).toString.replace('&', ' ').replace("""\n""", ""))

println(graphWithIds.vertices.count() + " vertices left.")
log.info(graphWithIds.vertices.count() + " vertices left.")

println("Saving graph...")
log.info("Saving graph...")
saveGraph(graphWithIds, path + "graph.gexf")
}
}
22 changes: 13 additions & 9 deletions src/main/scala/WikiBrainHebbStatic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import ch.epfl.lts2.Globals._
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import org.slf4j.Logger

/**
* Created by volodymyrmiz on 15.05.17.
Expand All @@ -11,40 +13,42 @@ object WikiBrainHebbStatic {
def main(args: Array[String]): Unit = {
suppressLogs(List("org", "akka"))

val log: Logger = LoggerFactory.getLogger(this.getClass)

val spark = SparkSession.builder
.master("local[*]")
.appName("Wiki Brain")
.config("spark.driver.maxResultSize", "10g")
.config("spark.executor.memory", "50g")
.getOrCreate()

println("Read vertices from disk...")
log.info("Read vertices from disk...")
val verticesRDD: RDD[(VertexId, (String, Map[Int, Double]))] = spark.sparkContext.objectFile(PATH_RESOURCES + "RDDs/staticVerticesRDD")

println("Vertices RDD: " + verticesRDD.count())
log.info("Vertices RDD: " + verticesRDD.count())

val vertexIDs = verticesRDD.map(_._1.toLong).collect().toSet

println("Read edges from disk...")
log.info("Read edges from disk...")
val edgesRDD: RDD[Edge[Double]] = spark.sparkContext.objectFile(PATH_RESOURCES + "RDDs/staticEdgesRDD")
.filter(e => vertexIDs.contains(e.srcId) & vertexIDs.contains(e.dstId))

val graph = Graph(verticesRDD, edgesRDD)
println("Vertices in graph: " + graph.vertices.count())
println("Edges in graph: " + graph.edges.count())
log.info("Vertices in graph: " + graph.vertices.count())
log.info("Edges in graph: " + graph.edges.count())

val trainedGraph = graph.mapTriplets(trplt => compareTimeSeries(trplt.dstAttr._2, trplt.srcAttr._2, start = OCT_START, stop = APR_END, isFiltered = true))

val prunedGraph = removeLowWeightEdges(trainedGraph, minWeight = 100.0)

println("Vertices in trained graph: " + prunedGraph.vertices.count())
println("Edges in trained graph: " + prunedGraph.edges.count())
log.info("Vertices in trained graph: " + prunedGraph.vertices.count())
log.info("Edges in trained graph: " + prunedGraph.edges.count())

val cleanGraph = removeSingletons(prunedGraph)
val CC = getLargestConnectedComponent(cleanGraph)

println("Vertices in LCC graph: " + CC.vertices.count())
println("Edges in LCC graph: " + CC.edges.count())
log.info("Vertices in LCC graph: " + CC.vertices.count())
log.info("Edges in LCC graph: " + CC.edges.count())

saveGraph(CC.mapVertices((vID, attr) => attr._1), PATH_RESOURCES + "graph.gexf")

Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/log4j.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%-5p %c{1} - %m%n"/>
</layout>
</appender>

<root>
<priority value ="info" />
<appender-ref ref="console" />
</root>

</log4j:configuration>

0 comments on commit 19e1d1b

Please sign in to comment.