PyTorch强化学习:不用Gym也能玩转实时游戏AI
2025-03-12 20:29:15
不用 Gym 环境,玩转 PyTorch Sequential 强化学习模型
遇到一个挺有意思的挑战:想用 AI 来玩个游戏,还要是实时的。这直接把 Gym 环境给排除了。
我的想法是,截取游戏画面,预处理一下,然后塞给模型,再根据模型在游戏里的死活来更新它。下面详细聊聊整个过程以及遇到的坑。
问题分析: 为什么不用 Gym ?
Gym 挺好,但它更适合回合制、有明确“结束”状态的任务。实时游戏,比如赛车、射击,状态变化太快,Gym 处理起来有点麻烦:
- 实时性: Gym 的
step()
函数通常是等待动作执行完毕才返回,不够“实时”。 - 状态连续: 实时游戏的状态几乎是无限的,Gym 的离散状态空间不太适用。
- 自定义环境: 不需要那些框架
解决方案:截图 + PyTorch
既然 Gym 不顺手,那就自己动手,丰衣足食。主要思路如下:
- 截图: 用
PIL
库的ImageGrab
抓取屏幕。 - 预处理: 把截图变成模型能“看懂”的格式。
- 模型: PyTorch 走起,搭一个神经网络。
- 训练: 根据游戏反馈(比如是否“死亡”)更新模型。
下面一步步来:
1. 截图:获取游戏画面
from PIL import ImageGrab, Image
import numpy as np
_bbox=(800, 200, 1200, 900) # (L,T,R,B) 屏幕区域
def get_screenshot():
screenshot = ImageGrab.grab(bbox=_bbox)
# 检查是否“死亡”(根据游戏画面判断)
# 例如,红色过多可能表示“死亡”
pixel_data = np.array(screenshot)
red_intensity = np.sum(pixel_data[:,:,0]) / (screenshot.size[0] * screenshot.size[1])
green_intensity = np.sum(pixel_data[:,:,1]) / (screenshot.size[0] * screenshot.size[1])
blue_intensity = np.sum(pixel_data[:,:,2])/(screenshot.size[0]*screenshot.size[1])
is_dead = red_intensity >= 190 and green_intensity < 50 and blue_intensity < 15
return screenshot, is_dead
- 代码解释:
ImageGrab.grab()
: 截取指定区域的屏幕。_bbox
定义了左上角和右下角的坐标。- “死亡”判断:这部分需要根据你的游戏来调整。这里只是一个例子,假设红色过多表示游戏结束。
- 注意事项:
_bbox
要根据你的游戏窗口调整。- “死亡”判断也要根据具体游戏来写。
2. 预处理:让图片“规矩”起来
模型可不喜欢乱七八糟的图片,所以要“收拾”一下:
from torchvision import transforms
preprocess = transforms.Compose([
transforms.Resize(256), # 缩放到 256x256
transforms.CenterCrop(256), # 中心裁剪到 256x256
transforms.ToTensor(), # 转成 Tensor
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # 标准化
])
- 原理:
Resize
、CenterCrop
:统一图片大小,方便后续处理。ToTensor
:把PIL Image
或numpy.ndarray
转成 PyTorch 的Tensor
。Normalize
:标准化,加速模型训练,有时候还能提高精度。
- 数值解释
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
。 这些值是 ImageNet 数据集的均值和标准差,用它们做标准化比较常见。如果你有自己数据集的均值和标准差,用自己的更好.
3. 模型:PyTorch 神经网络
import torch
from torch import nn
model = nn.Sequential(
nn.Conv2d(3, 256, 5), # 卷积层:3通道输入,256个卷积核,大小 5x5
nn.ReLU(), # ReLU 激活
nn.MaxPool2d(2, 2), # 最大池化:2x2 窗口
nn.Conv2d(256, 512, 5), # 卷积层
nn.MaxPool2d(2, 2), # 最大池化
nn.ReLU(), # ReLU 激活
nn.Flatten(), # 展平
nn.Linear(3721*512, 512), #这里3721是错的。 应该是根据图片大小算出来的, 可以看进阶使用技巧, 这里保留原始的错误, 可以算出来是多少。
nn.LeakyReLU(), # LeakyReLU 激活
nn.Linear(512, 512), # 全连接层
nn.LeakyReLU(), # LeakyReLU 激活
nn.Linear(512, 3), # 全连接层:输出3个动作的概率
nn.Softmax(1) # Softmax:把输出转成概率
)
- 原理:
- 卷积层(
Conv2d
):提取图像特征。 - ReLU、LeakyReLU:激活函数,增加非线性。
- 最大池化(
MaxPool2d
):缩小图像尺寸,减少计算量。 - Flatten:把多维数据“拍扁”成一维。
- 全连接层(
Linear
):把特征映射到输出(动作)。 - Softmax:把输出变成概率分布。
- 卷积层(
- 进阶使用技巧
-
上面模型的3721 是通过图片大小计算而来的, 具体怎么做?
在nn.Flatten()
之后的线性层(nn.Linear
)的输入大小, 你必须手动计算输入到全连接层的张量的扁平化大小. 根据您提供的模型和预处理步骤,我们可以逐步计算它:-
初始图像大小 :经过
transforms.Resize(256)
和transforms.CenterCrop(256)
之后,图像的大小为 256x256 像素,具有 3 个通道(RGB)。 -
第一个卷积层 (nn.Conv2d(3, 256, 5)) :
- 输入通道:3
- 输出通道:256
- 卷积核大小:5x5
- Stride(假设为1): (256 - 5) / 1 + 1 = 252
所以,输出大小为 252x252x256
- 第一个最大池化层 (nn.MaxPool2d(2, 2)) :
- 池化窗口:2x2
- Stride:2
输出大小: 252 / 2 = 126
所以, 输出大小为: 126x126x256
- 第二个卷积层 (nn.Conv2d(256, 512, 5)) :
- 输入通道:256
- 输出通道:512
- 卷积核大小: 5x5
- Stride (假设为 1): (126-5)/1+1 = 122.
输出大小:122x122x512
-
第二个最大池化层 (nn.MaxPool2d(2, 2)) :
- 池化窗口: 2x2
- Stride: 2
输出大小: 122 / 2 = 61。
-
输出大小 : 61x61x512
-
扁平化层(nn.Flatten()) : 扁平化后的大小是
61 * 61 * 512 = 1896832
. 所以是 1896832.
修改后的代码为:
-
-
import torch
from torch import nn
model = nn.Sequential(
nn.Conv2d(3, 256, 5), # 卷积层:3通道输入,256个卷积核,大小 5x5
nn.ReLU(), # ReLU 激活
nn.MaxPool2d(2, 2), # 最大池化:2x2 窗口
nn.Conv2d(256, 512, 5), # 卷积层
nn.MaxPool2d(2, 2), # 最大池化
nn.ReLU(), # ReLU 激活
nn.Flatten(), # 展平
nn.Linear(1896832, 512),
nn.LeakyReLU(), # LeakyReLU 激活
nn.Linear(512, 512), # 全连接层
nn.LeakyReLU(), # LeakyReLU 激活
nn.Linear(512, 3), # 全连接层:输出3个动作的概率
nn.Softmax(1) # Softmax:把输出转成概率
)
4. 训练:边玩边学
import torch.optim as optim
from torch.distributions import Categorical
optimizer = optim.Adam(model.parameters(), lr=1e-2) # 优化器
def train_model(model, optimizer, num_episodes=100):
for episode in range(num_episodes):
log_probs = []
rewards = []
state, is_dead = get_screenshot() # 获得截图
while not is_dead:
input_tensor = preprocess(state).unsqueeze(0) # 预处理 + 增加 batch 维度
output = model(input_tensor) # 模型预测
m = Categorical(output) #构建概率分布
action = m.sample() # 根据概率采样
log_prob = m.log_prob(action) # 概率取对数
# 执行动作 (这里需要根据你的游戏来写,比如用 pyautogui 控制鼠标键盘)
# ...
# 下面给出伪代码示例,
# if action.item() == 0: #假设为0时,代表无操作
# pass # do nothing
# elif action.item() == 1: # 左
# pyautogui.press('left') #或者其他的
# elif action.item() == 2: # 右
# pyautogui.press('right')
# 获取新的状态,和是否“死亡”
next_state, is_dead = get_screenshot()
reward = 1 if not is_dead else 0 # 活着就有奖励
log_probs.append(log_prob)
rewards.append(reward)
state = next_state
# 一局游戏结束,更新模型
discounted_rewards = rewards #更复杂的策略可以用discounted_reward.
policy_loss = []
for log_prob, Gt in zip(log_probs, discounted_rewards):
policy_loss.append(-log_prob * Gt)
optimizer.zero_grad() # 梯度清零
policy_loss = torch.cat(policy_loss).sum() # 损失求和
policy_loss.backward() # 反向传播
optimizer.step() # 更新参数
print("Episode:", episode, " policy_loss:", policy_loss.item())
- 代码解释:
optimizer
: 优化器,这里用 Adam,学习率 0.01。- 循环:
- 预处理截图,输入模型,得到动作概率。
- 根据概率 采样 动作(
Categorical
分布)。 - 保存log probability.
- 根据你的游戏执行这个动作,然后获得执行后的奖励(reward),存起来。
- 重复,直到游戏结束(“死亡”)。
- 根据获得的所有reward,以及对应的log probability 来进行反向传播,更新参数。
policy_loss
:策略梯度损失, 我们的目标是让损失越小越好optimizer.zero_grad()
:梯度清零,防止累积。policy_loss.backward()
:反向传播,计算梯度。optimizer.step()
:更新模型参数。
optimizer.step()
如何更新模型?
在 PyTorch 中,optimizer.step()
和模型之间的联系是通过在模型参数上注册优化器来实现的,PyTorch 在底层做了这些事:- 注册参数 :
当你创建优化器时 (optimizer = optim.Adam(model.parameters(), lr=1e-2)
), 你将模型的所有参数 (model.parameters()
) 传递给优化器。这意味着优化器知道它需要优化哪些变量。 - 计算梯度 :
调用.backward()
时, PyTorch 会自动计算损失相对于模型中每个参数的梯度,并将这些梯度存储在每个参数的.grad
属性中。 - 更新权重 : 调用
optimizer.step()
的时候,根据当前使用的优化算法,用.grad
对各个参数做更新。
- 注册参数 :
安全建议
在制作实时AI的时候, 很容易用力过猛, 导致系统崩溃, 需要考虑给程序降降温.
- 限制帧率: 不要让截图和模型预测过于频繁,可以用
time.sleep()
控制一下。 - 检测系统资源消耗 : 时刻检测系统CPU, 显卡的使用情况. 按需调整.
总结
这个项目展示了如何在没有 Gym 的情况下,用 PyTorch 构建一个实时游戏的强化学习模型。核心在于自己处理游戏画面、构建模型、以及实现训练循环。
其中有很多根据具体项目需要修改的地方,例如:如何定义reward,如果根据项目特点调整模型架构。需要耐心和实践精神!