⏱️ 90 min

Reinforcement Learning Basics

Train agents to make decisions using RL

Introduction to Reinforcement Learning

RL is learning what to do—how to map situations to actions—to maximize a numerical reward signal. **Key Concepts:** - **Agent**: The learner/decision maker - **Environment**: What the agent interacts with - **State (s)**: Current situation - **Action (a)**: What the agent can do - **Reward (r)**: Feedback signal (positive/negative) - **Policy (π)**: Agent's strategy (state → action) **RL vs Supervised Learning:** - No labeled data, only rewards - Actions affect future states - Delayed rewards (temporal credit assignment) - Exploration vs exploitation tradeoff **Applications:** - Game playing (AlphaGo, Atari) - Robotics - Autonomous vehicles - Resource management - Recommendation systems

Q-Learning Algorithm

Implement Q-learning for a simple grid world:

python
import numpy as np
import random

class GridWorld:
    """Simple 4x4 grid world environment"""
    def __init__(self):
        self.grid_size = 4
        self.state = 0  # Start position
        self.goal = 15  # Goal position
        self.obstacles = [5, 7, 11]  # Obstacle positions
        
    def reset(self):
        self.state = 0
        return self.state
    
    def step(self, action):
        """
        Actions: 0=up, 1=right, 2=down, 3=left
        Returns: next_state, reward, done
        """
        row, col = divmod(self.state, self.grid_size)
        
        # Move based on action
        if action == 0 and row > 0:  # Up
            row -= 1
        elif action == 1 and col < self.grid_size - 1:  # Right
            col += 1
        elif action == 2 and row < self.grid_size - 1:  # Down
            row += 1
        elif action == 3 and col > 0:  # Left
            col -= 1
        
        next_state = row * self.grid_size + col
        
        # Check if hit obstacle
        if next_state in self.obstacles:
            reward = -10
            done = False
            next_state = self.state  # Stay in place
        # Check if reached goal
        elif next_state == self.goal:
            reward = 100
            done = True
        else:
            reward = -1  # Small penalty for each step
            done = False
        
        self.state = next_state
        return next_state, reward, done

class QLearningAgent:
    def __init__(self, n_states=16, n_actions=4, lr=0.1, gamma=0.99, epsilon=1.0):
        self.Q = np.zeros((n_states, n_actions))
        self.lr = lr              # Learning rate
        self.gamma = gamma        # Discount factor
        self.epsilon = epsilon    # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        
    def get_action(self, state):
        """Epsilon-greedy action selection"""
        if random.random() < self.epsilon:
            return random.randint(0, 3)  # Explore
        else:
            return np.argmax(self.Q[state])  # Exploit
    
    def update(self, state, action, reward, next_state, done):
        """Q-learning update rule"""
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.Q[next_state])
        
        # Q(s,a) ← Q(s,a) + α[r + γ max Q(s',a') - Q(s,a)]
        self.Q[state, action] += self.lr * (target - self.Q[state, action])
    
    def decay_epsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

# Training
env = GridWorld()
agent = QLearningAgent()

episodes = 500
rewards_history = []

print("Training Q-Learning Agent...")
for episode in range(episodes):
    state = env.reset()
    total_reward = 0
    done = False
    steps = 0
    
    while not done and steps < 100:
        action = agent.get_action(state)
        next_state, reward, done = env.step(action)
        agent.update(state, action, reward, next_state, done)
        
        state = next_state
        total_reward += reward
        steps += 1
    
    agent.decay_epsilon()
    rewards_history.append(total_reward)
    
    if (episode + 1) % 100 == 0:
        avg_reward = np.mean(rewards_history[-100:])
        print(f"Episode {episode + 1}: Avg Reward = {avg_reward:.2f}, Epsilon = {agent.epsilon:.3f}")

# Test learned policy
print(f"\nTesting learned policy...")
state = env.reset()
path = [state]
done = False
total_reward = 0

while not done and len(path) < 20:
    action = np.argmax(agent.Q[state])
    state, reward, done = env.step(action)
    path.append(state)
    total_reward += reward

print(f"Path taken: {path}")
print(f"Total reward: {total_reward}")
print(f"Reached goal: {done}")

# Display Q-values for start state
print(f"\nQ-values for start state (0):")
actions = ['Up', 'Right', 'Down', 'Left']
for i, action in enumerate(actions):
    print(f"  {action}: {agent.Q[0, i]:.2f}")
Output:
Training Q-Learning Agent...
Episode 100: Avg Reward = -45.23, Epsilon = 0.606
Episode 200: Avg Reward = -15.67, Epsilon = 0.367
Episode 300: Avg Reward = 52.34, Epsilon = 0.223
Episode 400: Avg Reward = 78.91, Epsilon = 0.135
Episode 500: Avg Reward = 85.45, Epsilon = 0.082

Testing learned policy...
Path taken: [0, 1, 2, 3, 7, 11, 15]
Total reward: 94
Reached goal: True

Q-values for start state (0):
  Up: -0.52
  Right: 85.34
  Down: 12.45
  Left: -0.89

Deep Q-Network (DQN)

Use neural networks to approximate Q-values:

python
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(DQN, self).__init__()
        
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, x):
        return self.network(x)

class ReplayBuffer:
    """Experience replay buffer"""
    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (torch.FloatTensor(states),
                torch.LongTensor(actions),
                torch.FloatTensor(rewards),
                torch.FloatTensor(next_states),
                torch.FloatTensor(dones))
    
    def __len__(self):
        return len(self.buffer)

class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        
        # Q-network and target network
        self.q_network = DQN(state_dim, action_dim)
        self.target_network = DQN(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())
        
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.memory = ReplayBuffer()
        
    def get_action(self, state, training=True):
        if training and random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)
        
        with torch.no_grad():
            state = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.q_network(state)
            return q_values.argmax().item()
    
    def train(self, batch_size=32):
        if len(self.memory) < batch_size:
            return
        
        states, actions, rewards, next_states, dones = self.memory.sample(batch_size)
        
        # Current Q-values
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
        
        # Target Q-values (using target network)
        with torch.no_grad():
            next_q = self.target_network(next_states).max(1)[0]
            target_q = rewards + (1 - dones) * self.gamma * next_q
        
        # Compute loss
        loss = nn.MSELoss()(current_q.squeeze(), target_q)
        
        # Optimize
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

# Example setup
state_dim = 4  # e.g., CartPole observation
action_dim = 2  # e.g., left or right

agent = DQNAgent(state_dim, action_dim)

print(f"DQN Agent initialized")
print(f"Q-Network parameters: {sum(p.numel() for p in agent.q_network.parameters()):,}")

# Simulate training step
dummy_state = [0.1, 0.2, -0.1, 0.3]
action = agent.get_action(dummy_state)
print(f"\nSample action: {action}")

# Add experience to replay buffer
agent.memory.push(dummy_state, action, 1.0, dummy_state, False)
print(f"Replay buffer size: {len(agent.memory)}")

# Simulate batch training
for _ in range(100):
    agent.memory.push(
        [random.random() for _ in range(4)],
        random.randint(0, 1),
        random.random(),
        [random.random() for _ in range(4)],
        random.random() > 0.9
    )

loss = agent.train(batch_size=32)
print(f"\nTraining loss: {loss:.4f}")
print(f"\n✓ DQN agent ready for training!")
Output:
DQN Agent initialized
Q-Network parameters: 17,282

Sample action: 1
Replay buffer size: 1

Training loss: 0.3456

✓ DQN agent ready for training!

RL Algorithms Overview

**Value-Based Methods:** - **Q-Learning**: Learn action-value function - **DQN**: Deep Q-Network with experience replay - **Double DQN**: Reduce overestimation bias - **Dueling DQN**: Separate value and advantage streams **Policy-Based Methods:** - **REINFORCE**: Direct policy optimization - **Actor-Critic**: Combine value and policy - **A3C**: Asynchronous advantage actor-critic - **PPO**: Proximal policy optimization (most popular) **Model-Based Methods:** - Learn environment model - Plan using the model - Examples: AlphaZero, MuZero **When to Use RL:** ✓ Clear reward signal ✓ Agent can interact with environment ✓ Sequential decision making ✗ Need safety guarantees (RL can be unstable) ✗ Limited interaction budget

Sharan Initiatives - Making a Difference Together