-
Notifications
You must be signed in to change notification settings - Fork 76
/
Copy pathAbstractStorage.java
330 lines (293 loc) · 10.5 KB
/
AbstractStorage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
/*
--------------------------------------------------------------------------------
SPADE - Support for Provenance Auditing in Distributed Environments.
Copyright (C) 2015 SRI International
This program is free software: you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
--------------------------------------------------------------------------------
*/
package spade.core;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import spade.query.quickgrail.core.QueryInstructionExecutor;
import spade.query.scaffold.Scaffold;
import spade.query.scaffold.ScaffoldFactory;
/**
* This is the base class from which concrete storage types inherit.
*
* @author Dawood Tariq and Raza Ahmad
*/
public abstract class AbstractStorage
{
// Screens only accessible from package and self (not even children can view it)
final Object screensLock = new Object();
final List<AbstractScreen> screens = new ArrayList<AbstractScreen>();
final void addScreen(final AbstractScreen screen) throws IllegalArgumentException{
if(screen == null){
throw new IllegalArgumentException("NULL screen cannot be added");
}
synchronized(screensLock){
screens.add(screen);
}
}
final void addScreens(final List<AbstractScreen> screens) throws IllegalArgumentException{
if(screens == null){
throw new IllegalArgumentException("NULL screens cannot be added");
}
for(final AbstractScreen screen : screens){
addScreen(screen);
}
}
final List<AbstractScreen> getScreens(){
synchronized(screensLock){
return new ArrayList<AbstractScreen>(screens);
}
}
final void clearScreens(){
synchronized(screensLock){
screens.clear();
}
}
public final AbstractScreen findScreen(final Class<? extends AbstractScreen> screenClass){
if(screenClass != null){
synchronized(screensLock){
for(final AbstractScreen screen : screens){
if(screen != null){
if(screen.getClass().equals(screenClass)){
return screen;
}
}
}
}
}
return null;
}
public final boolean putVertex(final AbstractVertex vertex){
boolean block = false;
if(vertex == null){
block = true;
}else{
synchronized(screensLock){
for(final AbstractScreen screen : screens){
if(screen.blockVertex(vertex)){
block = true;
break;
}
}
}
}
if(block){
return false;
}else{
return storeVertex(vertex);
}
}
public final boolean putEdge(final AbstractEdge edge){
boolean block = false;
if(edge == null){
block = true;
}else{
synchronized(screensLock){
for(final AbstractScreen screen : screens){
if(screen.blockEdge(edge)){
block = true;
break;
}
}
}
}
if(block){
return false;
}else{
return storeEdge(edge);
}
}
////////////////
public static final String PRIMARY_KEY = "hash";
public static final String CHILD_VERTEX_KEY = "childVertexHash";
public static final String PARENT_VERTEX_KEY = "parentVertexHash";
public static final String DIRECTION = "direction";
public static final String MAX_DEPTH = "maxDepth";
public static final String MAX_LENGTH = "maxLength";
public static final String DIRECTION_ANCESTORS = "ancestors";
public static final String DIRECTION_DESCENDANTS = "descendants";
public static final String DIRECTION_BOTH = "both";
protected Logger logger;
/**
* The arguments with which this storage was initialized.
*/
public String arguments;
/**
* The number of vertices that this storage instance has successfully
* received.
*/
protected long vertexCount;
/**
* The number of edges that this storage instance has successfully received.
*/
protected long edgeCount;
protected static Properties databaseConfigs = new Properties();
/**
* Variables and functions for computing performance stats
*/
protected boolean reportingEnabled = false;
protected long reportingInterval;
protected long reportEveryMs;
protected long startTime, lastReportedTime;
protected long lastReportedVertexCount, lastReportedEdgeCount;
private static String configFile = Settings.getDefaultConfigFilePath(AbstractStorage.class);
/**
* Variables and functions for managing scaffold storage
*/
public static Scaffold scaffold = null;
public static boolean BUILD_SCAFFOLD;
public static String SCAFFOLD_PATH;
public static String SCAFFOLD_DATABASE_NAME;
static
{
try
{
databaseConfigs.load(new FileInputStream(configFile));
BUILD_SCAFFOLD = Boolean.parseBoolean(databaseConfigs.getProperty("build_scaffold"));
SCAFFOLD_PATH = Settings.getPathRelativeToSPADERoot(databaseConfigs.getProperty("scaffold_path"));
SCAFFOLD_DATABASE_NAME = databaseConfigs.getProperty("scaffold_database_name");
if(BUILD_SCAFFOLD)
{
scaffold = ScaffoldFactory.createScaffold(SCAFFOLD_DATABASE_NAME);
if(!scaffold.initialize(SCAFFOLD_PATH))
{
Logger.getLogger(AbstractStorage.class.getName()).log(Level.WARNING, "Scaffold not set!");
}
}
}
catch(Exception ex)
{
// default settings
BUILD_SCAFFOLD = false;
SCAFFOLD_PATH = Settings.getPathRelativeToSPADERoot("db", "scaffold");
SCAFFOLD_DATABASE_NAME = "BerkeleyDB";
Logger.getLogger(AbstractStorage.class.getName()).log(Level.WARNING,
"Loading scaffold configurations from file '" + configFile + "' " +
" unsuccessful! Falling back to default settings", ex);
}
}
protected boolean insertScaffoldEntry(AbstractEdge incomingEdge)
{
return scaffold.insertEntry(incomingEdge);
}
/* For testing purposes only. Set scaffold through Settings file normally. */
public static void setScaffold(Scaffold scaffold)
{
AbstractStorage.scaffold = scaffold;
BUILD_SCAFFOLD = true;
}
/**
* This method is invoked by the kernel to initialize the storage.
*
* @param arguments The arguments with which this storage is to be
* initialized.
* @return True if the storage was initialized successfully.
*/
public abstract boolean initialize(String arguments);
/**
* This method is invoked by the kernel to shut down the storage.
*
* @return True if the storage was shut down successfully.
*/
public boolean shutdown()
{
if(BUILD_SCAFFOLD)
{
scaffold.shutdown();
}
return true;
}
protected void computeStats()
{
long currentTime = System.currentTimeMillis();
if((currentTime - lastReportedTime) >= reportEveryMs)
{
printStats();
lastReportedTime = currentTime;
lastReportedVertexCount = vertexCount;
lastReportedEdgeCount = edgeCount;
}
}
protected void printStats()
{
long currentTime = System.currentTimeMillis();
float overallTime = (float) (currentTime - startTime) / 1000; // # in secs
float intervalTime = (float) (currentTime - lastReportedTime) / 1000; // # in secs
if(overallTime > 0 && intervalTime > 0)
{
// # records/sec
float overallVertexVolume = (float) vertexCount / overallTime;
float overallEdgeVolume = (float) edgeCount / overallTime;
// # records/sec
long intervalVertexCount = vertexCount - lastReportedVertexCount;
long intervalEdgeCount = edgeCount - lastReportedEdgeCount;
float intervalVertexVolume = (float) (intervalVertexCount) / intervalTime;
float intervalEdgeVolume = (float) (intervalEdgeCount) / intervalTime;
logger.log(Level.INFO, "Overall Stats => rate: {0} vertex/sec and {1} edge/sec. count: vertices: {2} and edges: {3}. In total {4} seconds.\n" +
"Interval Stats => rate: {5} vertex/sec and {6} edge/sec. count: vertices: {7} and edges: {8}. In {9} seconds.",
new Object[]{overallVertexVolume, overallEdgeVolume, vertexCount, edgeCount, overallTime, intervalVertexVolume,
intervalEdgeVolume, intervalVertexCount, intervalEdgeCount, intervalTime});
}
}
/**
* This method returns current edge count.
*
* @return edge count
*/
public long getEdgeCount(){
return edgeCount;
}
/**
* This method returns current vertex count.
*
* @return vertex count
*/
public long getVertexCount(){
return vertexCount;
}
/**
* This method is triggered by the Kernel to flush transactions.
*
* @return True if the transactions were flushed successfully.
*/
public boolean flushTransactions(boolean force) {
return true;
}
/**
* This function inserts the given edge into the underlying storage(s) and
* updates the cache(s) accordingly.
* @param incomingEdge edge to insert into the storage
* @return returns true if the insertion is successful. Insertion is considered
* not successful if the edge is already present in the storage.
*/
public abstract boolean storeEdge(AbstractEdge incomingEdge);
/**
* This function inserts the given vertex into the underlying storage(s) and
* updates the cache(s) accordingly.
* @param incomingVertex vertex to insert into the storage
* @return returns true if the insertion is successful. Insertion is considered
* not successful if the vertex is already present in the storage.
*/
public abstract boolean storeVertex(AbstractVertex incomingVertex);
public abstract Object executeQuery(String query);
public QueryInstructionExecutor getQueryInstructionExecutor(){
throw new RuntimeException("Storage does not support querying!");
}
}