手势鼠标二 —— 识别节点,发送数据

以下是将 MediaPipe 手势识别结果通过 UDP 发送的完整 Python 代码实现,包含数据打包和网络通信模块:

import cv2
import json
import socket
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

# UDP 配置
UDP_IP = "127.0.0.1"  # 目标IP地址
UDP_PORT = 12345       # 目标端口
BUFFER_SIZE = 1024     # 数据包大小

# 初始化 UDP 套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

def send_gesture_data(gesture_data):
    """通过UDP发送结构化手势数据"""
    try:
        # 将数据序列化为JSON并编码为字节
        message = json.dumps(gesture_data).encode('utf-8')
        udp_socket.sendto(message, (UDP_IP, UDP_PORT))
    except Exception as e:
        print(f"UDP发送错误: {str(e)}")

# 初始化MediaPipe手势识别器
GESTURE_MODEL_PATH = 'gesture_recognizer.task'
base_options = python.BaseOptions(model_asset_path=GESTURE_MODEL_PATH)
options = vision.GestureRecognizerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.VIDEO,
    num_hands=2,
    min_hand_detection_confidence=0.7
)
recognizer = vision.GestureRecognizer.create_from_options(options)

# 视频捕获
cap = cv2.VideoCapture(0)
frame_timestamp_ms = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # 手势识别
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
    recognition_result = recognizer.recognize_for_video(mp_image, frame_timestamp_ms)
    frame_timestamp_ms += 1

    # 处理并发送数据
    if recognition_result.gestures:
        gesture_package = {
            "hands": [],
            "timestamp": frame_timestamp_ms
        }

        for hand_gestures, hand_landmarks in zip(
            recognition_result.gestures, recognition_result.hand_landmarks
        ):
            hand_data = {
                "gesture": hand_gestures[0].category_name,
                "confidence": float(hand_gestures[0].score),
                "landmarks": []
            }

            # 添加关键点坐标(归一化坐标)
            for idx, landmark in enumerate(hand_landmarks):
                hand_data["landmarks"].append({
                    "x": float(landmark.x),
                    "y": float(landmark.y),
                    "z": float(landmark.z),
                    "id": idx
                })

            gesture_package["hands"].append(hand_data)

        # 发送数据
        send_gesture_data(gesture_package)

    # 显示画面(可选)
    cv2.imshow('Gesture UDP Sender', frame)
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

# 释放资源
cap.release()
cv2.destroyAllWindows()
udp_socket.close()

代码结构说明

  1. UDP 通信模块
  2. 使用标准库 socket 实现 UDP 客户端
  3. send_gesture_data() 函数处理数据序列化和发送
  4. 支持错误捕获防止程序崩溃
  5. 数据结构设计python { "timestamp": 1678901234, # 时间戳 "hands": [ { "gesture": "Thumb_Up", "confidence": 0.92, # 置信度 "landmarks": [ # 21个关键点 {"x": 0.45, "y": 0.78, "z": -0.1, "id": 0}, ... ] } ] }
  6. 特征说明
  7. 包含完整的手部关键点3D坐标(归一化坐标系)
  8. 每个手势附带置信度评分
  9. 支持多手同时识别
  10. 时间戳用于接收端同步处理

数据接收端示例(Python)

import socket
import json

UDP_IP = "0.0.0.0"  # 监听所有接口
UDP_PORT = 12345

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))

print(f"监听UDP端口 {UDP_PORT}...")

while True:
    data, addr = sock.recvfrom(1024)
    try:
        gesture_data = json.loads(data.decode('utf-8'))
        print(f"收到来自 {addr} 的数据:")
        print(f"时间戳: {gesture_data['timestamp']}")
        for hand in gesture_data['hands']:
            print(f"- 手势: {hand['gesture']} (置信度: {hand['confidence']:.2f})")
            print(f"  关键点数量: {len(hand['landmarks'])}")
    except Exception as e:
        print(f"数据解析错误: {str(e)}")

协议优化建议

  1. 数据压缩python # 发送前压缩数据 import zlib compressed_data = zlib.compress(message)
  2. 二进制协议python # 使用protobuf替代JSON import gesture_pb2 # 需要先定义protobuf结构 gesture_proto = gesture_pb2.GestureData() gesture_proto.gesture = "Thumb_Up" udp_socket.sendto(gesture_proto.SerializeToString(), (UDP_IP, UDP_PORT))
  3. 频率控制python # 控制发送频率(如30FPS) import time last_send_time = 0 if time.time() - last_send_time > 1/30: send_gesture_data(data) last_send_time = time.time()

网络调试技巧

  1. 使用 Wiresharktcpdump 抓包验证数据: bash tcpdump -i any -A udp port 12345
  2. 使用 netcat 快速测试接收: bash nc -ul 12345
  3. 压力测试工具: bash # 使用sockperf进行UDP压力测试 sockperf udp -i 127.0.0.1 -p 12345 -t 30
此条目发表在手势鼠标分类目录。将固定链接加入收藏夹。