diff --git a/mobile/lib/domain/interfaces/sync_api.interface.dart b/mobile/lib/domain/interfaces/sync_api.interface.dart new file mode 100644 index 0000000000000..fb8f1aa46e07e --- /dev/null +++ b/mobile/lib/domain/interfaces/sync_api.interface.dart @@ -0,0 +1,7 @@ +import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; + +abstract interface class ISyncApiRepository { + Future ack(String data); + + Stream> watchUserSyncEvent(); +} diff --git a/mobile/lib/domain/models/sync/sync_event.model.dart b/mobile/lib/domain/models/sync/sync_event.model.dart new file mode 100644 index 0000000000000..133d22256a24c --- /dev/null +++ b/mobile/lib/domain/models/sync/sync_event.model.dart @@ -0,0 +1,56 @@ +// ignore_for_file: public_member_api_docs, sort_constructors_first +import 'dart:convert'; + +class SyncEvent { + // dynamic + final dynamic data; + + final String ack; + + SyncEvent({ + required this.data, + required this.ack, + }); + + SyncEvent copyWith({ + dynamic data, + String? ack, + }) { + return SyncEvent( + data: data ?? this.data, + ack: ack ?? this.ack, + ); + } + + Map toMap() { + return { + 'data': data, + 'ack': ack, + }; + } + + factory SyncEvent.fromMap(Map map) { + return SyncEvent( + data: map['data'] as dynamic, + ack: map['ack'] as String, + ); + } + + String toJson() => json.encode(toMap()); + + factory SyncEvent.fromJson(String source) => + SyncEvent.fromMap(json.decode(source) as Map); + + @override + String toString() => 'SyncEvent(data: $data, ack: $ack)'; + + @override + bool operator ==(covariant SyncEvent other) { + if (identical(this, other)) return true; + + return other.data == data && other.ack == ack; + } + + @override + int get hashCode => data.hashCode ^ ack.hashCode; +} diff --git a/mobile/lib/domain/models/sync/sync_user_delete.model.dart b/mobile/lib/domain/models/sync/sync_user_delete.model.dart new file mode 100644 index 0000000000000..268de647ccc29 --- /dev/null +++ b/mobile/lib/domain/models/sync/sync_user_delete.model.dart @@ -0,0 +1,49 @@ +// ignore_for_file: public_member_api_docs, sort_constructors_first +import 'dart:convert'; + +class SyncUserDeleteResponse { + final String userId; + SyncUserDeleteResponse({ + required this.userId, + }); + + SyncUserDeleteResponse copyWith({ + String? userId, + }) { + return SyncUserDeleteResponse( + userId: userId ?? this.userId, + ); + } + + Map toMap() { + return { + 'userId': userId, + }; + } + + factory SyncUserDeleteResponse.fromMap(Map map) { + return SyncUserDeleteResponse( + userId: map['userId'] as String, + ); + } + + String toJson() => json.encode(toMap()); + + factory SyncUserDeleteResponse.fromJson(String source) => + SyncUserDeleteResponse.fromMap( + json.decode(source) as Map, + ); + + @override + String toString() => 'SyncUserDeleteResponse(userId: $userId)'; + + @override + bool operator ==(covariant SyncUserDeleteResponse other) { + if (identical(this, other)) return true; + + return other.userId == userId; + } + + @override + int get hashCode => userId.hashCode; +} diff --git a/mobile/lib/domain/models/sync/sync_user_update.model.dart b/mobile/lib/domain/models/sync/sync_user_update.model.dart new file mode 100644 index 0000000000000..3f6ed893a5aba --- /dev/null +++ b/mobile/lib/domain/models/sync/sync_user_update.model.dart @@ -0,0 +1,79 @@ +// ignore_for_file: public_member_api_docs, sort_constructors_first +import 'dart:convert'; + +class SyncUserUpdateResponse { + final String id; + + final String name; + + final String email; + + final DateTime? deletedAt; + + SyncUserUpdateResponse({ + required this.id, + required this.name, + required this.email, + required this.deletedAt, + }); + + SyncUserUpdateResponse copyWith({ + String? id, + String? name, + String? email, + DateTime? deletedAt, + }) { + return SyncUserUpdateResponse( + id: id ?? this.id, + name: name ?? this.name, + email: email ?? this.email, + deletedAt: deletedAt ?? this.deletedAt, + ); + } + + Map toMap() { + return { + 'id': id, + 'name': name, + 'email': email, + 'deletedAt': deletedAt, + }; + } + + factory SyncUserUpdateResponse.fromMap(Map map) { + return SyncUserUpdateResponse( + id: map['id'] as String, + name: map['name'] as String, + email: map['email'] as String, + deletedAt: + map['deletedAt'] != null ? DateTime.parse(map['deletedAt']) : null, + ); + } + + String toJson() => json.encode(toMap()); + + factory SyncUserUpdateResponse.fromJson(String source) => + SyncUserUpdateResponse.fromMap( + json.decode(source) as Map, + ); + + @override + String toString() { + return 'SyncUserResponse(id: $id, name: $name, email: $email, deletedAt: $deletedAt)'; + } + + @override + bool operator ==(covariant SyncUserUpdateResponse other) { + if (identical(this, other)) return true; + + return other.id == id && + other.name == name && + other.email == email && + other.deletedAt == deletedAt; + } + + @override + int get hashCode { + return id.hashCode ^ name.hashCode ^ email.hashCode ^ deletedAt.hashCode; + } +} diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart new file mode 100644 index 0000000000000..3e4216d17e394 --- /dev/null +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -0,0 +1,50 @@ +import 'package:flutter/foundation.dart'; +import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/interfaces/user.interface.dart'; +import 'package:immich_mobile/domain/models/sync/sync_user_delete.model.dart'; +import 'package:immich_mobile/domain/models/sync/sync_user_update.model.dart'; +import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart'; +import 'package:immich_mobile/repositories/user.repository.dart'; + +final syncStreamServiceProvider = Provider( + (ref) => SyncStreamService( + ref.watch(syncApiRepositoryProvider), + ref.watch(userRepositoryProvider), + ), +); + +class SyncStreamService { + final ISyncApiRepository _syncApiRepository; + final IUserRepository _userRepository; + + SyncStreamService(this._syncApiRepository, this._userRepository); + + void syncUsers() { + _syncApiRepository.watchUserSyncEvent().listen((events) async { + for (final event in events) { + if (event.data is SyncUserUpdateResponse) { + final data = event.data as SyncUserUpdateResponse; + final user = await _userRepository.get(data.id); + + if (user == null) { + continue; + } + + user.name = data.name; + user.email = data.email; + user.updatedAt = DateTime.now(); + + await _userRepository.update(user); + await _syncApiRepository.ack(event.ack); + } + + if (event.data is SyncUserDeleteResponse) { + final data = event.data as SyncUserDeleteResponse; + + debugPrint("User delete: $data"); + } + } + }); + } +} diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart new file mode 100644 index 0000000000000..94a28666c15e3 --- /dev/null +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -0,0 +1,151 @@ +import 'dart:convert'; + +import 'package:flutter/foundation.dart'; +import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:http/http.dart'; +import 'package:immich_mobile/domain/models/store.model.dart'; +import 'package:immich_mobile/entities/store.entity.dart'; +import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; +import 'package:immich_mobile/domain/models/sync/sync_user_delete.model.dart'; +import 'package:immich_mobile/domain/models/sync/sync_user_update.model.dart'; +import 'package:immich_mobile/providers/api.provider.dart'; +import 'package:immich_mobile/providers/db.provider.dart'; +import 'package:immich_mobile/repositories/database.repository.dart'; +import 'package:http/http.dart' as http; +import 'package:openapi/api.dart'; + +final syncApiRepositoryProvider = Provider( + (ref) => SyncApiRepository( + ref.watch(dbProvider), + ref.watch(apiServiceProvider).syncApi, + ), +); + +class SyncApiRepository extends DatabaseRepository + implements ISyncApiRepository { + final SyncApi _syncApi; + SyncApiRepository(super.db, this._syncApi); + + @override + Stream> watchUserSyncEvent() { + return _streamSync( + types: [SyncRequestType.usersV1], + methodName: 'watchUserSyncEvent', + ); + } + + Stream> _streamSync({ + required List types, + required String methodName, + int batchSize = 5000, + }) async* { + final timer = Stopwatch()..start(); + String previousChunk = ''; + List lines = []; + + final client = http.Client(); + final request = _getRequest(types); + if (request == null) { + return; + } + + try { + final response = await client.send(request); + + await for (var chunk in response.stream.transform(utf8.decoder)) { + previousChunk += chunk; + final parts = previousChunk.split('\n'); + previousChunk = parts.removeLast(); + lines.addAll(parts); + + if (lines.length < batchSize) { + continue; + } + + final events = await compute(_parseSyncResponse, lines); + yield events; + lines.clear(); + } + } finally { + if (lines.isNotEmpty) { + final events = await compute(_parseSyncResponse, lines); + yield events; + } + client.close(); + + timer.stop(); + debugPrint( + "[SyncApiRepository.$methodName] Elapsed time: ${timer.elapsedMilliseconds}ms", + ); + } + } + + Request? _getRequest(List types) { + final serverUrl = Store.tryGet(StoreKey.serverUrl); + final accessToken = Store.tryGet(StoreKey.accessToken); + if (serverUrl == null || accessToken == null) { + return null; + } + + final url = Uri.parse('$serverUrl/sync/stream'); + final request = http.Request('POST', url); + final headers = { + 'Content-Type': 'application/json', + 'x-immich-user-token': accessToken, + }; + + request.headers.addAll(headers); + request.body = json.encode({ + "types": [...types], + }); + + return request; + } + + @override + Future ack(String data) { + return _syncApi.sendSyncAck(SyncAckSetDto(acks: [data])); + } +} + +// Need to be outside of the class to be able to use compute +List _parseSyncResponse( + List lines, +) { + final List data = []; + + for (var line in lines) { + try { + final jsonData = jsonDecode(line); + final type = SyncEntityType.fromJson(jsonData['type'])!; + final dataJson = jsonData['data']; + final ack = jsonData['ack']; + + switch (type) { + case SyncEntityType.userV1: + data.add( + SyncEvent( + data: SyncUserUpdateResponse.fromMap(dataJson), + ack: ack, + ), + ); + break; + case SyncEntityType.userDeleteV1: + data.add( + SyncEvent( + data: SyncUserDeleteResponse.fromMap(dataJson), + ack: ack, + ), + ); + break; + default: + debugPrint("[_parseSyncReponse] Unknown type $type"); + } + } catch (error, stack) { + debugPrint("[_parseSyncReponse] Error parsing json $error $stack"); + } + } + + return data; +} diff --git a/mobile/lib/widgets/common/immich_app_bar.dart b/mobile/lib/widgets/common/immich_app_bar.dart index 7a42606797db2..c43dae3e5adc3 100644 --- a/mobile/lib/widgets/common/immich_app_bar.dart +++ b/mobile/lib/widgets/common/immich_app_bar.dart @@ -12,6 +12,7 @@ import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/immich_logo_provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart'; import 'package:immich_mobile/routing/router.dart'; +import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:immich_mobile/widgets/common/app_bar_dialog/app_bar_dialog.dart'; import 'package:immich_mobile/widgets/common/user_circle_avatar.dart'; @@ -185,6 +186,12 @@ class ImmichAppBar extends ConsumerWidget implements PreferredSizeWidget { }, ), actions: [ + IconButton( + onPressed: () { + ref.read(syncStreamServiceProvider).syncUsers(); + }, + icon: const Icon(Icons.sync), + ), if (actions != null) ...actions!.map( (action) => Padding(