189 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			189 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			Python
		
	
	
	
| import cv2
 | |
| from ultralytics import YOLO
 | |
| from collections import deque
 | |
| import paho.mqtt.client as mqtt
 | |
| from influxdb import InfluxDBClient
 | |
| from influxdb_client import InfluxDBClient, Point, WriteOptions
 | |
| import time
 | |
| from datetime import datetime
 | |
| import ssl
 | |
| import os
 | |
| 
 | |
| # InfluxDB Configuration
 | |
| INFLUX_URL = "http://localhost:8086"
 | |
| INFLUX_TOKEN = "export INFLUX_TOKEN=duVTQHPpHqr6WmdYfpSStqm-pxnvZHs-W0-3lXDnk8Tn6PGt59MlnTSR6egjMWdYvmL_ZI6xt3YUzGVBZHvc7w=="
 | |
| INFLUX_ORG = "GAAIM"
 | |
| INFLUX_BUCKET = "AGVIGNETTE"
 | |
| 
 | |
| # Connect to InfluxDB
 | |
| client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
 | |
| write_api = client.write_api(write_options=WriteOptions(batch_size=1))
 | |
| 
 | |
| # MQTT Setup
 | |
| MQTT_BROKER = "192.168.10.57"
 | |
| MQTT_TOPIC = "fruit/classification"
 | |
| 
 | |
| mqtt_client = mqtt.Client()
 | |
| 
 | |
| 
 | |
| # Set up TLS/SSL for MQTT connection
 | |
| 
 | |
| # mqtt_client.tls_set(
 | |
| #    ca_certs="/Users/vel/Desktop/CvModel/mosquitto/mosquitto/certs/ca.crt",  # Path to the CA certificate
 | |
| #    tls_version=ssl.PROTOCOL_TLS  # Specify the TLS version
 | |
| #)
 | |
| #mqtt_client.tls_insecure_set(True)
 | |
| mqtt_client.connect(MQTT_BROKER, 1883, 6000)
 | |
| 
 | |
| # Allow duplicate loading of OpenMP runtime
 | |
| os.environ["KMP_DUPLICATE_LIB_OK"] = "True"
 | |
| 
 | |
| # Define the official YAML configuration file path (adjust as needed)
 | |
| yaml_path = "botsort.yaml"
 | |
| 
 | |
| # Camera index (default camera index, 1 indicates an external camera)
 | |
| camera_index = 0
 | |
| 
 | |
| cap = cv2.VideoCapture(camera_index)
 | |
| cap.set(cv2.CAP_PROP_FPS, 30)
 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
 | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
 | |
| 
 | |
| # Load the YOLO model
 | |
| model = YOLO(r"/Users/vel/Desktop/CvModel/CV_AG/runs/detect/train4/weights/best.pt")  # Load custom model
 | |
| 
 | |
| # Define class labels
 | |
| class_labels = {
 | |
|     0: "Bruised",
 | |
|     1: "DefectiveLemon",
 | |
|     2: "GoodLemon",
 | |
|     3: "NotRipeLemon",
 | |
|     4: "Rotten"
 | |
| }
 | |
| 
 | |
| # Apply smoothing to "DefectiveLemon", "GoodLemon", and "NotRipeLemon"
 | |
| smoothing_labels = ["DefectiveLemon", "GoodLemon", "NotRipeLemon"]
 | |
| 
 | |
| # Smoothing parameters for sliding window
 | |
| HISTORY_LENGTH = 20  # Number of recent frames
 | |
| DEFECT_THRESHOLD = 0.3  # Threshold for "DefectiveLemon" proportion
 | |
| GOOD_THRESHOLD = 0.7    # Threshold for "GoodLemon" and "NotRipeLemon" proportion
 | |
| 
 | |
| # State history for each target (used for smoothing), format: {ID: deque([...], maxlen=HISTORY_LENGTH)}
 | |
| lemon_history = {}
 | |
| lemon_send_history = []
 | |
| 
 | |
| # Set the display window to be resizable
 | |
| cv2.namedWindow("Live Detection", cv2.WINDOW_NORMAL)
 | |
| 
 | |
| # Smoothing function:
 | |
| # If the current detected label is not in smoothing_labels, clear the target's history and return the current label;
 | |
| # Otherwise, add the current label to the history and return a smoothed label based on the proportion.
 | |
| def get_smoothed_label(obj_id, current_label):
 | |
|     if current_label not in smoothing_labels:
 | |
|         if obj_id in lemon_history:
 | |
|             lemon_history[obj_id].clear()
 | |
|         return current_label
 | |
| 
 | |
|     if obj_id not in lemon_history:
 | |
|         lemon_history[obj_id] = deque(maxlen=HISTORY_LENGTH)
 | |
|     lemon_history[obj_id].append(current_label)
 | |
| 
 | |
|     history = lemon_history[obj_id]
 | |
|     defect_count = history.count("DefectiveLemon")
 | |
|     good_count = history.count("GoodLemon")
 | |
|     notripe_count = history.count("NotRipeLemon")
 | |
|     total = len(history)
 | |
| 
 | |
|     if total == 0:
 | |
|         return current_label
 | |
|     if defect_count / total >= DEFECT_THRESHOLD:
 | |
|         return "DefectiveLemon"
 | |
|     elif good_count / total >= GOOD_THRESHOLD:
 | |
|         return "GoodLemon"
 | |
|     elif notripe_count / total >= GOOD_THRESHOLD:
 | |
|         return "NotRipeLemon"
 | |
|     else:
 | |
|         return history[-1]
 | |
| 
 | |
| # Use streaming tracking mode to maintain tracker state
 | |
| results = model.track(
 | |
|     source=camera_index,    # Get video stream directly from the camera
 | |
|     conf=0.45,
 | |
|     tracker=yaml_path,      # Use the YAML configuration file
 | |
|     persist=True,           # Persist tracking (do not reset)
 | |
|     stream=True,            # Stream processing, not frame-by-frame calling
 | |
|     show=False,
 | |
|     device = 'mps' #'cpu' 
 | |
| )
 | |
| 
 | |
| # Iterate over streaming tracking results
 | |
| for result in results:
 | |
| 
 | |
|     frame = result.orig_img  # Current frame
 | |
|     detections = result.boxes  # Detection box information
 | |
| 
 | |
|     for box in detections:
 | |
|         x1, y1, x2, y2 = map(int, box.xyxy[0])  # Detection box coordinates
 | |
|         obj_id = int(box.id) if box.id is not None else -1  # Tracking ID
 | |
|         class_id = int(box.cls)  # Class ID
 | |
|         score = box.conf  # Confidence
 | |
|         label = class_labels.get(class_id, "Unknown")  # Get class name
 | |
| 
 | |
|         # If target ID is valid
 | |
|         if obj_id != -1:
 | |
|             # If the detected label requires smoothing, use the smoothing function
 | |
|             if label in smoothing_labels:
 | |
|                 final_label = get_smoothed_label(obj_id, label)
 | |
|                 display_text = f"ID {obj_id} | {final_label}"
 | |
|                 # Only print for targets with smoothed labels (only care about these three classes)
 | |
|                 if final_label in smoothing_labels:
 | |
|                     position = f"({x1}, {y1}, {x2}, {y2})"
 | |
|                     print(f"ID: {obj_id}, Position: {position}, Label: {display_text}")
 | |
|                             # Draw detection box and label with color based on classification
 | |
|                     if final_label == "DefectiveLemon":
 | |
|                         box_color = (100, 100, 255)  # Red for defective
 | |
|                     elif final_label == "NotRipeLemon":
 | |
|                         box_color = (255, 100, 80)  # Blue for unripe
 | |
|                     elif final_label == "GoodLemon":
 | |
|                         box_color = (0, 255, 0)  # Green for good
 | |
|                     else:
 | |
|                         box_color = (255, 255, 255)  # White for unknown or other classes
 | |
| 
 | |
|                     # Add background rectangle for text
 | |
|                     text_size = cv2.getTextSize(display_text, cv2.FONT_HERSHEY_TRIPLEX, 0.6, 2)[0]
 | |
|                     text_x, text_y = x1, y1 - 10
 | |
|                     text_w, text_h = text_size[0], text_size[1]
 | |
|                     cv2.rectangle(frame, (text_x, text_y - text_h - 5), (text_x + text_w, text_y + 5), (0, 0, 0), -1)
 | |
| 
 | |
|                     # Draw detection box and text
 | |
|                     cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)
 | |
|                     cv2.putText(frame, display_text, (text_x, text_y),
 | |
|                                 cv2.FONT_HERSHEY_TRIPLEX, 0.6, box_color, 2)
 | |
| 
 | |
|                     if x1 > 750 and x1 < 850 and y2 < 410 and y1 > 190 and obj_id not in lemon_send_history:
 | |
|                         if final_label in ["DefectiveLemon", "NotRipeLemon", "GoodLemon"]:
 | |
|                             mqtt_message = f"lemon_classification classification=\"{final_label}\" {int(time.time()*1e9)}"
 | |
|                             lemon_send_history.append(obj_id)
 | |
|                             mqtt_client.publish(MQTT_TOPIC, mqtt_message)
 | |
|             else:
 | |
|                 # For other classes, display the current detection result directly and clear history (if exists)
 | |
|                 if obj_id in lemon_history:
 | |
|                     lemon_history[obj_id].clear()
 | |
|                 display_text = label
 | |
|         else:
 | |
|             display_text = label
 | |
| 
 | |
|     # Display the processed frame
 | |
|     cv2.imshow("Live Detection", frame)
 | |
| 
 | |
|     # Exit program when ESC key is pressed
 | |
|     if cv2.waitKey(1) & 0xFF == 27:
 | |
|         print("ESC key detected. Exiting the program.")
 | |
|         break
 | |
| 
 | |
| cv2.destroyAllWindows()
 | |
| print("Camera video processing complete. Program terminated.")
 | |
| 
 | |
| 
 |