move pkg, resources, dbm_lib, to under 1 opendbm directory

This commit is contained in:
jordi.hasianta
2022-09-14 23:53:10 +07:00
parent a1816eb4b5
commit 5a585a7996
46 changed files with 48 additions and 53 deletions

View File

@@ -0,0 +1,57 @@
"""
file_name: facial_tremor
project_name: cdx_analysis
created: 2019-03-16
author: Deshana Desai
"""
import sys, os, glob, cv2
import pandas as pd
import numpy as np
def euclidean_distance(point1, point2):
"""
Compute euclidean distance between points
"""
return np.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2)
# def detect_peaks()
def expand_landmarks(landmarks):
"""
util method to expand landmark list:
eg: [1,2] -> [['l1_x', 'l1_y'], ['l2_x', 'l2_y']]
"""
return [['l{}_x'.format(l), 'l{}_y'.format(l)] for l in landmarks]
def calc_displacement_vec(df, landmarks, num_frames):
"""
Calculates displacement vector frame by frame
"""
landmarks = expand_landmarks(landmarks)
disp_vec = np.zeros((len(landmarks), num_frames))
prev_point = np.zeros((len(landmarks), 2))
# initialize
for j, pair in enumerate(landmarks):
first_row = df.iloc[0]
prev_point[j] = (first_row[pair[0]], first_row[pair[1]])
for i in range(num_frames):
frame_row = df.iloc[i]
for j, pair in enumerate(landmarks):
x, y = pair[0], pair[1]
current = (frame_row[x], frame_row[y])
deviation = euclidean_distance( current, prev_point[j])
disp_vec[j][i] = deviation
prev_point[j] = current
return disp_vec

View File

@@ -0,0 +1,212 @@
"""
file_name: nlp_util
project_name: DBM
created: 2020-10-11
"""
import subprocess
import json
import numpy as np
import pandas as pd
import os
import logging
import nltk
import re
from lexicalrichness import LexicalRichness
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger()
#Speech to text using Deepspeech 0.9.1
def deepspeech(AUDIO_FILE,deep_path):
"""
Extracting text from audio using Deep Speech neural network trained model
Returns:
Text: text which is extracted from audio
"""
api = 'deepspeech'
arg_speech0 = '--model'
arg_speech_path0 = os.path.join(deep_path, 'deepspeech-0.9.1-models.pbmm')
arg_speech1 = '--scorer'
arg_speech_path1 = os.path.join(deep_path, 'deepspeech-0.9.1-models.scorer')
arg_audio = "--audio"
out = subprocess.Popen([api, arg_speech0, arg_speech_path0, arg_speech1, arg_speech_path1, arg_audio, AUDIO_FILE],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
logger.info('Deepspeech output...... {}'.format(out))
try:
stdout,stderr = out.communicate()
except:
return "error", "error"
#print(stderr)
return stdout,stderr
def deep_speech_output_clean(result):
"""
Parsing deep speech output(text)
Return:
Text from speech
"""
text = ""
if len(result)>0:
res_split = str(result[0]).split('\\n')
if len(res_split)>0:
for i in range(len(res_split)):
if 'Inference took' in res_split[i]:
text = res_split[i + 1]
return text
return text
def process_deepspeech(audio_file,deep_path):
"""
Transcribing audio to extract text from speech
"""
deep_output = deepspeech(audio_file,deep_path)
deep_text= deep_speech_output_clean(deep_output)
return deep_text
def nltk_download():
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
logger.info('punkt is not available')
nltk.download('punkt')
try:
nltk.data.find('averaged_perceptron_tagger')
except LookupError:
logger.info('averaged_perceptron_tagger is not available')
nltk.download('averaged_perceptron_tagger')
def empty_speech(r_config, master_url, error_txt):
"""
Preparing empty speech matrix with error
Args:
r_config: raw config file object
error_txt: Error message during transcription
Returns:
Empty dataframe for speech features with error
"""
col = [r_config.nlp_numSentences, r_config.nlp_singPronPerAns, r_config.nlp_singPronPerSen, r_config.nlp_pastTensePerAns,
r_config.nlp_pastTensePerSen, r_config.nlp_pronounsPerAns, r_config.nlp_pronounsPerSen, r_config.nlp_verbsPerAns,
r_config.nlp_verbsPerSen, r_config.nlp_adjectivesPerAns, r_config.nlp_adjectivesPerSen, r_config.nlp_nounsPerAns,
r_config.nlp_nounsPerSen, r_config.nlp_sentiment_mean, r_config.nlp_mattr, r_config.nlp_wordsPerMin,
r_config.nlp_totalTime, r_config.err_reason]
df_speech = pd.DataFrame([[np.nan] * len(col) + [error_txt]], columns = col)
df_speech['dbm_master_url'] = master_url
return df_speech
def divide_var(speech_var1, spech_var2):
"""
divide variables
"""
speech_var = np.nan
if spech_var2!=0:
speech_var = speech_var1/spech_var2
return speech_var
def process_speech(transcribe_df,r_config):
"""
Preparing speech features
Args:
transcribe_df: Transcribed dataframe
r_config: raw config file object
Returns:
Dataframe for speech features
"""
transcribe_df = transcribe_df.replace(np.nan, '', regex=True)
err_transcribe = transcribe_df[r_config.err_reason].iloc[0]
transcribe = transcribe_df[r_config.nlp_transcribe].iloc[0]
total_time = transcribe_df[r_config.nlp_totalTime].iloc[0]
master_url = transcribe_df['dbm_master_url'].iloc[0]
#clean transcribe
transcribe = transcribe.replace(",", "")
transcribe = " ".join(re.findall(r"[\w']+|[.!?]", transcribe))
if err_transcribe != 'Pass':
df_speech = empty_speech(r_config, master_url, error_txt)
return df_speech
speech_dict = {}
nltk_download()
sentences = nltk.tokenize.sent_tokenize(transcribe)
words_all = nltk.tokenize.word_tokenize(transcribe)
num_sentences = len(sentences)
speech_dict[r_config.nlp_numSentences] = num_sentences
#nlp_singPron
i_s = transcribe.count('I')
me_s = transcribe.count('me')
my_s = transcribe.count('my')
sing_count = i_s + me_s + my_s
speech_dict[r_config.nlp_singPronPerAns] = sing_count if len(words_all)>0 else np.nan
speech_dict[r_config.nlp_singPronPerSen] = divide_var(speech_dict[r_config.nlp_singPronPerAns], num_sentences)
tagged = nltk.pos_tag(transcribe.split())
tagged_df = pd.DataFrame(tagged, columns=['word', 'pos_tag'])
#Past tense per answer
all_POSs = tagged_df['pos_tag'].tolist()
speech_dict[r_config.nlp_pastTensePerAns] = all_POSs.count('VBD') if len(words_all)>0 else np.nan
speech_dict[r_config.nlp_pastTensePerSen] = divide_var(speech_dict[r_config.nlp_pastTensePerAns], num_sentences)
#Pronoun per answer
pronounsPerAns = all_POSs.count('PRP') + all_POSs.count('PRP$')
speech_dict[r_config.nlp_pronounsPerAns] = pronounsPerAns if len(words_all)>0 else np.nan
speech_dict[r_config.nlp_pronounsPerSen] = divide_var(speech_dict[r_config.nlp_pronounsPerAns], num_sentences)
#Verb per answer
verbPerAns = all_POSs.count('VB') + all_POSs.count('VBD') + all_POSs.count('VBG') \
+ all_POSs.count('VBN') + all_POSs.count('VBP') + all_POSs.count('VBZ')
speech_dict[r_config.nlp_verbsPerAns] = verbPerAns if len(words_all) > 0 else np.nan
speech_dict[r_config.nlp_verbsPerSen] = divide_var(speech_dict[r_config.nlp_verbsPerAns], num_sentences)
#Adjective per answer
adjectivesAns = all_POSs.count('JJ') + all_POSs.count('JJR') + all_POSs.count('JJS')
speech_dict[r_config.nlp_adjectivesPerAns] = adjectivesAns if len(words_all) > 0 else np.nan
speech_dict[r_config.nlp_adjectivesPerSen] = divide_var(speech_dict[r_config.nlp_adjectivesPerAns], num_sentences)
#Noun per answer
nounsAns = all_POSs.count('NN') + all_POSs.count('NNP') + all_POSs.count('NNS')
speech_dict[r_config.nlp_nounsPerAns] = nounsAns if len(words_all) > 0 else np.nan
speech_dict[r_config.nlp_nounsPerSen] = divide_var(speech_dict[r_config.nlp_nounsPerAns], num_sentences)
#Sentiment analysis
vader = SentimentIntensityAnalyzer()
sentence_valences = []
for s in sentences:
sentiment_dict = vader.polarity_scores(s)
sentence_valences.append(sentiment_dict['compound'])
speech_dict[r_config.nlp_sentiment_mean] = np.mean(sentence_valences) if len(sentence_valences) > 0 else np.nan
non_punc = list(value for value in words_all if value not in ['.','!','?'])
non_punc_as_str = " ".join(str(non_punc))
lex = LexicalRichness(non_punc_as_str)
speech_dict[r_config.nlp_mattr] = lex.mattr(window_size=lex.words) if lex.words > 0 else np.nan
#Number of words per minute
speech_dict[r_config.nlp_wordsPerMin] = divide_var(len(non_punc), total_time)*60
speech_dict[r_config.nlp_totalTime] = total_time
speech_dict['dbm_master_url'] = master_url
df_speech = pd.DataFrame([speech_dict])
return df_speech

View File

@@ -0,0 +1,112 @@
"""
file_name: util
project_name: DBM
created: 2020-20-07
"""
import os
import glob
import numpy as np
import subprocess
def filter_path(video_url, out_dir):
"""
Filtering video uri path to prepare input and ouptut location
Args:
video_url: S3 bucket path for video
out_dir: Output directory path
"""
fl_name,_ = os.path.splitext(os.path.basename(video_url))
input_loc = os.path.dirname(video_url)
out_loc = os.path.join(out_dir, fl_name)
return input_loc, out_loc, fl_name
def save_output(df, out_loc, fl_name, f_dir, f_ext):
"""
creating output directory for Audio features
Args:
df: (dataframe) feature dataframe[ex: Formant freq, pitch]
out_loc: (dir) Output location where we want to save raw output
fl_name: file name
f_dir: directory name for a feature
f_ext: extension for a feature [ex: '_pose.csv']
"""
full_f_name = fl_name + f_ext
dir_path = os.path.join(out_loc, f_dir)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
sav_path = os.path.join(dir_path,full_f_name)
df.to_csv(sav_path, index=False)
def audio_process(base_dir,video_url):
"""
Parsing cleaned audio files(Audio files without IMA voice)
Args:
base_dir: Base path for raw data
video_url: Raw video file path
"""
new_video_url = base_dir+'/'.join(video_url[2:])
split_val = new_video_url.split('/')
wav_path = '/'.join(split_val[0:len(split_val)-1])
audio_split_check = glob.glob(wav_path + '/*_split.wav')
return audio_split_check
def compute_open_face_features(input_filepath,
output_directory,
open_face_executable,
au_static=False,
tracked_visualization=False,
clobber=False,
verbose=True):
"""
Runs OpenFace on an input video.
See https://github.com/TadasBaltrusaitis/OpenFace/wiki/Command-line-arguments
Args:
input_filepath:
output_directory:
au_static:
tracked_visualization:
open_face_executable:
clobber: (bool) if True existing files will be overwritten
verbose:
Returns:
(str) path to output csv file
Raises:
IOError if OpenFace executable is missing
"""
if not os.path.isfile(open_face_executable):
raise IOError("OpenFace executable {} could not be found.".format(open_face_executable))
bn, _ = os.path.splitext(os.path.basename(input_filepath))
if not output_directory:
output_directory = os.path.join(os.path.dirname(input_filepath), bn + '_openface')
output_csv = os.path.join(output_directory, bn + '.csv')
if not os.path.isfile(output_csv) or clobber:
call = [open_face_executable, ]
if au_static:
call += ['-au_static', ]
if tracked_visualization:
call += ['-tracked', ]
call += ['-q', '-2Dfp', '-3Dfp', '-pdmparams', '-pose', '-aus', '-gaze']
call += ['-f', input_filepath, '-out_dir', output_directory]
if verbose:
print('Computing OpenFace features {} from video file'.format(input_filepath))
subprocess.check_output(call)
if verbose:
print('OpenFace features saved to {}'.format(output_directory))
else:
if verbose:
print('Output file {} already exists'.format(output_csv))
return os.path.join(output_directory, bn + '.csv')

View File

@@ -0,0 +1,221 @@
"""
file_name: vad_utilities
project_name: DBM
created: 2020-20-07
"""
# code from https://github.com/wiseman/py-webrtcvad/blob/master/example.py
import collections
import contextlib
import sys
import wave
def read_wave(path):
"""Reads a .wav file.
Takes the path, and returns (PCM audio data, sample rate).
"""
with contextlib.closing(wave.open(path, 'rb')) as wf:
num_channels = wf.getnchannels()
assert num_channels == 1
sample_width = wf.getsampwidth()
assert sample_width == 2
sample_rate = wf.getframerate()
assert sample_rate in (8000, 16000, 32000, 48000)
pcm_data = wf.readframes(wf.getnframes())
return pcm_data, sample_rate
class Frame(object):
"""Represents a "frame" of audio data."""
def __init__(self, bytes, timestamp, duration):
self.bytes = bytes
self.timestamp = timestamp
self.duration = duration
def frame_generator(frame_duration_ms, audio, sample_rate):
"""Generates audio frames from PCM audio data.
Takes the desired frame duration in milliseconds, the PCM data, and
the sample rate.
Yields Frames of the requested duration.
"""
n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
offset = 0
timestamp = 0.0
duration = (float(n) / sample_rate) / 2.0
while offset + n < len(audio):
yield Frame(audio[offset:offset + n], timestamp, duration)
timestamp += duration
offset += n
def vad_collector(sample_rate, frame_duration_ms,
padding_duration_ms, vad, frames):
"""Filters out non-voiced audio frames.
Given a webrtcvad.Vad and a source of audio frames, yields only
the voiced audio.
Uses a padded, sliding window algorithm over the audio frames.
When more than 90% of the frames in the window are voiced (as
reported by the VAD), the collector triggers and begins yielding
audio frames. Then the collector waits until 90% of the frames in
the window are unvoiced to detrigger.
The window is padded at the front and back to provide a small
amount of silence or the beginnings/endings of speech around the
voiced frames.
Arguments:
sample_rate - The audio sample rate, in Hz.
frame_duration_ms - The frame duration in milliseconds.
padding_duration_ms - The amount to pad the window, in milliseconds.
vad - An instance of webrtcvad.Vad.
frames - a source of audio frames (sequence or generator).
Returns: A generator that yields PCM audio data.
"""
num_padding_frames = int(padding_duration_ms / frame_duration_ms)
# We use a deque for our sliding window/ring buffer.
ring_buffer = collections.deque(maxlen=num_padding_frames)
# We have two states: TRIGGERED and NOTTRIGGERED. We start in the
# NOTTRIGGERED state.
triggered = False
voiced_frames = []
for frame in frames:
is_speech = vad.is_speech(frame.bytes, sample_rate)
sys.stdout.write('1' if is_speech else '0')
if not triggered:
ring_buffer.append((frame, is_speech))
num_voiced = len([f for f, speech in ring_buffer if speech])
# If we're NOTTRIGGERED and more than 90% of the frames in
# the ring buffer are voiced frames, then enter the
# TRIGGERED state.
if num_voiced > 0.9 * ring_buffer.maxlen:
triggered = True
sys.stdout.write('+(%s)' % (ring_buffer[0][0].timestamp,))
# We want to yield all the audio we see from now until
# we are NOTTRIGGERED, but we have to start with the
# audio that's already in the ring buffer.
for f, s in ring_buffer:
voiced_frames.append(f)
ring_buffer.clear()
else:
# We're in the TRIGGERED state, so collect the audio data
# and add it to the ring buffer.
voiced_frames.append(frame)
ring_buffer.append((frame, is_speech))
num_unvoiced = len([f for f, speech in ring_buffer if not speech])
# If more than 90% of the frames in the ring buffer are
# unvoiced, then enter NOTTRIGGERED and yield whatever
# audio we've collected.
if num_unvoiced > 0.9 * ring_buffer.maxlen:
sys.stdout.write('-(%s)' % (frame.timestamp + frame.duration))
triggered = False
yield b''.join([f.bytes for f in voiced_frames])
ring_buffer.clear()
voiced_frames = []
if triggered: # BT if were in triggered state at end of signal, set output time
sys.stdout.write('-(%s)' % (frame.timestamp + frame.duration))
sys.stdout.write('\n')
# If we have any leftover voiced audio when we run out of input,
# yield it.
if voiced_frames:
yield b''.join([f.bytes for f in voiced_frames])
def vad_get_segment_times(sample_rate, frame_duration_ms,
padding_duration_ms, vad, frames):
"""Filters out non-voiced audio frames.
BT: based on vad_collector, but returns start and end times for voiced segs
Given a webrtcvad.Vad and a source of audio frames, yields only
the voiced audio.
Uses a padded, sliding window algorithm over the audio frames.
When more than 90% of the frames in the window are voiced (as
reported by the VAD), the collector triggers and begins yielding
audio frames. Then the collector waits until 90% of the frames in
the window are unvoiced to detrigger.
The window is padded at the front and back to provide a small
amount of silence or the beginnings/endings of speech around the
voiced frames.
Arguments:
sample_rate - The audio sample rate, in Hz.
frame_duration_ms - The frame duration in milliseconds.
padding_duration_ms - The amount to pad the window, in milliseconds.
vad - An instance of webrtcvad.Vad.
frames - a source of audio frames (sequence or generator).
Returns: lists of start and end segments
"""
num_padding_frames = int(padding_duration_ms / frame_duration_ms)
# We use a deque for our sliding window/ring buffer.
ring_buffer = collections.deque(maxlen=num_padding_frames)
# We have two states: TRIGGERED and NOTTRIGGERED. We start in the
# NOTTRIGGERED state.
triggered = False
start_times = []
end_times = []
for frame in frames:
is_speech = vad.is_speech(frame.bytes, sample_rate)
#sys.stdout.write('1' if is_speech else '0')
if not triggered:
ring_buffer.append((frame, is_speech))
num_voiced = len([f for f, speech in ring_buffer if speech])
# If we're NOTTRIGGERED and more than 90% of the frames in
# the ring buffer are voiced frames, then enter the
# TRIGGERED state.
if num_voiced > 0.9 * ring_buffer.maxlen:
triggered = True
#sys.stdout.write('+(%s)' % (ring_buffer[0][0].timestamp,))
start_times.append(ring_buffer[0][0].timestamp) # BT
ring_buffer.clear()
else:
# We're in the TRIGGERED state, so collect the audio data
# and add it to the ring buffer.
ring_buffer.append((frame, is_speech))
num_unvoiced = len([f for f, speech in ring_buffer if not speech])
# If more than 90% of the frames in the ring buffer are
# unvoiced, then enter NOTTRIGGERED and yield whatever
# audio we've collected.
if num_unvoiced > 0.9 * ring_buffer.maxlen:
#sys.stdout.write('-(%s)' % (frame.timestamp + frame.duration))
end_times.append(ring_buffer[0][0].timestamp + frame.duration) # BT
triggered = False
if triggered: # BT if were in triggered state at end of signal, set output time
#sys.stdout.write('-(%s)' % (frame.timestamp + frame.duration))
if len(ring_buffer)>0:
end_times.append(ring_buffer[0][0].timestamp ) # BT
else:
# only get here in very rare case that we triggered on 2nd-to-last frame
end_times.append(frame.timestamp + frame.duration)
#sys.stdout.write('\n')
return(start_times, end_times)
def filter_seg_times(seg_starts, seg_ends, pad_at_start = 0.5, len_to_keep=2.5 ):
"""
do some filtering on the segments found to select part for analysis
rule: find the first segment that is at least (pad_at_start+len_to_keep sec long.
Discard the firstpad_at_start sec, keep the next len_to_keep sec
if no such segments, then return empty list
returns sel_start, sel_end, sel_end_longer
"""
sel_start = []
sel_end = []
sel_end_longer = []
not_found = True
for iseg in range(len(seg_starts)):
seg_dur = seg_ends[iseg]-seg_starts[iseg]
if (not_found & (seg_dur > (pad_at_start + len_to_keep))):
t_start = seg_starts[iseg] + pad_at_start
sel_start.append(t_start)
sel_end.append(t_start + len_to_keep)
sel_end_longer.append(max(t_start + len_to_keep, seg_ends[iseg]-pad_at_start))
not_found = False
return sel_start, sel_end, sel_end_longer

View File

@@ -0,0 +1,191 @@
"""
file_name: video_util
project_name: DBM
created: 2020-20-07
"""
import pandas as pd
import numpy as np
import glob
from opendbm.dbm_lib.dbm_features.raw_features.util import util as ut
def smooth(x,window_len=11,window='hanning'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the begining and end part of the output signal.
input:
x: the input signal
window_len: the dimension of the smoothing window; should be an odd integer
window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
output:
the smoothed signal
example:
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
see also:
numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
NOTE: length(output) != length(input), to correct this: return y[(window_len/2-1):-(window_len/2)] instead of just y.
"""
if x.ndim != 1:
raise (ValueError, "smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise (ValueError, "Input vector needs to be bigger than window size.")
if window_len<3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise (ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=np.r_[x[window_len-1:0:-1],x,x[-2:-window_len-1:-1]]
#print(len(s))
if window == 'flat': #moving average
w=np.ones(window_len,'d')
else:
w=eval('np.'+window+'(window_len)')
y=np.convolve(w/w.sum(),s,mode='valid')
return y[int(window_len/2):-int(window_len/2)]
def filter_by_confidence_and_thresh(x, fea, thresh):
if x['s_confidence'] > 0.2 and np.fabs(x[fea]) < thresh:
return x[fea]
else:
return np.NaN
def add_au_emotion(x, emotion,emotion_type,exp_type):
"""
computing individula emotion expressivity matrix
Args:
emotion: Action Unit
"""
error_reason = 'Pass'
if x['s_confidence'] > 0.8: #if using smooth, no need for 'success'
sum_r = 0
cnt = 0
for au in emotion:
au_c_label = " AU{:02d}_c".format(au)
au_r_label = " AU{:02d}_r".format(au)
if x[au_c_label]==1 and (not np.isnan(x[au_r_label])): #there are data with face in, but au_c=0
sum_r += x[au_r_label]
cnt += 6
if exp_type=='full' and x[au_c_label]==0: #Logic to compute emotion expressivity when all AU's are present
cnt = 0
break
if cnt > 0:
sum_r /= cnt
else:
sum_r = 0
v_emo = x[emotion_type] + sum_r
else:
v_emo = np.NaN
error_reason = 'confidence less than 80%'
return v_emo, error_reason
def add_au_occ(x, emotion,emotion_type):
"""
computing individula emotion presence
Args:
emotion: Action Unit
"""
au_pres = []
em_pres = 0
error_reason = 'Pass'
if x['s_confidence'] > 0.8: #if using smooth, no need for 'success'
for au in emotion:
au_c_label = " AU{:02d}_c".format(au)
if x[au_c_label]==1: #there are data with face in, but au_c=0
au_pres.append(1)
if len(au_pres) == len(emotion):
em_pres = 1
else:
em_pres = np.NaN
error_reason = 'confidence less than 80%'
return em_pres, error_reason
def emotion_exp(em_au,of,em_col,err_col):
"""
Computing individual emotion expressivity and adding it to dataframe
"""
for emotion in em_au:
of[[em_col[0],err_col]]=of.apply(add_au_emotion, args=(emotion,em_col[0],'partial',), axis=1, result_type='expand')
of[[em_col[1],err_col]]=of.apply(add_au_emotion, args=(emotion,em_col[1],'full',), axis=1, result_type='expand')
def emotion_pres(em_au,of,em_col,err_col):
"""
Computing individual emotion expressivity and adding it to dataframe
"""
for emotion in em_au:
of[[em_col,err_col]]=of.apply(add_au_occ, args=(emotion,em_col,), axis=1, result_type='expand')
def calc_of_for_video(of,face_cfg,fe_cfg):
"""
Creating dataframe for emotion expressivity
"""
new_cols = [fe_cfg.hap_exp,fe_cfg.sad_exp,fe_cfg.sur_exp,fe_cfg.fea_exp,fe_cfg.ang_exp,fe_cfg.dis_exp,fe_cfg.con_exp,
fe_cfg.pai_exp,fe_cfg.neg_exp,fe_cfg.pos_exp,fe_cfg.neu_exp,fe_cfg.com_lower_exp,fe_cfg.com_upper_exp,
fe_cfg.cai_exp,fe_cfg.com_exp,fe_cfg.happ_occ,fe_cfg.sad_occ,fe_cfg.sur_occ,fe_cfg.fea_occ,fe_cfg.ang_occ,
fe_cfg.dis_occ,fe_cfg.con_occ,fe_cfg.hap_exp_full,fe_cfg.sad_exp_full,fe_cfg.sur_exp_full,fe_cfg.fea_exp_full,
fe_cfg.ang_exp_full,fe_cfg.dis_exp_full,fe_cfg.con_exp_full,fe_cfg.pai_exp_full,fe_cfg.neg_exp_full,
fe_cfg.pos_exp_full,fe_cfg.neu_exp_full,fe_cfg.cai_exp_full,fe_cfg.com_lower_exp_full,fe_cfg.com_upper_exp_full,
fe_cfg.com_exp_full]
of[new_cols] = pd.DataFrame([[0] * len(new_cols)], index=of.index)
of[fe_cfg.err_reason] = 'Pass'
#Composite happiness expressivity
emotion_exp(face_cfg.happiness,of,[fe_cfg.hap_exp,fe_cfg.hap_exp_full],fe_cfg.err_reason)
#Composite sadness expressivity
emotion_exp(face_cfg.sadness,of,[fe_cfg.sad_exp,fe_cfg.sad_exp_full],fe_cfg.err_reason)
#Composite surprise expressivity
emotion_exp(face_cfg.surprise,of,[fe_cfg.sur_exp,fe_cfg.sur_exp_full],fe_cfg.err_reason)
#Composite fear expressivity
emotion_exp(face_cfg.fear,of,[fe_cfg.fea_exp,fe_cfg.fea_exp_full],fe_cfg.err_reason)
#Composite anger expressivity
emotion_exp(face_cfg.anger,of,[fe_cfg.ang_exp,fe_cfg.ang_exp_full],fe_cfg.err_reason)
#Composite disgust expressivity
emotion_exp(face_cfg.disgust,of,[fe_cfg.dis_exp,fe_cfg.dis_exp_full],fe_cfg.err_reason)
#Composite contempt expressivity
emotion_exp(face_cfg.contempt,of,[fe_cfg.con_exp,fe_cfg.con_exp_full],fe_cfg.err_reason)
#Composite Negative Expressivity
emotion_exp(face_cfg.NEG_ACTION_UNITS,of,[fe_cfg.neg_exp,fe_cfg.neg_exp_full],fe_cfg.err_reason)
#Composite Positive Expressivity
emotion_exp(face_cfg.POS_ACTION_UNITS,of,[fe_cfg.pos_exp,fe_cfg.pos_exp_full],fe_cfg.err_reason)
#Composite Neutral Expressivity
emotion_exp(face_cfg.NET_ACTION_UNITS,of,[fe_cfg.neu_exp,fe_cfg.neu_exp_full],fe_cfg.err_reason)
#Composite Activation Expressivity
emotion_exp(face_cfg.cai,of,[fe_cfg.cai_exp,fe_cfg.cai_exp_full],fe_cfg.err_reason)
#Composite Expressivity
emotion_exp(face_cfg.ACTION_UNITS,of,[fe_cfg.com_exp,fe_cfg.com_exp_full],fe_cfg.err_reason)
#Composite lower face expressivity
emotion_exp(face_cfg.LOWER_ACTION_UNITS,of,[fe_cfg.com_lower_exp,fe_cfg.com_lower_exp_full],fe_cfg.err_reason)
#Composite upper face Expressivity
emotion_exp(face_cfg.UPPER_ACTION_UNITS,of,[fe_cfg.com_upper_exp,fe_cfg.com_upper_exp_full],fe_cfg.err_reason)
#Composite pain expressivity
emotion_exp(face_cfg.pain,of,[fe_cfg.pai_exp,fe_cfg.pai_exp_full],fe_cfg.err_reason)
#AU happiness presence
emotion_pres(face_cfg.happiness,of,fe_cfg.happ_occ,fe_cfg.err_reason)
#AU Sad presence
emotion_pres(face_cfg.sadness,of,fe_cfg.sad_occ,fe_cfg.err_reason)
#AU Surprise presence
emotion_pres(face_cfg.surprise,of,fe_cfg.sur_occ,fe_cfg.err_reason)
#AU fear presence
emotion_pres(face_cfg.fear,of,fe_cfg.fea_occ,fe_cfg.err_reason)
#AU anger presence
emotion_pres(face_cfg.anger,of,fe_cfg.ang_occ,fe_cfg.err_reason)
#AU disgust presence
emotion_pres(face_cfg.disgust,of,fe_cfg.dis_occ,fe_cfg.err_reason)
#AU contempt presence
emotion_pres(face_cfg.contempt,of,fe_cfg.con_occ,fe_cfg.err_reason)