返回

解决摄像头馈送和模型推理多线程化中的常见问题

python

修复摄像头馈送和模型推理多线程化问题

简介

在计算机视觉应用中,同时进行摄像头馈送和模型推理至关重要。通过多线程化,我们可以最大限度地提高性能并避免延迟。本文将逐步指导你解决摄像头馈送和模型推理多线程化中的常见错误,并提供优化后的代码以提高应用程序的流畅度。

错误分析

  • 导入库不足: 确保导入必要的 threading 库。
  • 启用多线程: 使用 threading 模块创建线程来执行模型推理。
  • 定义模型推理函数: 定义一个函数来执行模型推理,并持续进行该过程。

解决方案

1. 导入库

import threading

2. 启用多线程

def generate_frames():
    sequence = []  # Initialize sequence variable
    sentence = []  # Initialize sentence variable

    # Create a thread for model inference
    inference_thread = threading.Thread(target=model_inference, args=(sequence, sentence))
    inference_thread.start()

    while True:
        # Camera feed code goes here...
        
        # Wait for the inference thread to finish
        inference_thread.join()
        
        # Display the camera frame
        ret, buffer = cv2.imencode('.jpg', image)
        frame = buffer.tobytes()
        yield (b'--frame\r\n'
                b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')

3. 定义模型推理函数

def model_inference(sequence, sentence):
    while True:
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            predictions.append(np.argmax(res))
            
            if np.unique(predictions[-10:])[0] == np.argmax(res):
                if res[np.argmax(res)] > threshold:
                    if len(sentence) > 0:
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                            new_word = actions[np.argmax(res)]
                            t2s.say(new_word)
                            t2s.runAndWait()
                    else:
                        sentence.append(actions[np.argmax(res)])
                        new_word = actions[np.argmax(res)]
                        t2s.say(new_word)
                        t2s.runAndWait()

            if len(sentence) > 5:
                sentence = sentence[-5:]

其他改进

  • 缩进代码以提高可读性。
  • model_inference 函数移至文件顶部,以便于在其他函数中调用。
  • model_inference 函数中添加 while True 循环,以持续进行模型推理。
  • 修复拼写错误,例如将 "mediapipe_detection" 更正为 "mediapipe_detection"。

优化后的代码

import threading

from flask import Flask, render_template, Response
import cv2
import pickle
import pyttsx3 
import numpy as np
import mediapipe as mp

app = Flask(__name__)

from tensorflow.keras.models import load_model
model = load_model('action.h5')

mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

actions = np.array(['Hello','I am','Affan','Thanks', 'i love you','Fever','See you', 'God'])

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

def extract_keypoints(results):
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([lh, rh])

def draw_styled_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(100, 100, 100), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(100, 100, 100), thickness=2, circle_radius=2)
                             )
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(200, 200,200), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(200, 200, 200), thickness=2, circle_radius=2)
                             )

def model_inference(sequence, sentence):
    while True:
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            predictions.append(np.argmax(res))
            
            if np.unique(predictions[-10:])[0] == np.argmax(res):
                if res[np.argmax(res)] > threshold:
                    if len(sentence) > 0:
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                            new_word = actions[np.argmax(res)]
                            t2s.say(new_word)
                            t2s.runAndWait()
                    else:
                        sentence.append(actions[np.argmax(res)])
                        new_word = actions[np.argmax(res)]
                        t2s.say(new_word)
                        t2s.runAndWait()

            if len(sentence) > 5:
                sentence = sentence[-5:]

sequence = []
sentence = []
predictions = []
threshold = 0.5

cap = cv2.VideoCapture(0)

def generate_frames():
    sequence = []  # Initialize sequence variable
    sentence = []  # Initialize sentence variable

    # Create a thread for model inference
    inference_thread = threading.Thread(target=model_inference, args=(sequence, sentence))
    inference_thread.start()

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        image, results = mediapipe_detection(frame, holistic)
        draw_styled_landmarks(image, results)
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]

        # Wait for the inference thread to finish
        inference_thread.join()
        
        # Display the camera frame
        ret, buffer = cv2.imencode('.jpg', image)
        frame = buffer.tobytes()
        yield (b'--frame\r\n'
                b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')

    cap.release()


@app.route('/')
def index():
    return render_template('index.html')

@app.route('/video_feed')
def video_feed():
    return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')

if __name__ == "__main__":
    t2s = pyttsx3.init()
    holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)
    app.run(debug=True)

常见问题解答

1. 为什么需要多线程化?

多线程化允许同时执行摄像头馈送和模型推理,从而最大限度地提高性能并避免延迟。

2. 如何导入必要的库?

在代码开头使用 import 语句导入必要的库,例如 threading

3. 如何在摄像头馈送和模型推理之间创建线程?

使用 threading.Thread 类创建线程,并指定要执行的函数和参数。

**4. 如何在代码中定义模型推理