agents/bc_cvae.py (77 lines of code) (raw):

# Copyright 2022 Twitter, Inc. # SPDX-License-Identifier: Apache-2.0 import copy import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from utils.logger import logger from torch.distributions import Distribution, Normal LOG_SIG_MAX = 2 LOG_SIG_MIN = -20 # Vanilla Variational Auto-Encoder class VAE(nn.Module): def __init__(self, state_dim, action_dim, latent_dim, max_action, device, hidden_dim=256): super(VAE, self).__init__() self.e1 = nn.Linear(state_dim + action_dim, hidden_dim) self.e2 = nn.Linear(hidden_dim, hidden_dim) self.mean = nn.Linear(hidden_dim, latent_dim) self.log_std = nn.Linear(hidden_dim, latent_dim) self.d1 = nn.Linear(state_dim + latent_dim, hidden_dim) self.d2 = nn.Linear(hidden_dim, hidden_dim) self.d3 = nn.Linear(hidden_dim, action_dim) self.max_action = max_action self.latent_dim = latent_dim self.device = device def forward(self, state, action): z = F.relu(self.e1(torch.cat([state, action], 1))) z = F.relu(self.e2(z)) mean = self.mean(z) # Clamped for numerical stability log_std = self.log_std(z).clamp(-4, 15) std = torch.exp(log_std) z = mean + std * torch.randn_like(std) u = self.decode(state, z) return u, mean, std def decode(self, state, z=None): # When sampling from the VAE, the latent vector is clipped to [-0.5, 0.5] if z is None: z = torch.randn((state.shape[0], self.latent_dim)).to(self.device).clamp(-0.5, 0.5) a = F.relu(self.d1(torch.cat([state, z], 1))) a = F.relu(self.d2(a)) return self.max_action * torch.tanh(self.d3(a)) def sample(self, state): return self.decode(state) class BC_CVAE(object): def __init__(self, state_dim, action_dim, max_action, device, discount, tau, lr=3e-4, ): latent_dim = action_dim * 2 self.vae = VAE(state_dim, action_dim, latent_dim, max_action, device).to(device) self.vae_optimizer = torch.optim.Adam(self.vae.parameters(), lr=lr) self.max_action = max_action self.action_dim = action_dim self.discount = discount self.tau = tau self.device = device def sample_action(self, state): with torch.no_grad(): state = torch.FloatTensor(state.reshape(1, -1)).to(self.device) action = self.vae.sample(state) return action.cpu().data.numpy().flatten() def train(self, replay_buffer, iterations, batch_size=100): for it in range(iterations): # Sample replay buffer / batch state, action, next_state, reward, not_done = replay_buffer.sample(batch_size) # Variational Auto-Encoder Training recon, mean, std = self.vae(state, action) recon_loss = F.mse_loss(recon, action) KL_loss = -0.5 * (1 + torch.log(std.pow(2)) - mean.pow(2) - std.pow(2)).mean() vae_loss = recon_loss + 0.5 * KL_loss self.vae_optimizer.zero_grad() vae_loss.backward() self.vae_optimizer.step() logger.record_tabular('VAE Loss', vae_loss.cpu().data.numpy()) def save_model(self, dir): torch.save(self.vae.state_dict(), f'{dir}/vae.pth') def load_model(self, dir): self.vae.load_state_dict(torch.load(f'{dir}/vae.pth'))