esp32是买的某宝模块
#设备采集音频代码
# 连接端口:SD->G21 WS->G22 SCK->G23 L/R-> 低电频
from machine import I2S,SPI
from machine import Pin
import os,utime
import network
import socket
def createWavHeader(sampleRate, bitsPerSample, num_channels, datasize):
o = bytes("RIFF",'ascii') # (4byte) Marks file as RIFF
o += (datasize + 36).to_bytes(4,'little') # (4byte) File size in bytes excluding this and RIFF marker
o += bytes("WAVE",'ascii') # (4byte) File type
o += bytes("fmt ",'ascii') # (4byte) Format Chunk Marker
o += (16).to_bytes(4,'little') # (4byte) Length of above format data
o += (1).to_bytes(2,'little') # (2byte) Format type (1 - PCM)
o += (num_channels).to_bytes(2,'little') # (2byte)
o += (sampleRate).to_bytes(4,'little') # (4byte)
o += (sampleRate * num_channels * bitsPerSample // 8).to_bytes(4,'little') # (4byte)
o += (num_channels * bitsPerSample // 8).to_bytes(2,'little') # (2byte)
o += (bitsPerSample).to_bytes(2,'little') # (2byte)
o += bytes("data",'ascii') # (4byte) Data Chunk Marker
o += (datasize).to_bytes(4,'little') # (4byte) Data size in bytes
return o
#音频文件采样率
sampleRate=8000
#每音频采样比特数
bitsPerSample=16
#缓冲字节数
bufSize=32768
#一次录音数据的总字节数
datasize=bufSize*4
#I2S所需管脚
#数据时钟(某宝INMP441模块SCK)
sck_pin = Pin(23)
#帧时钟(某宝INMP441模块模块WS)
ws_pin = Pin(22)
#数据(某宝INMP441模块模块SD)
sd_pin = Pin(21)
# 设置服务器的IP地址和端口
server_ip = '192.168.3.11' # 服务端IP
server_port = 12345 # 服务端端口
# 连接到WiFi
def connect_to_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect('HUAWEI-51EL07', '123456')
while not wlan.isconnected():
pass
print("Connected to WiFi")
# 创建socket连接
def create_socket():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.settimeout(5) # 设置超时时间
try:
client_socket.connect((server_ip, server_port))
return client_socket
except:
return None
# 连接WiFi
connect_to_wifi()
#创建用于音频录制的I2S对象
audioInI2S = I2S(0,
sck=sck_pin, ws=ws_pin, sd=sd_pin,
mode=I2S.RX,
bits=bitsPerSample,
format=I2S.STEREO,
rate=sampleRate,
ibuf=bufSize)
#音频数据读取缓冲
readBuf = bytearray(bufSize)
#SPI 1的使能端口
cs=Pin(14,Pin.OUT)
#使用SPI
spi=SPI(1,baudrate=8000000,sck=Pin(25),mosi=Pin(26),miso=Pin(27))
#创建音频文件的文件头
headData=createWavHeader(sampleRate, bitsPerSample, 2, datasize)
print("ready.......")
#休眠一点时间
utime.sleep(2.0)
# 计算音频数据的均值
def calculate_audio_mean(audio_data):
total = sum(audio_data)
mean = total / len(audio_data)
return mean
# 设置阈值
sound_threshold = 1000
# 休眠一点时间
utime.sleep(2.0)
# 循环录制音频并发送
client_socket = None
try:
while True:
if not client_socket:
client_socket = create_socket()
if not client_socket:
print("Reconnecting...")
utime.sleep(5)
continue
try:
# 读取音频数据
currByteCount = audioInI2S.readinto(readBuf)
audio_mean = calculate_audio_mean(readBuf)
print("音频均值:", audio_mean)
# 发送音频数据
client_socket.send(readBuf)
except Exception as e:
print("发生异常:", e)
print("连接断开,尝试重新连接")
client_socket.close()
client_socket = None
except Exception as e:
print("发生异常:", e)
finally:
if client_socket:
client_socket.close()
audioInI2S.deinit()
print("录音结束")
```python
接收端代码, 把录音数据保存到磁盘
import socket
import wave
import time
import os
def create_wav_file(filename, audio_data, sample_rate, num_channels, bits_per_sample):
with wave.open(filename, 'wb') as wav_file:
wav_file.setnchannels(num_channels)
wav_file.setsampwidth(bits_per_sample // 8)
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_data)
def start_server(host='0.0.0.0', port=12345, output_dir='E:/111'):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(1)
print(f"Listening on {host}:{port}...")
conn, addr = server_socket.accept()
print(f"Connected by {addr}")
# 设置音频参数
sample_rate = 8000
num_channels = 2
bits_per_sample = 16
# 时间相关变量
file_duration = 30 # 文件时长(秒)
start_time = time.time()
audio_data = bytearray()
file_count = 0
try:
while True:
data = conn.recv(4096)
if not data:
break
audio_data.extend(data)
# 检查是否到达文件时长
if time.time() - start_time >= file_duration:
# 保存文件
file_count += 1
filename = os.path.join(output_dir, f"audio_{file_count}.wav")
create_wav_file(filename, audio_data, sample_rate, num_channels, bits_per_sample)
print(f"Audio data saved to {filename}")
# 重置变量
start_time = time.time()
audio_data = bytearray()
finally:
conn.close()
if __name__ == '__main__':
start_server()
录制音频数据