Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Record task metadata in watermark.
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelDrogalis committed Nov 9, 2017
1 parent 80ae4f0 commit f3bc80e
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 16 deletions.
41 changes: 41 additions & 0 deletions src/onyx/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@
Local replicas clear out all data about completed and killed jobs -
as if they never existed.
Does not clear out old checkpoints. Use gc-checkpoints to clear those away.
Takes either a peer configuration and constructs a client once for
the operation (closing it on completion) or an already started client."
^{:added "0.6.0"}
Expand Down Expand Up @@ -427,6 +429,45 @@
(finally
(component/stop client)))))

(defmulti gc-checkpoints
"Invokes the garbage collector on Onyx's checkpoints for a given job.
Deletes all checkpoints in non-current replica versions, and all except
the active checkpoint in the current replica version.
Takes either a peer configuration and constructs a client once for
the operation (closing it on completion) or an already started client."
^{:added "0.12.0"}
(fn [connector job-id]
(type connector)))

(defmethod gc-checkpoints OnyxClient
[{:keys [log] :as onyx-client} job-id]
(let [tenancy-id (:prefix log)
coordindates (job-snapshot-coordinates onyx-client tenancy-id job-id)
max-rv (:replica-version coordindates)
max-epoch (:epoch coordindates)
watermarks (checkpoint/read-all-replica-epoch-watermarks log job-id)
sorted-watermarks (sort-by :replica-version watermarks)
targets (take-while
(fn [w]
(or (< (:replica-version w) max-rv)
(and (= (:replica-version w) max-rv)
(< (:epoch w) max-epoch))))
sorted-watermarks)]
(reduce
(fn [result {:keys [replica-version epoch]}])
{:checkpoints-deleted 0 :ts []}
targets)))

(defmethod gc-checkpoints :default
[peer-client-config job-id]
(validator/validate-peer-client-config peer-client-config)
(let [client (component/start (system/onyx-client peer-client-config))]
(try
(gc-checkpoints client job-id)
(finally
(component/stop client)))))

(defmulti await-job-completion
"Blocks until job-id has had all of its tasks completed or the job is killed.
Returns true if the job completed successfully, false if the job was killed.
Expand Down
15 changes: 13 additions & 2 deletions src/onyx/checkpoint.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,21 @@
(fn [storage tenancy-id job-id replica-version epoch task-id slot-id checkpoint-type]
(type storage)))

(defmulti gc-checkpoint!
(fn [storage tenancy-id job-id replica-version epoch task-id slot-id checkpoint-type]
(type storage)))

(defmulti write-replica-epoch-watermark
(fn [storage job-id job-id replication-version epoch task-data]
(type storage)))

(defmulti read-all-replica-epoch-watermarks
(fn [storage job-id]
(type storage)))

; Consistent coordinate write interfaces
(defmulti write-checkpoint-coordinate
(fn [storage tenancy-id job-id coordinate epoch version]
(fn [storage tenancy-id job-id coordinate epoch version task-data]
(type storage)))

(defmulti watch-checkpoint-coordinate
Expand All @@ -40,4 +52,3 @@
(defmulti assume-checkpoint-coordinate
(fn [storage tenancy-id job-id]
(type storage)))

37 changes: 32 additions & 5 deletions src/onyx/log/zookeeper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,21 @@
;; zookeeper connection is shared with peer group, so we don't want to stop it
log)

(defmethod checkpoint/read-all-replica-epoch-watermarks ZooKeeper
[{:keys [conn prefix monitoring]} job-id]
(measure-latency
#(clean-up-broken-connections
(fn []
(let [rv-nodes (zk/children conn (str (epoch-path prefix) "/" job-id))]
(doall
(map
(fn [rv-node]
(let [path (str (epoch-path prefix) "/" job-id "/" rv-node)]
(zookeeper-decompress (:data (zk/data conn path)))))
rv-nodes)))))
#(let [args {:event :zookeeper-read-rv-epochs :latency %}]
(extensions/emit monitoring args))))

(defmethod checkpoint/read-checkpoint ZooKeeper
[{:keys [conn opts prefix monitoring] :as log} tenancy-id job-id
replica-version epoch task-id slot-id checkpoint-type]
Expand All @@ -694,12 +709,24 @@
#(let [args {:event :zookeeper-read-checkpoint :latency %}]
(extensions/emit monitoring args))))

(defn set-epoch-watermark! [conn monitoring prefix job-id rv epoch]
(let [bytes (zookeeper-compress {:epoch epoch})]
(defmethod checkpoint/gc-checkpoint! ZooKeeper
[{:keys [conn prefix monitoring] :as log} tenancy-id job-id
replica-version epoch task-id slot-id checkpoint-type]
;; TODO: add monitoring.

(let [node (str (checkpoint-path-version tenancy-id job-id replica-version epoch)
"/" (checkpoint-task-key task-id slot-id checkpoint-type))]
(zk/delete conn node)))

(defmethod checkpoint/write-replica-epoch-watermark ZooKeeper
[{:keys [conn prefix monitoring] :as log} job-id replica-version epoch task-data]
(let [bytes (zookeeper-compress {:replica-version replica-version
:epoch epoch
:task-data task-data})]
(measure-latency
#(clean-up-broken-connections
(fn []
(let [node (str (epoch-path prefix) "/" job-id "/" rv)
(let [node (str (epoch-path prefix) "/" job-id "/" replica-version)
version (:version (zk/exists conn node))]
(if (nil? version)
(zk/create-all conn node :persistent? true :data bytes)
Expand All @@ -709,14 +736,14 @@
(extensions/emit monitoring args)))))

(defmethod checkpoint/write-checkpoint-coordinate ZooKeeper
[{:keys [conn opts monitoring] :as log} tenancy-id job-id coordinate epoch version]
[{:keys [conn opts monitoring] :as log} tenancy-id job-id coordinate epoch version task-data]
(let [bytes (zookeeper-compress coordinate)]
(measure-latency
#(clean-up-broken-connections
(fn []
(let [node (latest-checkpoint-path tenancy-id job-id)]
(zk/set-data conn node bytes version)
(set-epoch-watermark! conn monitoring tenancy-id job-id (:replica-version coordinate) epoch))))
(checkpoint/write-replica-epoch-watermark log job-id (:replica-version coordinate) epoch task-data))))
#(let [args {:event :zookeeper-write-checkpoint-coordinate :id job-id
:latency % :bytes (count bytes)}]
(extensions/emit monitoring args)))))
Expand Down
31 changes: 25 additions & 6 deletions src/onyx/peer/coordinator.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@
(update state :barrier merge {:remaining new-remaining}))))
state))

(defn write-coordinate [curr-version log tenancy-id job-id coordinate epoch]
(defn write-coordinate [curr-version log tenancy-id job-id coordinate epoch task-data]
(try (->> curr-version
(write-checkpoint-coordinate log tenancy-id job-id coordinate epoch)
(write-checkpoint-coordinate log tenancy-id job-id coordinate epoch task-data)
(:version))
(catch KeeperException$BadVersionException bve
(throw (Exception. "Coordinator failed to write coordinates.
Expand Down Expand Up @@ -110,16 +110,34 @@
(assoc :curr-replica new-replica)))
(assoc state :curr-replica new-replica))))

(defn make-task-data [replica job-id]
(let [tasks (get-in replica [:tasks job-id])
inputs (get-in replica [:input-tasks job-id])
outputs (get-in replica [:output-tasks job-id])
windows (get-in replica [:state-tasks job-id])]
(reduce
(fn [all task-id]
(let [slots (vals (get-in replica [:task-slot-ids job-id task-id]))]
(if (seq slots)
(let [kind (cond (some #{task-id} inputs) :input
(some #{task-id} outputs) :output
(some #{task-id} windows) :windows)]
(assoc-in all [kind task-id] (apply max slots)))
all)))
{}
tasks)))

(defn complete-job
[{:keys [tenancy-id log job-id messenger checkpoint group-ch] :as state}]
[{:keys [tenancy-id log job-id messenger checkpoint group-ch curr-replica] :as state}]
(info (format "Job %s completed, and final checkpoint has finished. Writing checkpoint coordinates." job-id))
(let [replica-version (m/replica-version messenger)
epoch (m/epoch messenger)]
(let [coordinates {:tenancy-id tenancy-id
:job-id job-id
:replica-version replica-version
:epoch epoch}
next-write-version (write-coordinate (:write-version checkpoint) log tenancy-id job-id coordinates epoch)]
task-data (make-task-data curr-replica job-id)
next-write-version (write-coordinate (:write-version checkpoint) log tenancy-id job-id coordinates epoch task-data)]
(>!! group-ch [:send-to-outbox {:fn :complete-job :args {:job-id job-id}}])
(-> state
(update :job merge {:completed? true
Expand Down Expand Up @@ -169,16 +187,17 @@
(and (not (:checkpointing? status))
(>= (:min-epoch status) epoch))))

(defn completed-checkpoint [{:keys [checkpoint messenger job-id tenancy-id log] :as state}]
(defn completed-checkpoint [{:keys [checkpoint messenger job-id tenancy-id log curr-replica] :as state}]
(let [{:keys [epoch write-version]} checkpoint
write-coordinate? (> epoch 0)
coordinates {:tenancy-id tenancy-id
:job-id job-id
:replica-version (m/replica-version messenger)
:epoch epoch}
task-data (make-task-data curr-replica job-id)
;; get the next version of the zk node, so we can detect when there are other writers
next-write-version (if write-coordinate?
(write-coordinate write-version log tenancy-id job-id coordinates epoch)
(write-coordinate write-version log tenancy-id job-id coordinates epoch task-data)
write-version)]
(-> state
(update :barrier merge {:scheduled? false})
Expand Down
10 changes: 7 additions & 3 deletions src/onyx/storage/s3.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
[java.util.concurrent TimeUnit]
[java.util.concurrent.atomic AtomicLong]))



(defn new-client ^AmazonS3Client [peer-config]
(case (arg-or-default :onyx.peer/storage.s3.auth-type peer-config)
:provider-chain (let [credentials (DefaultAWSCredentialsProviderChain.)]
Expand Down Expand Up @@ -118,7 +116,6 @@

(defrecord CheckpointManager [id monitoring client transfer-manager bucket encryption transfers timeout-ns])


(defmethod onyx.checkpoint/storage :s3 [peer-config monitoring]
(let [id (java.util.UUID/randomUUID)
region (:onyx.peer/storage.s3.region peer-config)
Expand Down Expand Up @@ -237,3 +234,10 @@
(do
(.addAndGet ^AtomicLong (:checkpoint-read-bytes monitoring) (alength ^bytes result))
result))))))

(defmethod checkpoint/gc-checkpoint! onyx.storage.s3.CheckpointManager
[{:keys [client bucket monitoring] :as storage}
tenancy-id job-id replica-version epoch task-id slot-id checkpoint-type]
;; TODO: add monitoring.
(let [k (checkpoint-task-key tenancy-id job-id replica-version epoch task-id slot-id checkpoint-type)]
(.deleteObject ^AmazonS3Client client bucket k)))

0 comments on commit f3bc80e

Please sign in to comment.