Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Delete checkpoints + znode.
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelDrogalis committed Nov 10, 2017
1 parent f3bc80e commit 144da37
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ log_artifact
onyx.iml
.idea/
/onyx.log.*
*~
32 changes: 4 additions & 28 deletions src/onyx/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
[onyx.static.planning :as planning]
[onyx.static.default-vals :refer [arg-or-default]]
[onyx.static.uuid :refer [random-uuid]]
[onyx.gc :as garbage-collector]
[hasch.core :refer [edn-hash uuid5]])
(:import [java.util UUID]
[java.security MessageDigest]
Expand Down Expand Up @@ -406,19 +407,7 @@

(defmethod gc OnyxClient
[onyx-client]
(let [id (java.util.UUID/randomUUID)
entry (create-log-entry :gc {:id id})
ch (chan 1000)]
(extensions/write-log-entry (:log onyx-client) entry)
(loop [replica (extensions/subscribe-to-log (:log onyx-client) ch)]
(let [entry (<!! ch)
new-replica (extensions/apply-log-entry entry (assoc replica :version (:message-id entry)))]
(if (and (= (:fn entry) :gc) (= (:id (:args entry)) id))
(let [diff (extensions/replica-diff entry replica new-replica)
args {:id id :type :client :log (:log onyx-client)}]
(extensions/fire-side-effects! entry replica new-replica diff args))
(recur new-replica))))
true))
(garbage-collector/gc-log onyx-client))

(defmethod gc :default
[peer-client-config]
Expand All @@ -443,21 +432,8 @@
(defmethod gc-checkpoints OnyxClient
[{:keys [log] :as onyx-client} job-id]
(let [tenancy-id (:prefix log)
coordindates (job-snapshot-coordinates onyx-client tenancy-id job-id)
max-rv (:replica-version coordindates)
max-epoch (:epoch coordindates)
watermarks (checkpoint/read-all-replica-epoch-watermarks log job-id)
sorted-watermarks (sort-by :replica-version watermarks)
targets (take-while
(fn [w]
(or (< (:replica-version w) max-rv)
(and (= (:replica-version w) max-rv)
(< (:epoch w) max-epoch))))
sorted-watermarks)]
(reduce
(fn [result {:keys [replica-version epoch]}])
{:checkpoints-deleted 0 :ts []}
targets)))
coordinates (job-snapshot-coordinates onyx-client tenancy-id job-id)]
(garbage-collector/gc-checkpoints onyx-client coordinates job-id)))

(defmethod gc-checkpoints :default
[peer-client-config job-id]
Expand Down
6 changes: 5 additions & 1 deletion src/onyx/checkpoint.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
(type storage)))

(defmulti write-replica-epoch-watermark
(fn [storage job-id job-id replication-version epoch task-data]
(fn [storage tenancy-id job-id replication-version epoch task-data]
(type storage)))

(defmulti read-all-replica-epoch-watermarks
(fn [storage job-id]
(type storage)))

(defmulti gc-replica-epoch-watermark!
(fn [storage tenancy-id job-id replication-version]
(type storage)))

; Consistent coordinate write interfaces
(defmulti write-checkpoint-coordinate
(fn [storage tenancy-id job-id coordinate epoch version task-data]
Expand Down
63 changes: 63 additions & 0 deletions src/onyx/gc.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
(ns onyx.gc
(:require [clojure.core.async :refer [chan <!!]]
[onyx.log.entry :refer [create-log-entry]]
[onyx.extensions :as extensions]
[onyx.checkpoint :as checkpoint]
[onyx.static.default-vals :refer [arg-or-default]]))

(defn gc-log [onyx-client]
(let [id (java.util.UUID/randomUUID)
entry (create-log-entry :gc {:id id})
ch (chan 1000)]
(extensions/write-log-entry (:log onyx-client) entry)
(loop [replica (extensions/subscribe-to-log (:log onyx-client) ch)]
(let [entry (<!! ch)
new-replica (extensions/apply-log-entry entry (assoc replica :version (:message-id entry)))]
(if (and (= (:fn entry) :gc) (= (:id (:args entry)) id))
(let [diff (extensions/replica-diff entry replica new-replica)
args {:id id :type :client :log (:log onyx-client)}]
(extensions/fire-side-effects! entry replica new-replica diff args))
(recur new-replica))))
true))

(defn build-checkpoint-targets [log job-id max-rv max-epoch]
(let [watermarks (checkpoint/read-all-replica-epoch-watermarks log job-id)
sorted-watermarks (sort-by :replica-version watermarks)]
(reduce
(fn [all {:keys [replica-version epoch task-data]}]
(cond (< replica-version max-rv)
(conj all {:replica-version replica-version
:epoch-range (range 1 (inc epoch))
:task-data task-data})

(= replica-version max-rv)
(conj all {:replica-version replica-version
:epoch-range (range 1 epoch)
:task-data task-data})

:else all))
[]
sorted-watermarks)))

(defn storage-connection [peer-config log]
(if (= :zookeeper (arg-or-default :onyx.peer/storage peer-config))
log
(checkpoint/storage peer-config nil))) ;; TODO: monitoring component?

(defn gc-checkpoints [{:keys [log peer-config] :as onyx-client} job-id coordinates]
(let [tenancy-id (:prefix log)
max-rv (:replica-version coordinates)
max-epoch (:epoch coordinates)
targets (build-checkpoint-targets log job-id max-rv max-epoch)
storage (storage-connection peer-config log)
gc-f (partial checkpoint/gc-checkpoint! storage tenancy-id job-id)]
(reduce
(fn [result {:keys [replica-version epoch-range task-data]}]
(doseq [epoch epoch-range]
(doseq [[p-type task-id->slots] task-data]
(doseq [[task-id slots] task-id->slots]
(doseq [slot (range (inc slots))]
(gc-f replica-version epoch task-id slot p-type)))))
(checkpoint/gc-replica-epoch-watermark! storage tenancy-id job-id replica-version))
{:checkpoints-deleted 0}
targets)))
10 changes: 10 additions & 0 deletions src/onyx/log/zookeeper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,16 @@
#(let [args {:event :zookeeper-read-rv-epochs :latency %}]
(extensions/emit monitoring args))))

(defmethod checkpoint/gc-replica-epoch-watermark! ZooKeeper
[{:keys [conn prefix monitoring]} tenancy-id job-id replica-version]
(measure-latency
#(clean-up-broken-connections
(fn []
(let [node (str (epoch-path prefix) "/" job-id "/" replica-version)]
(zk/delete conn node))))
#(let [args {:event :zookeeper-gc-rv-watermark :latency %}]
(extensions/emit monitoring args))))

(defmethod checkpoint/read-checkpoint ZooKeeper
[{:keys [conn opts prefix monitoring] :as log} tenancy-id job-id
replica-version epoch task-id slot-id checkpoint-type]
Expand Down

0 comments on commit 144da37

Please sign in to comment.