返回

PyTorch强化学习:不用Gym也能玩转实时游戏AI

Ai

不用 Gym 环境,玩转 PyTorch Sequential 强化学习模型

遇到一个挺有意思的挑战:想用 AI 来玩个游戏,还要是实时的。这直接把 Gym 环境给排除了。

我的想法是,截取游戏画面,预处理一下,然后塞给模型,再根据模型在游戏里的死活来更新它。下面详细聊聊整个过程以及遇到的坑。

问题分析: 为什么不用 Gym ?

Gym 挺好,但它更适合回合制、有明确“结束”状态的任务。实时游戏,比如赛车、射击,状态变化太快,Gym 处理起来有点麻烦:

  1. 实时性: Gym 的 step() 函数通常是等待动作执行完毕才返回,不够“实时”。
  2. 状态连续: 实时游戏的状态几乎是无限的,Gym 的离散状态空间不太适用。
  3. 自定义环境: 不需要那些框架

解决方案:截图 + PyTorch

既然 Gym 不顺手,那就自己动手,丰衣足食。主要思路如下:

  1. 截图:PIL 库的 ImageGrab 抓取屏幕。
  2. 预处理: 把截图变成模型能“看懂”的格式。
  3. 模型: PyTorch 走起,搭一个神经网络。
  4. 训练: 根据游戏反馈(比如是否“死亡”)更新模型。

下面一步步来:

1. 截图:获取游戏画面

from PIL import ImageGrab, Image
import numpy as np

_bbox=(800, 200, 1200, 900) # (L,T,R,B) 屏幕区域

def get_screenshot():
    screenshot = ImageGrab.grab(bbox=_bbox)

    # 检查是否“死亡”(根据游戏画面判断)
    # 例如,红色过多可能表示“死亡”
    pixel_data = np.array(screenshot)
    red_intensity = np.sum(pixel_data[:,:,0]) / (screenshot.size[0] * screenshot.size[1])
    green_intensity = np.sum(pixel_data[:,:,1]) / (screenshot.size[0] * screenshot.size[1])
    blue_intensity = np.sum(pixel_data[:,:,2])/(screenshot.size[0]*screenshot.size[1])

    is_dead = red_intensity >= 190 and green_intensity < 50 and blue_intensity < 15

    return screenshot, is_dead
  • 代码解释:
    • ImageGrab.grab(): 截取指定区域的屏幕。_bbox 定义了左上角和右下角的坐标。
    • “死亡”判断:这部分需要根据你的游戏来调整。这里只是一个例子,假设红色过多表示游戏结束。
  • 注意事项:
    • _bbox 要根据你的游戏窗口调整。
    • “死亡”判断也要根据具体游戏来写。

2. 预处理:让图片“规矩”起来

模型可不喜欢乱七八糟的图片,所以要“收拾”一下:

from torchvision import transforms

preprocess = transforms.Compose([
    transforms.Resize(256),         # 缩放到 256x256
    transforms.CenterCrop(256),     # 中心裁剪到 256x256
    transforms.ToTensor(),         # 转成 Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # 标准化
])
  • 原理:
    • ResizeCenterCrop:统一图片大小,方便后续处理。
    • ToTensor:把 PIL Imagenumpy.ndarray 转成 PyTorch 的 Tensor
    • Normalize:标准化,加速模型训练,有时候还能提高精度。
  • 数值解释
    mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]。 这些值是 ImageNet 数据集的均值和标准差,用它们做标准化比较常见。如果你有自己数据集的均值和标准差,用自己的更好.

3. 模型:PyTorch 神经网络

import torch
from torch import nn

model = nn.Sequential(
          nn.Conv2d(3, 256, 5),    # 卷积层:3通道输入,256个卷积核,大小 5x5
          nn.ReLU(),              # ReLU 激活
          nn.MaxPool2d(2, 2),    # 最大池化:2x2 窗口
          nn.Conv2d(256, 512, 5), # 卷积层
          nn.MaxPool2d(2, 2),    # 最大池化
          nn.ReLU(),              # ReLU 激活
          nn.Flatten(),           # 展平
          nn.Linear(3721*512, 512), #这里3721是错的。  应该是根据图片大小算出来的, 可以看进阶使用技巧, 这里保留原始的错误,  可以算出来是多少。
          nn.LeakyReLU(),         # LeakyReLU 激活
          nn.Linear(512, 512),    # 全连接层
          nn.LeakyReLU(),         # LeakyReLU 激活
          nn.Linear(512, 3),      # 全连接层:输出3个动作的概率
          nn.Softmax(1)          # Softmax:把输出转成概率
        )
  • 原理:
    • 卷积层(Conv2d):提取图像特征。
    • ReLU、LeakyReLU:激活函数,增加非线性。
    • 最大池化(MaxPool2d):缩小图像尺寸,减少计算量。
    • Flatten:把多维数据“拍扁”成一维。
    • 全连接层(Linear):把特征映射到输出(动作)。
    • Softmax:把输出变成概率分布。
  • 进阶使用技巧
    • 上面模型的3721 是通过图片大小计算而来的, 具体怎么做?
      nn.Flatten()之后的线性层(nn.Linear)的输入大小, 你必须手动计算输入到全连接层的张量的扁平化大小. 根据您提供的模型和预处理步骤,我们可以逐步计算它:

      1. 初始图像大小 :经过transforms.Resize(256)transforms.CenterCrop(256) 之后,图像的大小为 256x256 像素,具有 3 个通道(RGB)。

      2. 第一个卷积层 (nn.Conv2d(3, 256, 5)) :

      • 输入通道:3
      • 输出通道:256
      • 卷积核大小:5x5
      • Stride(假设为1): (256 - 5) / 1 + 1 = 252
        所以,输出大小为 252x252x256
      1. 第一个最大池化层 (nn.MaxPool2d(2, 2)) :
      • 池化窗口:2x2
      • Stride:2
        输出大小: 252 / 2 = 126
        所以, 输出大小为: 126x126x256
      1. 第二个卷积层 (nn.Conv2d(256, 512, 5)) :
      • 输入通道:256
      • 输出通道:512
      • 卷积核大小: 5x5
      • Stride (假设为 1): (126-5)/1+1 = 122.
        输出大小:122x122x512
      1. 第二个最大池化层 (nn.MaxPool2d(2, 2)) :

        • 池化窗口: 2x2
        • Stride: 2
          输出大小: 122 / 2 = 61。
      2. 输出大小 : 61x61x512

      3. 扁平化层(nn.Flatten()) : 扁平化后的大小是 61 * 61 * 512 = 1896832. 所以是 1896832.
        修改后的代码为:

import torch
from torch import nn

model = nn.Sequential(
          nn.Conv2d(3, 256, 5),    # 卷积层:3通道输入,256个卷积核,大小 5x5
          nn.ReLU(),              # ReLU 激活
          nn.MaxPool2d(2, 2),    # 最大池化:2x2 窗口
          nn.Conv2d(256, 512, 5), # 卷积层
          nn.MaxPool2d(2, 2),    # 最大池化
          nn.ReLU(),              # ReLU 激活
          nn.Flatten(),           # 展平
          nn.Linear(1896832, 512), 
          nn.LeakyReLU(),         # LeakyReLU 激活
          nn.Linear(512, 512),    # 全连接层
          nn.LeakyReLU(),         # LeakyReLU 激活
          nn.Linear(512, 3),      # 全连接层:输出3个动作的概率
          nn.Softmax(1)          # Softmax:把输出转成概率
        )

4. 训练:边玩边学

import torch.optim as optim
from torch.distributions import Categorical

optimizer = optim.Adam(model.parameters(), lr=1e-2) # 优化器

def train_model(model, optimizer, num_episodes=100):
    for episode in range(num_episodes):
        log_probs = []
        rewards = []
        state, is_dead = get_screenshot() # 获得截图

        while not is_dead:
            input_tensor = preprocess(state).unsqueeze(0)  # 预处理 + 增加 batch 维度
            output = model(input_tensor)       # 模型预测
            m = Categorical(output)            #构建概率分布
            action = m.sample()                 # 根据概率采样
            log_prob = m.log_prob(action)      # 概率取对数

            # 执行动作 (这里需要根据你的游戏来写,比如用 pyautogui 控制鼠标键盘)
            # ...
            # 下面给出伪代码示例,
            # if action.item() == 0:  #假设为0时,代表无操作
            #         pass # do nothing
            #  elif action.item() == 1: # 左
            #         pyautogui.press('left') #或者其他的
            #  elif action.item() == 2:  # 右
            #     pyautogui.press('right')

            # 获取新的状态,和是否“死亡”
            next_state, is_dead = get_screenshot()
            
            reward = 1 if not is_dead else 0 # 活着就有奖励

            log_probs.append(log_prob)
            rewards.append(reward)

            state = next_state

        # 一局游戏结束,更新模型
        
        discounted_rewards = rewards #更复杂的策略可以用discounted_reward.

        policy_loss = []
        for log_prob, Gt in zip(log_probs, discounted_rewards):
            policy_loss.append(-log_prob * Gt)

        optimizer.zero_grad()              # 梯度清零
        policy_loss = torch.cat(policy_loss).sum()  # 损失求和
        policy_loss.backward()             # 反向传播
        optimizer.step()                # 更新参数
        print("Episode:", episode, " policy_loss:", policy_loss.item())
  • 代码解释:
    • optimizer: 优化器,这里用 Adam,学习率 0.01。
    • 循环:
      • 预处理截图,输入模型,得到动作概率。
      • 根据概率 采样 动作(Categorical 分布)。
      • 保存log probability.
      • 根据你的游戏执行这个动作,然后获得执行后的奖励(reward),存起来。
      • 重复,直到游戏结束(“死亡”)。
      • 根据获得的所有reward,以及对应的log probability 来进行反向传播,更新参数。
    • policy_loss:策略梯度损失, 我们的目标是让损失越小越好
    • optimizer.zero_grad():梯度清零,防止累积。
    • policy_loss.backward():反向传播,计算梯度。
    • optimizer.step():更新模型参数。
  • optimizer.step() 如何更新模型?
    在 PyTorch 中,optimizer.step() 和模型之间的联系是通过在模型参数上注册优化器来实现的,PyTorch 在底层做了这些事:
    1. 注册参数
      当你创建优化器时 (optimizer = optim.Adam(model.parameters(), lr=1e-2)), 你将模型的所有参数 (model.parameters()) 传递给优化器。这意味着优化器知道它需要优化哪些变量。
    2. 计算梯度
      调用 .backward()时, PyTorch 会自动计算损失相对于模型中每个参数的梯度,并将这些梯度存储在每个参数的 .grad 属性中。
    3. 更新权重 : 调用optimizer.step()的时候,根据当前使用的优化算法,用.grad对各个参数做更新。

安全建议

在制作实时AI的时候, 很容易用力过猛, 导致系统崩溃, 需要考虑给程序降降温.

  • 限制帧率: 不要让截图和模型预测过于频繁,可以用 time.sleep() 控制一下。
  • 检测系统资源消耗 : 时刻检测系统CPU, 显卡的使用情况. 按需调整.

总结

这个项目展示了如何在没有 Gym 的情况下,用 PyTorch 构建一个实时游戏的强化学习模型。核心在于自己处理游戏画面、构建模型、以及实现训练循环。

其中有很多根据具体项目需要修改的地方,例如:如何定义reward,如果根据项目特点调整模型架构。需要耐心和实践精神!