How Could I Speed Up My Program?

@NuclearPasta0 @Classfied3D Did a lot of debugging, and no luck with my threading code currently. Here’s the topic: Threading Causes Program To Abruptly Stop

Looks like most of your code can be converted to multiprocessing easily.

@Classfied3D Yes, I have just done that, and it works very well. I’m working on improving even more pieces of my code. Currently, it takes around 39 seconds to run. Here is each peice of the program’s runtime:

Get Video Data: 0.07236390000002757
Get Frame Data: 3.3157897000000958
Extract Model Data: 7.54977550000001
Scan each frame for faces: 28.7039835

Here is my updated code:

import cv2
import os
import numpy as np
import face_recognition
import time
import threading
import logging
from multiprocessing import Process, Lock, Manager

people = []
people_lock = Lock()

# Initialize a global cache for face encodings
encoded_faces_cache = {}

def load_and_encode_images(subfolder_path):
    encoded_faces = []
    for item in os.listdir(subfolder_path):
        item_path = os.path.join(subfolder_path, item)
        if os.path.isfile(item_path):
            # Check if encoding is already in the cache
            if item_path not in encoded_faces_cache:
                # Load the image and get encoding
                image = face_recognition.load_image_file(item_path)
                encodings = face_recognition.face_encodings(image)
                # Cache the encoding for future use
                if encodings:
                    encoded_faces_cache[item_path] = encodings[0]
            # Append the encoding from the cache
            if item_path in encoded_faces_cache:
                encoded_faces.append(encoded_faces_cache[item_path])
    return encoded_faces

def scan_frames(frames, model, output, lock):
    global people
    for i, frame in enumerate(frames):
        try:
            frame = ensure_valid_color_shape(frame)
            expected_height, expected_width = frames[0].shape[:2]
            if frame is None or frame.shape != (expected_height, expected_width, 3):
                continue
            # Detect faces and generate encodings
            with people_lock:
                face_locations = face_recognition.face_locations(frame)
                face_encodings = face_recognition.face_encodings(frame, face_locations)
            for face_encoding in face_encodings:
                matches = {
                    label: np.mean(face_recognition.face_distance(face_images_encodings, face_encoding))
                    for label, face_images_encodings in model.items()
                    if face_images_encodings
                }
            if matches:
                best_match = min(matches, key=matches.get)
                if matches[best_match] < 0.5:  # Adjustable threshold for matching similarity
                    with lock:
                        if not (best_match in output):
                            output.append(best_match)
                            # print(people)
        except Exception as e:
            print(f"An exception occurred while processing frame {i+1}: {e}")

def ensure_valid_color_shape(frame, default_shape=(64, 64, 3)):
    if frame is None:
        raise ValueError("Frame is None, can't process.")
    # Convert frame to a numpy array if it's not one
    if not(isinstance(frame, np.ndarray)):
        frame = np.array(frame)
    
    # Convert to 8-bit unsigned integer if necessary
    if frame.dtype == 'int32':
        frame = (frame / 65536).astype('uint8')
    # Check if the image has less than 3 dimensions or if dimensions are out of order/incorrect
    if len(frame.shape) != 3 or frame.shape[2] != 3:
        if len(frame.shape) == 2 or (len(frame.shape) == 3 and frame.shape[2] == 1):
            # Convert grayscale (or single channel) to RGB
            frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
        else:
            # Assign a default color image shape if not valid
            frame = np.zeros(default_shape, dtype='uint8')
    
    return frame

def ai_face_recog_from_video(video_path):
    with Manager() as manager:
        recog_model = manager.dict()
        people_list = manager.list()

        start = time.perf_counter()
        # Capture video
        video_capture = cv2.VideoCapture(video_path)
        if not video_capture.isOpened():
            raise FileNotFoundError(f'Video not found: {video_path}')
        print("Video: " + str(time.perf_counter() - start))

        # Initialize variables
        recog_model = {}
        frame_count = 0
        frame_amount = video_capture.get(cv2.CAP_PROP_FRAME_COUNT)
        frame_ranges = [
            list(range(1, int(frame_amount) // 4 + 1, 2)),
            list(range(int(frame_amount) // 4 + 1, int(frame_amount) // 2 + 1, 2)),
            list(range(int(frame_amount) // 2 + 1, int(frame_amount) // 4 * 3 + 1, 2)),
            list(range(int(frame_amount) // 4 * 3 + 1, int(frame_amount) + 1, 2))
        ]

        start = time.perf_counter()
        for i in range(len(frame_ranges)):
            for j in range(len(frame_ranges[i])):
                # Set the frame index to the current frame number
                frame_number = frame_ranges[i][j]
                video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number - 1)
                # Read the frame
                ret, frame = video_capture.read()
                if ret:
                    frame_ranges[i][j] = frame  # Replace frame number with the actual frame
                else:
                    print(f"Error: Could not retrieve frame at index {frame_number}.")
                    break  # Stop if a frame could not be retrieved
        print("Frames: " + str(time.perf_counter() - start))

        start = time.perf_counter()
        # Load recognition model only once and use caching for encodings
        for subfolder in os.listdir('recog_model'):
            subfolder_path = os.path.join('recog_model', subfolder)
            if os.path.isdir(subfolder_path):
                # Use the new function to load images and get encodings
                recog_model[subfolder] = load_and_encode_images(subfolder_path)
        print("Model Data: " + str(time.perf_counter() - start))
        # Create threads
        first_qrt = Process(target=scan_frames, args=(frame_ranges[0], recog_model, people_list, people_lock,))
        second_qrt = Process(target=scan_frames, args=(frame_ranges[1], recog_model, people_list, people_lock,))
        third_qrt = Process(target=scan_frames, args=(frame_ranges[2], recog_model, people_list, people_lock,))
        fourth_qrt = Process(target=scan_frames, args=(frame_ranges[3], recog_model, people_list, people_lock,))

        # Start threads
        first_qrt.start()
        second_qrt.start()
        third_qrt.start()
        fourth_qrt.start()

        # Join threads to the main thread
        first_qrt.join()
        second_qrt.join()
        third_qrt.join()
        fourth_qrt.join()

        video_capture.release()
        people[:] = list(people_list)
    print(f'Processed {frame_amount/2} frames and {len(people)} face(s).')
    return people

if __name__ == '__main__':
    start = time.perf_counter()
    faces = ai_face_recog_from_video('test1.mp4')
    # print(f"People in video: {', '.join(faces)}\nTime: " + str(time.perf_counter() - start))
    print(f"People in video: {faces}\nTime: " + str(time.perf_counter() - start))
1 Like

I suppose now the only way to improve performance is decreasing the number of frames or increasing the processes used (to the optimal number of processes).

It doesn’t seem to be any faster, though. Might be a bug in your code or maybe the library was already optimised with multiprocessing.

The other optimisations would be using multiprocessing on the frames or caching the model data.

Yeah, or I could use multiprocessing on the model data.

For reference, here’s how the face_detection CLI source code does multiprocessing:

def test_image(image_to_check, model, upsample):
    ...

def process_images_in_process_pool(images_to_check, number_of_cpus, model, upsample):
    if number_of_cpus == -1:
        processes = None
    else:
        processes = number_of_cpus
    # macOS will crash due to a bug in libdispatch if you don't use 'forkserver'
    context = multiprocessing
    if "forkserver" in multiprocessing.get_all_start_methods():
        context = multiprocessing.get_context("forkserver")
    pool = context.Pool(processes=processes)
    function_parameters = zip(
        images_to_check,
        itertools.repeat(model),
        itertools.repeat(upsample),
    )
    pool.starmap(test_image, function_parameters)
1 Like

So, does my multiprocessing code not actually do anything?

Ensure that you’re on Python 3.12+. You could see if pyston gives better performance in this case.

2 Likes

I don’t know much about using multiprocessing.
Maybe try using Pool and starmap. Check your CPU usage to see if it is working, maybe. Print something from each thread when done.

1 Like

I’ll look into it.

Looks like I am on Python 3.11.6. I’ll see if I can update it right now. What changes will Python 3.12 make to my program?

1 Like

usually, this is given by os.cpu_count() - 1.


CPython 3.12 update has general performance improvements.

3 Likes

I’ll try it out.

Got it.