-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
259 lines (207 loc) · 10.3 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import base64
import json
import math
import librosa
import numpy as np
import parselmouth
import torch
import torch.nn as nn
from flask import Flask, render_template, request, jsonify, send_from_directory, url_for, send_file
from joblib import load
from parselmouth import praat
from pydub import AudioSegment
from pydub.silence import detect_leading_silence
from scipy.io import wavfile
from skimage.transform import resize
from feedback import vowel_feedback, pron_hack
app = Flask(__name__)
# The neural regression definition
class CNNRegressor(nn.Module):
def __init__(self):
super(CNNRegressor, self).__init__()
self.cnn_layer1 = nn.Sequential(nn.Conv2d(1, 16, kernel_size=3, padding='valid'), nn.ReLU(), nn.Dropout(0.1),
nn.BatchNorm2d(16), nn.MaxPool2d(kernel_size=2))
self.cnn_layer2 = nn.Sequential(nn.Conv2d(16, 32, kernel_size=3, padding='valid'), nn.ReLU(), nn.Dropout(0.2),
nn.BatchNorm2d(32), nn.MaxPool2d(kernel_size=2))
self.linear_layer1 = nn.Linear(32 * 30 * 6 + 8, 128)
self.dropout1 = nn.Dropout(0.5)
self.activ1 = nn.ReLU()
self.linear_layer_p = nn.Linear(128, 64)
self.dropout_p = nn.Dropout(0.5)
self.activ_p = nn.ReLU()
self.linear_layer2 = nn.Linear(64, 2)
self.activ2 = nn.Sigmoid()
def forward(self, images, features):
cnn2 = self.cnn_layer2(self.cnn_layer1(images.unsqueeze(1)))
cnn_vec = cnn2.reshape(cnn2.shape[0], -1)
out = self.dropout1(self.activ1(self.linear_layer1(torch.cat((cnn_vec, features), dim=1))))
return self.activ2(self.linear_layer2(self.activ_p(self.dropout_p(self.linear_layer_p(out)))))
# The neural classification model
class CNNClassifier(nn.Module):
def __init__(self, num_classes=10):
super(CNNClassifier, self).__init__()
self.cnn_layer1 = nn.Sequential(nn.Conv2d(1, 16, kernel_size=3, padding='valid'), nn.ReLU(), nn.Dropout(0.1),
nn.BatchNorm2d(16), nn.MaxPool2d(kernel_size=2))
self.cnn_layer2 = nn.Sequential(nn.Conv2d(16, 32, kernel_size=3, padding='valid'), nn.ReLU(), nn.Dropout(0.2),
nn.BatchNorm2d(32), nn.MaxPool2d(kernel_size=2))
self.linear_layer1 = nn.Linear(32 * 30 * 3 + 7, 64)
self.dropout1 = nn.Dropout(0.5)
self.activ1 = nn.ReLU()
self.linear_layer_p = nn.Linear(128, 64)
self.dropout_p = nn.Dropout(0.5)
self.activ_p = nn.ReLU()
self.linear_layer2 = nn.Linear(64, num_classes)
def forward(self, images, features):
cnn2 = self.cnn_layer2(self.cnn_layer1(images.unsqueeze(1)))
cnn_vec = cnn2.reshape(cnn2.shape[0], -1)
out = self.dropout1(self.activ1(self.linear_layer1(torch.cat((cnn_vec, features), dim=1))))
out2 = self.linear_layer2(self.activ_p(self.dropout_p(self.linear_layer_p(out))))
return out2
rule_clf = load('models/rule_based.joblib') # The rule-based classifier
nn_clf = torch.load('models/neural_classifier.pt', map_location=torch.device('cpu')) # The neural classifier
scaler = load('models/scaler.joblib') # The scaler, transforms formants so that they have a mean of 0 and a variance of 1
regressor = torch.load('models/neural_regressor.pt', map_location=torch.device('cpu')) # The vowel detection model
idx2key = ['2', '9', 'a', 'a~', 'e', 'E', 'i', 'O', 'o', 'o~', 'u', 'U~+', 'y'] # All possible vowels
valid = [0, 1, 2, 4, 5, 6, 7, 8, 10, 12] # Vowels we consider here (depends on the classifier)
all_phonemes = ['l', 'm', 'p', 's', 't', 't1'] # Phonemes that can be before the vowel
# Locations for the temporary output files
tmp_wav = 'tmp_process.wav'
tmp_wav_2 = 'tmp_process_trimmed.wav'
max_w = 31 # Image width to resize to
max_w_2 = 20
@app.route('/predict', methods=['POST'])
def predict(): # Evaluation module
data = json.loads(request.data)
pred = get_probabilities(data, model='all') # Get all probabilities
# On error, return the error
if isinstance(pred, str):
return jsonify(error=pred)
# Else, return all probabilities
pred_nn, pred_lg = pred
return jsonify(
probas_nn=pred_nn[0].tolist(),
pred_nn=idx2key[valid[np.argmax(pred_nn)]],
probas_lg=pred_lg[0].tolist(),
pred_lg=idx2key[valid[np.argmax(pred_lg)]]
)
@app.route('/audio/<path:filename>', methods=['GET'])
def read_audio(filename): # Get audio file
return send_from_directory('static/audio', filename, mimetype="audio/wav", as_attachment=False)
@app.route('/video/<path:filename>', methods=['GET'])
def read_video(filename): # Get video file
return send_from_directory('static/video', filename, mimetype="video/mp4", as_attachment=False)
@app.route('/upload', methods=['POST'])
def upload(): # Standard module
data = json.loads(request.data)
pred = get_probabilities(data) # Get all probabilities
if isinstance(pred, str):
return jsonify(error=pred)
des_vowel = data['des_vowel']
final_vowel = np.argmax(pred)
final_confidence = pred[0][final_vowel] # Best score
final_vowel = idx2key[valid[final_vowel]] # Actual prediction
print('Vowel ', 'Confidence') # Console output
print('-' * 25)
for i in range(len(valid)):
vowel = idx2key[valid[i]]
print(f'{vowel:<6} {pred[0][i]:.3f}', '=' * int(pred[0][i] * 100))
print(f'Prediction: /{final_vowel}/, confidence: {final_confidence:.3f}')
return jsonify(predicted_vowel=final_vowel,
confidence=float(final_confidence),
feedback=vowel_feedback(des_vowel, final_vowel),
add_feedback=pron_hack(des_vowel, final_vowel)
)
def get_probabilities(data, model='lg'):
# Get metadata
speaker_gender = 'f' if data['gender'].startswith('F') else 'm'
audio = data['audio'][22:] # Get rid of application data
previous_phoneme = data['prev_phoneme']
word_ends_with_r = data['r_word']
input_file = "input.wav"
audio = base64.b64decode(audio) # Decode the audio file
with open(input_file, 'wb') as f: # Write the audio to file
f.write(audio)
# Remove leading and trailing silences
sound = AudioSegment.from_file(input_file)
trim_leading_silence = lambda x: x[detect_leading_silence(x, silence_threshold=-20):]
trimmed = trim_leading_silence(trim_leading_silence(sound).reverse()).reverse()
trimmed.export(tmp_wav, format='wav', bitrate='768k')
# Generate log-melspectrogram
try:
y, sr = librosa.load(tmp_wav)
except ValueError:
return 'The file is too silent to analyze! Try speaking louder.'
mels = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, n_fft=512, hop_length=512)
mels = np.log(mels + 1e-9) # add small number to avoid log(0)
# Rescale mel-spectrogram
mels_std = (mels - mels.min()) / (mels.max() - mels.min())
melspec = (mels_std * 255).astype(np.uint8)
melspec = np.flip(melspec, axis=0) # put low frequencies at the bottom in image
melspec = 255 - melspec
# Feed log-melspectrogram to regression model to predict start and end of the vowel
melspec = resize(melspec, (melspec.shape[0], max_w), anti_aliasing=False)
melspec2 = resize(melspec, (melspec.shape[0], max_w_2), anti_aliasing=False)
input_tensor = torch.tensor(melspec).float()
input_features = torch.tensor(
[speaker_gender == 'f', not word_ends_with_r, *[previous_phoneme == x for x in all_phonemes]])
pred = regressor(input_tensor.unsqueeze(0), input_features.unsqueeze(0))[0]
vowel_start = pred[0].item()
vowel_end = pred[1].item()
if vowel_start >= vowel_end:
return 'The model predicted that the vowel has negative duration! Try again.'
# Trim file at start and end to only have the vowel
sample_rate, wave_data = wavfile.read(tmp_wav)
duration = len(wave_data) / sample_rate
start_sample = int(duration * vowel_start * sample_rate)
end_sample = int(duration * vowel_end * sample_rate)
wavfile.write(tmp_wav_2, sample_rate, wave_data[start_sample:end_sample])
duration = len(wave_data[start_sample:end_sample]) / sample_rate
if duration < 0.01:
return 'The model predicted that the vowel is too short! Try speaking louder.'
input_features = torch.cat([input_features[0:1], input_features[2:]])
if model != 'lg':
# Neural network
input_tensor = torch.tensor(melspec2).float()
pred_nn = nn.Softmax(dim=1)(nn_clf(input_tensor.unsqueeze(0), input_features.unsqueeze(0))).detach().numpy()
if model == 'nn':
return pred_nn
if model != 'nn':
# Extract formants
sound = parselmouth.Sound(tmp_wav_2)
point_process = praat.call(sound, "To PointProcess (periodic, cc)", math.ceil(3 / duration + 0.000001), 300)
formants = praat.call(sound, "To Formant (burg)", 0, 5, 5000, 0.025, 50)
num_points = praat.call(point_process, "Get number of points")
f_lists = [[] for i in range(5)]
for point in range(1, num_points + 1):
t = praat.call(point_process, "Get time from index", point)
for i in range(4):
f_lists[i].append(praat.call(formants, "Get value at time", i + 1, t, 'Hertz', 'Linear'))
f_lists = [[x for x in f_list if not math.isnan(x)] for f_list in f_lists]
# Compute the average of formants
formants = []
try:
for i in range(4):
formants.append(sum(f_lists[i]) / len(f_lists[i]))
except ZeroDivisionError:
return 'The file is too short/empty to analyze! Try speaking louder.'
# Add additional features (gender, previous phoneme)
input_features = input_features.cpu()
features = torch.cat((torch.tensor(formants), input_features)).numpy()
# Rescale formants
features[:4] = scaler.transform(np.array(features[:4]).reshape(1, -1))[0]
# Prediction with probabilities
pred_lg = rule_clf.predict_proba([features]) # Probabilities
if model == 'lg':
return pred_lg
return pred_nn, pred_lg
@app.route('/')
def index():
return render_template("index.html")
@app.route('/eval')
def eval():
return render_template("eval.html")
@app.route('/privacy.html')
def privacy():
return render_template("privacy.html")
if __name__ == '__main__':
app.run(debug=True)