-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathPipe.swift
127 lines (93 loc) · 2.96 KB
/
Pipe.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//
// Pipe.swift
// Pipe
//
// Created by xueqooy on 2022/5/9.
//
import Foundation
private let pipeValueUserInfoKey = "Pipe.Value"
private class PipeContext {
var isInvalidated: Bool = false
var hasSunk: Bool = false
var latestValue: Any?
}
public class PipeChannel {
public var isInvalidated: Bool {
context.isInvalidated
}
fileprivate let name: NSNotification.Name
fileprivate let context: PipeContext
fileprivate init(name: NSNotification.Name, context: PipeContext) {
self.name = name
self.context = context
}
}
public class PipeSinkChannel<T>: PipeChannel {
@discardableResult
public func write(_ value: T?) -> Bool {
if self.isInvalidated {
return false
}
context.hasSunk = true
context.latestValue = value
var userInfo: [AnyHashable : Any]?
if let value = value {
userInfo = [pipeValueUserInfoKey : value]
}
NotificationCenter.default.post(name: name, object: nil, userInfo: userInfo)
return true
}
}
public class PipeSourceToken {
fileprivate weak var observer: AnyObject?
deinit {
invalidate()
}
public func invalidate() {
if let observer = observer {
NotificationCenter.default.removeObserver(observer)
self.observer = nil
}
}
}
public class PipeSourceChannel<T>: PipeChannel {
private var tokens = NSHashTable<PipeSourceToken>.weakObjects()
deinit {
invalidateTokens()
}
fileprivate func invalidateTokens() {
let enumerator = tokens.objectEnumerator()
while let token = enumerator.nextObject() as? PipeSourceToken {
token.invalidate()
}
}
public func read(onQueue queue: OperationQueue? = nil, replay: Bool = false, block: @escaping (T?) -> Void) -> PipeSourceToken? {
if self.isInvalidated {
return nil
}
let token = PipeSourceToken()
token.observer = NotificationCenter.default.addObserver(forName: name, object: nil, queue: queue, using: { note in
let value = note.userInfo?[pipeValueUserInfoKey]
block(value as? T)
})
tokens.add(token)
if replay && context.hasSunk {
block(context.latestValue as? T)
}
return token
}
}
public class Pipe<T> {
private let context = PipeContext()
public let sinkChannel: PipeSinkChannel<T>
public let sourceChannel: PipeSourceChannel<T>
public init() {
let name = Notification.Name(rawValue: "Pipe-" + UUID().uuidString)
sinkChannel = PipeSinkChannel<T>(name: name, context: context)
sourceChannel = PipeSourceChannel<T>(name: name, context: context)
}
deinit {
context.isInvalidated = true
sourceChannel.invalidateTokens()
}
}