这个难度有些大,有两个policy,一个负责更新策略,另一个负责提供数据,实际这两个policy是一个东西,用policy1跑出一组数据给新的policy2训练,然后policy2跑数据给新的policy3训练,,,,直到policy(N-1)跑数据给新的policyN训练,过程感觉和DQN比较像,但是模型是actor critic 架构,on-policy转换成off-policy,使用剪切策略来限制策略的更新幅度,off-policy的好处是策略更新快,PPO的优化目标是最大化策略的期望回报,同时避免策略更新过大
import gym import torch import torch.nn as nn import torch.optim as optim import numpy as np import pygame import sys from collections import deque# 定义策略网络 class PolicyNetwork(nn.Module):def __init__(self):super(PolicyNetwork, self).__init__()self.fc = nn.Sequential(nn.Linear(4, 2),nn.Tanh(),nn.Linear(2, 2), # CartPole的动作空间为2nn.Softmax(dim=-1))def forward(self, x):return self.fc(x)# 定义值网络 class ValueNetwork(nn.Module):def __init__(self):super(ValueNetwork, self).__init__()self.fc = nn.Sequential(nn.Linear(4, 2),nn.Tanh(),nn.Linear(2, 1))def forward(self, x):return self.fc(x)# 经验回放缓冲区 class RolloutBuffer:def __init__(self):self.states = []self.actions = []self.rewards = []self.dones = []self.log_probs = []def store(self, state, action, reward, done, log_prob):self.states.append(state)self.actions.append(action)self.rewards.append(reward)self.dones.append(done)self.log_probs.append(log_prob)def clear(self):self.states = []self.actions = []self.rewards = []self.dones = []self.log_probs = []def get_batch(self):return (torch.tensor(self.states, dtype=torch.float),torch.tensor(self.actions, dtype=torch.long),torch.tensor(self.rewards, dtype=torch.float),torch.tensor(self.dones, dtype=torch.bool),torch.tensor(self.log_probs, dtype=torch.float))# PPO更新函数 def ppo_update(policy_net, value_net, optimizer_policy, optimizer_value, buffer, epochs=10, gamma=0.99, clip_param=0.2):states, actions, rewards, dones, old_log_probs = buffer.get_batch()returns = []advantages = []G = 0adv = 0dones = dones.to(torch.int)# print(dones)for reward, done, value in zip(reversed(rewards), reversed(dones), reversed(value_net(states))):if done:G = 0adv = 0G = reward + gamma * G #蒙特卡洛回溯G值delta = reward + gamma * value.item() * (1 - done) - value.item() #TD差分# adv = delta + gamma * 0.95 * adv * (1 - done) #adv = delta + adv*(1-done)returns.insert(0, G)advantages.insert(0, adv)returns = torch.tensor(returns, dtype=torch.float) #价值advantages = torch.tensor(advantages, dtype=torch.float)advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) #add baselinefor _ in range(epochs):action_probs = policy_net(states)dist = torch.distributions.Categorical(action_probs)new_log_probs = dist.log_prob(actions)ratio = (new_log_probs - old_log_probs).exp()surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1.0 - clip_param, 1.0 + clip_param) * advantagesactor_loss = -torch.min(surr1, surr2).mean()optimizer_policy.zero_grad()actor_loss.backward()optimizer_policy.step()value_loss = (returns - value_net(states)).pow(2).mean()optimizer_value.zero_grad()value_loss.backward()optimizer_value.step()# 初始化环境和模型 env = gym.make('CartPole-v1') policy_net = PolicyNetwork() value_net = ValueNetwork() optimizer_policy = optim.Adam(policy_net.parameters(), lr=3e-4) optimizer_value = optim.Adam(value_net.parameters(), lr=1e-3) buffer = RolloutBuffer()# Pygame初始化 pygame.init() screen = pygame.display.set_mode((600, 400)) clock = pygame.time.Clock()draw_on = False # 训练循环 state = env.reset() for episode in range(10000): # 训练轮次done = Falsestate = state[0]step= 0while not done:step+=1state_tensor = torch.FloatTensor(state).unsqueeze(0)action_probs = policy_net(state_tensor)dist = torch.distributions.Categorical(action_probs)action = dist.sample()log_prob = dist.log_prob(action)next_state, reward, done, _ ,_ = env.step(action.item())buffer.store(state, action.item(), reward, done, log_prob)state = next_state# 实时显示for event in pygame.event.get():if event.type == pygame.QUIT:pygame.quit()sys.exit()if draw_on:# 清屏并重新绘制 screen.fill((0, 0, 0))cart_x = int(state[0] * 100 + 300) # 位置转换为屏幕坐标pygame.draw.rect(screen, (0, 128, 255), (cart_x, 300, 50, 30))pygame.draw.line(screen, (255, 0, 0), (cart_x + 25, 300), (cart_x + 25 - int(50 * np.sin(state[2])), 300 - int(50 * np.cos(state[2]))), 5)pygame.display.flip()clock.tick(600)if step >10000:draw_on = Trueppo_update(policy_net, value_net, optimizer_policy, optimizer_value, buffer)buffer.clear()state = env.reset()print(f'Episode {episode} completed {step}.')# 结束训练 env.close() pygame.quit()
运行效果