In [ ]:
# ┌────────────────────────────────────┐
# │ Video / Image Input │
# │ (Live camera or recorded video) │
# └───────────────┬────────────────────┘
# ↓
# ┌────────────────────────────────────┐
# │ Frame Extraction (FPS fixed) │
# └───────────────┬────────────────────┘
# ↓
# ┌────────────────────────────────────┐
# │ YOLOv8 Face Detection │
# │ → Face ROI Cropping │
# └───────────────┬────────────────────┘
# ↓
# ┌───────┴───────────┐
# │ │
# │ │
# ┌───────▼──────────┐ ┌─────▼────────────────────────┐
# │ Path-A: CNN Path │ │ Path-B: Physiological Path │
# │ (Appearance) │ │ (Interpretability) │
# │ │ │ │
# │ CNN Classifier │ │ MediaPipe FaceMesh │
# │ (32×32 Face ROI) │ │ → Facial Landmarks │
# │ │ │ → EAR(t), MAR(t) │
# │ Output: │ │ │
# │ P_drowsy(t) │ │ Temporal Aggregation │
# │ (0–1 probability)│ │ → Eye closure history │
# │ │ │ → PERCLOS(W) │
# │ Grad-CAM │ │ → Yawn evidence │
# │ (Explainability) │ │ │
# └───────┬──────────┘ └─────────┬────────────────────┘
# │ │
# └─────────────┬───────────┘
# ↓
# ┌──────────────────────────────────────────────┐
# │ Hybrid Fuzzy Inference System (FIS) │
# │ │
# │ Crisp → Fuzzy Inputs: │
# │ • CNN_score ← CNN confidence │
# │ • Eye_fatigue ← EAR + PERCLOS │
# │ • Mouth_fatigue ← MAR / Yawn cues │
# │ │
# │ Rule-based Hybrid Reasoning: │
# │ • Correct CNN uncertainty │
# │ • Fuse deep + physiological evidence │
# │ │
# │ Defuzzification (Centroid) │
# └───────────────┬──────────────────────────────┘
# ↓
# ┌────────────────────────────────────┐
# │ Drowsiness Score (0–1) │
# │ │
# │ → Label Mapping: │
# │ • Active │
# │ • Warning │
# │ • Drowsy │
# │ • Severe │
# │ │
# │ → Temporal Alert Logic │
# │ (sustained detection) │
# └────────────────────────────────────┘
In [2]:
# ============================================================
# HYBRID DRIVER DROWSINESS DETECTION (FULL CODE)
# CNN + YOLO + EAR/MAR + PERCLOS + HYBRID FIS
# ============================================================
# -------------------- ENV --------------------
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
# -------------------- IMPORTS --------------------
import cv2
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import tensorflow as tf
from tensorflow.keras import models
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Dropout, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from ultralytics import YOLO
from huggingface_hub import hf_hub_download
import mediapipe as mp
import skfuzzy as fuzz
from skfuzzy import control as ctrl
2026-01-20 13:30:17.595967: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered WARNING: All log messages before absl::InitializeLog() is called are written to STDERR E0000 00:00:1768894217.638868 17001 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered E0000 00:00:1768894217.651627 17001 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered W0000 00:00:1768894217.726687 17001 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once. W0000 00:00:1768894217.726728 17001 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once. W0000 00:00:1768894217.726730 17001 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once. W0000 00:00:1768894217.726732 17001 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
In [3]:
import cv2
import gc
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from ultralytics import YOLO
from huggingface_hub import hf_hub_download
tf.config.optimizer.set_jit(False)
from tensorflow.keras.layers import Conv2D,MaxPooling2D,Dropout,Flatten
import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow as tf
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Flatten, Conv2D, MaxPooling2D
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
In [4]:
# ============================================================
# CONFIG
# ============================================================
IMG_SIZE = 32
FPS_FALLBACK = 25
PERCLOS_WINDOW_SEC = 60
ALERT_HOLD_SEC = 2.0
T_EAR_CLOSE = 0.21
T_EAR_OPEN = 0.27
T_MAR_HALF = 0.30
T_MAR_OPEN = 0.40
ALPHA_EYE = 0.6
In [5]:
FACE_DATASET_PATH = "/home/dev-rayied/ML/time_augmented"
IMG_SIZE = 32 # 🔥 reduced from 224 → much faster
BATCH_SIZE = 32
AUTOTUNE = tf.data.AUTOTUNE
CLASSES = {"active": 0, "drowsy": 1}
SUBFOLDERS = [
"afternoon","cloudy","completely_dark","completely_white",
"Dawn","evening","golden_hour","harsh_sun",
"indoor_light","midnight","morning","night"
]
In [6]:
# ============================================================
# LOAD YOLO FACE MODEL
# ============================================================
print("Downloading YOLOv8 FACE model...")
face_model_path = hf_hub_download(
repo_id="arnabdhar/YOLOv8-Face-Detection",
filename="model.pt"
)
print("Loading YOLOv8 FACE (CPU only)")
yolo_face = YOLO(face_model_path)
Downloading YOLOv8 FACE model... Loading YOLOv8 FACE (CPU only)
In [7]:
def build_index(path):
paths, labels = [], []
for cls, label in CLASSES.items():
for sub in SUBFOLDERS:
folder = os.path.join(path, cls, sub)
if not os.path.exists(folder):
continue
for img in os.listdir(folder):
if img.lower().endswith((".jpg",".png",".jpeg")):
paths.append(os.path.join(folder, img))
labels.append(label)
return paths, labels
paths, labels = build_index(FACE_DATASET_PATH)
train_p, val_p, train_l, val_l = train_test_split(
paths, labels,
test_size=0.2,
stratify=labels,
random_state=42
)
In [8]:
def load_image(path, label):
img = tf.io.read_file(path)
img = tf.image.decode_image(img, channels=3, expand_animations=False)
img.set_shape([None, None, 3])
img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE))
img = tf.cast(img, tf.float32) / 255.0
return img, tf.cast(label, tf.float32)
train_ds = (
tf.data.Dataset.from_tensor_slices((train_p, train_l))
.shuffle(1024)
.map(load_image, num_parallel_calls=AUTOTUNE)
.batch(BATCH_SIZE)
.prefetch(AUTOTUNE)
)
val_ds = (
tf.data.Dataset.from_tensor_slices((val_p, val_l))
.map(load_image, num_parallel_calls=AUTOTUNE)
.batch(BATCH_SIZE)
.prefetch(AUTOTUNE)
)
2026-01-20 13:30:35.012223: E external/local_xla/xla/stream_executor/cuda/cuda_platform.cc:51] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
In [14]:
# ============================================================
# CNN MODEL (YOUR ORIGINAL)
# ============================================================
model = models.Sequential([
Conv2D(32, (3,3), activation="relu", input_shape=(IMG_SIZE, IMG_SIZE, 3)),
MaxPooling2D(2,2),
Conv2D(32, (3,3), activation="relu"),
MaxPooling2D(2,2),
Conv2D(64, (3,3), activation="relu", name="last_conv"),
GlobalAveragePooling2D(),
Dense(128, activation="relu"),
Dropout(0.5),
Dense(1, activation="sigmoid")
])
/home/dev-rayied/miniconda3/envs/tf310/lib/python3.10/site-packages/keras/src/layers/convolutional/base_conv.py:113: UserWarning: Do not pass an `input_shape`/`input_dim` argument to a layer. When using Sequential models, prefer using an `Input(shape)` object as the first layer in the model instead. super().__init__(activity_regularizer=activity_regularizer, **kwargs)
In [15]:
model.compile(
optimizer=Adam(learning_rate=1e-4),
loss="binary_crossentropy",
metrics=[
"accuracy",
tf.keras.metrics.Precision(name="precision"),
tf.keras.metrics.Recall(name="recall")
]
)
model.summary()
Model: "sequential"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Layer (type) ┃ Output Shape ┃ Param # ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ conv2d (Conv2D) │ (None, 30, 30, 32) │ 896 │ ├─────────────────────────────────┼────────────────────────┼───────────────┤ │ max_pooling2d (MaxPooling2D) │ (None, 15, 15, 32) │ 0 │ ├─────────────────────────────────┼────────────────────────┼───────────────┤ │ conv2d_1 (Conv2D) │ (None, 13, 13, 32) │ 9,248 │ ├─────────────────────────────────┼────────────────────────┼───────────────┤ │ max_pooling2d_1 (MaxPooling2D) │ (None, 6, 6, 32) │ 0 │ ├─────────────────────────────────┼────────────────────────┼───────────────┤ │ last_conv (Conv2D) │ (None, 4, 4, 64) │ 18,496 │ ├─────────────────────────────────┼────────────────────────┼───────────────┤ │ global_average_pooling2d │ (None, 64) │ 0 │ │ (GlobalAveragePooling2D) │ │ │ ├─────────────────────────────────┼────────────────────────┼───────────────┤ │ dense (Dense) │ (None, 128) │ 8,320 │ ├─────────────────────────────────┼────────────────────────┼───────────────┤ │ dropout (Dropout) │ (None, 128) │ 0 │ ├─────────────────────────────────┼────────────────────────┼───────────────┤ │ dense_1 (Dense) │ (None, 1) │ 129 │ └─────────────────────────────────┴────────────────────────┴───────────────┘
Total params: 37,089 (144.88 KB)
Trainable params: 37,089 (144.88 KB)
Non-trainable params: 0 (0.00 B)
In [16]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
callbacks = [
EarlyStopping(
monitor="val_loss",
patience=5,
restore_best_weights=True
),
ModelCheckpoint(
"drowsiness_model_32x32.keras",
monitor="val_loss",
save_best_only=True
)
]
In [17]:
history = model.fit(
train_ds,
validation_data=val_ds,
epochs=15,
callbacks=callbacks
)
Epoch 1/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 126s 39ms/step - accuracy: 0.8127 - loss: 0.4204 - precision: 0.8137 - recall: 0.8110 - val_accuracy: 0.8434 - val_loss: 0.3308 - val_precision: 0.8551 - val_recall: 0.8269 Epoch 2/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 125s 39ms/step - accuracy: 0.8577 - loss: 0.3217 - precision: 0.8652 - recall: 0.8472 - val_accuracy: 0.8395 - val_loss: 0.3175 - val_precision: 0.7894 - val_recall: 0.9260 Epoch 3/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 127s 40ms/step - accuracy: 0.8689 - loss: 0.2918 - precision: 0.8802 - recall: 0.8540 - val_accuracy: 0.8537 - val_loss: 0.2894 - val_precision: 0.8054 - val_recall: 0.9326 Epoch 4/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 128s 40ms/step - accuracy: 0.8795 - loss: 0.2732 - precision: 0.8903 - recall: 0.8656 - val_accuracy: 0.8841 - val_loss: 0.2639 - val_precision: 0.9693 - val_recall: 0.7934 Epoch 5/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 129s 40ms/step - accuracy: 0.8866 - loss: 0.2597 - precision: 0.8953 - recall: 0.8755 - val_accuracy: 0.8948 - val_loss: 0.2388 - val_precision: 0.8797 - val_recall: 0.9148 Epoch 6/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 129s 41ms/step - accuracy: 0.8919 - loss: 0.2473 - precision: 0.9000 - recall: 0.8817 - val_accuracy: 0.9167 - val_loss: 0.2273 - val_precision: 0.9257 - val_recall: 0.9060 Epoch 7/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 128s 40ms/step - accuracy: 0.8981 - loss: 0.2347 - precision: 0.9059 - recall: 0.8884 - val_accuracy: 0.8902 - val_loss: 0.2262 - val_precision: 0.8796 - val_recall: 0.9042 Epoch 8/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 129s 40ms/step - accuracy: 0.9030 - loss: 0.2246 - precision: 0.9091 - recall: 0.8954 - val_accuracy: 0.9078 - val_loss: 0.2086 - val_precision: 0.9271 - val_recall: 0.8852 Epoch 9/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 123s 39ms/step - accuracy: 0.9099 - loss: 0.2141 - precision: 0.9148 - recall: 0.9040 - val_accuracy: 0.9241 - val_loss: 0.1923 - val_precision: 0.9283 - val_recall: 0.9191 Epoch 10/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 127s 40ms/step - accuracy: 0.9151 - loss: 0.2052 - precision: 0.9183 - recall: 0.9112 - val_accuracy: 0.9118 - val_loss: 0.1931 - val_precision: 0.9455 - val_recall: 0.8739 Epoch 11/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 126s 40ms/step - accuracy: 0.9191 - loss: 0.1965 - precision: 0.9215 - recall: 0.9162 - val_accuracy: 0.8965 - val_loss: 0.2243 - val_precision: 0.8483 - val_recall: 0.9657 Epoch 12/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 126s 40ms/step - accuracy: 0.9234 - loss: 0.1876 - precision: 0.9247 - recall: 0.9218 - val_accuracy: 0.9177 - val_loss: 0.1872 - val_precision: 0.9821 - val_recall: 0.8509 Epoch 13/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 127s 40ms/step - accuracy: 0.9278 - loss: 0.1781 - precision: 0.9295 - recall: 0.9259 - val_accuracy: 0.9333 - val_loss: 0.1613 - val_precision: 0.9332 - val_recall: 0.9334 Epoch 14/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 127s 40ms/step - accuracy: 0.9299 - loss: 0.1710 - precision: 0.9319 - recall: 0.9277 - val_accuracy: 0.9324 - val_loss: 0.1629 - val_precision: 0.9159 - val_recall: 0.9523 Epoch 15/15 3179/3179 ━━━━━━━━━━━━━━━━━━━━ 127s 40ms/step - accuracy: 0.9330 - loss: 0.1636 - precision: 0.9354 - recall: 0.9301 - val_accuracy: 0.9357 - val_loss: 0.1540 - val_precision: 0.9438 - val_recall: 0.9265
In [12]:
# ================== EVALUATION ==================
pred = (model.predict(val_ds) > 0.5).astype(int).ravel()
print(classification_report(val_l, pred, target_names=["Active", "Drowsy"]))
print(confusion_matrix(val_l, pred))
795/795 ━━━━━━━━━━━━━━━━━━━━ 22s 28ms/step precision recall f1-score support Active 0.93 0.94 0.94 12715 Drowsy 0.94 0.93 0.94 12713 accuracy 0.94 25428 macro avg 0.94 0.94 0.94 25428 weighted avg 0.94 0.94 0.94 25428 [[12013 702] [ 934 11779]]
In [13]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import numpy as np
# Compute confusion matrix
cm = confusion_matrix(val_l, pred)
# Class labels
class_names = ["Active", "Drowsy"]
# Plot
plt.figure(figsize=(6, 5))
plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.colorbar()
tick_marks = np.arange(len(class_names))
plt.xticks(tick_marks, class_names)
plt.yticks(tick_marks, class_names)
# Add numbers inside cells
thresh = cm.max() / 2.0
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
plt.text(
j, i, format(cm[i, j], "d"),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black",
fontsize=12
)
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.tight_layout()
plt.show()
In [11]:
model=models.load_model("/home/dev-rayied/ML/drowsiness_model_32x32.keras") # without compile=False
In [14]:
def make_gradcam_heatmap(img_array, model):
grad_model = tf.keras.models.Model(
model.inputs,
[model.get_layer("last_conv").output, model.outputs[0]]
)
with tf.GradientTape() as tape:
conv_out, preds = grad_model(img_array)
loss = preds[:, 0]
grads = tape.gradient(loss, conv_out)
pooled_grads = tf.reduce_mean(grads, axis=(0,1,2))
heatmap = tf.reduce_sum(conv_out[0] * pooled_grads, axis=-1)
heatmap = tf.maximum(heatmap, 0)
heatmap /= tf.reduce_max(heatmap) + 1e-8
return heatmap.numpy()
def overlay_heatmap(img, heatmap):
heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
heatmap = np.uint8(255 * heatmap)
heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
return cv2.addWeighted(img, 0.6, heatmap, 0.4, 0)
In [34]:
import mediapipe as mp
print(mp.__file__)
print(dir(mp))
/home/dev-rayied/miniconda3/envs/tf310/lib/python3.10/site-packages/mediapipe/__init__.py ['Image', 'ImageFormat', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', '__version__', 'tasks']
In [28]:
%pip uninstall mediapipe -y
Found existing installation: mediapipe 0.10.31 Uninstalling mediapipe-0.10.31: Successfully uninstalled mediapipe-0.10.31 Note: you may need to restart the kernel to use updated packages.
In [32]:
%pip install mediapipe==0.10.9 -y
Usage: /home/dev-rayied/miniconda3/envs/tf310/bin/python -m pip install [options] <requirement specifier> [package-index-options] ... /home/dev-rayied/miniconda3/envs/tf310/bin/python -m pip install [options] -r <requirements file> [package-index-options] ... /home/dev-rayied/miniconda3/envs/tf310/bin/python -m pip install [options] [-e] <vcs project url> ... /home/dev-rayied/miniconda3/envs/tf310/bin/python -m pip install [options] [-e] <local project path> ... /home/dev-rayied/miniconda3/envs/tf310/bin/python -m pip install [options] <archive url/path> ... no such option: -y Note: you may need to restart the kernel to use updated packages.
In [32]:
# ============================================================
# MEDIAPIPE FACEMESH
# ============================================================
# mp_face_mesh = mp.solutions.face_mesh
# face_mesh = mp_face_mesh.FaceMesh(
# static_image_mode=False,
# max_num_faces=1,
# refine_landmarks=True,
# min_detection_confidence=0.5,
# min_tracking_confidence=0.5
# )
LEFT_EYE_IDX = [33, 160, 158, 133, 153, 144]
RIGHT_EYE_IDX = [362, 385, 387, 263, 373, 380]
MOUTH_IDX = [61, 291, 81, 178, 13, 14]
import mediapipe as mp
# Explicitly import the solutions module
import mediapipe.python.solutions.face_mesh as mp_face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
I0000 00:00:1768894766.897412 17001 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5 I0000 00:00:1768894766.901983 17541 gl_context.cc:344] GL version: 3.2 (OpenGL ES 3.2 Mesa 25.0.7-0ubuntu0.24.04.2), renderer: Mesa Intel(R) Graphics (RPL-P)
In [33]:
# # ============================================================
# # HELPER FUNCTIONS
# # ============================================================
# def euclid(p1,p2):
# return np.linalg.norm(np.array(p1)-np.array(p2))
# def extract_landmarks(face):
# rgb = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
# res = face_mesh.process(rgb)
# if not res.multi_face_landmarks:
# return None
# lm = res.multi_face_landmarks[0].landmark
# return [(p.x, p.y) for p in lm]
# def compute_ear(lm,w,h):
# def pt(i): return (lm[i][0]*w, lm[i][1]*h)
# def ear(idx):
# p1,p2,p3,p4,p5,p6 = idx
# P1,P2,P3,P4,P5,P6 = map(pt,idx)
# return (euclid(P2,P5)+euclid(P3,P6))/(2*euclid(P1,P4)+1e-6)
# return (ear(LEFT_EYE)+ear(RIGHT_EYE))/2
# def compute_mar(lm,w,h):
# def pt(i): return (lm[i][0]*w, lm[i][1]*h)
# m1,m4,m2,m5,m3,m6 = MOUTH
# M1,M4,M2,M5,M3,M6 = map(pt,[m1,m4,m2,m5,m3,m6])
# return (euclid(M2,M5)+euclid(M3,M6))/(2*euclid(M1,M4)+1e-6)
# ============================================================
# HELPER FUNCTIONS
# ============================================================
def euclid(p1, p2):
return np.linalg.norm(np.array(p1) - np.array(p2))
def extract_landmarks(face):
rgb = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
res = face_mesh.process(rgb)
if not res.multi_face_landmarks:
return None
lm = res.multi_face_landmarks[0].landmark
return [(p.x, p.y) for p in lm]
def compute_ear(lm, w, h):
def pt(i):
return (lm[i][0] * w, lm[i][1] * h)
def ear(idx):
p1, p2, p3, p4, p5, p6 = idx
P1, P2, P3, P4, P5, P6 = map(pt, idx)
return (euclid(P2, P5) + euclid(P3, P6)) / (2 * euclid(P1, P4) + 1e-6)
return (ear(LEFT_EYE_IDX) + ear(RIGHT_EYE_IDX)) / 2
def compute_mar(lm, w, h):
def pt(i):
return (lm[i][0] * w, lm[i][1] * h)
m1, m4, m2, m5, m3, m6 = MOUTH_IDX
M1, M4, M2, M5, M3, M6 = map(pt, [m1, m4, m2, m5, m3, m6])
return (euclid(M2, M5) + euclid(M3, M6)) / (2 * euclid(M1, M4) + 1e-6)
In [34]:
# # ============================================================
# # PERCLOS
# # ============================================================
# class Perclos:
# def __init__(self,fps):
# self.buf = deque(maxlen=int(fps*PERCLOS_WINDOW_SEC))
# def update(self,closed):
# self.buf.append(1 if closed else 0)
# def value(self):
# return 100*sum(self.buf)/max(1,len(self.buf))
# ============================================================
# PERCLOS
# ============================================================
class Perclos:
def __init__(self, fps):
self.buf = deque(maxlen=int(fps * PERCLOS_WINDOW_SEC))
def update(self, closed):
self.buf.append(1 if closed else 0)
def value(self):
return 100 * sum(self.buf) / max(1, len(self.buf))
In [35]:
# # ============================================================
# # HYBRID FUZZY SYSTEM
# # ============================================================
# CNN = ctrl.Antecedent(np.linspace(0,1,101),'CNN')
# EYE = ctrl.Antecedent(np.linspace(0,1,101),'EYE')
# MOUTH = ctrl.Antecedent(np.linspace(0,1,101),'MOUTH')
# D = ctrl.Consequent(np.linspace(0,1,101),'D')
# CNN['low']=fuzz.trapmf(CNN.universe,[0,0,0.25,0.45])
# CNN['med']=fuzz.trapmf(CNN.universe,[0.3,0.45,0.55,0.7])
# CNN['high']=fuzz.trapmf(CNN.universe,[0.55,0.75,1,1])
# EYE['normal']=fuzz.trapmf(EYE.universe,[0,0,0.2,0.4])
# EYE['fatigue']=fuzz.trapmf(EYE.universe,[0.25,0.45,0.55,0.75])
# EYE['severe']=fuzz.trapmf(EYE.universe,[0.6,0.8,1,1])
# MOUTH['normal']=fuzz.trapmf(MOUTH.universe,[0,0,0.2,0.4])
# MOUTH['yawn']=fuzz.trapmf(MOUTH.universe,[0.5,0.7,1,1])
# D['low']=fuzz.trapmf(D.universe,[0,0,0.3,0.5])
# D['high']=fuzz.trapmf(D.universe,[0.5,0.7,1,1])
# rules = [
# ctrl.Rule(CNN['low'] & EYE['normal'], D['low']),
# ctrl.Rule(CNN['med'] & EYE['fatigue'], D['high']),
# ctrl.Rule(CNN['high'] & EYE['severe'], D['high']),
# ctrl.Rule(MOUTH['yawn'], D['high']),
# ctrl.Rule(EYE['severe'], D['high'])
# ]
# fis = ctrl.ControlSystem(rules)
# fis_sim = ctrl.ControlSystemSimulation(fis)
# ============================================================
# HYBRID FUZZY SYSTEM (SAFE NAMES)
# ============================================================
CNN_SCORE = ctrl.Antecedent(np.linspace(0, 1, 101), 'CNN')
EYE_FAT = ctrl.Antecedent(np.linspace(0, 1, 101), 'EYE')
MOUTH_FAT = ctrl.Antecedent(np.linspace(0, 1, 101), 'MOUTH')
DROWSY = ctrl.Consequent(np.linspace(0, 1, 101), 'D')
CNN_SCORE['low'] = fuzz.trapmf(CNN_SCORE.universe, [0, 0, 0.25, 0.45])
CNN_SCORE['med'] = fuzz.trapmf(CNN_SCORE.universe, [0.3, 0.45, 0.55, 0.7])
CNN_SCORE['high'] = fuzz.trapmf(CNN_SCORE.universe, [0.55, 0.75, 1, 1])
EYE_FAT['normal'] = fuzz.trapmf(EYE_FAT.universe, [0, 0, 0.2, 0.4])
EYE_FAT['fatigue'] = fuzz.trapmf(EYE_FAT.universe, [0.25, 0.45, 0.55, 0.75])
EYE_FAT['severe'] = fuzz.trapmf(EYE_FAT.universe, [0.6, 0.8, 1, 1])
MOUTH_FAT['normal'] = fuzz.trapmf(MOUTH_FAT.universe, [0, 0, 0.2, 0.4])
MOUTH_FAT['yawn'] = fuzz.trapmf(MOUTH_FAT.universe, [0.5, 0.7, 1, 1])
DROWSY['low'] = fuzz.trapmf(DROWSY.universe, [0, 0, 0.3, 0.5])
DROWSY['high'] = fuzz.trapmf(DROWSY.universe, [0.5, 0.7, 1, 1])
rules = [
ctrl.Rule(CNN_SCORE['low'] & EYE_FAT['normal'], DROWSY['low']),
ctrl.Rule(CNN_SCORE['med'] & EYE_FAT['fatigue'], DROWSY['high']),
ctrl.Rule(CNN_SCORE['high'] & EYE_FAT['severe'], DROWSY['high']),
ctrl.Rule(MOUTH_FAT['yawn'], DROWSY['high']),
ctrl.Rule(EYE_FAT['severe'], DROWSY['high'])
]
fis = ctrl.ControlSystem(rules)
fis_sim = ctrl.ControlSystemSimulation(fis)
In [37]:
# def ear_to_fatigue(ear):
# """
# Normalize EAR into fatigue score [0,1]
# 1.0 = fully closed, 0.0 = fully open
# """
# if ear <= T_EAR_CLOSE:
# return 1.0
# if ear >= T_EAR_OPEN:
# return 0.0
# return (T_EAR_OPEN - ear) / (T_EAR_OPEN - T_EAR_CLOSE + 1e-6)
def ear_to_fatigue(ear):
if ear <= T_EAR_CLOSE:
return 1.0
if ear >= T_EAR_OPEN:
return 0.0
return (T_EAR_OPEN - ear) / (T_EAR_OPEN - T_EAR_CLOSE + 1e-6)
In [41]:
# # ============================================================
# # FRAME PROCESSING
# # ============================================================
# def process_frame(frame, perclos, use_gradcam=False):
# img = cv2.resize(frame, (640,640))
# results = yolo_face.predict(img, verbose=False)
# for r in results:
# if r.boxes is None:
# continue
# for box in r.boxes.xyxy:
# x1,y1,x2,y2 = map(int, box)
# face = img[y1:y2, x1:x2]
# if face.size == 0:
# continue
# # CNN input
# face_small = cv2.resize(face, (IMG_SIZE, IMG_SIZE))
# x = face_small.astype(np.float32) / 255.0
# x = np.expand_dims(x, axis=0)
# cnn_prob = float(model(x, training=False)[0][0])
# # Grad-CAM (optional)
# if use_gradcam:
# heatmap = make_gradcam_heatmap(x, model)
# cam = overlay_heatmap(face_small, heatmap)
# cam = cv2.resize(cam, (x2-x1, y2-y1))
# img[y1:y2, x1:x2] = cam
# # Landmarks
# lm = extract_landmarks(face)
# if lm:
# h,w = face.shape[:2]
# ear = compute_ear(lm, w, h)
# mar = compute_mar(lm, w, h)
# perclos.update(ear <= T_EAR_CLOSE)
# ear_f = ear_to_fatigue(ear)
# eye_f = ALPHA_EYE * ear_f + (1 - ALPHA_EYE) * (perclos.value()/100)
# mouth_f = np.clip(
# (mar - T_MAR_HALF) /
# (T_MAR_OPEN - T_MAR_HALF + 1e-6),
# 0, 1
# )
# else:
# eye_f = mouth_f = 0.0
# # Hybrid FIS
# fis_sim.input['CNN'] = cnn_prob
# fis_sim.input['EYE'] = eye_f
# fis_sim.input['MOUTH'] = mouth_f
# fis_sim.compute()
# score = fis_sim.output['D']
# label = "ACTIVE"
# color = (0,255,0)
# if score >= 0.7:
# label = "DROWSY"
# color = (0,0,255)
# cv2.rectangle(img, (x1,y1), (x2,y2), color, 2)
# cv2.putText(
# img,
# f"{label} | {score:.2f}",
# (x1, y1-10),
# cv2.FONT_HERSHEY_SIMPLEX,
# 0.7,
# color,
# 2
# )
# return img
def process_frame(frame, perclos, use_gradcam=False):
img = cv2.resize(frame, (640, 640))
results = yolo_face.predict(img, verbose=False)
for r in results:
if r.boxes is None:
continue
for box in r.boxes.xyxy:
x1, y1, x2, y2 = map(int, box)
face = img[y1:y2, x1:x2]
if face.size == 0:
continue
# CNN
face_small = cv2.resize(face, (IMG_SIZE, IMG_SIZE))
x = face_small.astype(np.float32) / 255.0
x = x[None, ...]
cnn_prob = float(model(x, training=False)[0][0])
# Grad-CAM
if use_gradcam:
heatmap = make_gradcam_heatmap(x, model)
cam = overlay_heatmap(face_small, heatmap)
cam = cv2.resize(cam, (x2-x1, y2-y1))
img[y1:y2, x1:x2] = cam
# Landmarks
lm = extract_landmarks(face)
if lm:
h, w = face.shape[:2]
ear = compute_ear(lm, w, h)
mar = compute_mar(lm, w, h)
perclos.update(ear <= T_EAR_CLOSE)
eye_f = ALPHA_EYE * ear_to_fatigue(ear) \
+ (1 - ALPHA_EYE) * (perclos.value() / 100)
mouth_f = np.clip(
(mar - T_MAR_HALF) / (T_MAR_OPEN - T_MAR_HALF + 1e-6),
0, 1
)
else:
eye_f = mouth_f = 0.0
# FIS
fis_sim.reset()
fis_sim.input['CNN'] = float(cnn_prob)
fis_sim.input['EYE'] = float(eye_f)
fis_sim.input['MOUTH'] = float(mouth_f)
fis_sim.compute()
score = fis_sim.output.get('D', 0.0)
label = "ACTIVE"
color = (0, 255, 0)
if score >= 0.7:
label = "DROWSY"
color = (0, 0, 255)
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
cv2.putText(img, f"{label} | {score:.2f}",
(x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
return img
In [44]:
# # ============================================================
# # VIDEO RUN
# # ============================================================
# cap = cv2.VideoCapture("/home/dev-rayied/ML/Test_data/55.mp4")
# fps = cap.get(cv2.CAP_PROP_FPS)
# fps = fps if fps > 0 else FPS_FALLBACK
# perclos = Perclos(fps)
# while cap.isOpened():
# ret, frame = cap.read()
# if not ret:
# break
# out = process_frame(frame, perclos)
# cv2.imshow("Hybrid Drowsiness Detection", out)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
# cap.release()
# cv2.destroyAllWindows()
# ============================================================
# VIDEO RUN + SAVE OUTPUT (FINAL)
# ============================================================
INPUT_VIDEO = "/home/dev-rayied/ML/Test_data/4.mp4"
OUTPUT_DIR = "/home/dev-rayied/ML/Test_data"
OUTPUT_VIDEO = os.path.join(OUTPUT_DIR, "hybrid_drowsiness_output3.mp4")
SHOW_LIVE = True # set False for faster processing
os.makedirs(OUTPUT_DIR, exist_ok=True)
cap = cv2.VideoCapture(INPUT_VIDEO)
if not cap.isOpened():
raise RuntimeError(f"Cannot open video: {INPUT_VIDEO}")
fps = cap.get(cv2.CAP_PROP_FPS)
fps = fps if fps > 0 else FPS_FALLBACK
out_writer = cv2.VideoWriter(
OUTPUT_VIDEO,
cv2.VideoWriter_fourcc(*"mp4v"),
fps,
(640, 640)
)
perclos = Perclos(fps)
print("▶ Processing video...")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
output_frame = process_frame(frame, perclos)
if SHOW_LIVE:
cv2.imshow("Hybrid Drowsiness Detection", output_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
out_writer.write(output_frame)
cap.release()
out_writer.release()
cv2.destroyAllWindows()
print(f"✅ Output saved to: {OUTPUT_VIDEO}")
▶ Processing video... ✅ Output saved to: /home/dev-rayied/ML/Test_data/hybrid_drowsiness_output3.mp4
In [46]:
# # ============================================================
# # VIDEO RUN
# # ============================================================
# cap = cv2.VideoCapture("/home/dev-rayied/ML/Test_data/55.mp4")
# fps = cap.get(cv2.CAP_PROP_FPS)
# fps = fps if fps > 0 else FPS_FALLBACK
# perclos = Perclos(fps)
# while cap.isOpened():
# ret, frame = cap.read()
# if not ret:
# break
# out = process_frame(frame, perclos)
# cv2.imshow("Hybrid Drowsiness Detection", out)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
# cap.release()
# cv2.destroyAllWindows()
# ============================================================
# VIDEO RUN + SAVE OUTPUT (FINAL)
# ============================================================
INPUT_VIDEO = "/home/dev-rayied/ML/Videos/AD.mp4"
OUTPUT_DIR = "/home/dev-rayied/ML/Test_data"
OUTPUT_VIDEO = os.path.join(OUTPUT_DIR, "hybrid_drowsiness_output5.mp4")
SHOW_LIVE = True # set False for faster processing
os.makedirs(OUTPUT_DIR, exist_ok=True)
cap = cv2.VideoCapture(INPUT_VIDEO)
if not cap.isOpened():
raise RuntimeError(f"Cannot open video: {INPUT_VIDEO}")
fps = cap.get(cv2.CAP_PROP_FPS)
fps = fps if fps > 0 else FPS_FALLBACK
out_writer = cv2.VideoWriter(
OUTPUT_VIDEO,
cv2.VideoWriter_fourcc(*"mp4v"),
fps,
(640, 640)
)
perclos = Perclos(fps)
print("▶ Processing video...")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
output_frame = process_frame(frame, perclos)
if SHOW_LIVE:
cv2.imshow("Hybrid Drowsiness Detection", output_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
out_writer.write(output_frame)
cap.release()
out_writer.release()
cv2.destroyAllWindows()
print(f"✅ Output saved to: {OUTPUT_VIDEO}")
▶ Processing video... ✅ Output saved to: /home/dev-rayied/ML/Test_data/hybrid_drowsiness_output5.mp4
In [30]:
print("LEFT_EYE:", LEFT_EYE)
print("RIGHT_EYE:", RIGHT_EYE)
LEFT_EYE: [33, 160, 158, 133, 153, 144] RIGHT_EYE: [362, 385, 387, 263, 373, 380]
In [ ]:
# # # workflow: //add gradcam also
# # # # ┌───────────────────────────────┐
# # # # │ Video / Image Input │
# # # # └───────────────┬───────────────┘
# # # # ↓
# # # # ┌───────────────────────────────┐
# # # # │ YOLOv8 Face Detection │
# # # # │ (Face ROI) │
# # # # └───────────────┬───────────────┘
# # # # ↓
# # # # ┌───────┴────────┐
# # # # │ │
# # # # │ │
# # # # ┌───────▼───────┐ ┌─────▼─────────────────┐
# # # # │ Path-A (CNN) │ │ Path-B (Physiology) │
# # # # │ │ │ │
# # # # │ CNN Classifier│ │ MediaPipe FaceMesh │
# # # # │ P_drowsy(t) │ │ EAR(t), MAR(t) │
# # # # │ │ │ │
# # # # └───────┬───────┘ │ Temporal Window │
# # # # │ │ → PERCLOS(W) │
# # # # │ └─────────┬─────────────┘
# # # # │ │
# # # # └──────────┬─────────┘
# # # # ↓
# # # # ┌──────────────────────────────────────────┐
# # # # │ Hybrid Fuzzy Inference System (FIS) │
# # # # │ Inputs: │
# # # # │ • CNN_score │
# # # # │ • Eye_fatigue (EAR + PERCLOS) │
# # # # │ • Mouth_fatigue (MAR / Yawn) │
# # # # │ Rules + Defuzzification │
# # # # └───────────────┬──────────────────────────┘
# # # # ↓
# # # # ┌───────────────────────────────┐
# # # # │ Drowsiness Score (0–1) │
# # # # │ → Label (Active / Warning / │
# # # # │ Drowsy / Severe) │
# # # # │ → Temporal Alert Logic │
# # # # └───────────────────────────────┘
# # # code:
# # # # ============================================================
# # # # HYBRID DRIVER DROWSINESS DETECTION (FULL CODE)
# # # # CNN + YOLO + EAR/MAR + PERCLOS + HYBRID FIS
# # # # ============================================================
# # # # -------------------- ENV --------------------
# # # import os
# # # os.environ["CUDA_VISIBLE_DEVICES"] = ""
# # # os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
# # # # -------------------- IMPORTS --------------------
# # # import cv2
# # # import numpy as np
# # # import matplotlib.pyplot as plt
# # # from collections import deque
# # # import tensorflow as tf
# # # from tensorflow.keras import models
# # # from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Dropout, GlobalAveragePooling2D
# # # from tensorflow.keras.optimizers import Adam
# # # from ultralytics import YOLO
# # # from huggingface_hub import hf_hub_download
# # # import mediapipe as mp
# # # import skfuzzy as fuzz
# # # from skfuzzy import control as ctrl
# # # import cv2
# # # import gc
# # # import numpy as np
# # # import matplotlib.pyplot as plt
# # # import tensorflow as tf
# # # from tensorflow.keras import layers, models, optimizers
# # # from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
# # # from sklearn.model_selection import train_test_split
# # # from sklearn.metrics import classification_report, confusion_matrix
# # # from ultralytics import YOLO
# # # from huggingface_hub import hf_hub_download
# # # tf.config.optimizer.set_jit(False)
# # # from tensorflow.keras.layers import Conv2D,MaxPooling2D,Dropout,Flatten
# # # import matplotlib.pyplot as plt
# # # import numpy as np
# # # import PIL
# # # import tensorflow as tf
# # # from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Flatten, Conv2D, MaxPooling2D
# # # from tensorflow.keras.models import Sequential
# # # from tensorflow.keras.optimizers import Adam
# # # # ============================================================
# # # # CONFIG
# # # # ============================================================
# # # IMG_SIZE = 32
# # # FPS_FALLBACK = 25
# # # PERCLOS_WINDOW_SEC = 60
# # # ALERT_HOLD_SEC = 2.0
# # # T_EAR_CLOSE = 0.21
# # # T_EAR_OPEN = 0.27
# # # T_MAR_HALF = 0.30
# # # T_MAR_OPEN = 0.40
# # # ALPHA_EYE = 0.6
# # # FACE_DATASET_PATH = "/home/dev-rayied/ML/time_augmented"
# # # IMG_SIZE = 32 # 🔥 reduced from 224 → much faster
# # # BATCH_SIZE = 32
# # # AUTOTUNE = tf.data.AUTOTUNE
# # # CLASSES = {"active": 0, "drowsy": 1}
# # # SUBFOLDERS = [
# # # "afternoon","cloudy","completely_dark","completely_white",
# # # "Dawn","evening","golden_hour","harsh_sun",
# # # "indoor_light","midnight","morning","night"
# # # ]
# # # # ============================================================
# # # # LOAD YOLO FACE MODEL
# # # # ============================================================
# # # print("Downloading YOLOv8 FACE model...")
# # # face_model_path = hf_hub_download(
# # # repo_id="arnabdhar/YOLOv8-Face-Detection",
# # # filename="model.pt"
# # # )
# # # print("Loading YOLOv8 FACE (CPU only)")
# # # yolo_face = YOLO(face_model_path)
# # # def build_index(path):
# # # paths, labels = [], []
# # # for cls, label in CLASSES.items():
# # # for sub in SUBFOLDERS:
# # # folder = os.path.join(path, cls, sub)
# # # if not os.path.exists(folder):
# # # continue
# # # for img in os.listdir(folder):
# # # if img.lower().endswith((".jpg",".png",".jpeg")):
# # # paths.append(os.path.join(folder, img))
# # # labels.append(label)
# # # return paths, labels
# # # paths, labels = build_index(FACE_DATASET_PATH)
# # # train_p, val_p, train_l, val_l = train_test_split(
# # # paths, labels,
# # # test_size=0.2,
# # # stratify=labels,
# # # random_state=42
# # # )
# # # def load_image(path, label):
# # # img = tf.io.read_file(path)
# # # img = tf.image.decode_image(img, channels=3, expand_animations=False)
# # # img.set_shape([None, None, 3])
# # # img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE))
# # # img = tf.cast(img, tf.float32) / 255.0
# # # return img, tf.cast(label, tf.float32)
# # # train_ds = (
# # # tf.data.Dataset.from_tensor_slices((train_p, train_l))
# # # .shuffle(1024)
# # # .map(load_image, num_parallel_calls=AUTOTUNE)
# # # .batch(BATCH_SIZE)
# # # .prefetch(AUTOTUNE)
# # # )
# # # val_ds = (
# # # tf.data.Dataset.from_tensor_slices((val_p, val_l))
# # # .map(load_image, num_parallel_calls=AUTOTUNE)
# # # .batch(BATCH_SIZE)
# # # .prefetch(AUTOTUNE)
# # # )
# # # # ============================================================
# # # # CNN MODEL (YOUR ORIGINAL)
# # # # ============================================================
# # # model = models.Sequential([
# # # Conv2D(32, (3,3), activation="relu", input_shape=(IMG_SIZE, IMG_SIZE, 3)),
# # # MaxPooling2D(2,2),
# # # Conv2D(32, (3,3), activation="relu"),
# # # MaxPooling2D(2,2),
# # # Conv2D(64, (3,3), activation="relu", name="last_conv"),
# # # GlobalAveragePooling2D(),
# # # Dense(128, activation="relu"),
# # # Dropout(0.5),
# # # Dense(1, activation="sigmoid")
# # # ])
# # # model.compile(
# # # optimizer=Adam(learning_rate=1e-4),
# # # loss="binary_crossentropy",
# # # metrics=[
# # # "accuracy",
# # # tf.keras.metrics.Precision(name="precision"),
# # # tf.keras.metrics.Recall(name="recall")
# # # ]
# # # )
# # # model.summary()
# # # from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
# # # callbacks = [
# # # EarlyStopping(
# # # monitor="val_loss",
# # # patience=5,
# # # restore_best_weights=True
# # # ),
# # # ModelCheckpoint(
# # # "drowsiness_model_32x32.keras",
# # # monitor="val_loss",
# # # save_best_only=True
# # # )
# # # ]
# # # history = model.fit(
# # # train_ds,
# # # validation_data=val_ds,
# # # epochs=15,
# # # callbacks=callbacks
# # # )
# # # # ================== EVALUATION ==================
# # # pred = (model.predict(val_ds) > 0.5).astype(int).ravel()
# # # print(classification_report(val_l, pred, target_names=["Active", "Drowsy"]))
# # # print(confusion_matrix(val_l, pred))
# # # from sklearn.metrics import confusion_matrix
# # # import matplotlib.pyplot as plt
# # # import numpy as np
# # # # Compute confusion matrix
# # # cm = confusion_matrix(val_l, pred)
# # # # Class labels
# # # class_names = ["Active", "Drowsy"]
# # # # Plot
# # # plt.figure(figsize=(6, 5))
# # # plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
# # # plt.title("Confusion Matrix")
# # # plt.colorbar()
# # # tick_marks = np.arange(len(class_names))
# # # plt.xticks(tick_marks, class_names)
# # # plt.yticks(tick_marks, class_names)
# # # # Add numbers inside cells
# # # thresh = cm.max() / 2.0
# # # for i in range(cm.shape[0]):
# # # for j in range(cm.shape[1]):
# # # plt.text(
# # # j, i, format(cm[i, j], "d"),
# # # horizontalalignment="center",
# # # color="white" if cm[i, j] > thresh else "black",
# # # fontsize=12
# # # )
# # # plt.ylabel("True Label")
# # # plt.xlabel("Predicted Label")
# # # plt.tight_layout()
# # # plt.show()
# # # def make_gradcam_heatmap(img_array, model):
# # # grad_model = tf.keras.models.Model(
# # # model.inputs,
# # # [model.get_layer("last_conv").output, model.outputs[0]]
# # # )
# # # with tf.GradientTape() as tape:
# # # conv_out, preds = grad_model(img_array)
# # # loss = preds[:, 0]
# # # grads = tape.gradient(loss, conv_out)
# # # pooled_grads = tf.reduce_mean(grads, axis=(0,1,2))
# # # heatmap = tf.reduce_sum(conv_out[0] * pooled_grads, axis=-1)
# # # heatmap = tf.maximum(heatmap, 0)
# # # heatmap /= tf.reduce_max(heatmap) + 1e-8
# # # return heatmap.numpy()
# # # def overlay_heatmap(img, heatmap):
# # # heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
# # # heatmap = np.uint8(255 * heatmap)
# # # heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
# # # return cv2.addWeighted(img, 0.6, heatmap, 0.4, 0)
# # # # ============================================================
# # # # MEDIAPIPE FACEMESH
# # # # ============================================================
# # # mp_face_mesh = mp.solutions.face_mesh
# # # face_mesh = mp_face_mesh.FaceMesh(
# # # static_image_mode=False,
# # # max_num_faces=1,
# # # refine_landmarks=True,
# # # min_detection_confidence=0.5,
# # # min_tracking_confidence=0.5
# # # )
# # # LEFT_EYE = [33,160,158,133,153,144]
# # # RIGHT_EYE = [362,385,387,263,373,380]
# # # MOUTH = [61,291,81,178,13,14]
# # # # ============================================================
# # # # HELPER FUNCTIONS
# # # # ============================================================
# # # def euclid(p1,p2):
# # # return np.linalg.norm(np.array(p1)-np.array(p2))
# # # def extract_landmarks(face):
# # # rgb = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
# # # res = face_mesh.process(rgb)
# # # if not res.multi_face_landmarks:
# # # return None
# # # lm = res.multi_face_landmarks[0].landmark
# # # return [(p.x, p.y) for p in lm]
# # # def compute_ear(lm,w,h):
# # # def pt(i): return (lm[i][0]*w, lm[i][1]*h)
# # # def ear(idx):
# # # p1,p2,p3,p4,p5,p6 = idx
# # # P1,P2,P3,P4,P5,P6 = map(pt,idx)
# # # return (euclid(P2,P5)+euclid(P3,P6))/(2*euclid(P1,P4)+1e-6)
# # # return (ear(LEFT_EYE)+ear(RIGHT_EYE))/2
# # # def compute_mar(lm,w,h):
# # # def pt(i): return (lm[i][0]*w, lm[i][1]*h)
# # # m1,m4,m2,m5,m3,m6 = MOUTH
# # # M1,M4,M2,M5,M3,M6 = map(pt,[m1,m4,m2,m5,m3,m6])
# # # return (euclid(M2,M5)+euclid(M3,M6))/(2*euclid(M1,M4)+1e-6)
# # # # ============================================================
# # # # PERCLOS
# # # # ============================================================
# # # class Perclos:
# # # def __init__(self,fps):
# # # self.buf = deque(maxlen=int(fps*PERCLOS_WINDOW_SEC))
# # # def update(self,closed):
# # # self.buf.append(1 if closed else 0)
# # # def value(self):
# # # return 100*sum(self.buf)/max(1,len(self.buf))
# # # # ============================================================
# # # # HYBRID FUZZY SYSTEM
# # # # ============================================================
# # # CNN = ctrl.Antecedent(np.linspace(0,1,101),'CNN')
# # # EYE = ctrl.Antecedent(np.linspace(0,1,101),'EYE')
# # # MOUTH = ctrl.Antecedent(np.linspace(0,1,101),'MOUTH')
# # # D = ctrl.Consequent(np.linspace(0,1,101),'D')
# # # CNN['low']=fuzz.trapmf(CNN.universe,[0,0,0.25,0.45])
# # # CNN['med']=fuzz.trapmf(CNN.universe,[0.3,0.45,0.55,0.7])
# # # CNN['high']=fuzz.trapmf(CNN.universe,[0.55,0.75,1,1])
# # # EYE['normal']=fuzz.trapmf(EYE.universe,[0,0,0.2,0.4])
# # # EYE['fatigue']=fuzz.trapmf(EYE.universe,[0.25,0.45,0.55,0.75])
# # # EYE['severe']=fuzz.trapmf(EYE.universe,[0.6,0.8,1,1])
# # # MOUTH['normal']=fuzz.trapmf(MOUTH.universe,[0,0,0.2,0.4])
# # # MOUTH['yawn']=fuzz.trapmf(MOUTH.universe,[0.5,0.7,1,1])
# # # D['low']=fuzz.trapmf(D.universe,[0,0,0.3,0.5])
# # # D['high']=fuzz.trapmf(D.universe,[0.5,0.7,1,1])
# # # rules = [
# # # ctrl.Rule(CNN['low'] & EYE['normal'], D['low']),
# # # ctrl.Rule(CNN['med'] & EYE['fatigue'], D['high']),
# # # ctrl.Rule(CNN['high'] & EYE['severe'], D['high']),
# # # ctrl.Rule(MOUTH['yawn'], D['high']),
# # # ctrl.Rule(EYE['severe'], D['high'])
# # # ]
# # # fis = ctrl.ControlSystem(rules)
# # # fis_sim = ctrl.ControlSystemSimulation(fis)
# def ear_to_fatigue(ear):
# """
# Normalize EAR into fatigue score [0,1]
# 1.0 = fully closed, 0.0 = fully open
# """
# if ear <= T_EAR_CLOSE:
# return 1.0
# if ear >= T_EAR_OPEN:
# return 0.0
# return (T_EAR_OPEN - ear) / (T_EAR_OPEN - T_EAR_CLOSE + 1e-6)
# # # ============================================================
# # # FRAME PROCESSING
# # # ============================================================
# # def process_frame(frame, perclos, use_gradcam=False):
# # img = cv2.resize(frame, (640,640))
# # results = yolo_face.predict(img, verbose=False)
# # for r in results:
# # if r.boxes is None:
# # continue
# # for box in r.boxes.xyxy:
# # x1,y1,x2,y2 = map(int, box)
# # face = img[y1:y2, x1:x2]
# # if face.size == 0:
# # continue
# # # CNN input
# # face_small = cv2.resize(face, (IMG_SIZE, IMG_SIZE))
# # x = face_small.astype(np.float32) / 255.0
# # x = np.expand_dims(x, axis=0)
# # cnn_prob = float(model(x, training=False)[0][0])
# # # Grad-CAM (optional)
# # if use_gradcam:
# # heatmap = make_gradcam_heatmap(x, model)
# # cam = overlay_heatmap(face_small, heatmap)
# # cam = cv2.resize(cam, (x2-x1, y2-y1))
# # img[y1:y2, x1:x2] = cam
# # # Landmarks
# # lm = extract_landmarks(face)
# # if lm:
# # h,w = face.shape[:2]
# # ear = compute_ear(lm, w, h)
# # mar = compute_mar(lm, w, h)
# # perclos.update(ear <= T_EAR_CLOSE)
# # ear_f = ear_to_fatigue(ear)
# # eye_f = ALPHA_EYE * ear_f + (1 - ALPHA_EYE) * (perclos.value()/100)
# # mouth_f = np.clip(
# # (mar - T_MAR_HALF) /
# # (T_MAR_OPEN - T_MAR_HALF + 1e-6),
# # 0, 1
# # )
# # else:
# # eye_f = mouth_f = 0.0
# # # Hybrid FIS
# # fis_sim.input['CNN'] = cnn_prob
# # fis_sim.input['EYE'] = eye_f
# # fis_sim.input['MOUTH'] = mouth_f
# # fis_sim.compute()
# # score = fis_sim.output['D']
# # label = "ACTIVE"
# # color = (0,255,0)
# # if score >= 0.7:
# # label = "DROWSY"
# # color = (0,0,255)
# # cv2.rectangle(img, (x1,y1), (x2,y2), color, 2)
# # cv2.putText(
# # img,
# # f"{label} | {score:.2f}",
# # (x1, y1-10),
# # cv2.FONT_HERSHEY_SIMPLEX,
# # 0.7,
# # color,
# # 2
# # )
# # return img
# # # ============================================================
# # # VIDEO RUN
# # # ============================================================
# # cap = cv2.VideoCapture("/home/dev-rayied/ML/Test_data/55.mp4")
# # fps = cap.get(cv2.CAP_PROP_FPS)
# # fps = fps if fps>0 else FPS_FALLBACK
# # perclos = Perclos(fps)
# # while cap.isOpened():
# # ret, frame = cap.read()
# # if not ret: break
# # out = process_frame(frame, perclos)
# # cv2.imshow("Hybrid Drowsiness Detection", out)
# # if cv2.waitKey(1)&0xFF==ord('q'):
# # break
# # cap.release()
# # cv2.destroyAllWindows()
# # # //any fix needed
In [ ]: