Skip to content

Commit

Permalink
harvester refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Orbiter committed Aug 9, 2020
1 parent dc993af commit 99eee5f
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 727 deletions.
23 changes: 0 additions & 23 deletions conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -264,29 +264,6 @@ flag.replaceinsteadunshorten = false
flag.debug.twitter_scraper = false
flag.debug.redirect_unshortener = false

# harvester to use
# possible values:
# - classic: The default and original Loklak Harvester
# - kaizen: The alternative Loklak Harvester that can utilize Twitter API (optional)
# - priority_kaizen: An extension of Kaizen harvester that uses priority queue
harvester.type = classic

# Configurations for kaizen harvester

# The amount of suggestions to request
harvester.kaizen.suggestions_count = 1000

# The amount of randomly selected suggestions to add
harvester.kaizen.suggestions_random = 5

# The radius for location/place queries (in miles)
harvester.kaizen.place_radius = 5

# The query limit (setting this to 0 or below means infinite)
harvester.kaizen.queries_limit = 500

# Verbosity (gives information to logs, if enabled)
harvester.kaizen.verbose = true

# Caretaker properties
caretaker.backendpush.retries = 5
Expand Down
32 changes: 0 additions & 32 deletions docs/kaizen.md

This file was deleted.

50 changes: 33 additions & 17 deletions src/org/loklak/Caretaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,43 +140,59 @@ public void run() {
}
busy = true;
}

// scan dump input directory to import files
try {
DAO.importAccountDumps(Integer.MAX_VALUE);
} catch (IOException e1) {
e1.printStackTrace();
}

// run some harvesting steps
boolean retrieval_forbackend_enabled = DAO.getConfig("retrieval.forbackend.enabled", false);
boolean backend_push_enabled = DAO.getConfig("backend.push.enabled", false);
int timeline_size = DAO.outgoingMessages.timelineSize();
if (retrieval_forbackend_enabled && backend_push_enabled && backends.length > 0 && timeline_size < TIMELINE_PUSH_MAXSIZE) {

int retrieval_forbackend_concurrency = (int) DAO.getConfig("retrieval.forbackend.concurrency", 1);
int retrieval_forbackend_loops = (int) DAO.getConfig("retrieval.forbackend.loops", 10);
int retrieval_forbackend_sleep_base = (int) DAO.getConfig("retrieval.forbackend.sleep.base", 300);
int retrieval_forbackend_sleep_randomoffset = (int) DAO.getConfig("retrieval.forbackend.sleep.randomoffset", 100);
hloop: for (int i = 0; i < retrieval_forbackend_loops; i++) {
Thread[] rts = new Thread[retrieval_forbackend_concurrency];
final AtomicInteger acccount = new AtomicInteger(0);
for (int j = 0; j < retrieval_forbackend_concurrency; j++) {
rts[j] = new Thread() {
public void run() {
int count = LoklakServer.harvester.harvest();
acccount.addAndGet(count);
}
};
rts[j].start();

// get more queries
int pendingQueries = 0;
try {
pendingQueries = LoklakServer.harvester.get_harvest_queries();
} catch (IOException e) {
}

// in case we have queries
if (pendingQueries > 0) {
Thread[] rts = new Thread[Math.min(pendingQueries, retrieval_forbackend_concurrency)];
final AtomicInteger acccount = new AtomicInteger(0);
for (int j = 0; j < rts.length; j++) {
rts[j] = new Thread() {
public void run() {
TwitterTimeline tl = LoklakServer.harvester.harvest_timeline();
if (tl != null && tl.getQuery() != null) {
/* Thread t = */ LoklakServer.harvester.push_timeline_to_backend(tl);
}
int count = tl == null ? 0 : tl.size();
acccount.addAndGet(count);
}
};
rts[j].start();
try {Thread.sleep(retrieval_forbackend_sleep_base + random.nextInt(retrieval_forbackend_sleep_randomoffset));} catch (InterruptedException e) {}
}
for (Thread t: rts) t.join();
if (acccount.get() < 0) break hloop;
try {Thread.sleep(retrieval_forbackend_sleep_base + random.nextInt(retrieval_forbackend_sleep_randomoffset));} catch (InterruptedException e) {}
}
for (Thread t: rts) t.join();
if (acccount.get() < 0) break hloop;
try {Thread.sleep(retrieval_forbackend_sleep_base + random.nextInt(retrieval_forbackend_sleep_randomoffset));} catch (InterruptedException e) {}
}
busy = true;
}

// run some crawl steps
if (Crawler.pending() > 0) {
crawler: for (int i = 0; i < 10; i++) {
Expand All @@ -198,7 +214,7 @@ public void run() {
if (finished.get()) break crawler; else busy = true;
}
}

// run searches
boolean retrieval_queries_enabled = DAO.getConfig("retrieval.queries.enabled", false);
if (retrieval_queries_enabled && IncomingMessageBuffer.addSchedulerAvailable()) {
Expand Down
27 changes: 3 additions & 24 deletions src/org/loklak/LoklakServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,9 @@
import org.loklak.api.vis.PieChartServlet;
import org.loklak.data.DAO;
import org.loklak.data.IncomingMessageBuffer;
import org.loklak.harvester.TwitterHarvester;
import org.loklak.harvester.TwitterScraper;
import org.loklak.harvester.YoutubeScraper;
import org.loklak.harvester.strategy.ClassicHarvester;
import org.loklak.harvester.strategy.Harvester;
import org.loklak.harvester.strategy.KaizenHarvester;
import org.loklak.harvester.strategy.PriorityKaizenHarvester;
import org.loklak.http.RemoteAccess;
import org.loklak.server.APIHandler;
import org.loklak.server.FileHandler;
Expand All @@ -156,7 +153,7 @@ public class LoklakServer {
private static DumpImporter dumpImporter = null;
private static HttpsMode httpsMode = HttpsMode.OFF;
public static Class<? extends Servlet>[] services;
public static Harvester harvester = null;
public static TwitterHarvester harvester = null;

public static Map<String, String> readConfig(Path data) throws IOException {
File conf_dir = new File("conf");
Expand Down Expand Up @@ -289,7 +286,7 @@ public static void main(String[] args) throws Exception {
setServerHandler(dataFile);

// init the harvester
initializeHarvester();
harvester = new TwitterHarvester();

LoklakServer.server.start();
LoklakServer.caretaker = new Caretaker();
Expand Down Expand Up @@ -328,7 +325,6 @@ public void run() {
LoklakServer.server.stop();
DAO.close();
TwitterScraper.executor.shutdown();
LoklakServer.harvester.stop();
DAO.log("main terminated, goodby.");

//LoklakServer.saveConfig();
Expand Down Expand Up @@ -392,23 +388,6 @@ private static void extract(JarFile jar, JarEntry file) throws IOException {
Files.copy(jar.getInputStream(file), target.toPath());
}

// initialize harvester
private static void initializeHarvester() {
String type = DAO.getConfig("harvester.type", "classic");
switch (type) {
default:
case "classic":
harvester = new ClassicHarvester();
break;
case "kaizen":
harvester = new KaizenHarvester();
break;
case "priority_kaizen":
harvester = new PriorityKaizenHarvester();
break;
}
}

//initiate http server
private static void setupHttpServer(int httpPort, int httpsPort) throws Exception{
QueuedThreadPool pool = new QueuedThreadPool();
Expand Down
37 changes: 0 additions & 37 deletions src/org/loklak/harvester/PushThread.java

This file was deleted.

Loading

0 comments on commit 99eee5f

Please sign in to comment.