HarmonyOS鸿蒙Next中usb批量传输,在接收数据少,频率高的情况怎么解决?

HarmonyOS鸿蒙Next中usb批量传输,在接收数据少,频率高的情况怎么解决? 使用鸿蒙的usbManage进行批量传输接收,发送端发送的数据包大小只有20~1009字节,但是特别频繁,间隔1ms甚至小于1ms,在进行接收的时候因为读不过来会丢包,鸿蒙这边有没有缓冲区、队列或者其他方法可以保证不丢包。

import usbManager from "@ohos.usbManager";
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { PhoneKitMessage } from '../communication/PhoneKitMessage';

const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

// 定义Worker消息类型
interface ControlMessage {
  type: 'CONNECT' | 'START_READING' | 'STOP_READING' | 'DISCONNECT';
}

type WorkerMessage = ControlMessage;

// USB 相关变量
let devicePipe: usbManager.USBDevicePipe | undefined;
let usbInterface: usbManager.USBInterface | undefined;
let endpoint: usbManager.USBEndpoint | undefined;
let isReading: boolean = false;
let readTimer: number | undefined;

// 数据缓冲区,用于处理分包情况
let dataBuffer: Uint8Array = new Uint8Array(0);

// 发送解析后的消息
function sendData(message: PhoneKitMessage): void {
  workerPort.postMessage(message);
}

// 处理接收到的数据,处理分包情况
function processReceivedData(newData: Uint8Array): void {
  // 将新数据添加到缓冲区
  const combinedBuffer = new Uint8Array(dataBuffer.length + newData.length);
  combinedBuffer.set(dataBuffer, 0);
  combinedBuffer.set(newData, dataBuffer.length);
  dataBuffer = combinedBuffer;

  // 尝试解析缓冲区中的消息
  try {
    // 检查是否有足够的帧头和长度信息
    if (dataBuffer.length < 6) return; // 需要至少帧头2 + 类型1 + 流水号1 + 长度2

    // 获取消息长度
    const msgLength = (dataBuffer[4] << 8) | dataBuffer[5];
    const totalMsgLength = 9 + msgLength; // 帧头2 + 类型1 + 流水号1 + 长度2 + 内容 + 校验和1 + 帧尾2

    // 检查是否有足够的数据
    if (dataBuffer.length < totalMsgLength) {
      return; // 等待更多数据
    }

    // 尝试解析消息
    const messageData = dataBuffer.slice(0, totalMsgLength);
    const message = PhoneKitMessage.fromBuffer(messageData);
    sendData(message);

    // 从缓冲区移除已处理的数据
    dataBuffer = dataBuffer.slice(totalMsgLength);

  } catch (error) {
    // 解析失败,可能是校验和错误或其他问题
    return;
  }
}

// 连接设备
async function connectDevice(): Promise<boolean> {
  try {
    const devices = usbManager.getDevices();

    if (devices.length === 0) {
      return false;
    }

    // 选择第一个设备进行测试
    const device = devices[0];

    // 检查权限
    const hasRight = usbManager.hasRight(device.name);
    if (!hasRight) {
      const granted = await usbManager.requestRight(device.name);
      if (!granted) {
        return false;
      }
    }

    // 连接设备
    devicePipe = usbManager.connectDevice(device);

    // 验证devicePipe是否正确设置
    if (!devicePipe) {
      return false;
    }

    // 设置配置(如果有多个配置,选择第一个)
    if (device.configs && device.configs.length > 0) {
      const config = device.configs[0];
      const setConfigResult = usbManager.setConfiguration(devicePipe, config);
      if (setConfigResult !== 0) {
        return false;
      }
    }

    // 获取接口(对于CDC设备,选择数据接口)
    if (device.configs && device.configs[0].interfaces && device.configs[0].interfaces.length > 0) {
      const interfaces = device.configs[0].interfaces;

      // 对于CDC设备,优先选择数据接口(clazz=10,CDC数据类)
      let dataInterface = interfaces.find((iface: usbManager.USBInterface) => iface.clazz === 10);
      if (!dataInterface) {
        // 如果没有数据接口,选择第一个接口
        dataInterface = interfaces[0];
      }

      usbInterface = dataInterface;

      // 声明接口
      const claimResult = usbManager.claimInterface(devicePipe, usbInterface, true);
      if (claimResult !== 0) {
        return false;
      }

      // 查找批量传输IN端点
      if (usbInterface.endpoints && usbInterface.endpoints.length > 0) {
        // 优先选择批量传输的IN端点
        for (const ep of usbInterface.endpoints) {
          if (ep.type === usbManager.UsbEndpointTransferType.TRANSFER_TYPE_BULK &&
              ep.direction === usbManager.USBRequestDirection.USB_REQUEST_DIR_FROM_DEVICE) {
            // 批量传输,IN方向
            endpoint = ep;
            break;
          }
        }

        // 如果没有找到批量传输端点,尝试中断传输端点
        if (!endpoint) {
          for (const ep of usbInterface.endpoints) {
            if (ep.type === usbManager.UsbEndpointTransferType.TRANSFER_TYPE_INTERRUPT &&
                ep.direction === usbManager.USBRequestDirection.USB_REQUEST_DIR_FROM_DEVICE) {
              endpoint = ep;
              break;
            }
          }
        }
      }

      if (!endpoint) {
        return false;
      }
    } else {
      return false;
    }

    return true;
  } catch (error) {
    return false;
  }
}

// 开始读取数据
function startReading(): void {
  if (!devicePipe || !endpoint) {
    return;
  }

  isReading = true;

  // 创建读取缓冲区,使用端点最大包大小
  const bufferSize = 1024;

  const readLoop = async (): Promise<void> => {
    if (!isReading) return;

    try {
      const buffer = new Uint8Array(bufferSize);
      const transferParams: usbManager.UsbDataTransferParams = {
        devPipe: devicePipe!,
        flags: usbManager.UsbTransferFlags.USB_TRANSFER_SHORT_NOT_OK,
        endpoint: endpoint!.address,
        type: usbManager.UsbEndpointTransferType.TRANSFER_TYPE_BULK,
        timeout: 100,
        length: bufferSize,
        callback: (err: Error, callBackData: usbManager.SubmitTransferCallback): void => {
          if (err) return;
          const bytesRead = callBackData.actualLength;
          if (bytesRead > 0) {
            // 处理接收到的数据,支持分包情况
            const receivedData = buffer.slice(0, bytesRead);
            processReceivedData(receivedData);
          }
        },
        userData: new Uint8Array(10),
        buffer: buffer,
        isoPacketCount: 0
      };
      usbManager.usbSubmitTransfer(transferParams);
    } catch (error) {
      // 读取失败,继续尝试
    }

    // 继续读取
    if (isReading) {
      readTimer = setTimeout(readLoop, 1); // 1ms间隔,尽可能快
    }
  };

  // 开始读取循环
  readLoop();
}

// 停止读取
function stopReading(): void {
  isReading = false;
  if (readTimer) {
    clearTimeout(readTimer);
    readTimer = undefined;
  }
}

// 断开连接
function disconnectDevice(): void {
  try {
    if (usbInterface && devicePipe) {
      usbManager.releaseInterface(devicePipe, usbInterface);
    }

    if (devicePipe) {
      usbManager.closePipe(devicePipe);
    }

    devicePipe = undefined;
    usbInterface = undefined;
    endpoint = undefined;
    stopReading();

    // 清空数据缓冲区
    dataBuffer = new Uint8Array(0);
  } catch (error) {
    // 断开连接失败,忽略
  }
}

// 消息处理
workerPort.onmessage = (e: MessageEvents): void => {
  const message: WorkerMessage = e.data as WorkerMessage;
  switch (message.type) {
    case 'CONNECT':
      connectDevice();
      break;
    case 'START_READING':
      startReading();
      break;
    case 'STOP_READING':
      stopReading();
      break;
    case 'DISCONNECT':
      disconnectDevice();
      break;
  }
};

// 错误处理
workerPort.onmessageerror = (): void => {
  // 消息序列化错误,忽略
};

// Worker错误处理
workerPort.onerror = (err: ErrorEvent): void => {
  // Worker执行错误,忽略
};

更多关于HarmonyOS鸿蒙Next中usb批量传输,在接收数据少,频率高的情况怎么解决?的实战教程也可以访问 https://www.itying.com/category-93-b0.html

3 回复

【背景知识】

usbManager.bulkTransfer可以设置缓冲区,在调用接口前需要通过usbManager.claimInterfaceclaim通信接口。

【解决方案】

开发者你好,可以看下通过bulkTransfer接口进行数据传输

更多关于HarmonyOS鸿蒙Next中usb批量传输,在接收数据少,频率高的情况怎么解决?的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html


在HarmonyOS Next中,针对USB批量传输接收数据少、频率高的情况,可通过以下方式优化:

  1. 使用异步I/O操作,避免阻塞主线程。
  2. 增大USB端点的缓冲区大小,减少频繁中断。
  3. 采用零拷贝技术,提升数据传输效率。
  4. 合理设置USB传输参数,如调整数据包大小和传输间隔。
  5. 利用多线程或任务队列处理接收到的数据,确保实时性。

针对高频小数据包USB批量传输丢包问题,核心在于提升接收处理效率。从代码看,当前使用setTimeout轮询方式存在延迟,无法满足1ms间隔需求。

优化方案:

  1. 使用连续异步读取替代轮询
async function continuousRead(): Promise<void> {
  while (isReading && devicePipe && endpoint) {
    try {
      const buffer = new Uint8Array(endpoint.maxPacketSize || 512);
      const result = await usbManager.usbBulkTransfer(
        devicePipe,
        endpoint,
        buffer,
        timeout
      );
      if (result?.length > 0) {
        processReceivedData(result);
      }
    } catch (error) {
      // 处理错误但不中断循环
    }
  }
}
  1. 增大内核缓冲区 在设备连接后配置更大缓冲区:
const setBufferResult = usbManager.setUsbBufferSize(
  devicePipe,
  endpoint.address,
  8192  // 8KB缓冲区
);
  1. 优化数据处理逻辑
  • 将数据解析操作移到独立线程
  • 使用环形缓冲区减少内存分配
  • 批量处理多个数据包
  1. 调整USB传输参数
const transferParams: usbManager.UsbDataTransferParams = {
  timeout: 0,           // 非阻塞模式
  flags: usbManager.UsbTransferFlags.USB_TRANSFER_ASYNC,
  // ...其他参数
};
  1. 使用硬件加速(如支持) 检查设备是否支持DMA传输,可通过usbManager.getDeviceCapabilities()查询。

关键点:

  • 避免使用setTimeout,直接使用异步连续读取
  • 适当增大端点的maxPacketSize配置
  • 确保数据处理不会阻塞读取循环
  • 考虑使用多个并行传输请求提高吞吐量

当前代码的主要瓶颈在于1ms的轮询间隔无法保证实时性,改为连续异步读取可显著降低延迟。

回到顶部