python檢測音頻中的靜音

#-*- coding: utf-8 -*-
import os
import wave
from time import sleep
import numpy as np

SUCCESS = 0
FAIL = 1

# 須要添加錄音互斥功能能,某些功能開啓的時候錄音暫時關閉
def ZCR(curFrame):
    # 過零率
    tmp1 = curFrame[:-1]
    tmp2 = curFrame[1:]
    sings = (tmp1 * tmp2 <= 0)
    diffs = (tmp1 - tmp2) > 0.02
    zcr = np.sum(sings * diffs)
    return zcr


def STE(curFrame):
    # 短時能量
    amp = np.sum(np.abs(curFrame))
    return amp


class Vad(object):
    def __init__(self):
        # 初始短時能量高門限
        self.amp1 = 140
        # 初始短時能量低門限
        self.amp2 = 120
        # 初始短時過零率高門限
        self.zcr1 = 10
        # 初始短時過零率低門限
        self.zcr2 = 5
        # 容許最大靜音長度
        self.maxsilence = 100
        # 語音的最短長度
        self.minlen = 40
        # 偏移值
        self.offsets = 40
        self.offsete = 40
        # 能量最大值
        self.max_en = 20000
        # 初始狀態爲靜音
        self.status = 0
        self.count = 0
        self.silence = 0
        self.frame_len = 256
        self.frame_inc = 128
        self.cur_status = 0
        self.frames = []
        # 數據開始偏移
        self.frames_start = []
        self.frames_start_num = 0
        # 數據結束偏移
        self.frames_end = []
        self.frames_end_num = 0
        # 緩存數據
        self.cache_frames = []
        self.cache = ""
        # 最大緩存長度
        self.cache_frames_num = 0
        self.end_flag = False
        self.wait_flag = False
        self.on = True
        self.callback = None
        self.callback_res = []
        self.callback_kwargs = {}

    def clean(self):
        self.frames = []
        # 數據開始偏移
        self.frames_start = []
        self.frames_start_num = 0
        # 數據結束偏移
        self.frames_end = []
        self.frames_end_num = 0
        # 緩存數據
        self.cache_frames = []
        # 最大緩存長度
        self.cache_frames_num = 0
        self.end_flag = False
        self.wait_flag = False

    def go(self):
        self.wait_flag = False

    def wait(self):
        self.wait_flag = True

    def stop(self):
        self.on = False

    def add(self, frame, wait=True):
        if wait:
            print 'wait'
            frame = self.cache + frame

        while len(frame) > self.frame_len:
            frame_block = frame[:self.frame_len]
            self.cache_frames.append(frame_block)
            frame = frame[self.frame_len:]
        if wait:
            self.cache = frame
        else:
            self.cache = ""
            self.cache_frames.append(-1)

    def run(self,hasNum):
        print "開始執行音頻端點檢測"
        step = self.frame_len - self.frame_inc
        num = 0
        while 1:
            # 開始端點
            # 得到音頻文件數字信號
            if self.wait_flag:
                sleep(1)
                continue
            if len(self.cache_frames) < 2:
                sleep(0.05)
                continue

            if self.cache_frames[1] == -1:
                print '----------------沒有聲音--------------'
                break
            # 從緩存中讀取音頻數據
            record_stream = "".join(self.cache_frames[:2])
            wave_data = np.fromstring(record_stream, dtype=np.int16)
            wave_data = wave_data * 1.0 / self.max_en
            data = wave_data[np.arange(0, self.frame_len)]
            speech_data = self.cache_frames.pop(0)
            # 得到音頻過零率
            zcr = ZCR(data)
            # 得到音頻的短時能量, 平方放大
            amp = STE(data) ** 2
            # 返回當前音頻數據狀態
            res = self.speech_status(amp, zcr)

            if res == 2:
                hasNum += 1

            if hasNum > 10:
                print '+++++++++++++++++++++++++有聲音++++++++++++++++++++++++'
                break
            num = num + 1
            # 一段一段進行檢測
            self.frames_start.append(speech_data)
            self.frames_start_num += 1
            if self.frames_start_num == self.offsets:
                # 開始音頻開始的緩存部分
                self.frames_start.pop(0)
                self.frames_start_num -= 1
            if self.end_flag:
                # 當音頻結束後進行後部緩存
                self.frames_end_num += 1
                # 下一段語音開始,或達到緩存閥值
                if res == 2 or self.frames_end_num == self.offsete:
                    speech_stream = b"".join(self.frames + self.frames_end)
                    self.callback_res.append(self.callback(speech_stream, **self.callback_kwargs))

                    # 數據環境初始化
                    # self.clean()
                    self.end_flag = False

                    self.frames = []
                    self.frames_end_num = 0
                    self.frames_end = []

                self.frames_end.append(speech_data)
            if res == 2:
                if self.cur_status in [0, 1]:
                    # 添加開始偏移數據到數據緩存
                    self.frames.append(b"".join(self.frames_start))
                # 添加當前的語音數據
                self.frames.append(speech_data)
            if res == 3:
                print '檢測音頻結束'
                self.frames.append(speech_data)
                # 開啓音頻結束標誌
                self.end_flag = True

            self.cur_status = res
            # return self.callback_res

    def speech_status(self, amp, zcr):
        status = 0
        # 0= 靜音, 1= 可能開始, 2=肯定進入語音段
        if self.cur_status in [0, 1]:
            # 肯定進入語音段
            if amp > self.amp1:
                status = 2
                self.silence = 0
                self.count += 1
            # 可能處於語音段
            elif amp > self.amp2 or zcr > self.zcr2:
                status = 1
                self.count += 1
            # 靜音狀態
            else:
                status = 0
                self.count = 0
                self.count = 0
        # 2 = 語音段
        elif self.cur_status == 2:
            # 保持在語音段
            if amp > self.amp2 or zcr > self.zcr2:
                self.count += 1
                status = 2
            # 語音將結束
            else:
                # 靜音還不夠長,還沒有結束
                self.silence += 1
                if self.silence < self.maxsilence:
                    self.count += 1
                    status = 2
                # 語音長度過短認爲是噪聲
                elif self.count < self.minlen:
                    status = 0
                    self.silence = 0
                    self.count = 0
                # 語音結束
                else:
                    status = 3
                    self.silence = 0
                    self.count = 0
        return status


def read_file_data(filename):
    """
    輸入:須要讀取的文件名
    返回:(聲道,量化位數,採樣率,數據)
    """
    read_file = wave.open(filename, "r")
    params = read_file.getparams()
    nchannels, sampwidth, framerate, nframes = params[:4]
    data = read_file.readframes(nframes)
    return nchannels, sampwidth, framerate, data

class FileParser(Vad):
    def __init__(self):
        self.block_size = 256
        Vad.__init__(self)
    def read_file(self, filename):
        if not os.path.isfile(filename):
            print "文件%s不存在" % filename
            return FAIL
        datas = read_file_data(filename)[-1]
        self.add(datas, False)

if __name__ == "__main__":
    stream_test = FileParser()

    filename = 'test1566606924822.wav'
    result = stream_test.read_file(filename)
    if result != FAIL:
        stream_test.run(0)

轉載自網絡,版權歸原做者全部緩存

相關文章
相關標籤/搜索