-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMain.py
73 lines (55 loc) · 2.22 KB
/
Main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from GPTConvo import *
from VoiceGenerator import *
from ScriptParser import *
from EC2Grabber import *
import threading
from LocalZipper import *
from StreamLabsClient import *
import os
from pathlib import Path
gptConvo = GPTConvo(os.getenv("OPEN_AI_API_KEY"));
voiceGens = [
VoiceGenerator("https://956fff992f57c85934.gradio.live"),
VoiceGenerator("https://181f7a2a6f70f2f1d1.gradio.live")
# Add more voice generators if needed
]
voiceDispatcher = VoiceGeneratorManager(voiceGens)
#====================================================================
pem_path = Path('Documents/GPT/wizPassword.pem')
local_dir = Path('Documents/AI INPUTS/temp')
ec2 = EC2Grabber(
'ec2-52-53-212-50.us-west-1.compute.amazonaws.com',
str(Path.home() / pem_path), # where the password file is
'/home/ubuntu/gptconvo/ai-voice-cloning/results/',
str(Path.home() / local_dir) #where to temp put the files when we grab them from EC2
)
import time
from concurrent.futures import ThreadPoolExecutor
def generate_scripts(queue):
while True:
try:
if (queue.qsize() > 2):
time.sleep(30)
script = gptConvo.callGPTForOneOffScript()
parser = ScriptParser(script, gptConvo.scriptBuilder.charNames)
unityScript = parser.getUnityScript()
print(unityScript)
queue.put((parser.lines, unityScript)) # Add the lines and unityScript to the queue
time.sleep(10)
except Exception as ex:
time.sleep(1)
def process_scripts(queue):
while True:
if not queue.empty():
lines, unityScript = queue.get() # Get the lines and unityScript from the queue
voiceDispatcher.run(lines)
ec2.getVoiceResults()
zip_files_in_dir("C:/Users/10850K/Documents/AI INPUTS/temp", unityScript, [line.character for line in lines])
else:
time.sleep(1) # Wait for 1 second before checking the queue again
# Use a thread-safe queue to hold the scripts
queue = queue.Queue()
# Use a ThreadPoolExecutor to run the tasks in separate threads
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(generate_scripts, queue)
executor.submit(process_scripts, queue)