Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Re-introduce support for :seq/checkpoint? option (#898)
Browse files Browse the repository at this point in the history
Allows us to prevent checkpoints on seq plugin inputs, for example, when
using an infinite seq as input.
  • Loading branch information
sundbry authored and solatis committed Jul 5, 2019
1 parent 2b3003a commit 8ee8fd1
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/onyx/plugin/seq.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[onyx.plugin.protocols :as p]
[taoensso.timbre :refer [fatal info debug] :as timbre]))

(defrecord AbsSeqReader [event sequential rst completed? offset]
(defrecord AbsSeqReader [event sequential rst completed? checkpoint? offset]
p/Plugin

(start [this event]
Expand All @@ -15,7 +15,7 @@

p/Checkpointed
(checkpoint [this]
@offset)
(when checkpoint? @offset))

(recover! [this _ checkpoint]
(vreset! completed? false)
Expand Down Expand Up @@ -46,11 +46,12 @@
(do (vreset! completed? true)
nil))))

(defn input [event]
(defn input [{:keys [onyx.core/task-map] :as event}]
(map->AbsSeqReader {:event event
:sequential (:seq/seq event)
:rst (volatile! nil)
:completed? (volatile! false)
:checkpoint? (not (false? (:seq/checkpoint? task-map)))
:offset (volatile! nil)}))

(def reader-calls
Expand Down

0 comments on commit 8ee8fd1

Please sign in to comment.