#-*- coding: utf-8 -*- import os import wave from time import sleep import numpy as np SUCCESS = 0 FAIL = 1 # 須要添加錄音互斥功能能,某些功能開啓的時候錄音暫時關閉 def ZCR(curFrame): # 過零率 tmp1 = curFrame[:-1] tmp2 = curFrame[1:] sings = (tmp1 * tmp2 <= 0) diffs = (tmp1 - tmp2) > 0.02 zcr = np.sum(sings * diffs) return zcr def STE(curFrame): # 短時能量 amp = np.sum(np.abs(curFrame)) return amp class Vad(object): def __init__(self): # 初始短時能量高門限 self.amp1 = 140 # 初始短時能量低門限 self.amp2 = 120 # 初始短時過零率高門限 self.zcr1 = 10 # 初始短時過零率低門限 self.zcr2 = 5 # 容許最大靜音長度 self.maxsilence = 100 # 語音的最短長度 self.minlen = 40 # 偏移值 self.offsets = 40 self.offsete = 40 # 能量最大值 self.max_en = 20000 # 初始狀態爲靜音 self.status = 0 self.count = 0 self.silence = 0 self.frame_len = 256 self.frame_inc = 128 self.cur_status = 0 self.frames = [] # 數據開始偏移 self.frames_start = [] self.frames_start_num = 0 # 數據結束偏移 self.frames_end = [] self.frames_end_num = 0 # 緩存數據 self.cache_frames = [] self.cache = "" # 最大緩存長度 self.cache_frames_num = 0 self.end_flag = False self.wait_flag = False self.on = True self.callback = None self.callback_res = [] self.callback_kwargs = {} def clean(self): self.frames = [] # 數據開始偏移 self.frames_start = [] self.frames_start_num = 0 # 數據結束偏移 self.frames_end = [] self.frames_end_num = 0 # 緩存數據 self.cache_frames = [] # 最大緩存長度 self.cache_frames_num = 0 self.end_flag = False self.wait_flag = False def go(self): self.wait_flag = False def wait(self): self.wait_flag = True def stop(self): self.on = False def add(self, frame, wait=True): if wait: print 'wait' frame = self.cache + frame while len(frame) > self.frame_len: frame_block = frame[:self.frame_len] self.cache_frames.append(frame_block) frame = frame[self.frame_len:] if wait: self.cache = frame else: self.cache = "" self.cache_frames.append(-1) def run(self,hasNum): print "開始執行音頻端點檢測" step = self.frame_len - self.frame_inc num = 0 while 1: # 開始端點 # 得到音頻文件數字信號 if self.wait_flag: sleep(1) continue if len(self.cache_frames) < 2: sleep(0.05) continue if self.cache_frames[1] == -1: print '----------------沒有聲音--------------' break # 從緩存中讀取音頻數據 record_stream = "".join(self.cache_frames[:2]) wave_data = np.fromstring(record_stream, dtype=np.int16) wave_data = wave_data * 1.0 / self.max_en data = wave_data[np.arange(0, self.frame_len)] speech_data = self.cache_frames.pop(0) # 得到音頻過零率 zcr = ZCR(data) # 得到音頻的短時能量, 平方放大 amp = STE(data) ** 2 # 返回當前音頻數據狀態 res = self.speech_status(amp, zcr) if res == 2: hasNum += 1 if hasNum > 10: print '+++++++++++++++++++++++++有聲音++++++++++++++++++++++++' break num = num + 1 # 一段一段進行檢測 self.frames_start.append(speech_data) self.frames_start_num += 1 if self.frames_start_num == self.offsets: # 開始音頻開始的緩存部分 self.frames_start.pop(0) self.frames_start_num -= 1 if self.end_flag: # 當音頻結束後進行後部緩存 self.frames_end_num += 1 # 下一段語音開始,或達到緩存閥值 if res == 2 or self.frames_end_num == self.offsete: speech_stream = b"".join(self.frames + self.frames_end) self.callback_res.append(self.callback(speech_stream, **self.callback_kwargs)) # 數據環境初始化 # self.clean() self.end_flag = False self.frames = [] self.frames_end_num = 0 self.frames_end = [] self.frames_end.append(speech_data) if res == 2: if self.cur_status in [0, 1]: # 添加開始偏移數據到數據緩存 self.frames.append(b"".join(self.frames_start)) # 添加當前的語音數據 self.frames.append(speech_data) if res == 3: print '檢測音頻結束' self.frames.append(speech_data) # 開啓音頻結束標誌 self.end_flag = True self.cur_status = res # return self.callback_res def speech_status(self, amp, zcr): status = 0 # 0= 靜音, 1= 可能開始, 2=肯定進入語音段 if self.cur_status in [0, 1]: # 肯定進入語音段 if amp > self.amp1: status = 2 self.silence = 0 self.count += 1 # 可能處於語音段 elif amp > self.amp2 or zcr > self.zcr2: status = 1 self.count += 1 # 靜音狀態 else: status = 0 self.count = 0 self.count = 0 # 2 = 語音段 elif self.cur_status == 2: # 保持在語音段 if amp > self.amp2 or zcr > self.zcr2: self.count += 1 status = 2 # 語音將結束 else: # 靜音還不夠長,還沒有結束 self.silence += 1 if self.silence < self.maxsilence: self.count += 1 status = 2 # 語音長度過短認爲是噪聲 elif self.count < self.minlen: status = 0 self.silence = 0 self.count = 0 # 語音結束 else: status = 3 self.silence = 0 self.count = 0 return status def read_file_data(filename): """ 輸入:須要讀取的文件名 返回:(聲道,量化位數,採樣率,數據) """ read_file = wave.open(filename, "r") params = read_file.getparams() nchannels, sampwidth, framerate, nframes = params[:4] data = read_file.readframes(nframes) return nchannels, sampwidth, framerate, data class FileParser(Vad): def __init__(self): self.block_size = 256 Vad.__init__(self) def read_file(self, filename): if not os.path.isfile(filename): print "文件%s不存在" % filename return FAIL datas = read_file_data(filename)[-1] self.add(datas, False) if __name__ == "__main__": stream_test = FileParser() filename = 'test1566606924822.wav' result = stream_test.read_file(filename) if result != FAIL: stream_test.run(0)
轉載自網絡,版權歸原做者全部緩存