-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
118 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
87 changes: 87 additions & 0 deletions
87
src/main/scala/com/github/sadikovi/spark/util/CloseableIterator.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Copyright 2016 sadikovi | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.github.sadikovi.spark.util | ||
|
||
/** | ||
* [[CloseableIterator]] provides ability to close associated resources once iteration is finished. | ||
* Copied from apache/spark and slightly modified version of `NextIterator`. | ||
*/ | ||
private[spark] abstract class CloseableIterator[U] extends Iterator[U] { | ||
private var gotNext = false | ||
private var nextValue: U = _ | ||
private var closed = false | ||
protected var finished = false | ||
|
||
/** | ||
* Method for subclasses to implement to provide the next element. | ||
* | ||
* If no next element is available, the subclass should set `finished` | ||
* to `true` and may return any value (it will be ignored). | ||
* | ||
* This convention is required because `null` may be a valid value, | ||
* and using `Option` seems like it might create unnecessary Some/None | ||
* instances, given some iterators might be called in a tight loop. | ||
* | ||
* @return U, or set 'finished' when done | ||
*/ | ||
protected def getNext(): U | ||
|
||
/** | ||
* Method for subclasses to implement when all elements have been successfully | ||
* iterated, and the iteration is done. | ||
* | ||
* Ideally you should have another try/catch that ensures any resources are closed should | ||
* iteration fail. | ||
*/ | ||
protected def close() | ||
|
||
/** | ||
* Calls the subclass-defined close method, but only once. | ||
* | ||
* Usually calling `close` multiple times should be fine, but historically | ||
* there have been issues with some InputFormats throwing exceptions. | ||
*/ | ||
def closeIfNeeded() { | ||
if (!closed) { | ||
// Note: it's important that we set closed = true before calling close(), since setting it | ||
// afterwards would permit us to call close() multiple times if close() threw an exception. | ||
closed = true | ||
close() | ||
} | ||
} | ||
|
||
override def hasNext: Boolean = { | ||
if (!finished) { | ||
if (!gotNext) { | ||
nextValue = getNext() | ||
if (finished) { | ||
closeIfNeeded() | ||
} | ||
gotNext = true | ||
} | ||
} | ||
!finished | ||
} | ||
|
||
override def next(): U = { | ||
if (!hasNext) { | ||
throw new NoSuchElementException("End of stream") | ||
} | ||
gotNext = false | ||
nextValue | ||
} | ||
} |