Skip to content

Commit

Permalink
Prevent subscription process update from having stale data
Browse files Browse the repository at this point in the history
  • Loading branch information
bgoncal committed Feb 29, 2024
1 parent 335db2d commit 26c5f37
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 20 deletions.
22 changes: 17 additions & 5 deletions Source/Caches/HACacheKeyStates.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,32 @@ internal struct HACacheKeyStates: HACacheKey {
.init(
subscription: .subscribeEntities(),
transform: { info in
.replace(processUpdates(info: info))
.replace(processUpdates(
info: info,
shouldResetEntities: info.subscriptionPhase == .initial
))
}
)
)
}

/// Process updates from the compressed state to HAEntity
/// - Parameter info: The compressed state update and the current cached states
/// - Parameters:
/// - info: The compressed state update and the current cached states
/// - shouldResetEntities: True if current state needs to be ignored (e.g. re-connection)
/// - Returns: HAEntity cached states
/// Logic from: https://github.com/home-assistant/home-assistant-js-websocket/blob/master/lib/entities.ts
static func processUpdates(info: HACacheTransformInfo<HACompressedStatesUpdates, HACachedStates?>)
static func processUpdates(
info: HACacheTransformInfo<HACompressedStatesUpdates, HACachedStates?>,
shouldResetEntities: Bool
)
-> HACachedStates {
var states: HACachedStates = info.current ?? .init(entities: [])

var states: HACachedStates
if shouldResetEntities {
states = .init(entities: [])
} else {
states = info.current ?? .init(entities: [])
}
if let additions = info.incoming.add {
for (entityId, updates) in additions {
if var currentState = states[entityId] {
Expand Down
24 changes: 20 additions & 4 deletions Source/Caches/HACacheSubscribeInfo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ public struct HACacheSubscribeInfo<OutgoingType> {

return transform(value)
}, start: { connection, perform in
connection.subscribe(to: nonRetrySubscription, handler: { _, incoming in
var operationType: HACacheSubscriptionPhase = .initial
return connection.subscribe(to: nonRetrySubscription, handler: { _, incoming in
perform { current in
transform(.init(incoming: incoming, current: current))
let transform = transform(.init(
incoming: incoming,
current: current,
subscriptionPhase: operationType
))
operationType = .iteration
return transform
}
})
}
Expand All @@ -63,10 +70,19 @@ public struct HACacheSubscribeInfo<OutgoingType> {
/// - Parameters:
/// - incoming: The incoming value, of some given type -- intended to be the IncomingType that created this
/// - current: The current value part of the transform info
/// - subscriptionPhase: The phase in which the subscription is, initial iteration or subsequent
/// - Throws: If the type of incoming does not match the original IncomingType
/// - Returns: The response from the transform block
public func transform<IncomingType>(incoming: IncomingType, current: OutgoingType) throws -> Response {
try anyTransform(HACacheTransformInfo<IncomingType, OutgoingType>(incoming: incoming, current: current))
public func transform<IncomingType>(
incoming: IncomingType,
current: OutgoingType,
subscriptionPhase: HACacheSubscriptionPhase
) throws -> Response {
try anyTransform(HACacheTransformInfo<IncomingType, OutgoingType>(
incoming: incoming,
current: current,
subscriptionPhase: subscriptionPhase
))
}

/// The start handler
Expand Down
11 changes: 11 additions & 0 deletions Source/Caches/HACacheTransformInfo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,15 @@ public struct HACacheTransformInfo<IncomingType, OutgoingType> {
/// For populate transforms, this is nil if an initial request hasn't been sent yet and the cache not reset.
/// For subscribe transforms, this is nil if the populate did not produce results (or does not exist).
public var current: OutgoingType

/// The current phase of the subscription
public var subscriptionPhase: HACacheSubscriptionPhase = .initial
}

/// The subscription phases
public enum HACacheSubscriptionPhase {
/// `Initial` means it's the first time a value is returned
case initial
/// `Iteration` means subsequent iterations
case iteration
}
8 changes: 4 additions & 4 deletions Tests/HACacheKeyStates.test.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ internal final class HACacheKeyStates_test: XCTestCase {
current: .init(
entities: []
)
)
), shouldResetEntities: false
)

XCTAssertEqual(result.all.count, 2)
Expand Down Expand Up @@ -137,7 +137,7 @@ internal final class HACacheKeyStates_test: XCTestCase {
existentEntity,
]
)
)
), shouldResetEntities: false
)

XCTAssertEqual(result.all.count, 1)
Expand Down Expand Up @@ -206,7 +206,7 @@ internal final class HACacheKeyStates_test: XCTestCase {
existentEntity,
]
)
)
), shouldResetEntities: false
)

XCTAssertEqual(result.all.count, 1)
Expand Down Expand Up @@ -254,7 +254,7 @@ internal final class HACacheKeyStates_test: XCTestCase {
current: .init(
entities: []
)
)
), shouldResetEntities: false
)

XCTAssertEqual(result.all.count, 0)
Expand Down
12 changes: 8 additions & 4 deletions Tests/HACacheSubscribeInfo.test.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ internal class HACacheSubscribeInfoTests: XCTestCase {
XCTAssertEqual(info.request.type, "test")
XCTAssertEqual(info.request.data["in_data"] as? Bool, true)

XCTAssertThrowsError(try info.transform(incoming: "hello", current: item))
XCTAssertThrowsError(try info.transform(incoming: SubscribeItem?.none, current: item))
XCTAssertThrowsError(try info.transform(incoming: "hello", current: item, subscriptionPhase: .initial))
XCTAssertThrowsError(try info.transform(
incoming: SubscribeItem?.none,
current: item,
subscriptionPhase: .iteration
))

result = .reissuePopulate
XCTAssertEqual(try info.transform(incoming: item, current: item), result)
XCTAssertEqual(try info.transform(incoming: item, current: item, subscriptionPhase: .iteration), result)

result = .replace(SubscribeItem())
XCTAssertEqual(try info.transform(incoming: item, current: item), result)
XCTAssertEqual(try info.transform(incoming: item, current: item, subscriptionPhase: .iteration), result)
}

func testNotRetryRequest() throws {
Expand Down
9 changes: 6 additions & 3 deletions Tests/HACachedStates.test.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ internal class HACachedStatesTests: XCTestCase {
"""
)
),
current: nil
current: nil,
subscriptionPhase: .initial
)
guard case let .replace(outgoingType) = result1 else {
XCTFail("Did not replace when expected")
Expand Down Expand Up @@ -85,7 +86,8 @@ internal class HACachedStatesTests: XCTestCase {
"""
)
),
current: .init(entities: Array(outgoingType!.all))
current: .init(entities: Array(outgoingType!.all)),
subscriptionPhase: .iteration
)

guard case let .replace(updatedOutgoingType) = updateEventResult else {
Expand All @@ -112,7 +114,8 @@ internal class HACachedStatesTests: XCTestCase {
"""
)
),
current: .init(entities: Array(updatedOutgoingType!.all))
current: .init(entities: Array(outgoingType!.all)),
subscriptionPhase: .iteration
)

guard case let .replace(entityRemovedOutgoingType) = entityRemovalEventResult else {
Expand Down

0 comments on commit 26c5f37

Please sign in to comment.