@ -7,26 +7,43 @@ import 'package:immich_mobile/shared/models/asset.dart';
import ' package:immich_mobile/shared/models/store.dart ' ;
import ' package:immich_mobile/shared/providers/asset.provider.dart ' ;
import ' package:immich_mobile/shared/providers/server_info.provider.dart ' ;
import ' package:immich_mobile/shared/services/sync.service.dart ' ;
import ' package:immich_mobile/utils/debounce.dart ' ;
import ' package:logging/logging.dart ' ;
import ' package:openapi/api.dart ' ;
import ' package:socket_io_client/socket_io_client.dart ' ;
enum PendingAction {
assetDelete ,
}
class PendingChange {
final PendingAction action ;
final dynamic value ;
const PendingChange ( this . action , this . value ) ;
}
class WebsocketState {
final Socket ? socket ;
final bool isConnected ;
final List < PendingChange > pendingChanges ;
WebsocketState ( {
this . socket ,
required this . isConnected ,
required this . pendingChanges ,
} ) ;
WebsocketState copyWith ( {
Socket ? socket ,
bool ? isConnected ,
List < PendingChange > ? pendingChanges ,
} ) {
return WebsocketState (
socket: socket ? ? this . socket ,
isConnected: isConnected ? ? this . isConnected ,
pendingChanges: pendingChanges ? ? this . pendingChanges ,
) ;
}
@ -49,10 +66,17 @@ class WebsocketState {
class WebsocketNotifier extends StateNotifier < WebsocketState > {
WebsocketNotifier ( this . ref )
: super ( WebsocketState ( socket: null , isConnected: false ) ) ;
: super (
WebsocketState ( socket: null , isConnected: false , pendingChanges: [ ] ) ,
) {
debounce = Debounce (
const Duration ( milliseconds: 500 ) ,
) ;
}
final log = Logger ( ' WebsocketNotifier ' ) ;
final Ref ref ;
late final Debounce debounce ;
connect ( ) {
var authenticationState = ref . read ( authenticationProvider ) ;
@ -79,21 +103,36 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
socket . onConnect ( ( _ ) {
debugPrint ( " Established Websocket Connection " ) ;
state = WebsocketState ( isConnected: true , socket: socket ) ;
state = WebsocketState (
isConnected: true ,
socket: socket ,
pendingChanges: state . pendingChanges ,
) ;
} ) ;
socket . onDisconnect ( ( _ ) {
debugPrint ( " Disconnect to Websocket Connection " ) ;
state = WebsocketState ( isConnected: false , socket: null ) ;
state = WebsocketState (
isConnected: false ,
socket: null ,
pendingChanges: state . pendingChanges ,
) ;
} ) ;
socket . on ( ' error ' , ( errorMessage ) {
log . severe ( " Websocket Error - $ errorMessage " ) ;
state = WebsocketState ( isConnected: false , socket: null ) ;
state = WebsocketState (
isConnected: false ,
socket: null ,
pendingChanges: state . pendingChanges ,
) ;
} ) ;
socket . on ( ' on_upload_success ' , _handleOnUploadSuccess ) ;
socket . on ( ' on_config_update ' , _handleOnConfigUpdate ) ;
socket . on ( ' on_asset_delete ' , _handleOnAssetDelete ) ;
socket . on ( ' on_asset_trash ' , _handleServerUpdates ) ;
socket . on ( ' on_asset_restore ' , _handleServerUpdates ) ;
} catch ( e ) {
debugPrint ( " [WEBSOCKET] Catch Websocket Error - ${ e . toString ( ) } " ) ;
}
@ -106,7 +145,11 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
var socket = state . socket ? . disconnect ( ) ;
if ( socket ? . disconnected = = true ) {
state = WebsocketState ( isConnected: false , socket: null ) ;
state = WebsocketState (
isConnected: false ,
socket: null ,
pendingChanges: state . pendingChanges ,
) ;
}
}
@ -120,6 +163,29 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
state . socket ? . on ( ' on_upload_success ' , _handleOnUploadSuccess ) ;
}
addPendingChange ( PendingAction action , dynamic value ) {
state = state . copyWith (
pendingChanges: [ . . . state . pendingChanges , PendingChange ( action , value ) ] ,
) ;
}
handlePendingChanges ( ) {
final deleteChanges = state . pendingChanges
. where ( ( c ) = > c . action = = PendingAction . assetDelete )
. toList ( ) ;
if ( deleteChanges . isNotEmpty ) {
List < String > remoteIds = deleteChanges
. map ( ( a ) = > jsonDecode ( a . value . toString ( ) ) . toString ( ) )
. toList ( ) ;
ref . read ( syncServiceProvider ) . handleRemoteAssetRemoval ( remoteIds ) ;
state = state . copyWith (
pendingChanges: state . pendingChanges
. where ( ( c ) = > c . action ! = PendingAction . assetDelete )
. toList ( ) ,
) ;
}
}
_handleOnUploadSuccess ( dynamic data ) {
final jsonString = jsonDecode ( data . toString ( ) ) ;
final dto = AssetResponseDto . fromJson ( jsonString ) ;
@ -133,6 +199,16 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
ref . read ( serverInfoProvider . notifier ) . getServerFeatures ( ) ;
ref . read ( serverInfoProvider . notifier ) . getServerConfig ( ) ;
}
/ / Refresh updated assets
_handleServerUpdates ( dynamic data ) {
ref . read ( assetProvider . notifier ) . getAllAsset ( ) ;
}
_handleOnAssetDelete ( dynamic data ) {
addPendingChange ( PendingAction . assetDelete , data ) ;
debounce ( handlePendingChanges ) ;
}
}
final websocketProvider =