Flutter数据流通信插件stream_channel的使用

发布于 1周前 作者 vueper 来自 Flutter

Flutter数据流通信插件stream_channel的使用

简介

stream_channel包提供了StreamChannel接口,该接口表示一个双向通信通道。每个StreamChannel都暴露了一个用于接收数据的Stream和一个用于发送数据的StreamSink。这个接口帮助将通信逻辑从底层协议中抽象出来,使得在不同场景下(例如WebSocket连接、Isolate连接等)可以复用相同的通信协议。

此外,stream_channel包还包含了一些处理StreamChannel以及一般双向通信的工具函数。更多文档请参阅API 文档

示例代码

下面是一个简单的例子,展示了如何使用stream_channel进行通信:

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';

import 'package:stream_channel/isolate_channel.dart';
import 'package:stream_channel/stream_channel.dart';

Future<void> main() async {
  // 创建一个基于标准输入输出的StreamChannel
  var stdioChannel = StreamChannel(stdin, stdout);
  // 向stdout发送消息
  stdioChannel.sink.add('Hello!\n'.codeUnits);

  // 使用编码器转换StreamChannel,以便处理字符串
  var stringChannel = stdioChannel
      .transform(StreamChannelTransformer.fromCodec(utf8))
      .transformStream(const LineSplitter());
  stringChannel.sink.add('world!\n');

  // 使用StreamChannelController创建更复杂的通信逻辑
  var ctrl = StreamChannelController<String>();
  ctrl.local.stream.listen((event) {
    print('Received from foreign: $event');
  });

  // 将事件从一个channel传递到另一个channel
  await ctrl.foreign.pipe(stringChannel);
  ctrl.local.sink.add('Piped!\n');
  await ctrl.local.sink.close();

  // 创建带有保证的StreamChannel
  var dummyCtrl0 = StreamChannelController<String>();
  var guaranteedChannel = StreamChannel.withGuarantees(
      dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink);
  await guaranteedChannel.sink.close();

  // 使用MultiChannel多路复用多个虚拟通道
  var dummyCtrl1 = StreamChannelController<String>();
  var multiChannel = MultiChannel<String>(dummyCtrl1.foreign);
  var channel1 = multiChannel.virtualChannel();
  await multiChannel.sink.close();

  // 客户端也创建自己的MultiChannel来处理各自通道中的事件
  var dummyCtrl2 = StreamChannelController<String>();
  var multiChannel2 = MultiChannel<String>(dummyCtrl2.foreign);
  var channel2 = multiChannel2.virtualChannel(channel1.id);
  await channel2.sink.close();
  await multiChannel2.sink.close();

  // 使用IsolateChannel实现跨Isolate通信
  var recv = ReceivePort();
  var recvChannel = IsolateChannel<void>.connectReceive(recv);
  var sendChannel = IsolateChannel<void>.connectSend(recv.sendPort);

  // 手动关闭IsolateChannel的sink
  await recvChannel.sink.close();
  await sendChannel.sink.close();

  // 使用Disconnector使通道模拟远程断开
  var disconnector = Disconnector<String>();
  var disconnectable = stringChannel.transform(disconnector);
  disconnectable.sink.add('Still connected!');
  await disconnector.disconnect();
}

关键点解释

  • StreamChannel:代表一个双向通信通道,提供了一个Stream用于接收数据和一个StreamSink用于发送数据。
  • StreamChannelController:简化了StreamChannel的实现,通过localforeign两个成员分别操作本地和远端的通信。
  • MultiChannel:允许多个虚拟通道共享同一个传输层,适用于需要区分来自不同客户端或服务端的消息场景。
  • IsolateChannel:利用Dart的Isolate机制,方便地实现了跨Isolate通信。
  • Disconnector:允许开发者模拟远程端口断开的情况,这对于测试网络异常等情况非常有用。

通过这些功能,stream_channel为Flutter应用内部或与其他系统之间的高效、可靠的数据交换提供了强大的支持。


更多关于Flutter数据流通信插件stream_channel的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html

1 回复

更多关于Flutter数据流通信插件stream_channel的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html


在Flutter开发中,stream_channel 是一个用于在不同隔离环境(如 Dart VM 和原生平台代码)之间建立数据流通信的插件。虽然 stream_channel 本身并不是 Flutter 官方提供的直接插件,但我们可以基于 Dart 的 dart:ui 库中的 PlatformChannel 实现类似的功能。Flutter 社区中有许多类似的库和插件,但这里我们将展示如何手动实现一个简单的数据流通信示例。

以下是一个简单的示例,展示了如何在 Flutter 与原生平台(Android 和 iOS)之间使用自定义的 StreamChannel 进行数据流通信。

Flutter 端代码

首先,我们需要在 Flutter 端定义一个 MethodChannel 和一个 EventChannel,前者用于发送命令,后者用于接收数据流。

import 'package:flutter/services.dart';

class MyStreamChannel {
  static const MethodChannel _methodChannel = MethodChannel('com.example.myapp/stream_channel');
  static const EventChannel _eventChannel = EventChannel('com.example.myapp/stream_event_channel');

  static Stream<dynamic> listenToEvents() {
    return _eventChannel.receiveBroadcastStream();
  }

  static Future<void> sendCommand(String command) async {
    try {
      await _methodChannel.invokeMethod('sendCommand', command);
    } on PlatformException catch (e) {
      print("Failed to invoke: '${e.message}'.");
    }
  }
}

// 使用示例
void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('Stream Channel Example'),
        ),
        body: Center(
          child: Column(
            mainAxisAlignment: MainAxisAlignment.center,
            children: <Widget>[
              Text('Listening to events...'),
              ElevatedButton(
                onPressed: () => MyStreamChannel.sendCommand('startStream'),
                child: Text('Start Stream'),
              ),
            ],
          ),
        ),
      ),
    );
  }

  @override
  void initState() {
    super.initState();
    MyStreamChannel.listenToEvents().listen((event) {
      print('Received event: $event');
    });
  }
}

原生端代码

Android 端

MainActivity.ktMainActivity.java 中设置 MethodChannelEventChannel

// MainActivity.kt
package com.example.myapp

import android.os.Bundle
import android.os.Handler
import android.os.Looper
import androidx.annotation.NonNull
import io.flutter.embedding.android.FlutterActivity
import io.flutter.embedding.engine.FlutterEngine
import io.flutter.plugin.common.EventChannel
import io.flutter.plugin.common.MethodChannel
import io.flutter.plugins.GeneratedPluginRegistrant

class MainActivity: FlutterActivity() {
    private val CHANNEL = "com.example.myapp/stream_channel"
    private val EVENT_CHANNEL = "com.example.myapp/stream_event_channel"

    override fun configureFlutterEngine(@NonNull flutterEngine: FlutterEngine) {
        super.configureFlutterEngine(flutterEngine)
        GeneratedPluginRegistrant.registerWith(flutterEngine)

        MethodChannel(flutterEngine.dartExecutor.binaryMessenger, CHANNEL).setMethodCallHandler { call, result ->
            if (call.method == "sendCommand") {
                val command = call.argument<String>("command")
                if (command == "startStream") {
                    startEventStreaming()
                    result.success(null)
                } else {
                    result.notImplemented()
                }
            } else {
                result.notImplemented()
            }
        }
    }

    private fun startEventStreaming() {
        EventChannel(dartExecutor.binaryMessenger, EVENT_CHANNEL).setStreamHandler(object : EventChannel.StreamHandler {
            override fun onListen(arguments: Any?, events: EventChannel.EventSink?) {
                Handler(Looper.getMainLooper()).postDelayed({
                    for (i in 0..9) {
                        events?.success("Event $i")
                        Handler(Looper.getMainLooper()).postDelayed({ }, 1000) // 模拟延迟
                    }
                }, 1000)
            }

            override fun onCancel(arguments: Any?) {
                // 取消监听时的处理
            }
        })
    }
}

iOS 端

AppDelegate.swift 中设置 MethodChannelEventChannel

// AppDelegate.swift
import UIKit
import Flutter

@UIApplicationMain
@objc class AppDelegate: FlutterAppDelegate {
  override func application(
    _ application: UIApplication,
    didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?
  ) -> Bool {
    GeneratedPluginRegistrant.register(with: self)
    
    let controller : FlutterViewController = window?.rootViewController as! FlutterViewController
    let channel = FlutterMethodChannel(name: "com.example.myapp/stream_channel", binaryMessenger: controller)
    let eventChannel = FlutterEventChannel(name: "com.example.myapp/stream_event_channel", binaryMessenger: controller)
    
    channel.setMethodCallHandler({
      (call: FlutterMethodCall, result: @escaping FlutterResult) in
      if call.method == "sendCommand" {
        if let command = call.arguments as? String, command == "startStream" {
          startEventStreaming(eventChannel: eventChannel)
          result(success: nil)
        } else {
          result(FlutterMethodNotImplemented)
        }
      } else {
        result(FlutterMethodNotImplemented)
      }
    })
    
    return super.application(application, didFinishLaunchingWithOptions: launchOptions)
  }
  
  private func startEventStreaming(eventChannel: FlutterEventChannel) {
    eventChannel.setStreamHandler(FlutterStreamHandler { (eventSink, _) in
      let timer = Timer.scheduledTimer(withTimeInterval: 1.0, repeats: true) { _ in
        for i in 0..<10 {
          eventSink?(FlutterError.none(), "Event \(i)")
        }
        timer.invalidate() // Stop the timer after sending 10 events
      }
    })
  }
}

总结

以上代码展示了如何在 Flutter 与原生平台之间使用自定义的 MethodChannelEventChannel 实现数据流通信。在实际项目中,你可能需要根据具体需求调整通信的细节和数据处理逻辑。

回到顶部