code refactoring for movement dbm group

This commit is contained in:
jordi.hasianta
2022-09-15 20:39:23 +07:00
parent a74e0d3e35
commit 609c752cfa
5 changed files with 538 additions and 353 deletions

View File

@@ -4,29 +4,47 @@ project_name: DBM
created: 2020-20-07
"""
import logging
import os
import glob
from scipy.spatial import distance as dist
from scipy.signal import find_peaks
from imutils.video import FileVideoStream
from imutils.video import VideoStream
from imutils import face_utils
from moviepy.editor import VideoFileClip
import subprocess
import cv2
import dlib
import imutils
import numpy as np
import pandas as pd
import imutils
import time
import dlib
import cv2
import logging
from imutils import face_utils
from imutils.video import FileVideoStream
from scipy.signal import find_peaks
from scipy.spatial import distance as dist
from opendbm.dbm_lib.dbm_features.raw_features.util import util as ut
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger()
logger = logging.getLogger()
movement_expr_dir = "movement/eye_blink"
csv_ext = "_eyeblinks.csv"
def get_length(filename):
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
filename,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
)
return float(result.stdout)
movement_expr_dir = 'movement/eye_blink'
csv_ext = '_eyeblinks.csv'
def eye_aspect_ratio(eye):
"""
@@ -46,7 +64,8 @@ def eye_aspect_ratio(eye):
ear = (dist_cor1 + dist_cor2) / (2.0 * dist_cor3)
return ear
def blink_detection(video_path,facial_landmarks,raw_config):
def blink_detection(video_path, facial_landmarks, raw_config):
"""
Blink detection for each frame
Args:
@@ -56,17 +75,17 @@ def blink_detection(video_path,facial_landmarks,raw_config):
Return:
Dataframe with blink informatiom like blink frame, duration etc.
"""
TOT_FRAME = 1
tot_frame = 1
blink_frame = []
ear_frame = []
clip = VideoFileClip(video_path, has_mask=True)
vid_length = clip.duration
# clip = VideoFileClip(video_path, has_mask=True)
vid_length = get_length(video_path)
identifier = dlib.get_frontal_face_detector() #dlib's face detector (HOG-based)
identifier = dlib.get_frontal_face_detector() # dlib's face detector (HOG-based)
forecaster = dlib.shape_predictor(facial_landmarks) # the facial landmark predictor
#left and right eye landmarks
# left and right eye landmarks
(left_beg, left_end) = face_utils.FACIAL_LANDMARKS_IDXS["left_eye"]
(right_beg, right_end) = face_utils.FACIAL_LANDMARKS_IDXS["right_eye"]
@@ -75,49 +94,54 @@ def blink_detection(video_path,facial_landmarks,raw_config):
while True:
try:
#check if stream/frame available in video
# check if stream/frame available in video
if f_stream and not vid_stream.more():
break
#reading & converting frame into grayscale
# reading & converting frame into grayscale
vid_frame = vid_stream.read()
vid_frame = imutils.resize(vid_frame, width=450)
gray = cv2.cvtColor(vid_frame, cv2.COLOR_BGR2GRAY)
#detecting face
# detecting face
rects = identifier(gray, 0)
for rect in rects:
lmk = forecaster(gray, rect)
lmk = face_utils.shape_to_np(lmk)
l_eye = lmk[left_beg:left_end] #Extracting left eye ratio
r_eye = lmk[right_beg:right_end] #Extracting right eye ratio
l_eye = lmk[left_beg:left_end] # Extracting left eye ratio
r_eye = lmk[right_beg:right_end] # Extracting right eye ratio
l_ear = eye_aspect_ratio(l_eye) # eye aspect ratio for left eye
r_ear = eye_aspect_ratio(r_eye) # eye aspect ratio for right eye
ear = (l_ear + r_ear) / 2.0 # average the eye aspect ratio
blink_frame.append(TOT_FRAME)
blink_frame.append(tot_frame)
ear_frame.append(ear)
TOT_FRAME += 1
tot_frame += 1
except Exception as e:
#logger.error("blink detection processing failed for: {}".format(video_path))
e
logger.info(
"blink detection processing finished in frame: {}".format(tot_frame - 1)
)
continue
blink_df = pd.DataFrame(ear_frame, columns =[raw_config.mov_blink_ear])
vid_stream.stop()
blink_df = pd.DataFrame(ear_frame, columns=[raw_config.mov_blink_ear])
blink_df[raw_config.vid_dur] = vid_length
blink_df[raw_config.fps] = int(TOT_FRAME/vid_length)
blink_df[raw_config.fps] = int(tot_frame / vid_length)
blink_df[raw_config.mov_blinkframes] = blink_frame
peaks, _ = find_peaks(blink_df[raw_config.mov_blink_ear]*-1, prominence=0.1)#prominence = 0.1 based on tuning
final_blink_df = blink_df.iloc[peaks,:].reset_index(drop=True)
peaks, _ = find_peaks(
blink_df[raw_config.mov_blink_ear] * -1, prominence=0.1
) # prominence = 0.1 based on tuning
final_blink_df = blink_df.iloc[peaks, :].reset_index(drop=True)
u_blink_df = blink_dur(final_blink_df,raw_config)
u_blink_df['dbm_master_url'] = video_path
u_blink_df = blink_dur(final_blink_df, raw_config)
u_blink_df["dbm_master_url"] = video_path
return u_blink_df
def blink_dur(blink_df,raw_config):
def blink_dur(blink_df, raw_config):
"""
Computing blink duration between each blink
Args:
@@ -126,35 +150,46 @@ def blink_dur(blink_df,raw_config):
Returns:
Updated dataframe with blink duration
"""
dur_list = []
if len(blink_df)>0:
blink_df[raw_config.mov_blinkdur] = blink_df[raw_config.mov_blinkframes].diff().fillna(
blink_df[raw_config.mov_blinkframes])
if len(blink_df) > 0:
blink_df[raw_config.mov_blinkdur] = (
blink_df[raw_config.mov_blinkframes]
.diff()
.fillna(blink_df[raw_config.mov_blinkframes])
)
else:
blink_df[raw_config.mov_blinkdur] = np.nan
blink_df[raw_config.mov_blinkdur] = blink_df[raw_config.mov_blinkdur]/blink_df[raw_config.fps]
blink_df[raw_config.mov_blinkdur] = (
blink_df[raw_config.mov_blinkdur] / blink_df[raw_config.fps]
)
return blink_df
def run_eye_blink(video_uri, out_dir, r_config, facial_landmarks):
def run_eye_blink(video_uri, out_dir, r_config, facial_landmarks, save=True):
"""
Processing all patient's for getting eye blink artifacts
---------------
---------------
Args:
video_uri: video path; input_dir : input directory for video's
out_dir: (str) Output directory for processed output; r_config: raw variable config object;
out_dir: (str) Output directory for processed output;
r_config: raw variable config object;
facial_landmarks: landmark model path
save: whether to save in csv or not
"""
try:
input_loc, out_loc, fl_name = ut.filter_path(video_uri, out_dir)
vid_file_path = os.path.exists(video_uri)
if vid_file_path==True:
logger.info('Processing Output file {} '.format(os.path.join(out_loc, fl_name)))
vid_file_path = os.path.exists(video_uri)
if vid_file_path:
logger.info(
"Processing Output file {} ".format(os.path.join(out_loc, fl_name))
)
df_blink = blink_detection(video_uri, facial_landmarks, r_config)
if save:
ut.save_output(df_blink, out_loc, fl_name, movement_expr_dir, csv_ext)
except Exception as e:
logger.error('Failed to process video file')
return df_blink
except Exception as e:
logger.error(f"Failed to process video file: {e}")

View File

@@ -4,28 +4,32 @@ project_name: DBM
created: 2020-30-11
"""
import os
import glob
import pandas as pd
import numpy as np
from scipy.spatial import distance
from os.path import join
import logging
import os
from os.path import join
import numpy as np
import pandas as pd
from scipy.spatial import distance
from opendbm.dbm_lib.dbm_features.raw_features.util import util as ut
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger()
logger = logging.getLogger()
eye_pose_dir = "movement/gaze"
eye_pose_ext = "_eyegaze.csv"
eye_pose_dir = 'movement/gaze'
eye_pose_ext = '_eyegaze.csv'
def eye_motion_df(l_disp, r_disp, error_list, r_config):
"""
Generating eye movement dataframe
Args:
l_disp: displacement list(left eye); l_disp: displacement list(right eye)
error_list:
l_disp: displacement list(left eye);
r_disp: displacement list(right eye)
r_config: raw variable config file object
Reutrns:
@@ -38,31 +42,43 @@ def eye_motion_df(l_disp, r_disp, error_list, r_config):
df_eye_motion[r_config.err_reason] = error_list
return df_eye_motion
def filter_motion(df_of, df_disp, col_l, col_r, r_config):
"""
Filtering final eye movement dataframe
Args:
df_of: Openface raw out dataframe; col_r: right eye column
col_l: left eye column; r_config: raw variable config file object
df_of: Openface raw out dataframe;
df_disp: displacement dataframe
col_r: right eye column
col_l: left eye column;
r_config: raw variable config file object
"""
df_of = df_of[col_l + col_r + [' confidence']]
df_of.loc[(df_of[' confidence'].astype(float) < 0.8), col_l + col_r] = np.nan
df_of = df_of[col_l + col_r + [" confidence"]].copy()
df_of.loc[(df_of[" confidence"].astype(float) < 0.8), col_l + col_r] = np.nan
df_filter = df_of[col_l + col_r]
df_filter.columns = [r_config.mov_leye_x, r_config.mov_leye_y, r_config.mov_leye_z,
r_config.mov_reye_x, r_config.mov_reye_y, r_config.mov_reye_z]
df_filter.columns = [
r_config.mov_leye_x,
r_config.mov_leye_y,
r_config.mov_leye_z,
r_config.mov_reye_x,
r_config.mov_reye_y,
r_config.mov_reye_z,
]
df_motion = pd.concat([df_filter, df_disp], axis=1, sort=False)
return df_motion
def eye_disp(of_results, col, r_config):
"""
Computing head velocity frame by frame
Args:
of_results: Openface raw out dataframe
col: col of eye_disp
r_config: Face config file object
Reutrns:
@@ -71,78 +87,96 @@ def eye_disp(of_results, col, r_config):
distance_list = []
error_list = []
of_results = of_results[col+ [' confidence']]
of_results = of_results[col + [" confidence"]]
for index, row in of_results.iterrows():
dst = np.nan
if index == 0 or float(row[' confidence']) < 0.8: #Threshold < 0.8
if index == 0 or float(row[" confidence"]) < 0.8: # Threshold < 0.8
distance_list.append(dst)
if float(row[' confidence']) < 0.8:
error_list.append('confidence less than 80%')
if float(row[" confidence"]) < 0.8:
error_list.append("confidence less than 80%")
else:
error_list.append('Pass')
error_list.append("Pass")
continue
if index > 0:
point_x = (of_results[col[0]][index-1], of_results[col[1]][index-1], of_results[col[2]][index-1])
point_y = (row[col[0]],row[col[1]],row[col[2]])
point_x = (
of_results[col[0]][index - 1],
of_results[col[1]][index - 1],
of_results[col[2]][index - 1],
)
point_y = (row[col[0]], row[col[1]], row[col[2]])
try:
dst = distance.euclidean(point_x, point_y)
except:
except Exception as e:
logger.info("Exception on eye_disp method", e)
pass
distance_list.append(abs(dst))
error_list.append('Pass')
error_list.append("Pass")
return distance_list, error_list
def calc_eye_mov(video_uri, df_of, out_loc, fl_name, r_config):
def calc_eye_mov(video_uri, df_of, out_loc, fl_name, r_config, save=True):
"""
Computing eye motion variables
Args:
video_uri: self explanatory
df_of: Openface dataframe
out_loc: Output path for saving output csv's
fl_name: file name for output csv
r_config: raw variable config file object
save: whether to save result to csv or not
"""
col_l = [ ' gaze_0_x', ' gaze_0_y', ' gaze_0_z']
col_r = [ ' gaze_1_x', ' gaze_1_y', ' gaze_1_z']
col_l = [" gaze_0_x", " gaze_0_y", " gaze_0_z"]
col_r = [" gaze_1_x", " gaze_1_y", " gaze_1_z"]
gazel_disp, err_l = eye_disp(df_of, col_l, r_config)
gazer_disp, err_r = eye_disp(df_of, col_r, r_config)
df_disp = eye_motion_df(gazel_disp, gazer_disp, err_l, r_config)
df_disp['dbm_master_url'] = video_uri
df_disp["dbm_master_url"] = video_uri
df_motion = filter_motion(df_of, df_disp, col_l, col_r, r_config)
if save:
ut.save_output(df_motion, out_loc, fl_name, eye_pose_dir, eye_pose_ext)
return df_motion
def run_eye_gaze(video_uri, out_dir, r_config):
def run_eye_gaze(video_uri, out_dir, r_config, save=True):
"""
Processing all patient's for getting eye movement artifacts
--------------------------------
--------------------------------
Args:
video_uri: video path; input_dir : input directory for video's
out_dir: (str) Output directory for processed output; r_config: raw variable config object
out_dir: (str) Output directory for processed output;
r_config: raw variable config object
save: whether to save result to csv or not
"""
try:
#filtering path to generate input & output path
# filtering path to generate input & output path
input_loc, out_loc, fl_name = ut.filter_path(video_uri, out_dir)
of_csv_path = glob.glob(join(out_loc, fl_name + '_openface/*.csv'))
if len(of_csv_path)>0:
of_csv_path = glob.glob(join(out_loc, fl_name + "_openface/*.csv"))
if len(of_csv_path) > 0:
of_csv = of_csv_path[0]
df_of = pd.read_csv(of_csv, error_bad_lines=False)
df_of = pd.read_csv(of_csv)
logger.info(
"Processing Output file {} ".format(os.path.join(out_loc, fl_name))
)
df_motion = calc_eye_mov(
video_uri, df_of, out_loc, fl_name, r_config, save=save
)
return df_motion
logger.info('Processing Output file {} '.format(os.path.join(out_loc, fl_name)))
calc_eye_mov(video_uri, df_of, out_loc, fl_name, r_config)
except Exception as e:
logger.error('Failed to process video file')
logger.error("Failed to process video file", e)

View File

@@ -1,33 +1,46 @@
import sys, os, glob, cv2, re
import pickle, json
import pandas as pd
import numpy as np
import numpy.ma as ma
import glob
import json
import logging
import os
import pickle
import re
import sys
from os.path import join
from opendbm.dbm_lib.dbm_features.raw_features.util import util as ut
from opendbm.dbm_lib.dbm_features.raw_features.util.math_util import *
import cv2
import numpy as np
import numpy.ma as ma
import pandas as pd
from opendbm.dbm_lib.dbm_features.raw_features.util import util as ut
from ..util.math_util import calc_displacement_vec
DBMLIB_PATH = os.path.dirname(__file__)
DBMLIB_FTREMOR_CONFIG = os.path.abspath(
os.path.join(DBMLIB_PATH, "../../../../resources/features/facial/config.json")
)
from opendbm.dbm_lib.dbm_features.raw_features.movement import DBMLIB_FTREMOR_CONFIG
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger()
logger = logging.getLogger()
ft_dir = "movement/facial_tremor"
csv_ext = "_fac_tremor.csv"
model_ext = "_fac_model.csv"
fac_features_ext = "_fac_features.csv"
ft_dir = 'movement/facial_tremor'
csv_ext = '_fac_tremor.csv'
model_ext = '_fac_model.csv'
fac_features_ext = '_fac_features.csv'
def compute_features(out_dir, df_of, r_config):
""" Computes features
"""Computes features
Returns: features in vector format
"""
config = json.loads(open(DBMLIB_FTREMOR_CONFIG,'r').read())
config = json.loads(open(DBMLIB_FTREMOR_CONFIG, "r").read())
pattern_x = re.compile("l\d+_x")
pattern_y = re.compile("l\d+_y")
pattern_x = re.compile(r"l\d+_x")
pattern_y = re.compile(r"l\d+_y")
# assumption: distance of face to camera remains at roughly static
@@ -37,12 +50,12 @@ def compute_features(out_dir, df_of, r_config):
if pattern_x.match(col) or pattern_y.match(col):
landmark_columns.append(col)
df_of= df_of[(df_of[landmark_columns]!= 0).any(axis=1)]
df_of = df_of[(df_of[landmark_columns] != 0).any(axis=1)]
df_of.reset_index(inplace=True)
num_frames = len(df_of)
logger.info("Number of frames to be processed: {}".format(str(num_frames)))
landmarks = config['landmarks']
landmarks = config["landmarks"]
try:
if num_frames == 0:
@@ -50,18 +63,23 @@ def compute_features(out_dir, df_of, r_config):
logger.error(error_reason)
return empty_frame(landmarks, r_config, error_reason)
# if num_frames < 60:
# error_reason = 'Number of frames with visible face < 60. Video too short'
# logger.error(error_reason)
# return empty_frame(landmarks, f_cfg, error_reason)
# if num_frames < 60:
# error_reason = 'Number of frames with visible face < 60. Video too short'
# logger.error(error_reason)
# return empty_frame(landmarks, f_cfg, error_reason)
first_row = df_of.iloc[0]
facew = abs(first_row[config['face_width_left']] - first_row[config['face_width_right']])
faceh = abs(first_row[config['face_height_left']] - first_row[config['face_height_right']])
facew = abs(
first_row[config["face_width_left"]] - first_row[config["face_width_right"]]
)
faceh = abs(
first_row[config["face_height_left"]]
- first_row[config["face_height_right"]]
)
if facew == 0 or faceh == 0:
error_reason = 'face width or height = 0. Check landmark values'
error_reason = "face width or height = 0. Check landmark values"
logger.error(error_reason)
return empty_frame(landmarks, r_config)
@@ -70,95 +88,107 @@ def compute_features(out_dir, df_of, r_config):
# if verbose:
# logger.info("Displacement output: {}".format(str(fac_disp)))
fac_disp_median = np.median(fac_disp, axis = 1)
fac_disp_mean = np.mean(fac_disp, axis = 1)
fac_disp_median = np.median(fac_disp, axis=1)
fac_disp_mean = np.mean(fac_disp, axis=1)
if len(fac_disp.shape)!=2:
error_reason = 'fac_disp is not 2D. smth went wrong with disp calc'
if len(fac_disp.shape) != 2:
error_reason = "fac_disp is not 2D. smth went wrong with disp calc"
logger.error(error_reason)
return empty_frame(landmarks, r_config, error_reason)
if len(fac_disp[0])<=1:
error_reason = 'Video too short. smth went wrong with disp calc'
if len(fac_disp[0]) <= 1:
error_reason = "Video too short. smth went wrong with disp calc"
logger.error(error_reason)
return empty_frame(landmarks, r_config, error_reason)
fac_corr_mat = np.corrcoef(fac_disp, rowvar = True)
fac_corr_mat = np.corrcoef(fac_disp, rowvar=True)
# extract relevant row from cov matrix
ref_lmk_index = [i for i, lmk in enumerate(landmarks) if config['ref_lmk']==lmk]
ref_lmk_index = [
i for i, lmk in enumerate(landmarks) if config["ref_lmk"] == lmk
]
fac_corr = fac_corr_mat[ref_lmk_index][0]
fac_area = config['ref_area'] / (facew * faceh)
fac_area = config["ref_area"] / (facew * faceh)
# if verbose:
# logger.info("Face area: {}".format(fac_area))
# logger.info("Face Displacement Median: {}".format(str(fac_disp_median)))
# logger.info("Face Displacement Mean: {}".format(str(fac_disp_mean)))
fac_features1 = np.multiply(fac_area * fac_disp_median, (1. - fac_corr))
fac_features2 = np.multiply(fac_area * fac_disp_mean, (1. - fac_corr))
fac_features1 = np.multiply(fac_area * fac_disp_median, (1.0 - fac_corr))
fac_features2 = np.multiply(fac_area * fac_disp_mean, (1.0 - fac_corr))
# base_fac_features = np.dot(fac_area * fac_disp_median, (1. - fac_corr))
# base_fac_features = np.dot(fac_area * fac_disp_median, (1. - fac_corr))
fac_features_dict = {}
for i, landmark in enumerate(landmarks):
fac_features_dict['fac_features_mean_{}'.format(landmark)] = [fac_features2[i]]
raw_variable_map = 'fac_tremor_median_{}'.format(landmark)
fac_features_dict[r_config.base_raw['raw_feature'][raw_variable_map]] = [fac_features1[i]]
fac_features_dict["fac_features_mean_{}".format(landmark)] = [
fac_features2[i]
]
raw_variable_map = "fac_tremor_median_{}".format(landmark)
fac_features_dict[r_config.base_raw["raw_feature"][raw_variable_map]] = [
fac_features1[i]
]
fac_features_dict['fac_disp_median_{}'.format(landmark)] = [fac_disp_median[i]]
fac_features_dict['fac_corr_{}'.format(landmark)] = [fac_corr[i]]
fac_features_dict["fac_disp_median_{}".format(landmark)] = [
fac_disp_median[i]
]
fac_features_dict["fac_corr_{}".format(landmark)] = [fac_corr[i]]
fac_features_dict[r_config.err_reason] = ['']
fac_features_dict[r_config.err_reason] = [""]
data = pd.DataFrame.from_dict(fac_features_dict)
logger.info('Concluded computing tremor features')
logger.info("Concluded computing tremor features")
return data
except Exception as e:
logger.error('Error computing tremor features: {}'.format(str(e)))
logger.error("Error computing tremor features: {}".format(str(e)))
return empty_frame(landmarks, r_config, str(e))
def empty_frame(landmarks, r_config, error_reason):
fac_features_dict = {}
for i, landmark in enumerate(landmarks):
raw_variable_map = 'fac_tremor_median_{}'.format(landmark)
fac_features_dict[r_config.base_raw['raw_feature'][raw_variable_map]] = [np.nan]
raw_variable_map = "fac_tremor_median_{}".format(landmark)
fac_features_dict[r_config.base_raw["raw_feature"][raw_variable_map]] = [np.nan]
fac_features_dict['fac_features_mean_{}'.format(landmark)] = [np.nan]
fac_features_dict['fac_disp_median_{}'.format(landmark)] = [np.nan]
fac_features_dict['fac_corr_{}'.format(landmark)] = [np.nan]
fac_features_dict["fac_features_mean_{}".format(landmark)] = [np.nan]
fac_features_dict["fac_disp_median_{}".format(landmark)] = [np.nan]
fac_features_dict["fac_corr_{}".format(landmark)] = [np.nan]
fac_features_dict[r_config.err_reason] = [error_reason]
empty_frame = pd.DataFrame.from_dict(fac_features_dict)
return empty_frame
def fac_tremor_process(video_uri, out_dir, r_config, model_output=False):
def fac_tremor_process(video_uri, out_dir, r_config, model_output=False, save=True):
"""
processing input videos
"""
# try:
try:
input_loc, out_loc, fl_name = ut.filter_path(video_uri, out_dir)
of_csv_path = glob.glob(join(out_loc, fl_name + '_openface_lmk/*.csv'))
if len(of_csv_path)>0:
of_csv_path = glob.glob(join(out_loc, fl_name + "_openface_lmk/*_output.csv"))
if len(of_csv_path) > 0:
of_csv = of_csv_path[0]
df_of = pd.read_csv(of_csv, error_bad_lines=False)
df_of = pd.read_csv(of_csv)
logger.info('Processing Output file {} '.format(os.path.join(out_loc, fl_name)))
logger.info(
"Processing Output file for facial_tremor {} ".format(
os.path.join(out_loc, fl_name)
)
)
feats = compute_features(of_csv_path , df_of, r_config)
# if model_output:
# result = score(feats, r_config)
# feats = pd.concat([feats, result], axis=1)
feats = compute_features(of_csv_path, df_of, r_config)
# if model_output:
# result = score(feats, r_config)
# feats = pd.concat([feats, result], axis=1)
if save:
ut.save_output(feats, out_loc, fl_name, ft_dir, csv_ext)
return feats
# except Exception as e:
logger.error('Failed to process video file')
except Exception as e:
logger.error("Failed to process video file for facial_tremor", str(e))

View File

@@ -4,23 +4,25 @@ project_name: DBM
created: 2020-20-07
"""
import os
import glob
import pandas as pd
import numpy as np
from scipy.spatial import distance
from os.path import join
import logging
import os
from os.path import join
import numpy as np
import pandas as pd
from scipy.spatial import distance
from opendbm.dbm_lib.dbm_features.raw_features.util import util as ut
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger()
logger = logging.getLogger()
h_mov_dir = "movement/head_movement"
h_pose_dir = "movement/head_pose"
h_mov_ext = "_headmov.csv"
h_pose_ext = "_headpose.csv"
h_mov_dir = 'movement/head_movement'
h_pose_dir = 'movement/head_pose'
h_mov_ext = '_headmov.csv'
h_pose_ext = '_headpose.csv'
def head_pose_dist(of_results):
"""
@@ -28,9 +30,8 @@ def head_pose_dist(of_results):
Args:
of_results: Openface raw out dataframe
f_nm_config: Face config file object
Reutrns:
Returns:
Final head pose distance frame by frame output
"""
distance_list = []
@@ -38,68 +39,86 @@ def head_pose_dist(of_results):
for index, row in of_results.iterrows():
dst = np.nan
if index == 0 or float(row[' confidence']) < 0.2: #Threshold < 0.2
if index == 0 or float(row[" confidence"]) < 0.2: # Threshold < 0.2
distance_list.append(dst)
if float(row[' confidence']) < 0.2:
error_list.append('confidence less than 20%')
if float(row[" confidence"]) < 0.2:
error_list.append("confidence less than 20%")
else:
error_list.append('Pass')
error_list.append("Pass")
continue
if index > 0:
point_x = (of_results[' pose_Rx'][index-1], of_results[' pose_Ry'][index-1], of_results[' pose_Rz'][index-1])
point_y = (row[' pose_Rx'],row[' pose_Ry'],row[' pose_Rz'])
point_x = (
of_results[" pose_Rx"][index - 1],
of_results[" pose_Ry"][index - 1],
of_results[" pose_Rz"][index - 1],
)
point_y = (row[" pose_Rx"], row[" pose_Ry"], row[" pose_Rz"])
try:
dst = distance.euclidean(point_x, point_y)
except:
except Exception as e:
logger.info("Exception met on head_pose_dist method", e)
pass
distance_list.append(abs(dst))
error_list.append('Pass')
error_list.append("Pass")
return distance_list, error_list
def head_pose(of_results,r_config):
def head_pose(of_results, r_config):
"""
Generating head pose estimation dataframe
Args:
distance_val: distance list
f_nm_config: raw variable config file object
of_results: openface results as dataframe
r_config: raw variable config file object
Reutrns:
Returns:
Final head pose estimation dataframe
"""
pose_dist_list, error_list = head_pose_dist(of_results)
of_results.loc[(of_results[' confidence'].astype(float) < 0.2), [' pose_Rx',' pose_Ry',' pose_Rz']] = np.nan
pose_of = of_results[[' pose_Rx',' pose_Ry',' pose_Rz']]
pose_of.columns = [r_config.mov_Hpose_Pitch, r_config.mov_Hpose_Yaw, r_config.mov_Hpose_Roll]
of_results = of_results.copy()
of_results.loc[
(of_results[" confidence"].astype(float) < 0.2),
[" pose_Rx", " pose_Ry", " pose_Rz"],
] = np.nan
pose_of = of_results[[" pose_Rx", " pose_Ry", " pose_Rz"]]
pose_of.columns = [
r_config.mov_Hpose_Pitch,
r_config.mov_Hpose_Yaw,
r_config.mov_Hpose_Roll,
]
pose_of = pose_of.copy()
pose_of[r_config.mov_Hpose_Dist] = pose_dist_list
pose_of[r_config.err_reason] = error_list
return pose_of
def head_motion_df(distance_val, error_list, r_config):
"""
Generating head movement dataframe
Args:
distance_val: distance list
error_list: Error reason
r_config: raw variable config file object
Reutrns:
Returns:
Final head velocity dataframe
"""
head_motion = r_config.head_vel
df_head_motion = pd.DataFrame(distance_val, columns=[head_motion])
df_head_motion['Frames'] = df_head_motion.index
df_head_motion["Frames"] = df_head_motion.index
new_df_intensity = df_head_motion[['Frames', head_motion]]
new_df_intensity = df_head_motion[["Frames", head_motion]].copy()
new_df_intensity[r_config.err_reason] = error_list
return new_df_intensity
def head_vel(of_results, r_config):
"""
Computing head velocity frame by frame
@@ -108,7 +127,7 @@ def head_vel(of_results, r_config):
of_results: Openface raw out dataframe
r_config: Face config file object
Reutrns:
Returns:
Final head velocity frame by frame output
"""
distance_list = []
@@ -116,59 +135,80 @@ def head_vel(of_results, r_config):
for index, row in of_results.iterrows():
dst = np.nan
if index == 0 or float(row[' confidence']) < 0.2: #Threshold < 0.2
if index == 0 or float(row[" confidence"]) < 0.2: # Threshold < 0.2
distance_list.append(dst)
if float(row[' confidence']) < 0.2:
error_list.append('confidence less than 20%')
if float(row[" confidence"]) < 0.2:
error_list.append("confidence less than 20%")
else:
error_list.append('Pass')
error_list.append("Pass")
continue
if index > 0:
point_x = (of_results[' pose_Tx'][index-1], of_results[' pose_Ty'][index-1], of_results[' pose_Tz'][index-1])
point_y = (row[' pose_Tx'],row[' pose_Ty'],row[' pose_Tz'])
point_x = (
of_results[" pose_Tx"][index - 1],
of_results[" pose_Ty"][index - 1],
of_results[" pose_Tz"][index - 1],
)
point_y = (row[" pose_Tx"], row[" pose_Ty"], row[" pose_Tz"])
try:
dst = distance.euclidean(point_x, point_y)
except:
except Exception as e:
logger.info("Exception met on head_vel method", e)
pass
if abs(dst)>200:
if abs(dst) > 200:
dst = np.nan
error_list.append('Out of range')
error_list.append("Out of range")
else:
error_list.append('Pass')
error_list.append("Pass")
distance_list.append(dst)
df_velocity = head_motion_df(distance_list, error_list, r_config)
return df_velocity
def calc_head_mov(video_uri, df_of, out_loc, fl_name, r_config):
def calc_head_mov(video_uri, df_of, out_loc, fl_name, r_config, save=True):
"""
Computing head motion and head pose variables
Args:
video_uri: video path
df_of: Openface dataframe
out_loc: Output path for saving output csv's
fl_name: file name for output csv
r_config: raw variable config file object
save: whether to save result to csv or not
"""
col = [' confidence',' pose_Rx',' pose_Ry',' pose_Rz',' pose_Tx', ' pose_Ty', ' pose_Tz']
col = [
" confidence",
" pose_Rx",
" pose_Ry",
" pose_Rz",
" pose_Tx",
" pose_Ty",
" pose_Tz",
]
df_of = df_of[col]
df_hmotion = head_vel(df_of, r_config)
df_hmotion['dbm_master_url'] = video_uri
df_hmotion["dbm_master_url"] = video_uri
df_pose = head_pose(df_of, r_config)
df_pose['dbm_master_url'] = video_uri
df_pose["dbm_master_url"] = video_uri
if save:
ut.save_output(df_hmotion, out_loc, fl_name, h_mov_dir, h_mov_ext)
ut.save_output(df_pose, out_loc, fl_name, h_pose_dir, h_pose_ext)
df_mot = pd.concat([df_hmotion[["Frames", "mov_headvel"]], df_pose], axis=1)
return df_mot
def run_head_movement(video_uri, out_dir, r_config):
"""
Processing all patient's for getting movement artifacts for cdx_analysis workflow
@@ -176,21 +216,25 @@ def run_head_movement(video_uri, out_dir, r_config):
--------------------------------
Args:
video_uri: video path; input_dir : input directory for video's
out_dir: (str) Output directory for processed output; r_config: raw variable config object
out_dir: (str) Output directory for processed output;
r_config: raw variable config object
"""
try:
#filtering path to generate input & output path
# filtering path to generate input & output path
input_loc, out_loc, fl_name = ut.filter_path(video_uri, out_dir)
of_csv_path = glob.glob(join(out_loc, fl_name + '_openface/*.csv'))
if len(of_csv_path)>0:
of_csv_path = glob.glob(join(out_loc, fl_name + "_openface/*.csv"))
if len(of_csv_path) > 0:
of_csv = of_csv_path[0]
df_of = pd.read_csv(of_csv, error_bad_lines=False)
df_of = pd.read_csv(of_csv)
logger.info(
"Processing Output file {} ".format(os.path.join(out_loc, fl_name))
)
df_mot = calc_head_mov(video_uri, df_of, out_loc, fl_name, r_config)
return df_mot
logger.info('Processing Output file {} '.format(os.path.join(out_loc, fl_name)))
calc_head_mov(video_uri, df_of, out_loc, fl_name, r_config)
except Exception as e:
logger.error('Failed to process video file')
logger.error("Failed to process video file", e)

View File

@@ -1,26 +1,31 @@
import pandas as pd
import os
import glob
from os.path import join
import parselmouth
from parselmouth.praat import call, run_file
import numpy as np
import librosa
import json
import re
import logging
import os
import re
from os.path import join
import numpy as np
import pandas as pd
import parselmouth
from parselmouth.praat import run_file
from opendbm.dbm_lib.dbm_features.raw_features.util import util as ut
from opendbm.dbm_lib.dbm_features.raw_features.movement import DBMLIB_VTREMOR_LIB
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger()
logger = logging.getLogger()
vt_dir = 'movement/voice_tremor'
csv_ext = '_vtremor.csv'
vt_dir = "movement/voice_tremor"
csv_ext = "_vtremor.csv"
#Executing praat script using parselmouth function
def tremor_praat(snd_file,r_cfg):
DBMLIB_PATH = os.path.dirname(__file__)
DBMLIB_VTREMOR_LIB = os.path.abspath(
os.path.join(DBMLIB_PATH, "../../../../resources/libraries/voice_tremor.praat")
)
# Executing praat script using parselmouth function
def tremor_praat(snd_file, r_cfg):
"""
Generating Voice tremor endpoint dataframe
Args:
@@ -29,42 +34,71 @@ def tremor_praat(snd_file,r_cfg):
Returns tremor endpoint dataframe
"""
snd = parselmouth.Sound(snd_file)
tremor_var = run_file(snd,DBMLIB_VTREMOR_LIB,capture_output=True)
new_tremor_var = re.sub('--undefined--', '0', tremor_var[1])
tremor_var = run_file(snd, DBMLIB_VTREMOR_LIB, capture_output=True)
new_tremor_var = re.sub("--undefined--", "0", tremor_var[1])
res = json.loads(new_tremor_var)
tremor_df = pd.DataFrame(res,index=['0',])
tremor_df.columns = [r_cfg.mov_freq_trem_freq,r_cfg.mov_amp_trem_freq,r_cfg.mov_freq_trem_index,
r_cfg.mov_amp_trem_index,r_cfg.mov_freq_trem_pindex,r_cfg.mov_amp_trem_pindex]
tremor_df = pd.DataFrame(
res,
index=[
"0",
],
)
tremor_df.columns = [
r_cfg.mov_freq_trem_freq,
r_cfg.mov_amp_trem_freq,
r_cfg.mov_freq_trem_index,
r_cfg.mov_amp_trem_index,
r_cfg.mov_freq_trem_pindex,
r_cfg.mov_amp_trem_pindex,
]
return tremor_df
def prepare_vtrem_output(audio_file, out_loc, r_config, fl_name):
def prepare_vtrem_output(audio_file, out_loc, r_config, fl_name, save=True):
"""
Preparing voice tremor matrix
Args:
audio_file: (.wav) parsed audio file ; r_config: raw config object
out_loc: (str) Output directory for csv ; fl_name: file name
r_config: Raw variable configuration file
fl_name: base filepath
save: whether to write results to csv or not
"""
df_tremor = tremor_praat(audio_file, r_config)
df_tremor[r_config.err_reason] = 'Pass'# will replace with threshold in future release
df_tremor[
r_config.err_reason
] = "Pass" # will replace with threshold in future release
logger.info('Processing Output file {} '.format(os.path.join(out_loc, fl_name)))
if save:
logger.info("Processing Output file {} ".format(os.path.join(out_loc, fl_name)))
ut.save_output(df_tremor, out_loc, fl_name, vt_dir, csv_ext)
return df_tremor
def prepare_empty_vt(out_loc, fl_name, r_config, error_txt):
def prepare_empty_vt(out_loc, fl_name, r_config, error_txt, save=True):
"""
Preparing empty voice tremor matrix
"""
cols = [r_config.mov_freq_trem_freq, r_config.mov_amp_trem_freq, r_config.mov_freq_trem_index,
r_config.mov_amp_trem_index, r_config.mov_freq_trem_pindex, r_config.mov_amp_trem_pindex, r_config.err_reason]
cols = [
r_config.mov_freq_trem_freq,
r_config.mov_amp_trem_freq,
r_config.mov_freq_trem_index,
r_config.mov_amp_trem_index,
r_config.mov_freq_trem_pindex,
r_config.mov_amp_trem_pindex,
r_config.err_reason,
]
out_val = [[np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, error_txt]]
df_tremor = pd.DataFrame(out_val, columns = cols)
df_tremor = pd.DataFrame(out_val, columns=cols)
logger.info('Saving Output file {} '.format(os.path.join(out_loc, fl_name)))
if save:
logger.info("Saving Output file {} ".format(os.path.join(out_loc, fl_name)))
ut.save_output(df_tremor, out_loc, fl_name, vt_dir, csv_ext)
return df_tremor
def run_vtremor(video_uri, out_dir, r_config):
def run_vtremor(video_uri, out_dir, r_config, save=True):
"""
Processing all patient's for fetching Formant freq
---------------
@@ -72,23 +106,31 @@ def run_vtremor(video_uri, out_dir, r_config):
Args:
video_uri: video path; r_config: raw variable config object
out_dir: (str) Output directory for processed output
r_config: Raw variable configuration file
save: whether to write results to csv or not
"""
try:
input_loc, out_loc, fl_name = ut.filter_path(video_uri, out_dir)
aud_filter = glob.glob(join(input_loc, fl_name + '.wav'))
if len(aud_filter)>0:
aud_filter = glob.glob(join(input_loc, fl_name + ".wav"))
if len(aud_filter) > 0:
audio_file = aud_filter[0]
aud_dur = librosa.get_duration(filename=audio_file)
aud_dur = ut.get_length(audio_file)
if float(aud_dur) < 0.5:
logger.info('Output file {} size is less than 0.5sec'.format(audio_file))
logger.info(
"Output file {} size is less than 0.5sec".format(audio_file)
)
error_txt = 'error: length less than 0.5 sec'
prepare_empty_vt(video_uri, out_loc, fl_name, error_txt)
return
prepare_vtrem_output(audio_file, out_loc, r_config, fl_name)
error_txt = "error: length less than 0.5 sec"
df_trem = prepare_empty_vt(video_uri, out_loc, fl_name, error_txt, save)
else:
df_trem = prepare_vtrem_output(
audio_file, out_loc, r_config, fl_name, save
)
return df_trem
except Exception as e:
logger.error('Failed to compute Voice Tremor {} for {}'.format(e,video_uri))
prepare_empty_vt(out_loc, fl_name, r_config, e)
logger.error("Failed to compute Voice Tremor {} for {}".format(e, video_uri))
prepare_empty_vt(out_loc, fl_name, r_config, e, save)