diff --git a/src/main/scala/com/github/sadikovi/spark/rdd/NetFlowFileRDD.scala b/src/main/scala/com/github/sadikovi/spark/rdd/NetFlowFileRDD.scala index a229c34..6dcc866 100644 --- a/src/main/scala/com/github/sadikovi/spark/rdd/NetFlowFileRDD.scala +++ b/src/main/scala/com/github/sadikovi/spark/rdd/NetFlowFileRDD.scala @@ -34,6 +34,7 @@ import com.github.sadikovi.netflowlib.predicate.Operators.FilterPredicate import com.github.sadikovi.spark.netflow.NetFlowFilters import com.github.sadikovi.spark.netflow.index.AttributeMap import com.github.sadikovi.spark.netflow.sources._ +import com.github.sadikovi.spark.util.CloseableIterator /** NetFlowFilePartition to hold sequence of file paths */ private[spark] class NetFlowFilePartition[T <: NetFlowFileStatus : ClassTag] ( @@ -104,7 +105,7 @@ private[spark] class NetFlowFileRDD[T <: SQLRow : ClassTag] ( val fileLength = elem.length // Prepare file stream - val stm: FSDataInputStream = fs.open(path) + var stm: FSDataInputStream = fs.open(path) val reader = NetFlowReader.prepareReader(stm, elem.bufferSize) val header = reader.getHeader() // Actual version of the file @@ -140,7 +141,33 @@ private[spark] class NetFlowFileRDD[T <: SQLRow : ClassTag] ( reader.prepareRecordBuffer(internalColumns) } - val rawIterator = recordBuffer.iterator().asScala + val rawIterator = new CloseableIterator[Array[Object]] { + private var delegate = recordBuffer.iterator().asScala + + override def getNext(): Array[Object] = { + // If delegate has traversed over all elements mark it as finished + // to allow to close stream + if (delegate.hasNext) { + delegate.next + } else { + finished = true + null + } + } + + override def close(): Unit = { + // Close stream if possible of fail silently, + // at this point exception does not really matter + try { + if (stm != null) { + stm.close() + stm = null + } + } catch { + case err: Exception => // do nothing + } + } + } // Try collecting statistics before any other mode, because attributes collect raw data. If // file exists, it is assumed that statistics are already written @@ -205,7 +232,7 @@ private[spark] class NetFlowFileRDD[T <: SQLRow : ClassTag] ( buffer = buffer ++ withConversionsIterator } - new Iterator[SQLRow] { + new InterruptibleIterator(context, new Iterator[SQLRow] { def next(): SQLRow = { SQLRow.fromSeq(buffer.next()) } @@ -213,6 +240,6 @@ private[spark] class NetFlowFileRDD[T <: SQLRow : ClassTag] ( def hasNext: Boolean = { buffer.hasNext } - } + }) } } diff --git a/src/main/scala/com/github/sadikovi/spark/util/CloseableIterator.scala b/src/main/scala/com/github/sadikovi/spark/util/CloseableIterator.scala new file mode 100644 index 0000000..85972e6 --- /dev/null +++ b/src/main/scala/com/github/sadikovi/spark/util/CloseableIterator.scala @@ -0,0 +1,87 @@ +/* + * Copyright 2016 sadikovi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sadikovi.spark.util + +/** + * [[CloseableIterator]] provides ability to close associated resources once iteration is finished. + * Copied from apache/spark and slightly modified version of `NextIterator`. + */ +private[spark] abstract class CloseableIterator[U] extends Iterator[U] { + private var gotNext = false + private var nextValue: U = _ + private var closed = false + protected var finished = false + + /** + * Method for subclasses to implement to provide the next element. + * + * If no next element is available, the subclass should set `finished` + * to `true` and may return any value (it will be ignored). + * + * This convention is required because `null` may be a valid value, + * and using `Option` seems like it might create unnecessary Some/None + * instances, given some iterators might be called in a tight loop. + * + * @return U, or set 'finished' when done + */ + protected def getNext(): U + + /** + * Method for subclasses to implement when all elements have been successfully + * iterated, and the iteration is done. + * + * Ideally you should have another try/catch that ensures any resources are closed should + * iteration fail. + */ + protected def close() + + /** + * Calls the subclass-defined close method, but only once. + * + * Usually calling `close` multiple times should be fine, but historically + * there have been issues with some InputFormats throwing exceptions. + */ + def closeIfNeeded() { + if (!closed) { + // Note: it's important that we set closed = true before calling close(), since setting it + // afterwards would permit us to call close() multiple times if close() threw an exception. + closed = true + close() + } + } + + override def hasNext: Boolean = { + if (!finished) { + if (!gotNext) { + nextValue = getNext() + if (finished) { + closeIfNeeded() + } + gotNext = true + } + } + !finished + } + + override def next(): U = { + if (!hasNext) { + throw new NoSuchElementException("End of stream") + } + gotNext = false + nextValue + } +}