Skip to content

Commit

Permalink
Merge pull request #74 from reown-com/develop
Browse files Browse the repository at this point in the history
1.2.1
  • Loading branch information
llbartekll authored Jan 6, 2025
2 parents 931f09d + a509cf4 commit bb3a2f6
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ final class ConfigurationService {
projectId: InputConfig.projectId,
socketFactory: DefaultSocketFactory()
)
Networking.instance.setLogging(level: .off)
Networking.instance.setLogging(level: .debug)

let metadata = AppMetadata(
name: "Example Wallet",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ extension NotifyClient {
}

public func register(deviceToken: String) async throws {
// try await pushClient.register(deviceToken: deviceToken)
try await pushClient.register(deviceToken: deviceToken)
}
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion Sources/WalletConnectRelay/PackageConfig.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"version": "1.2.0"}
{"version": "1.2.1"}
101 changes: 73 additions & 28 deletions Sources/WalletConnectRelay/RelayClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,87 @@ public final class RelayClient {
}

public func subscribe(topic: String) async throws {
logger.debug("Subscribing to topic: \(topic)")
logger.debug("[Subscribe] Subscribing to topic: \(topic)")

let rpc = Subscribe(params: .init(topic: topic))
let request = rpc
.asRPCRequest()
let message = try! request.asJSONEncodedString()
let request = rpc.asRPCRequest()
let message = try request.asJSONEncodedString()

try await dispatcher.protectedSend(message)
observeSubscription(requestId: request.id!, topics: [topic])

// Wait for relay's subscription response
try await waitForSubscriptionResponse(
requestId: request.id!,
topics: [topic],
logPrefix: "[Subscribe]"
)
}

public func batchSubscribe(topics: [String]) async throws {
guard !topics.isEmpty else { return }
logger.debug("Subscribing to topics: \(topics)")
logger.debug("[BatchSubscribe] Subscribing to topics: \(topics)")

let rpc = BatchSubscribe(params: .init(topics: topics))
let request = rpc
.asRPCRequest()
let message = try! request.asJSONEncodedString()
let request = rpc.asRPCRequest()
let message = try request.asJSONEncodedString()

try await dispatcher.protectedSend(message)
observeSubscription(requestId: request.id!, topics: topics)

// Same wait, but for multiple topics
try await waitForSubscriptionResponse(
requestId: request.id!,
topics: topics,
logPrefix: "[BatchSubscribe]"
)
}

private func waitForSubscriptionResponse(
requestId: RPCID,
topics: [String],
logPrefix: String
) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
var cancellable: AnyCancellable?

cancellable = subscriptionResponsePublisher
// Only handle responses matching this request ID
.filter { $0.0 == requestId }
// Convert Never to RelayError so we can throw on timeout
.setFailureType(to: RelayError.self)
// Enforce a 30-second timeout
.timeout(.seconds(30), scheduler: concurrentQueue, customError: { .requestTimeout })
.sink(
receiveCompletion: { [unowned self] completion in
switch completion {
case .failure(let error):
cancellable?.cancel()
logger.debug("\(logPrefix) Relay request timeout for topics: \(topics)")
continuation.resume(throwing: error)
case .finished:
// Not typically called in this pattern, but required by Combine
break
}
},
receiveValue: { [unowned self] (_, subscriptionIds) in
cancellable?.cancel()
logger.debug("\(logPrefix) Subscribed to topics: \(topics)")

// Check ID counts, warn if mismatch
guard topics.count == subscriptionIds.count else {
logger.warn("\(logPrefix) Number of returned subscription IDs != number of topics")
continuation.resume(returning: ())
return
}

// Track each subscription
for (i, topic) in topics.enumerated() {
subscriptionsTracker.setSubscription(for: topic, id: subscriptionIds[i])
}

continuation.resume(returning: ())
}
)
}
}

public func unsubscribe(topic: String) async throws {
Expand Down Expand Up @@ -219,24 +282,6 @@ public final class RelayClient {
}
}


private func observeSubscription(requestId: RPCID, topics: [String]) {
var cancellable: AnyCancellable?
cancellable = subscriptionResponsePublisher
.filter { $0.0 == requestId }
.sink { [unowned self] (_, subscriptionIds) in
cancellable?.cancel()
logger.debug("Subscribed to topics: \(topics)")
guard topics.count == subscriptionIds.count else {
logger.warn("Number of topics in (batch)subscribe does not match number of subscriptions")
return
}
for i in 0..<topics.count {
subscriptionsTracker.setSubscription(for: topics[i], id: subscriptionIds[i])
}
}
}

public func getClientId() throws -> String {
try clientIdStorage.getClientId()
}
Expand Down
11 changes: 0 additions & 11 deletions Tests/RelayerTests/RelayClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ final class RelayClientTests: XCTestCase {
waitForExpectations(timeout: 0.001, handler: nil)
}

func testSubscribeRequest() async {
try? await sut.subscribe(topic: "")
let request = dispatcher.getLastRequestSent()
XCTAssertNotNil(request)
}

func testUnsubscribeRequest() {
let topic = String.randomTopic()
subscriptionsTracker.setSubscription(for: topic, id: "")
Expand All @@ -75,11 +69,6 @@ final class RelayClientTests: XCTestCase {
waitForExpectations(timeout: 0.1, handler: nil)
}

func testSendOnSubscribe() async {
try? await sut.subscribe(topic: "")
XCTAssertTrue(dispatcher.sent)
}

func testSendOnUnsubscribe() {
let topic = "123"
subscriptionsTracker.setSubscription(for: topic, id: "")
Expand Down

0 comments on commit bb3a2f6

Please sign in to comment.