Flutter NDJSON流处理插件shelf_ndjson_stream的使用

Flutter NDJSON流处理插件shelf_ndjson_stream的使用

shelf_ndjson_stream 是一个用于轻松地从Shelf服务器流式传输NDJSON响应的包装器。

使用步骤

  1. 创建一个 NdjsonStream 对象。
  2. 在Shelf处理器中返回 stream.response
  3. 使用 stream.add() 方法添加消息。
  4. 不要忘记调用 stream.close(),否则连接将永远保持打开状态(或者可能会产生一些奇怪的行为)。

示例代码

以下是一个完整的示例代码,展示了如何在Dart应用中使用 shelf_ndjson_stream 插件:

import 'dart:io';

import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as shelf_io;
import 'package:shelf_ndjson_stream/shelf_ndjson_stream.dart';

// 主函数,启动服务器并设置路由处理器
Future<void> main() async {
  // 设置中间件和处理器
  final handler = const Pipeline().addMiddleware(logRequests()).addHandler(_streamedResponse);

  // 启动服务器
  final server = await shelf_io.serve(
    handler,
    InternetAddress.anyIPv4,
    8080,
  );

  // 打印服务器地址信息
  print('Serving at http://${server.address.host}:${server.port}');
}

// 处理请求并生成NDJSON流响应
Response _streamedResponse(Request request) {
  // 创建一个NdjsonStream对象,并初始化第一条消息
  NdjsonStream stream = NdjsonStream(initialMessage: {'counter': 0});

  // 添加数据到流中
  _doStuffWithStream(stream);

  // 返回流响应
  return stream.response;
}

// 添加多个消息到流中
void _doStuffWithStream(NdjsonStream stream) async {
  // 循环添加20条消息到流中
  for (int i = 1; i <= 20; i++) {
    // 每次添加一条新的消息
    stream.add({'counter': i});
    
    // 等待一段时间再添加下一条消息
    await Future.delayed(Duration(milliseconds: 200));
  }

  // 关闭流
  stream.close();
}

代码说明

  1. 导入必要的包:

    import 'dart:io';
    import 'package:shelf/shelf.dart';
    import 'package:shelf/shelf_io.dart' as shelf_io;
    import 'package:shelf_ndjson_stream/shelf_ndjson_stream.dart';
    
  2. 主函数:

    Future<void> main() async {
      final handler = const Pipeline().addMiddleware(logRequests()).addHandler(_streamedResponse);
      final server = await shelf_io.serve(handler, InternetAddress.anyIPv4, 8080);
      print('Serving at http://${server.address.host}:${server.port}');
    }
    

    这里设置了中间件和处理器,并启动了服务器。

  3. 处理请求并生成NDJSON流响应:

    Response _streamedResponse(Request request) {
      NdjsonStream stream = NdjsonStream(initialMessage: {'counter': 0});
      _doStuffWithStream(stream);
      return stream.response;
    }
    

    这里创建了一个 NdjsonStream 对象,并初始化了一条初始消息。然后调用了 _doStuffWithStream 函数来添加更多的消息。

  4. 添加多个消息到流中:

    void _doStuffWithStream(NdjsonStream stream) async {
      for (int i = 1; i <= 20; i++) {
        stream.add({'counter': i});
        await Future.delayed(Duration(milliseconds: 200));
      }
      stream.close();
    }
    

更多关于Flutter NDJSON流处理插件shelf_ndjson_stream的使用的实战教程也可以访问 https://www.itying.com/category-92-b0.html

1 回复

更多关于Flutter NDJSON流处理插件shelf_ndjson_stream的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html


当然,以下是如何在Flutter项目中使用shelf_ndjson_stream插件来处理NDJSON(Newline Delimited JSON)流的示例代码。请注意,shelf_ndjson_stream是一个Dart库,通常用于服务器端处理NDJSON流,而Flutter是客户端框架。因此,我们将通过一个Dart后端服务来演示如何使用这个库,然后在Flutter客户端通过HTTP请求与这个服务进行交互。

1. 设置Dart后端服务

首先,创建一个新的Dart项目作为后端服务。

dart create ndjson_server
cd ndjson_server

然后,添加shelfshelf_ndjson_stream依赖到pubspec.yaml文件中:

dependencies:
  shelf: ^1.0.0
  shelf_ndjson_stream: ^0.3.0

运行dart pub get来安装依赖。

接下来,创建一个简单的Dart后端服务,使用shelf_ndjson_stream来处理NDJSON流:

// bin/server.dart
import 'dart:async';
import 'dart:convert';
import 'package:shelf/shelf.dart';
import 'package:shelf_ndjson_stream/shelf_ndjson_stream.dart';

void main() async {
  var handler = (Request request) async {
    if (request.method == 'POST' && request.headers.contentType?.value == 'application/x-ndjson') {
      var decoder = newNdjsonDecoder(request.body);
      await for (var jsonObj in decoder) {
        // 处理每个JSON对象
        print('Received JSON: ${jsonEncode(jsonObj)}');
        // 这里可以添加你的业务逻辑,比如存储到数据库等
      }
      return new Response.ok('NDJSON stream processed');
    } else {
      return new Response.notFound('Only POST with application/x-ndjson is supported');
    }
  };

  var server = await shelf.serve(handler, 'localhost', 8080);
  print('Serving at http://${server.address.host}:${server.address.port}');
}

运行这个Dart后端服务:

dart run bin/server.dart

2. Flutter客户端

现在,创建一个新的Flutter项目作为客户端。

flutter create ndjson_client
cd ndjson_client

在Flutter客户端项目中,使用http包来发送NDJSON流到后端服务。

首先,在pubspec.yaml文件中添加http依赖:

dependencies:
  flutter:
    sdk: flutter
  http: ^0.13.3

运行flutter pub get来安装依赖。

然后,在Flutter项目中创建一个函数来发送NDJSON流到后端服务:

// lib/main.dart
import 'dart:convert';
import 'dart:io';
import 'package:flutter/material.dart';
import 'package:http/http.dart' as http;

void main() => runApp(MyApp());

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('NDJSON Client'),
        ),
        body: Center(
          child: ElevatedButton(
            onPressed: sendNdJsonStream,
            child: Text('Send NDJSON Stream'),
          ),
        ),
      ),
    );
  }

  Future<void> sendNdJsonStream() async {
    var client = http.Client();
    var body = IOSink(stdout) // 临时用于打印,实际应使用ByteData或StringWriter
      ..write('{"name":"Alice"}\n')
      ..write('{"name":"Bob"}\n')
      ..close();

    var request = http.MultipartRequest('POST', Uri.parse('http://localhost:8080'))
      ..headers['Content-Type'] = 'application/x-ndjson'
      ..addPart(http.FormPart.fromBytes('data', await File('').readAsBytes())); // 这里需要正确设置NDJSON流数据

    // 由于http.MultipartRequest不支持直接设置流数据,这里需要自定义请求
    // 使用http.Request代替MultipartRequest
    var stream = newByteStream()
      ..write(utf8.encode('{"name":"Alice"}\n{"name":"Bob"}\n'));
    var request = http.Request('POST', Uri.parse('http://localhost:8080'))
      ..headers['Content-Type'] = 'application/x-ndjson'
      ..body = stream;

    var response = await client.send(request);
    response.streamDecode().listen((value) {
      print(value);
    });

    client.close();
  }
}

注意:上面的代码在sendNdJsonStream函数中有一个问题,因为http.MultipartRequest不支持直接设置流数据。为了简单起见,这里使用了newByteStream()来模拟NDJSON流,但这并不是最佳实践。在实际应用中,你可能需要创建一个自定义的http.BaseClient或使用其他方式来发送NDJSON流。

由于Flutter客户端和Dart后端服务通常在不同的环境中运行(比如Flutter在移动设备上,Dart后端服务在服务器上),你需要确保它们可以相互通信。这可能涉及到网络配置、CORS(跨源资源共享)策略等。

以上代码是一个基本示例,用于展示如何在Flutter项目中与Dart后端服务进行NDJSON流通信。在实际应用中,你可能需要根据具体需求进行更复杂的错误处理、数据验证和安全性配置。

回到顶部