-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinternal.py
248 lines (212 loc) · 5.91 KB
/
internal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#handles audio config, buffers
#provides a bit more function than just sounddevice
#while hiding common stuff like sample rate and window
from com import *
import vis
import sounddevice as sd
#DEBUG= True
#if DEBUG:
sdqd= sd.query_devices()
print(sdqd)
#(out,in)
voicemeeter= False
#only useful for programs that cant rebind mic
for s in sdqd:
if 'Voicemeeter' in s:
voicemeeter= True
#todo fix
#todo is voicemeeter even necessary to output to programs requiring default mic?
for (i,s) in enumerate(sdqd):
if 'default' in s:
sd.default.device= (i,i)
DUPLEX= True
#todo i keep forgetting what the fuck is a duplex
#just guess until it works
#must be pure function, as async
from enum import Enum
class arity(Enum):
AMP_OUT=1
AMP_INOUT=2
FFT_OUT=3
FFT_INOUT=4
@dcls
class aop_t:
f: callable
arity:arity
audio_op= lambda arity: lambda f: aop_t(f,arity)
audio_op_aktiv= None
from numpy import complex64
from numpy import *
from numpy.fft import *
import scipy.fftpack as fftpack
sample_rate = sd.query_devices(sd.default.device if DUPLEX else sd.default.device[0], 'input')['default_samplerate']
#sample_rate= 44000
#fftlen= fftpack.next_fast_len(fftsize_calc(sample_rate))
#1:1 size of samples:fft
# otherwise rescaling is required, which is pessimum
#samples= zeros(fftlen)
#fft= zeros(fftlen)
def push_samples(arr):
global samples
o= arr.size
assert o<sample_count
samples[o:-1]= samples[0:-o-1]
samples[0:o]= arr
return samples#*hann(sample_count)*800
def hz_idx(t):
return (t*sample_count/sample_rate).astype(int)
frame= 0
t0= 0
last_amp= 0# prevents skipping on dropped frames
def audio_callback(indata, outdata, frames, time, status):
#ASYNC this function is invoked from another thread
if DEBUG:
print('______update')
if status:
print('STATUS'+str(status))
print('frames'+str(frames))
global frame
global data_p
global data_pp
global t0
stereo= indata.shape[1]==2
#if DEBUG:
#print('note_array.shape '+note_array.shape)
in_amp= indata[:,0].view()
if fin.instance!=None:
b= fin.instance.buf
end= frame+frames
if end<b.size:
in_amp+= b[frame:end]
#todo rate resampling
in_fft= rfft(in_amp)
out_amp= outdata[:,0].view()
out_frq= zeros(frames//2+1,dtype=complex64)
t1= t0+float(frames)
rate= sample_rate
if DEBUG:
print('interval '+str(t0-t1))
#DO NOT high latency ops such as allocation, because underflow
#all bulk ops should be as o[:]=... to prevent realloc
#todo theres a few copy ops which may or may not need eliminated
#i believe double buffering wouldnt aid much
#amplitude
#freq= resample(freq, int(freq.size*2.))
#freq= freq[:max(1600,fftsize)] #denoise
#freq= fft(freq, n=fftsize)
#a= ifft(roll(freq,200.), n=fftsize)
##a= irfft(a[::], n=frames) ????
#todo signal window functions
#optimize flywheel - elim allocations
visin= (in_amp.copy(),in_fft.copy())
_op= audio_op_aktiv
do_rfft= False #delaying allows postprocess
def _AMP_OUT():
out_amp[:]= _op.f( rate, linspace(t0,t1,frames,endpoint=False) )
out_frq[:]= rfft(out_amp)
def _AMP_INOUT():
out_amp[:]= _op.f( rate, in_amp )
out_frq[:]= rfft(out_amp)
def _FFT_OUT():
out_frq[:]= _op.f( rate, zeros(frames//2+1) )
nonlocal do_rfft
do_rfft= True
def _FFT_INOUT():
out_frq[:]= _op.f( rate, in_fft )
nonlocal do_rfft
do_rfft= True
{
arity.AMP_OUT: _AMP_OUT,
arity.AMP_INOUT: _AMP_INOUT,
arity.FFT_OUT: _FFT_OUT,
arity.FFT_INOUT: _FFT_INOUT,
}[_op.arity]()
#temp window function
#evited by using sine
# has slight low freq emission but prevents all popping
#o[:]*= pow( linspace(0,2,o.size)*linspace(2,0,o.size), 2.)
if do_rfft:
#lowpass
_lk= 200#half-frequency, reciprocal
out_frq*= square(_lk/arange(_lk,_lk+out_frq.size)) # 1 -> lim 0
out_amp[:]= irfft(out_frq, n=len(out_amp))
#out_amp[:]= irfft(out_frq*1j, n=len(out_amp))
# *1j does cosine->sine, to taper ends and mostly elim need for windowing
#todo n should not need specified
visout= (out_amp.copy(),out_frq.copy())
#transforms after here are not shown on graphs
if stereo:#chanel mirroring
outdata[:,1]= outdata[:,0]
vis.fifo.put_nowait((*visin, *visout))
#copy is unevitable since o==outdata are managed by outer scope
# here was determined to be the appropriate location to copy
# manually buffering would still require a copy
#todo i dispute my previous self and think i can do better
if fout.instance!=None:
fout.instance.buf.append(out_amp.copy())
t0= t1
frame+= frames
import soundfile
class fout:
instance= None
def __init__(self,file):
self.file= file
self.buf= []
fout.instance= self
def flush(self):
print('saving %s'%self.file)
buf= array(self.buf).flatten()
print(buf)
#fixme stereo flattens wrong
soundfile.write(self.file,buf,int(sample_rate))
class fin:
instance= None
def __init__(self, file):
self.buf= soundfile.read(file)[0]
print('soundfile loaded %s'%file)
print(self.buf)
fin.instance= self
aktiv= True
def quit():
global aktiv
aktiv= False
if fout.instance!=None:
fout.instance.flush()
import threading
import time
#parameters are for different threads
#dont let them fuck with eachother
def invoke(op, infile=None,outfile=None):
global audio_op_aktiv
audio_op_aktiv= op
if infile!=None:
if op.arity in (arity.AMP_OUT,arity.FFT_OUT):
raise 'file input provided to function which does not take input audio'
else:
None
if infile!=None:
fin(infile)
if outfile!=None:
fout(outfile)
#sounddevice's control flow is fucked, ergo dummy thread
strem= lambda: sd.Stream(
samplerate=sample_rate,
#blocksize=fftsize,
blocksize=2048,
#dtype=None, latency=None,
clip_off=True,
dither_off= True,
##never_drop_input= True,
channels=1,#mono input and output
#latency='high',
callback= audio_callback)
def thr():
try:
with strem():
while aktiv:
time.sleep(1.);
except Exception as e:
print("\nEXCEPT")
#always check if devices are configured via sd.default.device
raise e
threading.Thread(name='sounddevice dummy',target=thr).start()