HarmonyOS鸿蒙Next中usb批量传输,在接收数据少,频率高的情况怎么解决?
HarmonyOS鸿蒙Next中usb批量传输,在接收数据少,频率高的情况怎么解决? 使用鸿蒙的usbManage进行批量传输接收,发送端发送的数据包大小只有20~1009字节,但是特别频繁,间隔1ms甚至小于1ms,在进行接收的时候因为读不过来会丢包,鸿蒙这边有没有缓冲区、队列或者其他方法可以保证不丢包。
import usbManager from "@ohos.usbManager";
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
import { PhoneKitMessage } from '../communication/PhoneKitMessage';
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
// 定义Worker消息类型
interface ControlMessage {
type: 'CONNECT' | 'START_READING' | 'STOP_READING' | 'DISCONNECT';
}
type WorkerMessage = ControlMessage;
// USB 相关变量
let devicePipe: usbManager.USBDevicePipe | undefined;
let usbInterface: usbManager.USBInterface | undefined;
let endpoint: usbManager.USBEndpoint | undefined;
let isReading: boolean = false;
let readTimer: number | undefined;
// 数据缓冲区,用于处理分包情况
let dataBuffer: Uint8Array = new Uint8Array(0);
// 发送解析后的消息
function sendData(message: PhoneKitMessage): void {
workerPort.postMessage(message);
}
// 处理接收到的数据,处理分包情况
function processReceivedData(newData: Uint8Array): void {
// 将新数据添加到缓冲区
const combinedBuffer = new Uint8Array(dataBuffer.length + newData.length);
combinedBuffer.set(dataBuffer, 0);
combinedBuffer.set(newData, dataBuffer.length);
dataBuffer = combinedBuffer;
// 尝试解析缓冲区中的消息
try {
// 检查是否有足够的帧头和长度信息
if (dataBuffer.length < 6) return; // 需要至少帧头2 + 类型1 + 流水号1 + 长度2
// 获取消息长度
const msgLength = (dataBuffer[4] << 8) | dataBuffer[5];
const totalMsgLength = 9 + msgLength; // 帧头2 + 类型1 + 流水号1 + 长度2 + 内容 + 校验和1 + 帧尾2
// 检查是否有足够的数据
if (dataBuffer.length < totalMsgLength) {
return; // 等待更多数据
}
// 尝试解析消息
const messageData = dataBuffer.slice(0, totalMsgLength);
const message = PhoneKitMessage.fromBuffer(messageData);
sendData(message);
// 从缓冲区移除已处理的数据
dataBuffer = dataBuffer.slice(totalMsgLength);
} catch (error) {
// 解析失败,可能是校验和错误或其他问题
return;
}
}
// 连接设备
async function connectDevice(): Promise<boolean> {
try {
const devices = usbManager.getDevices();
if (devices.length === 0) {
return false;
}
// 选择第一个设备进行测试
const device = devices[0];
// 检查权限
const hasRight = usbManager.hasRight(device.name);
if (!hasRight) {
const granted = await usbManager.requestRight(device.name);
if (!granted) {
return false;
}
}
// 连接设备
devicePipe = usbManager.connectDevice(device);
// 验证devicePipe是否正确设置
if (!devicePipe) {
return false;
}
// 设置配置(如果有多个配置,选择第一个)
if (device.configs && device.configs.length > 0) {
const config = device.configs[0];
const setConfigResult = usbManager.setConfiguration(devicePipe, config);
if (setConfigResult !== 0) {
return false;
}
}
// 获取接口(对于CDC设备,选择数据接口)
if (device.configs && device.configs[0].interfaces && device.configs[0].interfaces.length > 0) {
const interfaces = device.configs[0].interfaces;
// 对于CDC设备,优先选择数据接口(clazz=10,CDC数据类)
let dataInterface = interfaces.find((iface: usbManager.USBInterface) => iface.clazz === 10);
if (!dataInterface) {
// 如果没有数据接口,选择第一个接口
dataInterface = interfaces[0];
}
usbInterface = dataInterface;
// 声明接口
const claimResult = usbManager.claimInterface(devicePipe, usbInterface, true);
if (claimResult !== 0) {
return false;
}
// 查找批量传输IN端点
if (usbInterface.endpoints && usbInterface.endpoints.length > 0) {
// 优先选择批量传输的IN端点
for (const ep of usbInterface.endpoints) {
if (ep.type === usbManager.UsbEndpointTransferType.TRANSFER_TYPE_BULK &&
ep.direction === usbManager.USBRequestDirection.USB_REQUEST_DIR_FROM_DEVICE) {
// 批量传输,IN方向
endpoint = ep;
break;
}
}
// 如果没有找到批量传输端点,尝试中断传输端点
if (!endpoint) {
for (const ep of usbInterface.endpoints) {
if (ep.type === usbManager.UsbEndpointTransferType.TRANSFER_TYPE_INTERRUPT &&
ep.direction === usbManager.USBRequestDirection.USB_REQUEST_DIR_FROM_DEVICE) {
endpoint = ep;
break;
}
}
}
}
if (!endpoint) {
return false;
}
} else {
return false;
}
return true;
} catch (error) {
return false;
}
}
// 开始读取数据
function startReading(): void {
if (!devicePipe || !endpoint) {
return;
}
isReading = true;
// 创建读取缓冲区,使用端点最大包大小
const bufferSize = 1024;
const readLoop = async (): Promise<void> => {
if (!isReading) return;
try {
const buffer = new Uint8Array(bufferSize);
const transferParams: usbManager.UsbDataTransferParams = {
devPipe: devicePipe!,
flags: usbManager.UsbTransferFlags.USB_TRANSFER_SHORT_NOT_OK,
endpoint: endpoint!.address,
type: usbManager.UsbEndpointTransferType.TRANSFER_TYPE_BULK,
timeout: 100,
length: bufferSize,
callback: (err: Error, callBackData: usbManager.SubmitTransferCallback): void => {
if (err) return;
const bytesRead = callBackData.actualLength;
if (bytesRead > 0) {
// 处理接收到的数据,支持分包情况
const receivedData = buffer.slice(0, bytesRead);
processReceivedData(receivedData);
}
},
userData: new Uint8Array(10),
buffer: buffer,
isoPacketCount: 0
};
usbManager.usbSubmitTransfer(transferParams);
} catch (error) {
// 读取失败,继续尝试
}
// 继续读取
if (isReading) {
readTimer = setTimeout(readLoop, 1); // 1ms间隔,尽可能快
}
};
// 开始读取循环
readLoop();
}
// 停止读取
function stopReading(): void {
isReading = false;
if (readTimer) {
clearTimeout(readTimer);
readTimer = undefined;
}
}
// 断开连接
function disconnectDevice(): void {
try {
if (usbInterface && devicePipe) {
usbManager.releaseInterface(devicePipe, usbInterface);
}
if (devicePipe) {
usbManager.closePipe(devicePipe);
}
devicePipe = undefined;
usbInterface = undefined;
endpoint = undefined;
stopReading();
// 清空数据缓冲区
dataBuffer = new Uint8Array(0);
} catch (error) {
// 断开连接失败,忽略
}
}
// 消息处理
workerPort.onmessage = (e: MessageEvents): void => {
const message: WorkerMessage = e.data as WorkerMessage;
switch (message.type) {
case 'CONNECT':
connectDevice();
break;
case 'START_READING':
startReading();
break;
case 'STOP_READING':
stopReading();
break;
case 'DISCONNECT':
disconnectDevice();
break;
}
};
// 错误处理
workerPort.onmessageerror = (): void => {
// 消息序列化错误,忽略
};
// Worker错误处理
workerPort.onerror = (err: ErrorEvent): void => {
// Worker执行错误,忽略
};
更多关于HarmonyOS鸿蒙Next中usb批量传输,在接收数据少,频率高的情况怎么解决?的实战教程也可以访问 https://www.itying.com/category-93-b0.html
3 回复
【背景知识】
usbManager.bulkTransfer可以设置缓冲区,在调用接口前需要通过usbManager.claimInterfaceclaim通信接口。
【解决方案】
开发者你好,可以看下通过bulkTransfer接口进行数据传输;
更多关于HarmonyOS鸿蒙Next中usb批量传输,在接收数据少,频率高的情况怎么解决?的实战系列教程也可以访问 https://www.itying.com/category-93-b0.html
在HarmonyOS Next中,针对USB批量传输接收数据少、频率高的情况,可通过以下方式优化:
- 使用异步I/O操作,避免阻塞主线程。
- 增大USB端点的缓冲区大小,减少频繁中断。
- 采用零拷贝技术,提升数据传输效率。
- 合理设置USB传输参数,如调整数据包大小和传输间隔。
- 利用多线程或任务队列处理接收到的数据,确保实时性。
针对高频小数据包USB批量传输丢包问题,核心在于提升接收处理效率。从代码看,当前使用setTimeout轮询方式存在延迟,无法满足1ms间隔需求。
优化方案:
- 使用连续异步读取替代轮询
async function continuousRead(): Promise<void> {
while (isReading && devicePipe && endpoint) {
try {
const buffer = new Uint8Array(endpoint.maxPacketSize || 512);
const result = await usbManager.usbBulkTransfer(
devicePipe,
endpoint,
buffer,
timeout
);
if (result?.length > 0) {
processReceivedData(result);
}
} catch (error) {
// 处理错误但不中断循环
}
}
}
- 增大内核缓冲区 在设备连接后配置更大缓冲区:
const setBufferResult = usbManager.setUsbBufferSize(
devicePipe,
endpoint.address,
8192 // 8KB缓冲区
);
- 优化数据处理逻辑
- 将数据解析操作移到独立线程
- 使用环形缓冲区减少内存分配
- 批量处理多个数据包
- 调整USB传输参数
const transferParams: usbManager.UsbDataTransferParams = {
timeout: 0, // 非阻塞模式
flags: usbManager.UsbTransferFlags.USB_TRANSFER_ASYNC,
// ...其他参数
};
- 使用硬件加速(如支持)
检查设备是否支持DMA传输,可通过
usbManager.getDeviceCapabilities()查询。
关键点:
- 避免使用setTimeout,直接使用异步连续读取
- 适当增大端点的maxPacketSize配置
- 确保数据处理不会阻塞读取循环
- 考虑使用多个并行传输请求提高吞吐量
当前代码的主要瓶颈在于1ms的轮询间隔无法保证实时性,改为连续异步读取可显著降低延迟。

