-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathaudio.py
219 lines (186 loc) · 7.67 KB
/
audio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import customtkinter # Only imported for typing
import wave
import pyaudio
from pydub import AudioSegment
from pydub.effects import normalize
import numpy as np
import tkinter.messagebox as msgbox
class AudioManager:
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
filePath = "session_output.wav"
playing = False
isRecording = False
paused = True
def __init__(self, root: customtkinter.CTk):
self.root = root
self.p = pyaudio.PyAudio()
self.out_stream = None
self.wf = None
def record(self):
self.filePath = "session_output.wav"
self.isRecording = True
self.frames = []
try:
stream = self.p.open(format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK)
while self.isRecording:
data = stream.read(self.CHUNK)
self.frames.append(data)
self.root.update()
stream.close()
except OSError as e:
if e.errno == -9996 or e.errno == -9999:
print("Warning: No default output device available.")
self.root.after(0, lambda: msgbox.showerror("Audio Error", "No default audio device available. Please check your audio settings."))
else:
raise
def stop(self):
self.isRecording = False
self.saveAudioFile(self.filePath)
time, signal = self.createWaveformFile()
return (self.filePath, time, signal)
def play(self, startPosition=None):
'''Plays audio starting from the given position in seconds.'''
# Ensure PyAudio is initialized
if not self.p:
self.p = pyaudio.PyAudio()
# Ensure filePath exists
if not self.filePath or not isinstance(self.filePath, str):
msgbox.showerror("Playback Error", "No audio file selected or invalid file path.")
return
if startPosition is not None:
self.current_position = startPosition
try:
# Open the wave file if not already opened
if not self.wf:
self.wf = wave.open(self.filePath, "rb")
# Set position
startFrame = int(self.current_position * self.wf.getframerate())
self.wf.setpos(startFrame)
# Initialize output stream if not already initialized
if not self.out_stream:
self.out_stream = self.p.open(
format=self.p.get_format_from_width(self.wf.getsampwidth()),
channels=self.wf.getnchannels(),
rate=self.wf.getframerate(),
output=True,
frames_per_buffer=self.CHUNK,
)
self.playing = True
self.paused = False
# Playback loop
while self.playing:
if not self.paused:
data = self.wf.readframes(self.CHUNK)
if data == b"":
break
self.out_stream.write(data)
except Exception as e:
print(f"Error during playback: {e}")
msgbox.showerror("Playback Error", f"An error occurred: {e}")
finally:
if not self.playing: # Only stop playback if playback ends
self.stopPlayback()
def pause(self):
self.paused = not self.paused
return self.paused
def upload(self, filename: str):
self.filePath = filename
try:
# Close any previously opened wave file
if self.wf:
self.wf.close()
self.wf = None
# Open and process the file
name, extension = filename.rsplit(".", 1)
extension = extension.lower()
if extension in ["mp3", "wav"]:
segment = AudioSegment.from_file(self.filePath, format=extension)
segment.export("export.wav", format="wav")
self.filePath = "export.wav"
# Open the wave file
self.wf = wave.open(self.filePath, "rb")
time, signal = self.createWaveformFile()
return (time, signal)
else:
raise ValueError("Unsupported file format")
except Exception as e:
print(f"Error uploading file: {e}")
msgbox.showerror("Upload Error", f"Failed to upload file: {e}")
# Ensure return values are consistent
return None, None
def normalizeUploadedFile(self):
print("The audio file is attempting to be normalized")
pre_normalized_audio = AudioSegment.from_file(self.filePath, format="wav")
normalized_audio = normalize(pre_normalized_audio)
normalized_audio.export(out_f=self.filePath, format="wav")
return self.filePath
def createWaveformFile(self):
self.audioExists = True
raw = wave.open(self.filePath)
signal = raw.readframes(-1)
signal = np.frombuffer(signal, dtype="int16")
f_rate = raw.getframerate()
time = np.linspace(0, len(signal) / f_rate, num=len(signal))
return (time, signal)
def saveAudioFile(self, filename: str):
wf = wave.open(filename, "wb")
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.p.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(b"".join(self.frames))
wf.close()
def getAudioDuration(self, filename=None):
if filename is None:
filename = self.filePath
audio = AudioSegment.from_file(filename)
return len(audio) / 1000.0 # Return duration in seconds
def setPlaybackPosition(self, position):
self.seek(position)
def seek(self, position):
'''Seeks to the specified position in seconds.'''
# Ensure wave file is initialized
if not self.wf:
try:
self.wf = wave.open(self.filePath, "rb")
except Exception as e:
print(f"Error reopening wave file: {e}")
msgbox.showerror("Error", "Cannot seek: Unable to reopen wave file.")
return
try:
# Clamp the position to valid bounds
self.current_position = max(0, min(position, self.getAudioDuration()))
# Seek to the appropriate frame
frame = int(self.current_position * self.wf.getframerate())
self.wf.setpos(frame)
print(f"Seeked to {self.current_position} seconds.")
# Update playback if currently playing
if self.playing and self.out_stream:
self.paused = False # Resume playback
except Exception as e:
print(f"Error during seek: {e}")
msgbox.showerror("Seek Error", f"An error occurred while seeking: {e}")
def stopPlayback(self):
'''Stops the audio playback and cleans up resources.'''
self.playing = False
self.paused = True # Ensure paused is True so playback can reset correctly
# Stop and close the output stream
if self.out_stream:
try:
self.out_stream.stop_stream()
self.out_stream.close()
except OSError as e:
print(f"Error stopping/closing stream: {e}")
self.out_stream = None
# Close and reset the wave file
if self.wf:
self.wf.close()
self.wf = None
# Terminate PyAudio if active
if self.p:
self.p.terminate()
self.p = None
# Reinitialize PyAudio to prepare for future playback
self.p = pyaudio.PyAudio()