USDA-throughput-control/ffmpegTest.py

43 lines
1.1 KiB
Python

import ffmpeg
import numpy as np
from ultralytics import YOLO
import paho.mqtt.client as mqtt
from filterpy.kalman import UnscentedKalmanFilter as UKF
from filterpy.kalman import MerweScaledSigmaPoints
import time
width = 1280
height = 720
fps = 60
frameCount = 0
# Setup the ffmpeg stream
input_stream = ffmpeg.input('video="0":video_size={}x{}:framerate={}'.format(width, height, fps))
output_stream = ffmpeg.output(input_stream, 'pipe:1', format='rawvideo', pix_fmt='bgr24')
# Initialize the stream
process = ffmpeg.run_async(output_stream, pipe_stdout=True, pipe_stderr=True)
frameTime = time.time()
while True:
# Read a frame from the stream
in_bytes = process.stdout.read(width * height * 3)
if len(in_bytes) < width * height * 3:
break # End of stream
# Convert bytes to numpy array
frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])
frameCount += 1
if (time.time() - frameTime) > 10:
trueFPS = frameCount / (time.time()-frameTime)
print(trueFPS)
break
# Close the process
process.stdout.close()
process.stderr.close()
process.wait()