Flutter 异步流Stream应用实例

"在Flutter中使用Stream处理异步数据流时遇到几个问题想请教:

  1. 如何正确监听Stream的数据变化并在UI上实时更新?
  2. 当多个Stream同时存在时,怎样避免监听混乱和内存泄漏?
  3. 能否分享一个结合Bloc模式的Stream实际应用案例?
  4. 处理Stream错误时有哪些最佳实践?

最近在实现实时聊天功能时发现数据偶尔丢失,不知道是不是StreamController使用方式有问题。"

3 回复

以下是一个使用 Flutter 的 Stream 实现异步任务的简单例子。假设我们需要从网络获取数据并实时更新 UI。

import 'dart:async';
import 'package:flutter/material.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: StreamExample(),
    );
  }
}

class StreamExample extends StatefulWidget {
  @override
  _StreamExampleState createState() => _StreamExampleState();
}

class _StreamExampleState extends State<StreamExample> {
  late StreamController<String> _streamController;

  @override
  void initState() {
    super.initState();
    _streamController = StreamController<String>();
    startFetchingData();
  }

  void startFetchingData() async {
    for (int i = 1; i <= 5; i++) {
      await Future.delayed(Duration(seconds: 1)); // 模拟网络延迟
      _streamController.add("第 $i 条数据");
    }
    _streamController.close(); // 数据流结束
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Stream 示例')),
      body: StreamBuilder<String>(
        stream: _streamController.stream,
        builder: (context, snapshot) {
          if (snapshot.hasError)
            return Text('错误: ${snapshot.error}');
          else if (!snapshot.hasData)
            return CircularProgressIndicator();
          else
            return Text(snapshot.data!);
        },
      ),
    );
  }

  @override
  void dispose() {
    _streamController.close(); // 清理资源
    super.dispose();
  }
}

这个示例中,我们创建了一个 StreamController,然后通过定时器模拟异步数据流。StreamBuilder 用于监听流,并根据数据更新 UI。

更多关于Flutter 异步流Stream应用实例的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html


假设你需要一个简单的计时器功能,可以使用 Flutter 的 Stream 来实现。以下是代码示例:

import 'dart:async';
import 'package:flutter/material.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: TimerPage(),
    );
  }
}

class TimerPage extends StatefulWidget {
  @override
  _TimerPageState createState() => _TimerPageState();
}

class _TimerPageState extends State<TimerPage> {
  StreamSubscription<int>? _subscription;
  int _counter = 0;

  @override
  void initState() {
    super.initState();
    final stream = Stream.periodic(Duration(seconds: 1), (i) => i);
    _subscription = stream.take(10).listen((value) {
      setState(() => _counter++);
    });
  }

  @override
  void dispose() {
    _subscription?.cancel();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Stream 计时器')),
      body: Center(child: Text('已计时 $_counter 秒')),
    );
  }
}

这个例子中,我们使用 Stream.periodic 创建了一个每秒触发一次的流,监听它并在每次触发时更新界面。通过 take(10) 限制了流的最大值为 10,当计时到 10 秒后自动停止监听。

在 Flutter 中,Stream 用于处理异步数据流,非常适合实时数据更新、网络请求等场景。以下是一个实用示例:

  1. 基础 Stream 示例:
// 创建一个简单的Stream
Stream<int> countStream(int max) async* {
  for (int i = 1; i <= max; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i; // 每秒发出一个数字
  }
}

// 在Widget中使用
StreamBuilder<int>(
  stream: countStream(5),
  builder: (context, snapshot) {
    if (snapshot.hasData) {
      return Text('当前计数: ${snapshot.data}');
    }
    return CircularProgressIndicator();
  },
)
  1. 实际应用场景(实时搜索):
class SearchBloc {
  final _searchController = StreamController<String>();
  
  // 输入流
  Sink<String> get searchSink => _searchController.sink;
  
  // 输出流(防抖处理)
  Stream<List<String>> get searchResults => _searchController.stream
      .debounce(Duration(milliseconds: 500))
      .asyncMap((query) => _performSearch(query));

  Future<List<String>> _performSearch(String query) async {
    // 模拟网络请求
    await Future.delayed(Duration(seconds: 1));
    return ['结果1', '结果2', '结果3']; // 实际替换为API调用
  }

  void dispose() {
    _searchController.close();
  }
}

// 使用时
final bloc = SearchBloc();
TextField(
  onChanged: bloc.searchSink.add,
),
StreamBuilder<List<String>>(
  stream: bloc.searchResults,
  builder: (_, snapshot) {
    // 显示搜索结果
  }
)

关键点:

  1. async*yield 用于生成 Stream
  2. StreamBuilder 自动管理订阅状态
  3. 可使用 debounce/throttle 等操作符优化性能
  4. BLoC模式常用Stream处理业务逻辑

注意:使用后务必调用close()释放资源,防止内存泄漏。

回到顶部