diff --git a/mobile/test/infrastructure/repositories/sync_api_repository_test.dart b/mobile/test/infrastructure/repositories/sync_api_repository_test.dart index 467e19bf3f..660b8206bb 100644 --- a/mobile/test/infrastructure/repositories/sync_api_repository_test.dart +++ b/mobile/test/infrastructure/repositories/sync_api_repository_test.dart @@ -80,6 +80,7 @@ void main() { int onDataCallCount = 0; bool abortWasCalledInCallback = false; List receivedEventsBatch1 = []; + final Completer firstBatchReceived = Completer(); Future onDataCallback(List events, Function() abort, Function() _) async { onDataCallCount++; @@ -87,6 +88,7 @@ void main() { receivedEventsBatch1 = events; abort(); abortWasCalledInCallback = true; + firstBatchReceived.complete(); } else { fail("onData called more than once after abort was invoked"); } @@ -94,7 +96,8 @@ void main() { final streamChangesFuture = streamChanges(onDataCallback); - await pumpEventQueue(); + // Give the stream subscription time to start (longer delay to account for mock delay) + await Future.delayed(const Duration(milliseconds: 50)); for (int i = 0; i < testBatchSize; i++) { responseStreamController.add( @@ -104,6 +107,11 @@ void main() { ); } + await firstBatchReceived.future.timeout( + const Duration(seconds: 5), + onTimeout: () => fail('First batch was not processed within timeout'), + ); + for (int i = testBatchSize; i < testBatchSize * 2; i++) { responseStreamController.add( utf8.encode( @@ -124,12 +132,14 @@ void main() { test('streamChanges does not process remaining lines in finally block if aborted', () async { int onDataCallCount = 0; bool abortWasCalledInCallback = false; + final Completer firstBatchReceived = Completer(); Future onDataCallback(List events, Function() abort, Function() _) async { onDataCallCount++; if (onDataCallCount == 1) { abort(); abortWasCalledInCallback = true; + firstBatchReceived.complete(); } else { fail("onData called more than once after abort was invoked"); } @@ -137,7 +147,7 @@ void main() { final streamChangesFuture = streamChanges(onDataCallback); - await pumpEventQueue(); + await Future.delayed(const Duration(milliseconds: 50)); for (int i = 0; i < testBatchSize; i++) { responseStreamController.add( @@ -147,6 +157,11 @@ void main() { ); } + await firstBatchReceived.future.timeout( + const Duration(seconds: 5), + onTimeout: () => fail('First batch was not processed within timeout'), + ); + // emit a single event to skip batching and trigger finally responseStreamController.add( utf8.encode( @@ -166,13 +181,17 @@ void main() { int onDataCallCount = 0; List receivedEventsBatch1 = []; List receivedEventsBatch2 = []; + final Completer firstBatchReceived = Completer(); + final Completer secondBatchReceived = Completer(); Future onDataCallback(List events, Function() _, Function() __) async { onDataCallCount++; if (onDataCallCount == 1) { receivedEventsBatch1 = events; + firstBatchReceived.complete(); } else if (onDataCallCount == 2) { receivedEventsBatch2 = events; + secondBatchReceived.complete(); } else { fail("onData called more than expected"); } @@ -180,7 +199,7 @@ void main() { final streamChangesFuture = streamChanges(onDataCallback); - await pumpEventQueue(); + await Future.delayed(const Duration(milliseconds: 50)); // Batch 1 for (int i = 0; i < testBatchSize; i++) { @@ -191,7 +210,11 @@ void main() { ); } - // Partial Batch 2 + await firstBatchReceived.future.timeout( + const Duration(seconds: 5), + onTimeout: () => fail('First batch was not processed within timeout'), + ); + responseStreamController.add( utf8.encode( _createJsonLine(SyncEntityType.userDeleteV1.toString(), SyncUserDeleteV1(userId: "user100").toJson(), 'ack100'), @@ -199,6 +222,12 @@ void main() { ); await responseStreamController.close(); + + await secondBatchReceived.future.timeout( + const Duration(seconds: 5), + onTimeout: () => fail('Second batch was not processed within timeout'), + ); + await expectLater(streamChangesFuture, completes); expect(onDataCallCount, 2); @@ -217,7 +246,7 @@ void main() { final streamChangesFuture = streamChanges(onDataCallback); - await pumpEventQueue(); + await Future.delayed(const Duration(milliseconds: 50)); responseStreamController.add( utf8.encode(