以下是将 MediaPipe 手势识别结果通过 UDP 发送的完整 Python 代码实现,包含数据打包和网络通信模块:
import cv2
import json
import socket
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
# UDP 配置
UDP_IP = "127.0.0.1" # 目标IP地址
UDP_PORT = 12345 # 目标端口
BUFFER_SIZE = 1024 # 数据包大小
# 初始化 UDP 套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def send_gesture_data(gesture_data):
"""通过UDP发送结构化手势数据"""
try:
# 将数据序列化为JSON并编码为字节
message = json.dumps(gesture_data).encode('utf-8')
udp_socket.sendto(message, (UDP_IP, UDP_PORT))
except Exception as e:
print(f"UDP发送错误: {str(e)}")
# 初始化MediaPipe手势识别器
GESTURE_MODEL_PATH = 'gesture_recognizer.task'
base_options = python.BaseOptions(model_asset_path=GESTURE_MODEL_PATH)
options = vision.GestureRecognizerOptions(
base_options=base_options,
running_mode=vision.RunningMode.VIDEO,
num_hands=2,
min_hand_detection_confidence=0.7
)
recognizer = vision.GestureRecognizer.create_from_options(options)
# 视频捕获
cap = cv2.VideoCapture(0)
frame_timestamp_ms = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 手势识别
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
recognition_result = recognizer.recognize_for_video(mp_image, frame_timestamp_ms)
frame_timestamp_ms += 1
# 处理并发送数据
if recognition_result.gestures:
gesture_package = {
"hands": [],
"timestamp": frame_timestamp_ms
}
for hand_gestures, hand_landmarks in zip(
recognition_result.gestures, recognition_result.hand_landmarks
):
hand_data = {
"gesture": hand_gestures[0].category_name,
"confidence": float(hand_gestures[0].score),
"landmarks": []
}
# 添加关键点坐标(归一化坐标)
for idx, landmark in enumerate(hand_landmarks):
hand_data["landmarks"].append({
"x": float(landmark.x),
"y": float(landmark.y),
"z": float(landmark.z),
"id": idx
})
gesture_package["hands"].append(hand_data)
# 发送数据
send_gesture_data(gesture_package)
# 显示画面(可选)
cv2.imshow('Gesture UDP Sender', frame)
if cv2.waitKey(10) & 0xFF == ord('q'):
break
# 释放资源
cap.release()
cv2.destroyAllWindows()
udp_socket.close()
代码结构说明
- UDP 通信模块:
- 使用标准库
socket实现 UDP 客户端 send_gesture_data()函数处理数据序列化和发送- 支持错误捕获防止程序崩溃
- 数据结构设计:
python { "timestamp": 1678901234, # 时间戳 "hands": [ { "gesture": "Thumb_Up", "confidence": 0.92, # 置信度 "landmarks": [ # 21个关键点 {"x": 0.45, "y": 0.78, "z": -0.1, "id": 0}, ... ] } ] } - 特征说明:
- 包含完整的手部关键点3D坐标(归一化坐标系)
- 每个手势附带置信度评分
- 支持多手同时识别
- 时间戳用于接收端同步处理
数据接收端示例(Python)
import socket
import json
UDP_IP = "0.0.0.0" # 监听所有接口
UDP_PORT = 12345
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
print(f"监听UDP端口 {UDP_PORT}...")
while True:
data, addr = sock.recvfrom(1024)
try:
gesture_data = json.loads(data.decode('utf-8'))
print(f"收到来自 {addr} 的数据:")
print(f"时间戳: {gesture_data['timestamp']}")
for hand in gesture_data['hands']:
print(f"- 手势: {hand['gesture']} (置信度: {hand['confidence']:.2f})")
print(f" 关键点数量: {len(hand['landmarks'])}")
except Exception as e:
print(f"数据解析错误: {str(e)}")
协议优化建议
- 数据压缩:
python # 发送前压缩数据 import zlib compressed_data = zlib.compress(message) - 二进制协议:
python # 使用protobuf替代JSON import gesture_pb2 # 需要先定义protobuf结构 gesture_proto = gesture_pb2.GestureData() gesture_proto.gesture = "Thumb_Up" udp_socket.sendto(gesture_proto.SerializeToString(), (UDP_IP, UDP_PORT)) - 频率控制:
python # 控制发送频率(如30FPS) import time last_send_time = 0 if time.time() - last_send_time > 1/30: send_gesture_data(data) last_send_time = time.time()
网络调试技巧
- 使用 Wireshark 或 tcpdump 抓包验证数据:
bash tcpdump -i any -A udp port 12345 - 使用 netcat 快速测试接收:
bash nc -ul 12345 - 压力测试工具:
bash # 使用sockperf进行UDP压力测试 sockperf udp -i 127.0.0.1 -p 12345 -t 30