Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bloc)!: add emit.onEach and emit.forEach to Cubit #4293

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions packages/bloc/lib/bloc.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,21 @@
/// Get started at [bloclibrary.dev](https://bloclibrary.dev) 🚀
library bloc;

export 'src/bloc.dart';
export 'src/bloc_observer.dart';
export 'src/change.dart';
export 'src/cubit.dart';
export 'src/transition.dart';
export 'src/bloc.dart'
show
Bloc,
BlocBase,
BlocEventSink,
Closable,
Emittable,
ErrorSink,
EventHandler,
EventMapper,
EventTransformer,
StateStreamable,
StateStreamableSource;
export 'src/bloc_observer.dart' show BlocObserver;
export 'src/change.dart' show Change;
export 'src/cubit.dart' show Cubit;
export 'src/emitter.dart' show Emitter;
export 'src/transition.dart' show Transition;
204 changes: 195 additions & 9 deletions packages/bloc/lib/src/bloc.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,201 @@
import 'dart:async';

import 'package:bloc/bloc.dart';
import 'package:bloc/src/emitter.dart';
import 'package:meta/meta.dart';

part 'bloc_base.dart';
part 'emitter.dart';
/// An object that provides access to a stream of states over time.
abstract class Streamable<State extends Object?> {
/// The current [stream] of states.
Stream<State> get stream;
}

/// A [Streamable] that provides synchronous access to the current [state].
abstract class StateStreamable<State> implements Streamable<State> {
/// The current [state].
State get state;
}

/// A [StateStreamable] that must be closed when no longer in use.
abstract class StateStreamableSource<State>
implements StateStreamable<State>, Closable {}

/// An object that must be closed when no longer in use.
abstract class Closable {
/// Closes the current instance.
/// The returned future completes when the instance has been closed.
FutureOr<void> close();

/// Whether the object is closed.
///
/// An object is considered closed once [close] is called.
bool get isClosed;
}

/// An object that can emit new states.
// ignore: one_member_abstracts
abstract class Emittable<State extends Object?> {
/// The [Emitter] which handles emitting a new state.
Emitter<State> get emit;
}

/// A generic destination for errors.
///
/// Multiple errors can be reported to the sink via `addError`.
abstract class ErrorSink implements Closable {
/// Adds an [error] to the sink with an optional [stackTrace].
///
/// Must not be called on a closed sink.
void addError(Object error, [StackTrace? stackTrace]);
}

/// {@template bloc_base}
/// An interface for the core functionality implemented by
/// both [Bloc] and [Cubit].
/// {@endtemplate}
abstract class BlocBase<State>
implements StateStreamableSource<State>, Emittable<State>, ErrorSink {
/// {@macro bloc_base}
BlocBase(this._state) {
// ignore: invalid_use_of_protected_member
_blocObserver.onCreate(this);
}

final _blocObserver = Bloc.observer;

late final _stateController = StreamController<State>.broadcast();

final _emitters = <BlocBaseEmitter<State>>[];

State _state;

bool _emitted = false;

@override
State get state => _state;

@override
Stream<State> get stream => _stateController.stream;

/// Whether the bloc is closed.
///
/// A bloc is considered closed once [close] is called.
/// Subsequent state changes cannot occur within a closed bloc.
@override
bool get isClosed => _stateController.isClosed;

/// Updates the [state] to the provided [state].
/// [emit] does nothing if the [state] being emitted
/// is equal to the current [state].
///
/// To allow for the possibility of notifying listeners of the initial state,
/// emitting a state which is equal to the initial state is allowed as long
/// as it is the first thing emitted by the instance.
///
/// * Throws a [StateError] if the bloc is closed.
@protected
@visibleForTesting
@override
Emitter<State> get emit {
late final BlocBaseEmitter<State> emitter;
emitter = BlocBaseEmitter(
(State state) {
try {
if (isClosed) {
throw StateError('Cannot emit new states after calling close');
}
if (state == _state && _emitted) return;
onChange(Change<State>(currentState: this.state, nextState: state));
_state = state;
_stateController.add(_state);
_emitted = true;
} catch (error, stackTrace) {
onError(error, stackTrace);
rethrow;
}
},
onSubscribe: () {
for (final emit in _emitters) {
if (emit != emitter) {
emit.cancel();
}
}
},
);
_emitters.add(emitter);
return emitter;
}

/// Called whenever a [change] occurs with the given [change].
/// A [change] occurs when a new `state` is emitted.
/// [onChange] is called before the `state` of the `cubit` is updated.
/// [onChange] is a great spot to add logging/analytics for a specific `cubit`.
///
/// **Note: `super.onChange` should always be called first.**
/// ```dart
/// @override
/// void onChange(Change change) {
/// // Always call super.onChange with the current change
/// super.onChange(change);
///
/// // Custom onChange logic goes here
/// }
/// ```
///
/// See also:
///
/// * [BlocObserver] for observing [Cubit] behavior globally.
///
@protected
@mustCallSuper
void onChange(Change<State> change) {
// ignore: invalid_use_of_protected_member
_blocObserver.onChange(this, change);
}

/// Reports an [error] which triggers [onError] with an optional [StackTrace].
@protected
@mustCallSuper
@override
void addError(Object error, [StackTrace? stackTrace]) {
onError(error, stackTrace ?? StackTrace.current);
}

/// Called whenever an [error] occurs and notifies [BlocObserver.onError].
///
/// **Note: `super.onError` should always be called last.**
///
/// ```dart
/// @override
/// void onError(Object error, StackTrace stackTrace) {
/// // Custom onError logic goes here
///
/// // Always call super.onError with the current error and stackTrace
/// super.onError(error, stackTrace);
/// }
/// ```
@protected
@mustCallSuper
void onError(Object error, StackTrace stackTrace) {
// ignore: invalid_use_of_protected_member
_blocObserver.onError(this, error, stackTrace);
}

/// Closes the instance.
/// This method should be called when the instance is no longer needed.
/// Once [close] is called, the instance can no longer be used.
@mustCallSuper
@override
Future<void> close() async {
for (final emitter in _emitters) {
emitter.cancel();
}
await Future.wait<void>(_emitters.map((e) => e.future));
// ignore: invalid_use_of_protected_member
_blocObserver.onClose(this);
await _stateController.close();
}
}

/// An [ErrorSink] that supports adding events.
///
Expand Down Expand Up @@ -67,7 +258,6 @@ abstract class Bloc<Event, State> extends BlocBase<State>
final _eventController = StreamController<Event>.broadcast();
final _subscriptions = <StreamSubscription<dynamic>>[];
final _handlers = <_Handler>[];
final _emitters = <_Emitter<dynamic>>[];
final _eventTransformer = Bloc.transformer;

/// Notifies the [Bloc] of a new [event] which triggers
Expand Down Expand Up @@ -148,7 +338,7 @@ abstract class Bloc<Event, State> extends BlocBase<State>
/// {@endtemplate}
@visibleForTesting
@override
void emit(State state) => super.emit(state);
Emitter<State> get emit => super.emit;

/// Register event handler for an event of type `E`.
/// There should only ever be one event handler per event type `E`.
Expand Down Expand Up @@ -208,7 +398,7 @@ abstract class Bloc<Event, State> extends BlocBase<State>
emit(state);
}

final emitter = _Emitter(onEmit);
final emitter = BlocEmitter(onEmit);
final controller = StreamController<E>.broadcast(
sync: true,
onCancel: emitter.cancel,
Expand Down Expand Up @@ -278,10 +468,6 @@ abstract class Bloc<Event, State> extends BlocBase<State>
@override
Future<void> close() async {
await _eventController.close();
for (final emitter in _emitters) {
emitter.cancel();
}
await Future.wait<void>(_emitters.map((e) => e.future));
await Future.wait<void>(_subscriptions.map((s) => s.cancel()));
return super.close();
}
Expand Down
Loading