项目通过 ESP32 联网获取心知天气的实时数据,将文本信息经 Web 服务器转发至 PC 端,并调用科大讯飞 TTS 接口生成语音文件,最终由 ESP32 下载音频并通过 I2S + DAC 播放出来。
整个实现过程横跨 嵌入式联网、HTTP 通信、JSON 解析、服务器文件操作以及音频播放链路,并不是单一模块的堆砌,而是一次从“数据获取”到“语音输出”的完整系统实践。本文将围绕整体设计思路、关键代码实现以及实际踩坑经验,对该项目进行一次较为完整的记录与复盘。
本次参加的活动,所实现的是一个基于基于esp32boxlite的,使用micropython编程的天气信息语音播报系统。同时搭建了一个小的文件操作服务器,同时使用了心知天气和讯飞的TTS合成这两个端口,从而实现一个简单的天气语音播报系统。
本次设计思路基本上是一个硬件结合一个软件部分,具体的一个详细过程如下:
1)首先,我们需要让ESP32连接到Wi-Fi网络。使用ESP32的Wi-Fi联网,可以在代码中设置Wi-Fi连接所需的SSID和密码,然后调用连接函数进行连接。连接成功后,ESP32就可以通过互联网访问其他网络资源。
2)连接成功后,我们可以使用ESP32访问http的功能,向心知天气的API发送HTTP请求,获取天气信息。心知天气的API会返回一个JSON格式的数据,其中包含天气信息如温度、天气状况等。
3)获取到天气信息后,我们需要将这些信息发送给Web服务器。可以使用ESP32的HTTP,将天气信息打包成HTTP POST请求,并发送到Web服务器的特定端点。
4)Web服务器接收到天气信息后,可以使用Python的文件操作,将天气信息写入input.txt文件。可以使用Python的open()函数打开文件,然后使用write()函数将天气信息写入文件中。
5)接下来,Web服务器需要使用讯飞API进行文本转语音(TTS)操作。可以通过向讯飞API发送HTTP POST请求,将文本内容发送给API,并收到返回的音频文件。
6)Web服务器将生成的音频文件存储在特定的位置,供ESP32访问。可以在服务器上设置一个特定的URL来提供音频文件的下载。
7)ESP32连接到Web服务器,通过HTTP请求下载音频文件。下载后的音频文件将保存在ESP32的本地存储中,准备进行音频播放。
8)为了实现音频播放,需要将ESP32与DAC芯片进行连接。根据硬件连接,设置相应的引脚作为I2S总线的时钟、字时钟和数据输出引脚。
9)ESP32将从本地存储中读取音频数据,并通过I2S协议将数据发送给DAC芯片。DAC芯片将数字信号转换为模拟信号,并将其输出到功放电路。
10)通过连接到喇叭或扬声器,功放电路将模拟音频信号转换为声音,从喇叭中播放出来。这样就实现了将天气信息以音频形式播放出来的功能。
总体的过程就是这么一个过程。
硬件主要就是esp32的板卡,然后基于他的esp32s3,结合外部的DAC模块和音频功放,也就是ES8156和NS4150。然后还有一个喇叭,我这个功能比较简单,是一个20s自动获取一次天气信息,自动播报一次的功能,所以我没有添加按键的功能,也没有驱动屏幕,也就是st7789的功能。然后这个板卡的硬件还是很丰富的,比如两个麦克风,可以采集音频数据,然后还有外扩的IO口,比如一些串口等等。同时比较有意思的是这个ADC的按键功能,三个按键根据一个adc的电压输入从而去实现不同的按键功能,这个操作我确实是第一次见,虽然很可惜,这一次我没有去完成按键播报的这么一个功能,但是各位都可以试一试这个。


1.流程图(见图)
2.主要代码片段说明(先说一下,代码都有注释,而且我是各个直接的功能单独实现的,可以参考各个功能的文件)
1)连接WIFI。
#WIFI配置(这里配置成自己的wifi连接)# ssid = 'zhao'# password ='66666666'#WIFI连接函数def ConnectNet(ssid ,password): mynetwork=network.WLAN(network.STA_IF)#先配置 mynetwork.active(False)#先关闭WiFi mynetwork.active(True)#再打开WiFi mynetwork.connect(ssid,password)#上诉没有的话,会导致报错,也就是wifi无法识别和连接 while True: if(mynetwork.isconnected()): break else : time.sleep(1) #print(mynetwork.ifconfig()) print("WIFI is connect")#串口打印文本监测
2)访问心知天气获取信息和解码json——这个打印出来的就是天气信息了。
result1=urequests.get('https://api.seniverse.com/v3/weather/now.json?key=这里用自己的心知天气密钥,参考他们的官方温度&location=nanjing(这里是城市,比如你想要南京的就是nanjing,北京就是beijing)&language=zh-Hans&unit=c')#获取api的数据,即为天气的json j1=ujson.loads(result1.text)#解析# print(j1['results'][0]['location']['name'],end=' ')#城市名字# print(j1['results'][0]['now']['text'],end=' ')#天气情况# print(j1['results'][0]['now']['temperature'],end='℃ ')#温度# print(j1['results'][0]['last_update'])#数据更新时间 city = j1['results'][0]['location']['name'] weather = j1['results'][0]['now']['text'] temp = j1['results'][0]['now']['temperature'] fresh_time = j1['results'][0]['last_update'] print(city) print(weather) print(temp,"度") print(fresh_time)
3)向服务器发送文本和从服务器下载音频数据。
def send_content_to_server(content): url = "http://IP地址:端口号/receive_content" # 请替换为你电脑的IP和端口号 content_utf8 = content.encode('utf-8')#编码 headers = {'Content-Type': 'text/plain'} response = urequests.post(url, data=content_utf8, headers=headers) print("Content sent to server. Response status:", response.status_code) response.close()def download_wav_file(url, save_path): response = urequests.get(url) with open(save_path, 'wb') as file: file.write(response.content)# 服务器端存放WAV文件的URLserver_wav_url = 'http://跟前面那个一样/get_wav_file'# ESP32端存放WAV文件的路径,请根据实际情况修改esp32_wav_path = '/output.wav'
4)I2S的初始化和DAC的配置。
sck_pin = Pin(17) # 串行时钟输出,I2S_SCLKws_pin = Pin(47) # 字时钟,I2S_LRCKsd_pin = Pin(15) # 串行数据输出,GPIO15=I2S_DAC_SDINpa_pin = Pin(46, Pin.OUT) # 创建用于控制功率放大器的Pin对象pa_pin.value(1) # 将引脚的值设置为1(高电平)以启用放大器print("ADC IS OK")#不用管,删掉,这是我调试数据的,所有的串口打印都是为了调试看的audio_out = I2S(0,sck=sck_Pin, ws=ws_pin, sd=sd_pin, mode=I2S.TX, bits=16, format=1, rate=16000, ibuf=20000)print("IIS IS OK")#不用管,删掉,这是我调试数据的,所有的串口打印都是为了调试看的
5)主函数的解析。
result1=urequests.get('https://api.seniverse.com/v3/weather/now.json?key=密钥location=城市&language=zh-Hans&unit=c')#获取api的数据,即为天气的json j1=ujson.loads(result1.text)#解析# print(j1['results'][0]['location']['name'],end=' ')#城市名字# print(j1['results'][0]['now']['text'],end=' ')#天气情况# print(j1['results'][0]['now']['temperature'],end='℃ ')#温度# print(j1['results'][0]['last_update'])#数据更新时间 city = j1['results'][0]['location']['name'] weather = j1['results'][0]['now']['text'] temp = j1['results'][0]['now']['temperature'] fresh_time = j1['results'][0]['last_update'] print(city) print(weather) print(temp,"度") print(fresh_time) # 获取要发送的内容,读取 content = city + " " + weather + " " + str(temp) + "度 " print(content) send_content_to_server(content)#向服务器的input写入这个文本 # 下载WAV文件并保存到ESP32 download_wav_file(server_wav_url, esp32_wav_path) #uart_test print("download is ok")#下载成功 wavtempfile = "output.wav" wav = open(wavtempfile,'rb') print('播放音频') # 播放开始时间 start_time = time.ticks_us() # 读取音频文件的二进制数据 buf = wav.read() # wav文件的头部数据,不是实际的音频数据,是文件信息,所以我们要丢弃这部分数据 bufhead = 44 # 实际音频数据大小,等于总大小减去头部信息的大小 bufsize = len(buf) - bufhead # 缓冲区大小,前面初始化的时候设置的最后一个参数 bufcap = 4096 # 下面的0.032得来的方法:16000(采样率) x 16(采样位宽,我用的是16位音频,单位bit) x1(通道数,单声道1,立体声n) ÷ 8(1字节=8bit) ÷ 1000000(秒换算成微秒) # 音频总时长 us(微秒) all_time = bufsize / 0.032 # 写入DAC的次数 bunum = 1 # 要写入的数据 开始位置 bufstart = bufhead # 循环读取 while bufsize: # 读取结束位置,等于读取次数*缓冲区大小 bufend = bufcap * bunum # 当结束位置大于总数据长度的时候,结束位置等于数据最后一位 if bufend > len(buf): bufend = len(buf) # 要写入的数据 bufwrite = buf[bufstart:bufend] # 写入数据 num_written = audio_out.write(bufwrite) # 总大小减去每次写入的大小 bufsize -= len(bufwrite) # 重新设置读取位置,为上次结束后一位 bufstart = bufend # 读取次数 bunum = bunum + 1 # 等待音频播放完 while 1: # 播放结束时间 end_time = time.ticks_us() # 如果当前时间减去开始播放的时间大于音频时长 if (end_time - start_time) > all_time: # 取消初始化 I2S 总线 audio_out.deinit() # 停止等待 break # 播放完毕 # 关闭文件 wav.close() print("OVER")#音频播放完毕,进入下一个天气播报循环 time.sleep(20)
6)服务器内容和科大讯飞的TTS功能,这一个是要在pycharm上面运行的。
# -*- coding:utf-8 -*-## author: iflytek## 本demo测试时运行的环境为:Windows + Python3.7# 本demo测试成功运行时所安装的第三方库及其版本如下:# cffi==1.12.3# gevent==1.4.0# greenlet==0.4.15# pycparser==2.19# six==1.12.0# websocket==0.2.1# websocket-client==0.56.0# 合成小语种需要传输小语种文本、使用小语种发音人vcn、tte=unicode以及修改文本编码方式# 错误码链接:https://www.xfyun.cn/document/error-code (code返回错误码时必看)# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #import websocketimport datetimeimport hashlibimport base64import hmacimport jsonfrom urllib.parse import urlencodeimport timeimport sslfrom wsgiref.handlers import format_date_timefrom datetime import datetimefrom time import mktimeimport _thread as threadimport os#pcm转waveimport waveimport structSTATUS_FIRST_FRAME = 0 # 第一帧的标识STATUS_CONTINUE_FRAME = 1 # 中间帧标识STATUS_LAST_FRAME = 2 # 最后一帧的标识class Ws_Param(object): # 初始化 def __init__(self, APPID, APIKey, APISecret, Text): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.Text = Text # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "x3_xiaoyue","speed":30, "tte": "utf8"} #聆小璇-温柔 x4_lingxiaoxuan_en,聆小瑶-情感 x4_lingxiaoyao_em,四川话x3_yezi_sc,粤语x3_xiaoyue。 self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")} #使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"” #self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")} # 生成url def create_url(self): url = 'wss://tts-api.xfyun.cn/v2/tts' # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/tts " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = url + '?' + urlencode(v) # print("date: ",date) # print("v: ",v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 # print('websocket url :', url) return urldef on_message(ws, message): try: message =json.loads(message) code = message["code"] sid = message["sid"] audio = message["data"]["audio"] audio = base64.b64decode(audio) status = message["data"]["status"] print(message) if status == 2: print("ws is closed") ws.close() if code != 0: errMsg = message["message"] print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) else: with open('./demo.pcm', 'ab') as f: f.write(audio) except Exception as e: print("receive msg,but parse exception:", e) #读取文件def openreadtxt(file_name): data = [] with open(file_name, 'r') as file: for row in file: # 去掉换行符并合并为一个字符串 row_data = ' '.join(row.strip().split()) data.append(row_data) return data# 收到websocket错误的处理def on_error(ws, error): print("### error:", error)# 收到websocket关闭的处理def on_close(ws, close_status, close_msg): print("### closed ###") print("Close status:", close_status) print("Close message:", close_msg)# 收到websocket连接建立的处理def on_open(ws): def run(*args): d = {"common": wsParam.CommonArgs, "business": wsParam.BusinessArgs, "data": wsParam.Data, } d = json.dumps(d) print("------>开始发送文本数据") ws.send(d) if os.path.exists('./demo.pcm'): os.remove('./demo.pcm') thread.start_new_thread(run, ())#转换wavdef pcm_to_wav(pcm_file, wav_file, channels=1, sample_width=2, frame_rate=16000): with open(pcm_file, 'rb') as pcm_data: pcm_data = pcm_data.read() wav = wave.open(wav_file, 'wb') wav.setnchannels(channels) wav.setsampwidth(sample_width) wav.setframerate(frame_rate) wav.writeframes(pcm_data) wav.close()if __name__ == "__main__": # 测试时候在此处正确填写相关信息即可运行 data = openreadtxt('input.txt') text = ''.join(data) # 将列表中的元素连接成一个字符串 print(text) print(data) wsParam = Ws_Param(APPID=' ', APIKey=' ', APISecret=' ', Text=text) websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) pcm_file_path = './demo.pcm' # 你的 PCM 文件路径 wav_file_path = './output.wav' # 保存为 WAV 文件的路径 pcm_to_wav(pcm_file_path, wav_file_path)
1.wifi连接
本地串口的信息显示,可以看到显示wifi成功连接,然后也显示了天气信息,然后是服务器的数据发送返回,然后是下载音频,下载成功,播放音频,播放音频之后会完毕wav文件,然后打印OVER。
2.服务器的访问
这里可以看出,服务器从esp32处获取了一次文件读写,然后又进行了一次音频下载。

3.写入文本
在电脑端口的TTS,讯飞进行的操作,可以看到成功写入了文本,并且音频数据也改变了。
4.附带讯飞端口的TTS执行
5.具体的实物操作
因为是音频,而且我没用这个按键和显示屏幕,所以还是得看视频,见谅。
1)本次因为乐鑫的idf不是很好配置,使用c编程的时候,挺麻烦的,不过有广大群友和大佬们进行了帮助,所以这个问题不是很大。
2)个人原因:不知道为什么我配完IDF,然后配置vscode的时候,怎么样都不成功,反而是elicpse的那个编译器可以正常工作,但是命令行的操作也会报错,最后我也只是跑通了连接wifi,获取心知天气的天气信息,然后在乐鑫官方的TTS例程那里卡住了。
3)因为临时卡住了,加上我使用过arduino来编程,然后学过python,就突发奇想想试一试micropyhon,结果这个确实要方便一些,但是对于很多细节的操作还是不够,可以的话还是建议使用乐鑫的idf和vscode配置去编程,命令行操作也很不错。
4)本次学习收获最大的还是web服务器,因为我的思路,导致了要在服务器上面进行操作,首先声明我不确定乐鑫的TTS是怎么实现的,但是理论上来讲,确实可以把讯飞在pycharm的移植到esp32的micropython里面,但是我没有去做,然后我也不确定行不行得通。
5)web服务器的知识还是很多的,我说实在没办法在文章中三言两语说清楚,而且我之前学习python是做课设,只会使用一些基本的数据处理,像这一次的服务器的知识我是现学的,同时这一次的技术点也在这,学会搭建了服务器之后,就可以实现esp32和pc端直接的互相操作,进而完成一个完整的项目。
6)不建议大家学习micropyhon去编程esp32,虽然乐鑫环境确实不太友好,但是那个是从原理出发的,这个micropython很多东西是做不到像c那样的,还是建议学习乐鑫的官方环境
叠甲:上述全是个人主观感受,不代表我吹什么什么好,也不是我说什么什么不行。每个人感觉不一样,同时这一次活动有很多大佬,他们所做的项目都比我要高级和精细,我只是一个刚开始使用乐鑫idf的一个新手,很多东西都做的不够好,还有很多要学习的地方,希望各位体谅。