目录
前言
上一章我们了解了如果准备一个可用的 ESP-01S 模块,如果有心的话顺便也了解了一些 AT 指令。现在就让我们尝试用 Pi Pico 和 MicroPython 来向 ESP 模块发送 AT 指令从而完成一些联网任务。
接线
Pico | GPIO | ESP-01S |
---|---|---|
1 | GP0 | RX |
2 | GP1 | TX |
3 | GND | GND |
36 | 3V3 | 3V3 |
36 | 3V3 | EN |
- 其他引脚置空。
- EN 脚作为使能控制引脚,3.3V 表示可用,低电平不可用。对于 01S 来说可以置空,不过 01 和 01S 排版差不多,所以我们直接都连上即可。
使用 machine.UART 收发 ESP 信息
如果上一章做的没错,ESP 模块已经被我们设置成 115200 的频率了。而我们创建串口对象时需要设置一下。
首先导入 machine 库 import machine
然后创建串口对象:
import machine
uart = machine.UART(0, 115200)
创建对象时,第一个参数为 “通道”,我无法解释的很清楚,但它指向了默认的通信引脚
- 通道 0 - 默认 TXD 为 GP0,RXD 为 GP1
- 通道 1 - 默认 TXD 为 GP4,RXD 为 GP5
更多参数请看下面演示代码的运行输出~
接下来我们就可以使用上面创建的 uart
对象来对 ESP 模块发送 AT 指令 uart.write(cmd + "\r\n")
注意要用回车加换行符结尾;或是读取 ESP 返回的数据和接收的数据 uart.read([num])
返回值是二进制数据,正确接收后还需要解码。
下面给出一个 MicroPython 代码,运行在 Pico 中假装是一个串口助手:
import machine
import utime
uart = machine.UART(0, 115200)
utime.sleep(2)
print(f"uart = {uart}")
uart.read()
# 发送指令后查看 ESP 返回的数据
def check_AT_command_output(cmd):
# 发送指令
uart.write(f"{cmd}\r\n")
utime.sleep(1)
# uart.any() 可以用于查看接收缓冲区字节数
print(f"any = {uart.any()}")
out = str()
# 不断读取数据直到获取 OK,因为任意指令正确运行后通常都会返回一些数据,并在最后返回一个 OK
# 有些数据可能会发送比较缓慢,所以需要一直循环,直到 OK 表示一个指令的返回结束
while True:
# read 不写参数表示读取缓冲区中的所有数据
rec = uart.read()
try:
# 如果数据为空则继续读取,不为空则加入 out 变量
if rec:
rec = rec.decode("utf-8")
out += rec
# 有时会接收到一些乱码,先跳过
except UnicodeError:
# out = "编码错误" + str(out)
pass
# 检测到末端的 OK 表示一个返回已经结束了
if out[-4:] == "OK\r\n": break
utime.sleep(1)
# 查看命令对应的返回数据,并将得到的数据返回
print(f"AT cmd [{cmd}] out: [\n{out}]")
return out
# 此函数就是单纯的读取缓冲区所有数据
def read_shuffal():
rec = uart.read()
try:
if rec: return rec.decode("utf-8")
except UnicodeError:
return "decode error"
return "nothing"
# 本程序用法:
# 输入 AT 命令,查看返回值
# 输入 check,查看缓冲区中是否有什么东西
# ctrl+C 停止程序
try:
while True:
send = input("say: ")
if send == "check": print("[\n" + read_shuffal() + "\n]")
else: check_AT_command_output(send)
except KeyboardInterrupt:
print("按键中断")
Web 服务器
虽然一开始就看到了 这篇文章 但是模块完全没调试好,导致这篇文章作为第一阶段的目标被一直搁置到我现在甚至完全理解了所有代码。总之代码参考的是那篇文章里的。
用到的 AT 指令
- AT - 测试 ESP 是否正常
- AT+CWMODE=1 - 设置 ESP 为站点模式(station)
- AT+CIPSTA? - 检查 ESP 的网络连接情况
- AT+CWJAP=”{ssid}”,”{password}” - 如果没有连接 WiFi 那么用此命令尝试连接
- AT+CIPSTATUS - 查询站点当前连接情况
- AT+CIPMUX=1 - 设置 ESP 连接控制为 “多连接”
- AT+CIPSERVER=1,80 - 开启 TCP 服务器,端口号设为 80
- AT+CIPSEND={IPDId},{sendLen} - 声明将要对指定 id 的链接发送数据的长度,并准备开始发送数据
- AT+CIPCLOSE={IPDId} - 关闭指定 id 的 TCP 链接
ESP 控制类
首先我们创建一个 ESP 控制类,方便进行常用的操作:
class ESP01S(object):
# thoroughfare - 通道默认为 0
# baudrate - 频率默认为 115200
def __init__(self, tf=0, br=115200):
self.u = machine.UART(tf, br)
# 发送指令,并获取返回数据
# 由于发送 Web 内容的过程没有返回数据,所以添加一个 noReturn 参数让我们可以选择跳过对返回数据的获取
def send(self, line, noReturn=False):
self.u.write(f"{line}\r\n")
if noReturn: return ""
out = str()
st = utime.time()
while True:
rec = self.u.read(1)
try:
if rec:
rec = rec.decode("utf-8")
out += rec
except UnicodeError:
# 编码错误的情况比较常见,由于影响不大,所以不进行记录或者输出提示
# out = "编码错误" + str(out)
pass
if out[-4:] == "OK\r\n": break
if utime.time() - st > 20:
out += "\n检测超时"
break
print(f"AT cmd [{line}] out: [\n{out}]")
return out
# 客户端向服务器发送请求时,Pico 不向 ESP 发送指令,因此设置一个读取缓存的方法,让我们可以手动查看
def read(self):
rec = self.u.read()
try:
if rec: rec = rec.decode("utf-8")
# else: rec = "没收到东西"
else: return ""
except UnicodeError:
rec = "编码错误" + str(out)
print(f"Some thing in shuffle: [\n{rec}]")
return rec
模块初始化
在这个函数中:
- 使用上面的类生成 esp 对象,并进行测试
- 连接无线网络,并进行测试,如果获取到 ip 为
0.0.0.0
表示当前没有连接到任何 AP,则尝试连接设定的 WiFi。尝试 5 次后报错并结束程序 - 设置 Web 服务器,首先检测连接情况,如果有链接(连接状态 statuId 为 3 )则遍历连接并全部关闭;如果没有连接或者连接状态不是 “服务器” 模式,则进行 TCP 服务器的设置,即
AT+CIPMUX=1; AT+CIPSERVER=1,80
- 最后将设定好的 esp 对象返回到主进程中
def init():
print("------------ ESP8266 Webserver ------------\n")
print(" - 准备ESP01S模块...")
esp = ESP01S()
for i in range(3):
rec = esp.send("AT")
if rec.strip("\r\n")[-2:] == "OK": break
else:
print("准备出错,程序结束")
return
ssid = "要让 ESP 连接的 WiFi 名"
pawd = "对应的 WiFi 密码"
print(" - 准备连接网络 {ssid}")
esp.send("AT+CWMODE=1")
# esp.send("AT+CWQAP")
ipInfo = dict()
for i in range(5):
rec = esp.send("AT+CIPSTA?")
lines = [line.strip('\r') for line in rec.split('\n') if line and line[0] == '+']
for line in lines:
k, w = line.replace("+CIPSTA:", "").split(':')
ipInfo[k] = w.strip("\"")
# print(ipInfo)
if ipInfo['ip'] == ipInfo['gateway'] == ipInfo['netmask'] == "0.0.0.0":
rec = esp.send(f'AT+CWJAP="{ssid}","{pawd}"')
else: break
else:
print("网络连接失败,程序结束")
return
print(" - 设置 Web 服务器")
rec = esp.send("AT+CIPSTATUS")
lines = [line.strip('\r') for line in rec.split('\n') if line]
linkType = 0
statuId = 0
for line in lines:
if "STATUS:" in line:
try:
statuId = int(line.split(':')[1])
except ValueError:
print(f"line = {line}")
if statuId == 3 and "+CIPSTATUS:" in line:
linkId = int(line.split(':')[1].split(',')[0])
esp.send(f"AT+CIPCLOSE={linkId}")
linkType = int(line.split(':')[1].split(',')[-1])
if linkType == 0:
esp.send("AT+CIPMUX=1")
esp.send("AT+CIPSERVER=1,80")
return esp
主循环函数
在这个函数中,很明显最主要的就是一个无限循环,which is 每个单片机程序所必须的部分:
- 不断使用
read
方法查询缓存,+IPD
就是客户端访问服务器时 ESP 会接受到的请求数据的开头,读到之后开始准备返回数据 - 解析请求数据,
+IPD
后面就是连接 id,ESP 根据这个就能对应的返回数据,它的第一行大概长这样+IPD,0,479:GET /favicon.ico HTTP/1.1
- 设定发送列表,之后我们可以写一个 HTML 文件,再通过 file 类来读取,不过这个暂且不提
- 获取要发送的数据长度,注意每一行后面都有一个
\r\n
所以是字符数+2*行数
- 发送将要发送数据的 AT 指令
- 等返回
>
右尖括号,表示 ESP 准备接收发送缓存,发送够指定长的数据后会自动停止发送 - 发送结束后关闭连接,等待下一次客户端的请求
def loop(esp):
while True:
shuffle = esp.read()
if "+IPD" in shuffle:
print(" - 获取到网络访问请求")
IPDId = int(shuffle[shuffle.find("+IPD"):][5])
print(" - 准备发送网页")
httpsends = [
"HTTP/1.1 200 OK",
"Content-Type: text/html",
"Connection: close",
"",
"<!DOCTYPE HTML>",
"<html>",
f"It Works! {utime.time()%1000}",
"</html>",
]
sendLen = len(''.join(httpsends)) + 2*len(httpsends)
esp.send(f"AT+CIPSEND={IPDId},{sendLen}", noReturn=True)
while '>' not in esp.read(): utime.sleep(1)
for line in httpsends:
esp.send(line, noReturn=True)
print(" - 发送结束")
utime.sleep(2)
esp.send(f"AT+CIPCLOSE={IPDId}")
主进程
当我们做好了一切准备工作,再看主进程的时候就会感到神清气爽~
- 执行
main
函数,但是要放在一个异常处理中,因为学习测试阶段我们需要手动停止程序 - 进入
main
函数,首先初始化有可能失败,我们尝试创建 3 次,如果一直不行就检查一下- 接线是否正确
- 连接线是否能用,确实是否能通电且没有接触不良
- 模块是否没有被烧,记得不要接高电压,必须要接 3.3V
- 初始化完成后,放进
loop
函数开始无限循环即可
def main():
for i in range(3):
esp = init()
if esp: break
else:
print("------------ 无法连接模块,请检查模块或连接情况 ------------")
return
loop(esp)
pass
try:
main()
except KeyboardInterrupt:
print("------------ 程序按键中断 ------------")
结尾
把上面的几段代码按顺序放在一起,然后在最上面导入需要的函数库 import machine, utime
即可正常工作。记得不要忘记设定自己的 WiFi。
我很高兴自己终于能够向真正的物联网项目迈进了一步,通过网页来控制单片机是一个非常重要的要求,因为其操作简单,不需要复杂的 APP 且能轻松移植 APP 和小程序,希望大家也都能成功。