Flutter分布式数据同步插件drift_crdt的使用

Flutter分布式数据同步插件drift_crdt的使用

drift_crdt 包含一个基于 sqflite 包的 CRDT drift 数据库实现。此包是 Simon Binder 的 Drift 插件,并基于 Simon Binder 的 drift_sqflite 包。

有关 drift 的更多信息,请参阅其文档

使用

CrdtQueryExecutor 类可以传递给您的 drift 数据库类的构造函数,使其使用 sqflite

@DriftDatabase(tables: [Todos, Categories])
class MyDatabase extends _$MyDatabase {
  // 我们通过此构造函数告诉数据库存储数据的位置
  MyDatabase() : super(_openConnection());

  // 每当更改或添加表定义时,应增加此数字。
  // 迁移在文档的其他部分有详细介绍。
  @override
  int get schemaVersion => 1;
}

QueryExecutor _openConnection() {
  return CrdtQueryExecutor.inDatabaseFolder(path: 'db.sqlite');
}

Drift迁移

目前不支持迁移。这是因为 CRDT 实现劫持了 SQL 查询并修改它们以管理 CRDT 函数。

但是,迁移可以被实现。

1. 像往常一样创建迁移。

2. 创建生成列的闭包

假设 DRIFT 为表生成的输出如下:

i1.GeneratedColumn<String> _column_0(String aliasedName) =>
    i1.GeneratedColumn<String>('id', aliasedName, false,
        type: i1.DriftSqlType.string);
i1.GeneratedColumn<DateTime> _column_1(String aliasedName) =>
    i1.GeneratedColumn<DateTime>('start', aliasedName, false,
        type: i1.DriftSqlType.dateTime, defaultValue: currentDateAndTime);
i1.GeneratedColumn<DateTime> _column_2(String aliasedName) =>
    i1.GeneratedColumn<DateTime>('end', aliasedName, true,
        type: i1.DriftSqlType.dateTime);
i1.GeneratedColumn<String> _column_3(String aliasedName) =>
    i1.GeneratedColumn<String>('title', aliasedName, false,
        type: i1.DriftSqlType.string);
i1.GeneratedColumn<String> _column_4(String aliasedName) =>
    i1.GeneratedColumn<String>('body', aliasedName, true,
        type: i1.DriftSqlType.string);
i1.GeneratedColumn<String> _column_5(String aliasedName) =>
    i1.GeneratedColumn<String>('category', aliasedName, true,
        type: i1.DriftSqlType.string);

添加 CRDT 相关的列如下:

i1.GeneratedColumn<String> _column_0(String aliasedName) =>
    i1.GeneratedColumn<String>('id', aliasedName, false,
        type: i1.DriftSqlType.string);
i1.GeneratedColumn<DateTime> _column_1(String aliasedName) =>
    i1.GeneratedColumn<DateTime>('start', aliasedName, false,
        type: i1.DriftSqlType.dateTime, defaultValue: currentDateAndTime);
i1.GeneratedColumn<DateTime> _column_2(String aliasedName) =>
    i1.GeneratedColumn<DateTime>('end', aliasedName, true,
        type: i1.DriftSqlType.dateTime);
i1.GeneratedColumn<String> _column_3(String aliasedName) =>
    i1.GeneratedColumn<String>('title', aliasedName, false,
        type: i1.DriftSqlType.string);
i1.GeneratedColumn<String> _column_4(String aliasedName) =>
    i1.GeneratedColumn<String>('body', aliasedName, true,
        type: i1.DriftSqlType.string);
i1.GeneratedColumn<String> _column_5(String aliasedName) =>
    i1.GeneratedColumn<String>('category', aliasedName, true,
        type: i1.DriftSqlType.string);
i1.GeneratedColumn<int> _column_6(String aliasedName) =>
    i1.GeneratedColumn<int>('is_deleted', aliasedName, false,
        type: i1.DriftSqlType.int, defaultValue: i1.Constant(0));
i1.GeneratedColumn<String> _column_7(String aliasedName) =>
    i1.GeneratedColumn<String>('hlc', aliasedName, false,
      type: i1.DriftSqlType.string,);
i1.GeneratedColumn<String> _column_8(String aliasedName) =>
    i1.GeneratedColumn<String>('node_id', aliasedName, false,
        type: i1.DriftSqlType.string);
i1.GeneratedColumn<String> _column_9(String aliasedName) =>
    i1.GeneratedColumn<String>('modified', aliasedName, false,
        type: i1.DriftSqlType.string);

3. 将 CRDT 列添加到 schema_versions

假设 VersionedTable 中的以下字段:

late final Shape0 epochs = Shape0(
  source: i0.VersionedTable(
    entityName: 'epochs',
    withoutRowId: true,
    isStrict: false,
    tableConstraints: [
      'PRIMARY KEY(id)',
    ],
    columns: [
      _column_0,
      _column_1,
      _column_2,
      _column_3,
      _column_4,
      _column_5,
    ],
    attachedDatabase: database,
  ),
  alias: null);

改为:

late final Shape0 epochs = Shape0(
  source: i0.VersionedTable(
    entityName: 'epochs',
    withoutRowId: true,
    isStrict: false,
    tableConstraints: [
      'PRIMARY KEY(id)',
    ],
    columns: [
      _column_0,
      _column_1,
      _column_2,
      _column_3,
      _column_4,
      _column_5,
      _column_6,
      _column_7,
      _column_8,
      _column_9
    ],
    attachedDatabase: database,
  ),
  alias: null);

4. 将生成的列添加到 Shape 类

每个表的 Shape 类从这:

class Shape0 extends i0.VersionedTable {
  Shape0({required super.source, required super.alias}) : super.aliased();
  i1.GeneratedColumn<String> get id =>
      columnsByName['id']! as i1.GeneratedColumn<String>;
  i1.GeneratedColumn<DateTime> get start =>
      columnsByName['start']! as i1.GeneratedColumn<DateTime>;
  i1.GeneratedColumn<DateTime> get end =>
      columnsByName['end']! as i1.GeneratedColumn<DateTime>;
  i1.GeneratedColumn<String> get title =>
      columnsByName['title']! as i1.GeneratedColumn<String>;
  i1.GeneratedColumn<String> get body =>
      columnsByName['body']! as i1.GeneratedColumn<String>;
  i1.GeneratedColumn<String> get category =>
      columnsByName['category']! as i1.GeneratedColumn<String>;
}

改为这:

class Shape0 extends i0.VersionedTable {
  Shape0({required super.source, required super.alias}) : super.aliased();
  i1.GeneratedColumn<String> get id =>
      columnsByName['id']! as i1.GeneratedColumn<String>;
  i1.GeneratedColumn<DateTime> get start =>
      columnsByName['start']! as i1.GeneratedColumn<DateTime>;
  i1.GeneratedColumn<DateTime> get end =>
      columnsByName['end']! as i1.GeneratedColumn<DateTime>;
  i1.GeneratedColumn<String> get title =>
      columnsByName['title']! as i1.GeneratedColumn<String>;
  i1.GeneratedColumn<String> get body =>
      columnsByName['body']! as i1.GeneratedColumn<String>;
  i1.GeneratedColumn<String> get category =>
      columnsByName['category']! as i1.GeneratedColumn<String>;
  i1.GeneratedColumn<int> get isDeleted =>
      columnsByName['is_deleted']! as i1.GeneratedColumn<int>;
  i1.GeneratedColumn<String> get hlc =>
      columnsByName['hlc']! as i1.GeneratedColumn<String>;
  i1.GeneratedColumn<String> get node_id =>
      columnsByName['node_id']! as i1.GeneratedColumn<String>;
  i1.GeneratedColumn<String> get modified =>
      columnsByName['modified']! as i1.GeneratedColumn<String>;
}

在 drift_crdt 和你中查询

默认情况下,删除的记录不会返回。

原因是希望 CRDT 实现无缝并且不应默认破坏您的应用程序。

但是,如果您想查询已删除的记录,可以使用 queryDeleted 辅助函数。

例如,获取所有用户包括已删除的用户:

final result = await queryDeleted(
  (db.executor) as CrdtQueryExecutor,
  () async => db.select(db.users).get()
);

CRDT 特定功能

使用 CrdtQueryExecutor.getLastModified 获取数据库的最后修改时间戳。

final changeset = await (db.executor as CrdtQueryExecutor).getLastModified();

使用 CrdtQueryExecutor.getChangeset 获取数据库的变更集。

final changeset = await (db.executor as CrdtQueryExecutor).getChangeset();

使用 CrdtQueryExecutor.merge 合并变更集到数据库。

await (db.executor as CrdtQueryExecutor).merge(changeset);

更多关于Flutter分布式数据同步插件drift_crdt的使用的实战教程也可以访问 https://www.itying.com/category-92-b0.html

1 回复

更多关于Flutter分布式数据同步插件drift_crdt的使用的实战系列教程也可以访问 https://www.itying.com/category-92-b0.html


当然,下面是一个关于如何在Flutter项目中使用drift_crdt插件来实现分布式数据同步的示例代码。drift_crdt是一个基于Drift(一个用于Flutter和Dart的ORM库)的CRDT(Conflict-Free Replicated Data Types)实现,它允许你在多个设备上无缝地同步数据。

1. 添加依赖

首先,你需要在pubspec.yaml文件中添加driftdrift_crdt的依赖:

dependencies:
  flutter:
    sdk: flutter
  drift: ^x.y.z  # 请替换为最新版本号
  drift_crdt: ^x.y.z  # 请替换为最新版本号
  # 其他依赖...

2. 定义CRDT模型

接下来,定义一个使用CRDT的模型。在这个例子中,我们将定义一个简单的计数器模型。

import 'package:drift/drift.dart';
import 'package:drift_crdt/drift_crdt.dart';

part 'counter_model.g.dart';

@DataClassName('Counter')
class Counters extends TableDef {
  TextColumn get id => text().primaryKey().autoGenerate();
  GCounterColumn<Int32Type> get count => int32().crdt<GCounter>();
}

@DriftDatabase(
  tables: [Counters],
  includeCrdtSchemas: true,
)
abstract class AppDatabase extends Database with _$AppDatabaseMixin {
  AppDatabase() : super(_openConnection());

  // _openConnection() 是一个自定义的数据库连接函数,这里省略具体实现
}

3. 生成数据访问对象

运行以下命令来生成数据访问对象(DAO):

flutter pub run build_runner build --build-target=path/to/your/counter_model.dart

4. 初始化数据库并同步数据

在你的Flutter应用中初始化数据库并设置CRDT同步。以下是一个简单的示例,展示如何初始化数据库并增加计数器的值。

import 'package:flutter/material.dart';
import 'package:drift/drift.dart';
import 'counter_model.dart';
import 'package:drift_crdt/drift_crdt.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('Drift CRDT Example'),
        ),
        body: Center(
          child: CounterScreen(),
        ),
      ),
    );
  }
}

class CounterScreen extends StatefulWidget {
  @override
  _CounterScreenState createState() => _CounterScreenState();
}

class _CounterScreenState extends State<CounterScreen> {
  late AppDatabase _db;
  late CounterDao _counterDao;
  late GCounter<Int32Type> _counter;

  @override
  void initState() {
    super.initState();
    initializeDatabase();
  }

  Future<void> initializeDatabase() async {
    // 打开数据库连接(这里省略具体实现)
    _db = await AppDatabase.connect(path/to/your/database);
    _counterDao = _db.counterDao;

    // 初始化或获取计数器实例
    var counterEntity = await _counterDao.querySingle(Counters.id.equals('counter_id'));
    if (counterEntity != null) {
      _counter = counterEntity.count.value;
    } else {
      _counter = GCounter.initial(Int32Type());
      await _counterDao.insert(Counter(id: 'counter_id', count: _counter.toColumn()));
    }

    // 设置CRDT同步(这里假设有一个同步机制,比如WebSocket)
    setupCrdtSynchronization();
  }

  Future<void> setupCrdtSynchronization() async {
    // 这里假设你有一个WebSocket连接来处理同步
    // WebSocket连接代码省略

    // 监听WebSocket消息并更新CRDT
    // WebSocket.addEventListener('message', (event) async {
    //   var message = parseMessage(event.data);
    //   await _db.withTransaction((_) async {
    //     // 更新CRDT状态
    //     _counter = _counter.merge(message.counter);
    //     await _counterDao.update(counterEntity.copyWith(count: _counter.toColumn()));
    //   });
    // });
  }

  Future<void> incrementCounter() async {
    await _db.withTransaction((_) async {
      _counter = _counter.increment();
      await _counterDao.update(Counter(id: 'counter_id', count: _counter.toColumn()));
      setState(() {});
    });
  }

  @override
  Widget build(BuildContext context) {
    return Column(
      mainAxisAlignment: MainAxisAlignment.center,
      children: <Widget>[
        Text('Counter: ${_counter.value}'),
        ElevatedButton(
          onPressed: incrementCounter,
          child: Text('Increment'),
        ),
      ],
    );
  }

  @override
  void dispose() {
    _db.close();
    super.dispose();
  }
}

注意事项

  1. 数据库连接:示例中的_openConnection()path/to/your/database需要根据你的具体实现进行替换。
  2. CRDT同步:示例中的WebSocket同步部分是一个简化版,你需要根据你的应用需求来实现具体的同步逻辑。
  3. 错误处理:示例中省略了错误处理逻辑,你应该在实际应用中加入适当的错误处理。

这个示例展示了如何使用drift_crdt在Flutter应用中实现基本的分布式数据同步。根据你的具体需求,你可能需要扩展这个示例来支持更多的CRDT类型和复杂的同步逻辑。

回到顶部