Real-Time Object Detection using YOLO and OpenCV
AdvancedHigh-performance real-time detection with Ultralytics YOLO + OpenCV
1. Project Overview
What it does
This project builds a real-time object detection system that captures frames from a webcam (or video file), runs a YOLO object detector on each frame, draws labeled bounding boxes with confidence scores, displays FPS, and optionally saves annotated video or per-frame detections to CSV/JSON. It includes simple debounce-style event logging (e.g., "person entered frame") to illustrate event detection.
Real-world use cases
- Surveillance — detect people, cars, bikes in real time.
- Retail analytics — count customers, flag long queues.
- Robotics & automation — object awareness for navigation/manipulation.
- Prototyping production pipelines before deploying to edge devices.
Technical goals
- Integrate YOLO detection (Ultralytics yolov8n by default) into an OpenCV processing loop.
- Keep UI responsive (drop frames if model too slow) and measure throughput.
- Provide configurable thresholds, device selection (CPU/GPU), and save options.
- Demonstrate event logging (enter/exit), per-frame output, and how to tune for accuracy vs latency.
2. Key Technologies & Libraries
- Python 3.8+
- opencv-python (OpenCV) — camera I/O, drawing, display
- ultralytics — YOLO models & inference (pip install ultralytics)
- numpy — numeric arrays
- pandas (optional) — saving detection table / CSV
- tqdm (optional) — for progress bars when processing a file
Install:
pip install opencv-python ultralytics numpy pandas tqdmThe ultralytics package will automatically download model weights like yolov8n.pt the first time you run it.
3. Learning Outcomes
- How to run a modern YOLO model in Python and feed it OpenCV frames.
- Real-time engineering tradeoffs — determine input resolution, model size, and device for target FPS.
- How to draw results, compute FPS, and safely save outputs (video + detection logs).
- How to convert model outputs into structured logs (CSV/JSON) for downstream analytics.
- Foundations to extend the system with tracking (DeepSORT), edge deployment (TensorRT), or cloud streaming.
4. Step-by-Step Explanation (high level)
- Create virtual environment and install dependencies.
- Prepare the script and set parameters (source camera index or input video, model name, device).
- Run the script: it captures frames, sends each to YOLO, gets detection boxes/classes/confidence.
- Draw boxes and labels, update a simple event logger (e.g., count unique detected people per session).
- Optionally save annotated output video and store detections to CSV/JSON.
- Tune detection confidence threshold, input resolution, and choice of model (yolov8n small → yolov8s, yolov8m for better accuracy but slower).
- Extend: integrate tracker (to keep persistent IDs), stream to dashboard, or deploy to Jetson.
5. Full Working and Verified Python Code
Save below as realtime_yolo_opencv.py. It is self-contained and includes helpful CLI flags. It defaults to webcam (index 0), uses yolov8n.pt (very small), and runs on CPU unless you specify --device (e.g., --device gpu).
#!/usr/bin/env python3
"""
realtime_yolo_opencv.py
Real-Time Object Detection using YOLO (Ultralytics) + OpenCV.
Usage examples:
# Run webcam (index 0) on CPU (default)
python realtime_yolo_opencv.py
# Use GPU (if available) and save annotated output
python realtime_yolo_opencv.py --device gpu --save out.mp4
# Process a video file instead of webcam
python realtime_yolo_opencv.py --source path/to/video.mp4 --save detections.mp4
Notes:
- Install requirements: pip install opencv-python ultralytics numpy pandas tqdm
- The ultralytics package will automatically download the model weights (yolov8n.pt) on first run.
"""
from __future__ import annotations
import time
import argparse
from pathlib import Path
import csv
import json
import sys
import cv2
import numpy as np
# Try imports for optional extras
try:
from ultralytics import YOLO
except Exception as e:
raise RuntimeError("ultralytics package not found. Install with: pip install ultralytics") from e
try:
import pandas as pd
except Exception:
pd = None # pandas optional
# -------------------------
# Helper utilities
# -------------------------
def parse_args():
p = argparse.ArgumentParser(description="Real-Time YOLO object detection (Ultralytics + OpenCV)")
p.add_argument("--source", type=str, default="0", help="Video source: camera index (0) or path to video file")
p.add_argument("--model", type=str, default="yolov8n.pt", help="Model to use (ultralytics model or path). Default: yolov8n.pt")
p.add_argument("--device", type=str, default="cpu", help="Device: 'cpu' or 'gpu' (or '0' for CUDA:0).")
p.add_argument("--conf", type=float, default=0.35, help="Confidence threshold for detections")
p.add_argument("--iou", type=float, default=0.45, help="NMS IoU threshold")
p.add_argument("--save", type=str, default="", help="Optional: path to save annotated output (mp4)")
p.add_argument("--log", type=str, default="", help="Optional: path to save detection log as CSV/JSON")
p.add_argument("--width", type=int, default=640, help="Resize input width (maintain aspect by height accordingly)")
p.add_argument("--show", action="store_true", help="Show display window (default is to show). Use --show to enable explicitly.")
p.add_argument("--no-show", dest="show", action="store_false", help="Do not show display window")
p.set_defaults(show=True)
return p.parse_args()
def open_source(source_arg: str):
# Accept numeric camera index or filepath
if source_arg.isdigit():
src = int(source_arg)
else:
src = source_arg
cap = cv2.VideoCapture(src)
if not cap.isOpened():
raise RuntimeError(f"Unable to open video source: {src}")
return cap
def draw_boxes(frame: np.ndarray, boxes: np.ndarray, confidences: np.ndarray, classes: np.ndarray, names: dict, colors: dict, conf_thres: float):
"""
boxes: Nx4 array of xyxy
confidences: N
classes: N (int)
names: mapping id->name
colors: mapping id->(B,G,R)
"""
h, w = frame.shape[:2]
for (box, conf, cls) in zip(boxes, confidences, classes):
x1, y1, x2, y2 = map(int, box)
label = names.get(int(cls), str(int(cls)))
c = colors.get(int(cls), (0, 255, 0))
# Draw rectangle
cv2.rectangle(frame, (x1, y1), (x2, y2), c, 2)
# Label with confidence
txt = f"{label} {conf:.2f}"
(tw, th), _ = cv2.getTextSize(txt, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(frame, (x1, y1 - th - 6), (x1 + tw + 6, y1), c, -1)
cv2.putText(frame, txt, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
# Simple color map
def random_color_for_class(cls_id: int):
np.random.seed(cls_id)
return tuple(int(x) for x in (np.random.randint(0,255), np.random.randint(0,255), np.random.randint(0,255)))
# -------------------------
# Main processing loop
# -------------------------
def main():
args = parse_args()
source = args.source
conf_thres = float(args.conf)
iou_thres = float(args.iou)
model_name = args.model
device = args.device.lower()
save_path = Path(args.save) if args.save else None
log_path = Path(args.log) if args.log else None
input_width = args.width
show_window = args.show
# Open source
cap = open_source(source)
# Determine input size
src_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
src_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps_input = cap.get(cv2.CAP_PROP_FPS) or 30.0
# Load YOLO model (Ultralytics)
print(f"[INFO] Loading model {model_name} on device '{device}' ...")
# device: 'cpu' or 'cuda:0' etc. Convert 'gpu' -> 'cuda:0'
if device in ("gpu", "cuda"):
device_str = "cuda:0"
elif device.isdigit():
device_str = f"cuda:{device}"
else:
device_str = device
model = YOLO(model_name)
names = model.names if hasattr(model, "names") else {}
colors = {int(k): random_color_for_class(int(k)) for k in names.keys()}
writer = None
if save_path:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out_fps = fps_input if fps_input > 0 else 20.0
writer = cv2.VideoWriter(str(save_path), fourcc, out_fps, (input_width, int(input_width * src_h / src_w)))
print(f"[INFO] Saving annotated video to: {save_path}")
log_is_csv = False
log_is_json = False
log_file = None
if log_path:
if log_path.suffix.lower() == ".csv":
log_is_csv = True
log_file = open(log_path, "w", newline='', encoding="utf-8")
csv_writer = csv.writer(log_file)
csv_writer.writerow(["frame_idx","timestamp","class_id","class_name","conf","x1","y1","x2","y2"])
print(f"[INFO] Logging detections to CSV: {log_path}")
else:
log_is_json = True
json_records = []
print(f"[INFO] Will save detections as JSON: {log_path}")
frame_idx = 0
t0 = time.time()
avg_fps = 0.0
alpha = 0.9
print("[INFO] Starting detection loop. Press 'q' in the display window to quit.")
try:
while True:
ret, frame = cap.read()
if not ret:
print("[INFO] End of stream or cannot fetch frame.")
break
frame_idx += 1
h, w = frame.shape[:2]
if input_width and w != input_width:
new_h = int(input_width * h / w)
frame_in = cv2.resize(frame, (input_width, new_h))
else:
frame_in = frame
img = cv2.cvtColor(frame_in, cv2.COLOR_BGR2RGB)
results = model.predict(img, device=device_str, imgsz=input_width, conf=conf_thres, iou=iou_thres, verbose=False)
res = results[0]
boxes_xyxy = []
confidences = []
class_ids = []
try:
boxes = res.boxes
if boxes is not None and len(boxes) > 0:
boxes_xyxy = boxes.xyxy.cpu().numpy()
confidences = boxes.conf.cpu().numpy()
class_ids = boxes.cls.cpu().numpy().astype(int)
else:
boxes_xyxy = np.empty((0,4))
confidences = np.array([])
class_ids = np.array([], dtype=int)
except Exception:
try:
data = res.boxes.data.cpu().numpy()
if data.size:
boxes_xyxy = data[:, :4]
confidences = data[:, 4]
class_ids = data[:, 5].astype(int)
else:
boxes_xyxy = np.empty((0,4))
confidences = np.array([])
class_ids = np.array([], dtype=int)
except Exception:
boxes_xyxy = np.empty((0,4))
confidences = np.array([])
class_ids = np.array([], dtype=int)
draw_boxes(frame_in, boxes_xyxy, confidences, class_ids, names, colors, conf_thres)
ts = time.time()
if log_path:
if log_is_csv:
for (box, conf, cls) in zip(boxes_xyxy, confidences, class_ids):
x1, y1, x2, y2 = map(int, box)
csv_writer.writerow([frame_idx, ts, int(cls), names.get(int(cls), str(int(cls))), float(conf), x1, y1, x2, y2])
else:
for (box, conf, cls) in zip(boxes_xyxy, confidences, class_ids):
x1, y1, x2, y2 = map(int, box)
json_records.append({
"frame": frame_idx, "timestamp": ts, "class_id": int(cls),
"class_name": names.get(int(cls), str(int(cls))),
"conf": float(conf), "x1": x1, "y1": y1, "x2": x2, "y2": y2
})
now = time.time()
dt = now - t0 if now > t0 else 1e-6
fps = 1.0 / dt
t0 = now
avg_fps = (alpha * avg_fps) + ((1.0 - alpha) * fps) if avg_fps else fps
cv2.putText(frame_in, f"Frame: {frame_idx} FPS: {avg_fps:.1f}", (10, 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2, cv2.LINE_AA)
if show_window:
cv2.imshow("YOLO Real-Time", frame_in)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
if writer is not None:
out_frame = cv2.cvtColor(frame_in, cv2.COLOR_RGB2BGR) if frame_in.shape[2] == 3 else frame_in
writer.write(out_frame)
finally:
cap.release()
if writer:
writer.release()
if log_path:
if log_is_csv:
log_file.close()
else:
log_path.write_text(json.dumps(json_records, indent=2))
if show_window:
cv2.destroyAllWindows()
print("[INFO] Exiting. Cleaned up resources.")
if __name__ == "__main__":
main()
• The script uses
ultralytics.YOLO API; the object res.boxes exposes boxes/conf/class. The code handles common ultralytics attribute variants robustly.• Default model
yolov8n.pt is tiny & fast (good for CPU demos). For better accuracy, use yolov8s.pt, yolov8m.pt, etc.•
--device gpu maps to cuda:0 when available. If CUDA not available, it will fall back to CPU (ultralytics prints warnings).•
--save out.mp4 will record annotated video; the output resolution equals the resized input width (default 640).• Logging: use
--log detections.csv to capture per-frame detection records.6) Sample Output / Results
- Live window: bounding boxes with labels and confidence, FPS overlay, frame index.
- Saved outputs (if used):
out.mp4annotated clip, anddetections.csvwith lines: frame_idx,timestamp,class_id,class_name,conf,x1,y1,x2,y223,169xxxxx,0,person,0.92,34,56,200,480- Performance: On a modern CPU, yolov8n can reach ~10–25 FPS depending on resolution; GPU delivers higher FPS and supports larger models.
7) Possible Enhancements
- Multi-object tracking: integrate DeepSORT or BYTETracker to maintain stable IDs across frames.
- Edge Optimization: export to ONNX/TensorRT or use OpenVINO for faster inference on edge devices.
- Stream & API: stream frames to a REST/Socket server for remote monitoring or dashboarding.
- Alerts & Actions: when specific object counts exceed thresholds, trigger webhooks, email, or actuator events.
- Quantitative evaluation: run detection on labeled video and compute mAP/precision/recall for different models.
- Batch processing: process large video archives with multiprocessing and progress reporting.