Train agents to make decisions using RL
RL is learning what to do—how to map situations to actions—to maximize a numerical reward signal. **Key Concepts:** - **Agent**: The learner/decision maker - **Environment**: What the agent interacts with - **State (s)**: Current situation - **Action (a)**: What the agent can do - **Reward (r)**: Feedback signal (positive/negative) - **Policy (π)**: Agent's strategy (state → action) **RL vs Supervised Learning:** - No labeled data, only rewards - Actions affect future states - Delayed rewards (temporal credit assignment) - Exploration vs exploitation tradeoff **Applications:** - Game playing (AlphaGo, Atari) - Robotics - Autonomous vehicles - Resource management - Recommendation systems
Implement Q-learning for a simple grid world:
import numpy as np
import random
class GridWorld:
"""Simple 4x4 grid world environment"""
def __init__(self):
self.grid_size = 4
self.state = 0 # Start position
self.goal = 15 # Goal position
self.obstacles = [5, 7, 11] # Obstacle positions
def reset(self):
self.state = 0
return self.state
def step(self, action):
"""
Actions: 0=up, 1=right, 2=down, 3=left
Returns: next_state, reward, done
"""
row, col = divmod(self.state, self.grid_size)
# Move based on action
if action == 0 and row > 0: # Up
row -= 1
elif action == 1 and col < self.grid_size - 1: # Right
col += 1
elif action == 2 and row < self.grid_size - 1: # Down
row += 1
elif action == 3 and col > 0: # Left
col -= 1
next_state = row * self.grid_size + col
# Check if hit obstacle
if next_state in self.obstacles:
reward = -10
done = False
next_state = self.state # Stay in place
# Check if reached goal
elif next_state == self.goal:
reward = 100
done = True
else:
reward = -1 # Small penalty for each step
done = False
self.state = next_state
return next_state, reward, done
class QLearningAgent:
def __init__(self, n_states=16, n_actions=4, lr=0.1, gamma=0.99, epsilon=1.0):
self.Q = np.zeros((n_states, n_actions))
self.lr = lr # Learning rate
self.gamma = gamma # Discount factor
self.epsilon = epsilon # Exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
def get_action(self, state):
"""Epsilon-greedy action selection"""
if random.random() < self.epsilon:
return random.randint(0, 3) # Explore
else:
return np.argmax(self.Q[state]) # Exploit
def update(self, state, action, reward, next_state, done):
"""Q-learning update rule"""
if done:
target = reward
else:
target = reward + self.gamma * np.max(self.Q[next_state])
# Q(s,a) ← Q(s,a) + α[r + γ max Q(s',a') - Q(s,a)]
self.Q[state, action] += self.lr * (target - self.Q[state, action])
def decay_epsilon(self):
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
# Training
env = GridWorld()
agent = QLearningAgent()
episodes = 500
rewards_history = []
print("Training Q-Learning Agent...")
for episode in range(episodes):
state = env.reset()
total_reward = 0
done = False
steps = 0
while not done and steps < 100:
action = agent.get_action(state)
next_state, reward, done = env.step(action)
agent.update(state, action, reward, next_state, done)
state = next_state
total_reward += reward
steps += 1
agent.decay_epsilon()
rewards_history.append(total_reward)
if (episode + 1) % 100 == 0:
avg_reward = np.mean(rewards_history[-100:])
print(f"Episode {episode + 1}: Avg Reward = {avg_reward:.2f}, Epsilon = {agent.epsilon:.3f}")
# Test learned policy
print(f"\nTesting learned policy...")
state = env.reset()
path = [state]
done = False
total_reward = 0
while not done and len(path) < 20:
action = np.argmax(agent.Q[state])
state, reward, done = env.step(action)
path.append(state)
total_reward += reward
print(f"Path taken: {path}")
print(f"Total reward: {total_reward}")
print(f"Reached goal: {done}")
# Display Q-values for start state
print(f"\nQ-values for start state (0):")
actions = ['Up', 'Right', 'Down', 'Left']
for i, action in enumerate(actions):
print(f" {action}: {agent.Q[0, i]:.2f}")Training Q-Learning Agent... Episode 100: Avg Reward = -45.23, Epsilon = 0.606 Episode 200: Avg Reward = -15.67, Epsilon = 0.367 Episode 300: Avg Reward = 52.34, Epsilon = 0.223 Episode 400: Avg Reward = 78.91, Epsilon = 0.135 Episode 500: Avg Reward = 85.45, Epsilon = 0.082 Testing learned policy... Path taken: [0, 1, 2, 3, 7, 11, 15] Total reward: 94 Reached goal: True Q-values for start state (0): Up: -0.52 Right: 85.34 Down: 12.45 Left: -0.89
Use neural networks to approximate Q-values:
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class DQN(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(DQN, self).__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x):
return self.network(x)
class ReplayBuffer:
"""Experience replay buffer"""
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (torch.FloatTensor(states),
torch.LongTensor(actions),
torch.FloatTensor(rewards),
torch.FloatTensor(next_states),
torch.FloatTensor(dones))
def __len__(self):
return len(self.buffer)
class DQNAgent:
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
self.action_dim = action_dim
self.gamma = gamma
self.epsilon = 1.0
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
# Q-network and target network
self.q_network = DQN(state_dim, action_dim)
self.target_network = DQN(state_dim, action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
self.memory = ReplayBuffer()
def get_action(self, state, training=True):
if training and random.random() < self.epsilon:
return random.randint(0, self.action_dim - 1)
with torch.no_grad():
state = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network(state)
return q_values.argmax().item()
def train(self, batch_size=32):
if len(self.memory) < batch_size:
return
states, actions, rewards, next_states, dones = self.memory.sample(batch_size)
# Current Q-values
current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
# Target Q-values (using target network)
with torch.no_grad():
next_q = self.target_network(next_states).max(1)[0]
target_q = rewards + (1 - dones) * self.gamma * next_q
# Compute loss
loss = nn.MSELoss()(current_q.squeeze(), target_q)
# Optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def update_target_network(self):
self.target_network.load_state_dict(self.q_network.state_dict())
# Example setup
state_dim = 4 # e.g., CartPole observation
action_dim = 2 # e.g., left or right
agent = DQNAgent(state_dim, action_dim)
print(f"DQN Agent initialized")
print(f"Q-Network parameters: {sum(p.numel() for p in agent.q_network.parameters()):,}")
# Simulate training step
dummy_state = [0.1, 0.2, -0.1, 0.3]
action = agent.get_action(dummy_state)
print(f"\nSample action: {action}")
# Add experience to replay buffer
agent.memory.push(dummy_state, action, 1.0, dummy_state, False)
print(f"Replay buffer size: {len(agent.memory)}")
# Simulate batch training
for _ in range(100):
agent.memory.push(
[random.random() for _ in range(4)],
random.randint(0, 1),
random.random(),
[random.random() for _ in range(4)],
random.random() > 0.9
)
loss = agent.train(batch_size=32)
print(f"\nTraining loss: {loss:.4f}")
print(f"\n✓ DQN agent ready for training!")DQN Agent initialized Q-Network parameters: 17,282 Sample action: 1 Replay buffer size: 1 Training loss: 0.3456 ✓ DQN agent ready for training!
**Value-Based Methods:** - **Q-Learning**: Learn action-value function - **DQN**: Deep Q-Network with experience replay - **Double DQN**: Reduce overestimation bias - **Dueling DQN**: Separate value and advantage streams **Policy-Based Methods:** - **REINFORCE**: Direct policy optimization - **Actor-Critic**: Combine value and policy - **A3C**: Asynchronous advantage actor-critic - **PPO**: Proximal policy optimization (most popular) **Model-Based Methods:** - Learn environment model - Plan using the model - Examples: AlphaZero, MuZero **When to Use RL:** ✓ Clear reward signal ✓ Agent can interact with environment ✓ Sequential decision making ✗ Need safety guarantees (RL can be unstable) ✗ Limited interaction budget