# Scalar MMSE Examples

Building intuition: $\hat{x} = \mathbb{E}[x|y]$ for different distributions

## Setup and Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import ipywidgets as widgets
from IPython.display import display
import seaborn as sns

# Set styling
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

# Random seed for reproducibility
np.random.seed(42)

## MMSE for Different Joint Distributions

The MMSE estimator $\hat{x} = \mathbb{E}[x|y]$ depends on the joint distribution $p(x,y)$. For Gaussian: linear function. For uniform: piecewise linear. For exponential: nonlinear curve.

Linear MMSE $\hat{x} = ay + b$ always gives the best linear fit regardless of distribution.

In [None]:
class ScalarMMSE:
    """
    Scalar MMSE estimation for various distributions.
    
    Computes E[X|Y=y] analytically when possible,
    numerically otherwise.
    """
    
    def __init__(self, dist_type='gaussian', params=None):
        self.dist_type = dist_type
        self.params = params if params else self._default_params()
        
    def _default_params(self):
        """Default parameters for each distribution type"""
        if self.dist_type == 'gaussian':
            return {'rho': 0.7, 'sigma_x': 1.0, 'sigma_y': 1.0}
        elif self.dist_type == 'uniform':
            return {'a': 0.8}  # y = ax + noise
        elif self.dist_type == 'exponential':
            return {'lambda_x': 1.0, 'lambda_y': 1.5}
        elif self.dist_type == 'mixture':
            return {'p1': 0.3, 'sep': 3.0}
        else:
            return {}
    
    def generate_samples(self, n=1000):
        """Generate (X, Y) samples from joint distribution"""
        if self.dist_type == 'gaussian':
            # Bivariate Gaussian
            rho = self.params['rho']
            cov = [[1, rho], [rho, 1]]
            samples = np.random.multivariate_normal([0, 0], cov, n)
            return samples[:, 0], samples[:, 1]
            
        elif self.dist_type == 'uniform':
            # X uniform, Y = aX + uniform noise
            x = np.random.uniform(-2, 2, n)
            a = self.params['a']
            noise = np.random.uniform(-0.5, 0.5, n)
            y = a * x + noise
            return x, y
            
        elif self.dist_type == 'exponential':
            # Coupled exponentials via copula
            u1 = np.random.uniform(0, 1, n)
            u2 = np.random.uniform(0, 1, n)
            # Clayton copula for positive dependence
            theta = 2.0
            v = u1 * (u2**(-theta/(1+theta)) - 1) + 1
            v = v**(-1/theta)
            # Transform to exponentials
            x = -np.log(1 - u1) / self.params['lambda_x']
            y = -np.log(1 - v) / self.params['lambda_y']
            return x, y
            
        elif self.dist_type == 'mixture':
            # Gaussian mixture with two modes
            p1 = self.params['p1']
            sep = self.params['sep']
            n1 = int(n * p1)
            n2 = n - n1
            
            # Component 1: positive correlation
            cov1 = [[1, 0.8], [0.8, 1]]
            samples1 = np.random.multivariate_normal([-sep/2, -sep/2], cov1, n1)
            
            # Component 2: negative correlation
            cov2 = [[1, -0.8], [-0.8, 1]]
            samples2 = np.random.multivariate_normal([sep/2, sep/2], cov2, n2)
            
            samples = np.vstack([samples1, samples2])
            np.random.shuffle(samples)
            return samples[:, 0], samples[:, 1]
    
    def true_mmse(self, y):
        """Compute true E[X|Y=y] for given y values"""
        y = np.atleast_1d(y)
        
        if self.dist_type == 'gaussian':
            # Analytical: E[X|Y] = ρ(σ_x/σ_y)Y
            rho = self.params['rho']
            return rho * y
            
        elif self.dist_type == 'uniform':
            # For Y = aX + U(-0.5, 0.5), invert the relationship
            a = self.params['a']
            # E[X|Y=y] is piecewise linear
            x_est = y / a  # Central estimate
            # Bounds from uniform distribution
            x_min = (y - 0.5) / a
            x_max = (y + 0.5) / a
            # Clip to feasible region
            x_est = np.clip(x_est, -2, 2)
            return x_est
            
        elif self.dist_type == 'exponential':
            # Numerical approximation via kernel density
            x_samples, y_samples = self.generate_samples(10000)
            result = np.zeros_like(y)
            
            for i, y_val in enumerate(y):
                # Kernel weights
                bandwidth = 0.2
                weights = np.exp(-(y_samples - y_val)**2 / (2 * bandwidth**2))
                weights /= weights.sum()
                result[i] = np.sum(weights * x_samples)
            return result
            
        elif self.dist_type == 'mixture':
            # Numerical approximation
            x_samples, y_samples = self.generate_samples(10000)
            result = np.zeros_like(y)
            
            for i, y_val in enumerate(y):
                bandwidth = 0.3
                weights = np.exp(-(y_samples - y_val)**2 / (2 * bandwidth**2))
                weights /= weights.sum()
                result[i] = np.sum(weights * x_samples)
            return result
    
    def linear_mmse(self, x_samples, y_samples):
        """Compute best linear estimator from samples"""
        cov_xy = np.cov(x_samples, y_samples)[0, 1]
        var_y = np.var(y_samples)
        
        if var_y < 1e-10:
            return lambda y: np.full_like(y, np.mean(x_samples))
        
        a = cov_xy / var_y
        b = np.mean(x_samples) - a * np.mean(y_samples)
        
        return lambda y: a * y + b

## Temperature Sensor Example

From Lecture 3, Part 1: Sensor measures $Y = X + N$ where $X$ is true temperature, $N$ is noise. MMSE estimator depends on prior $p(x)$ and noise distribution $p(n)$.

Gaussian prior + Gaussian noise → Linear MMSE optimal. Non-Gaussian cases → Nonlinear MMSE better.

In [None]:
def temperature_sensor_demo():
    """
    Temperature sensor estimation under different noise models.
    """
    # True temperature distribution (prior)
    # Temperatures in Celsius, centered around 20°C
    n_samples = 2000
    
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    
    # Test three noise models
    noise_models = [
        ('Gaussian', lambda n: np.random.normal(0, 2, n)),
        ('Laplace', lambda n: np.random.laplace(0, 2, n)),
        ('Uniform', lambda n: np.random.uniform(-3, 3, n))
    ]
    
    for idx, (noise_name, noise_gen) in enumerate(noise_models):
        # Generate true temperatures (Gaussian prior)
        x_true = np.random.normal(20, 5, n_samples)  # Mean 20°C, std 5°C
        
        # Add noise
        noise = noise_gen(n_samples)
        y_observed = x_true + noise
        
        # Compute MMSE estimates
        # For Gaussian prior + any noise, use empirical conditional expectation
        y_grid = np.linspace(y_observed.min(), y_observed.max(), 100)
        mmse_estimates = np.zeros_like(y_grid)
        
        for i, y_val in enumerate(y_grid):
            # Kernel density estimation of E[X|Y=y]
            bandwidth = 1.0
            weights = np.exp(-(y_observed - y_val)**2 / (2 * bandwidth**2))
            if weights.sum() > 1e-10:
                weights /= weights.sum()
                mmse_estimates[i] = np.sum(weights * x_true)
            else:
                mmse_estimates[i] = 20  # Prior mean
        
        # Linear MMSE
        cov_xy = np.cov(x_true, y_observed)[0, 1]
        var_y = np.var(y_observed)
        a_linear = cov_xy / var_y
        b_linear = np.mean(x_true) - a_linear * np.mean(y_observed)
        linear_estimates = a_linear * y_grid + b_linear
        
        # Plot data
        ax = axes[0, idx]
        ax.scatter(y_observed[:500], x_true[:500], alpha=0.3, s=10)
        ax.plot(y_grid, mmse_estimates, 'r-', linewidth=2.5, label='MMSE')
        ax.plot(y_grid, linear_estimates, 'b--', linewidth=2, label='Linear MMSE')
        ax.plot(y_grid, y_grid, 'k:', alpha=0.5, label='No processing')
        ax.set_xlabel('Observed Y (°C)')
        ax.set_ylabel('True X (°C)')
        ax.set_title(f'{noise_name} Noise')
        ax.legend()
        ax.grid(True, alpha=0.3)
        
        # Error distributions
        ax = axes[1, idx]
        
        # Compute errors on test set
        test_idx = np.random.choice(n_samples, 500)
        
        # MMSE errors
        mmse_test = np.zeros(500)
        for i, idx_val in enumerate(test_idx):
            y_val = y_observed[idx_val]
            weights = np.exp(-(y_observed - y_val)**2 / (2 * bandwidth**2))
            if weights.sum() > 1e-10:
                weights /= weights.sum()
                mmse_test[i] = np.sum(weights * x_true)
            else:
                mmse_test[i] = 20
        
        mmse_errors = x_true[test_idx] - mmse_test
        linear_test = a_linear * y_observed[test_idx] + b_linear
        linear_errors = x_true[test_idx] - linear_test
        
        ax.hist(mmse_errors, bins=30, alpha=0.5, density=True, color='red', label='MMSE')
        ax.hist(linear_errors, bins=30, alpha=0.5, density=True, color='blue', label='Linear')
        ax.set_xlabel('Estimation Error (°C)')
        ax.set_ylabel('Density')
        ax.set_title(f'MSE: {np.mean(mmse_errors**2):.2f} vs {np.mean(linear_errors**2):.2f}')
        ax.legend()
        ax.grid(True, alpha=0.3)
    
    plt.suptitle('Temperature Sensor: MMSE vs Linear MMSE under Different Noise Models', fontsize=14)
    plt.tight_layout()
    plt.show()
    
    # Summary statistics
    print("Performance Summary:")
    print("Gaussian noise: Linear MMSE is optimal (same as MMSE)")
    print("Heavy-tailed noise: MMSE more robust to outliers")
    print("Bounded noise: MMSE exploits bounded support")

temperature_sensor_demo()

## Interactive Distribution Comparison

Explore how $\mathbb{E}[X|Y=y]$ changes with distribution type. Gaussian → linear. Uniform → piecewise linear. Mixture → multimodal.

Performance gap between MMSE and linear MMSE quantifies the price of restricting to linear estimators.

In [None]:
def interactive_distribution_comparison():
    """
    Interactive exploration of MMSE for different distributions.
    """
    
    # Widget controls
    dist_dropdown = widgets.Dropdown(
        options=['gaussian', 'uniform', 'exponential', 'mixture'],
        value='gaussian',
        description='Distribution:',
        style={'description_width': 'initial'}
    )
    
    param_slider = widgets.FloatSlider(
        value=0.7, min=-0.9, max=0.9, step=0.1,
        description='Correlation:',
        style={'description_width': 'initial'}
    )
    
    n_samples_slider = widgets.IntSlider(
        value=1000, min=500, max=3000, step=250,
        description='# Samples:',
        style={'description_width': 'initial'}
    )
    
    def update_plot(dist_type, correlation, n_samples):
        # Update parameters based on distribution
        if dist_type == 'gaussian':
            params = {'rho': correlation, 'sigma_x': 1.0, 'sigma_y': 1.0}
        elif dist_type == 'uniform':
            params = {'a': 1.5 + correlation}  # Map correlation to slope
        elif dist_type == 'exponential':
            params = {'lambda_x': 1.0, 'lambda_y': 1.5}
        elif dist_type == 'mixture':
            params = {'p1': 0.3 + 0.2*correlation, 'sep': 3.0}
        
        # Create estimator
        estimator = ScalarMMSE(dist_type=dist_type, params=params)
        
        # Generate samples
        x_samples, y_samples = estimator.generate_samples(n_samples)
        
        # Compute estimators
        y_grid = np.linspace(y_samples.min(), y_samples.max(), 200)
        mmse_curve = estimator.true_mmse(y_grid)
        linear_func = estimator.linear_mmse(x_samples, y_samples)
        linear_curve = linear_func(y_grid)
        
        # Create plots
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        
        # Joint distribution
        ax = axes[0, 0]
        ax.scatter(y_samples, x_samples, alpha=0.3, s=10)
        ax.set_xlabel('Y')
        ax.set_ylabel('X')
        ax.set_title(f'Joint Distribution: {dist_type}')
        ax.grid(True, alpha=0.3)
        
        # MMSE functions
        ax = axes[0, 1]
        ax.scatter(y_samples[:200], x_samples[:200], alpha=0.2, s=10, color='gray')
        ax.plot(y_grid, mmse_curve, 'r-', linewidth=2.5, label='MMSE: E[X|Y]')
        ax.plot(y_grid, linear_curve, 'b--', linewidth=2, label='Linear MMSE')
        ax.set_xlabel('Y')
        ax.set_ylabel('Estimate of X')
        ax.set_title('MMSE vs Linear MMSE')
        ax.legend()
        ax.grid(True, alpha=0.3)
        
        # Performance comparison
        ax = axes[0, 2]
        
        # Compute MSE for both estimators
        mmse_estimates = estimator.true_mmse(y_samples)
        linear_estimates = linear_func(y_samples)
        
        mse_mmse = np.mean((x_samples - mmse_estimates)**2)
        mse_linear = np.mean((x_samples - linear_estimates)**2)
        
        bars = ax.bar(['MMSE', 'Linear MMSE'], [mse_mmse, mse_linear], 
                      color=['red', 'blue'], alpha=0.7)
        ax.set_ylabel('MSE')
        ax.set_title('Performance Comparison')
        ax.grid(True, alpha=0.3, axis='y')
        
        for bar, val in zip(bars, [mse_mmse, mse_linear]):
            ax.text(bar.get_x() + bar.get_width()/2, val*1.05, 
                   f'{val:.4f}', ha='center', fontweight='bold')
        
        # Error scatter plot
        ax = axes[1, 0]
        mmse_errors = x_samples - mmse_estimates
        ax.scatter(y_samples, mmse_errors, alpha=0.3, s=10, color='red')
        ax.axhline(0, color='black', linestyle='-', linewidth=0.5)
        ax.set_xlabel('Y')
        ax.set_ylabel('MMSE Error')
        ax.set_title('MMSE Residuals')
        ax.grid(True, alpha=0.3)
        
        # Linear error scatter
        ax = axes[1, 1]
        linear_errors = x_samples - linear_estimates
        ax.scatter(y_samples, linear_errors, alpha=0.3, s=10, color='blue')
        ax.axhline(0, color='black', linestyle='-', linewidth=0.5)
        ax.set_xlabel('Y')
        ax.set_ylabel('Linear MMSE Error')
        ax.set_title('Linear MMSE Residuals')
        ax.grid(True, alpha=0.3)
        
        # Difference visualization
        ax = axes[1, 2]
        difference = mmse_curve - linear_curve
        ax.plot(y_grid, difference, 'g-', linewidth=2.5)
        ax.axhline(0, color='black', linestyle='--', alpha=0.5)
        ax.set_xlabel('Y')
        ax.set_ylabel('MMSE - Linear MMSE')
        ax.set_title('Nonlinearity: E[X|Y] - Linear Approximation')
        ax.grid(True, alpha=0.3)
        ax.fill_between(y_grid, 0, difference, alpha=0.3, color='green')
        
        plt.tight_layout()
        plt.show()
        
        # Print statistics
        improvement = (mse_linear - mse_mmse) / mse_linear * 100
        correlation = np.corrcoef(x_samples, y_samples)[0, 1]
        
        print(f"\n=== {dist_type.upper()} Distribution ===")
        print(f"Correlation ρ(X,Y): {correlation:.4f}")
        print(f"MMSE: {mse_mmse:.6f}")
        print(f"Linear MMSE: {mse_linear:.6f}")
        print(f"Improvement: {improvement:.2f}%")
        
        if dist_type == 'gaussian':
            print("Note: For Gaussian, MMSE = Linear MMSE (optimal)")
        elif dist_type == 'uniform':
            print("Note: Piecewise linear MMSE exploits bounded support")
        elif dist_type == 'mixture':
            print("Note: Multimodal structure requires nonlinear estimator")
    
    # Create interactive widget
    interactive_plot = widgets.interactive(
        update_plot,
        dist_type=dist_dropdown,
        correlation=param_slider,
        n_samples=n_samples_slider
    )
    
    display(interactive_plot)

# Run interactive demo
interactive_distribution_comparison()

## Outlier Effects

MMSE sensitive to outliers in heavy-tailed distributions. Linear MMSE uses all data equally. Robust estimators (median-based) handle outliers better.

Trade-off: Statistical efficiency vs robustness.

In [None]:
def outlier_analysis():
    """
    Compare MMSE robustness to outliers.
    """
    n_samples = 1000
    outlier_fractions = [0, 0.05, 0.1, 0.2]
    
    fig, axes = plt.subplots(2, len(outlier_fractions), figsize=(16, 8))
    
    mse_results = {'mmse': [], 'linear': [], 'robust': []}
    
    for idx, outlier_frac in enumerate(outlier_fractions):
        # Generate base Gaussian data
        x_clean = np.random.normal(0, 1, n_samples)
        noise = np.random.normal(0, 0.3, n_samples)
        y_clean = 0.8 * x_clean + noise
        
        # Add outliers
        n_outliers = int(n_samples * outlier_frac)
        if n_outliers > 0:
            outlier_idx = np.random.choice(n_samples, n_outliers, replace=False)
            y_clean[outlier_idx] += np.random.choice([-1, 1], n_outliers) * np.random.uniform(5, 10, n_outliers)
        
        # Compute estimators
        y_grid = np.linspace(y_clean.min(), y_clean.max(), 200)
        
        # Standard MMSE (kernel density)
        mmse_estimates = np.zeros_like(y_grid)
        for i, y_val in enumerate(y_grid):
            bandwidth = 0.5
            weights = np.exp(-(y_clean - y_val)**2 / (2 * bandwidth**2))
            if weights.sum() > 1e-10:
                weights /= weights.sum()
                mmse_estimates[i] = np.sum(weights * x_clean)
        
        # Linear MMSE
        cov_xy = np.cov(x_clean, y_clean)[0, 1]
        var_y = np.var(y_clean)
        a_linear = cov_xy / var_y if var_y > 1e-10 else 0
        b_linear = np.mean(x_clean) - a_linear * np.mean(y_clean)
        linear_estimates = a_linear * y_grid + b_linear
        
        # Robust estimator (median-based)
        robust_estimates = np.zeros_like(y_grid)
        for i, y_val in enumerate(y_grid):
            # Use robust kernel with bounded influence
            distances = np.abs(y_clean - y_val)
            threshold = np.percentile(distances, 75)
            weights = np.exp(-np.minimum(distances, threshold)**2 / (2 * bandwidth**2))
            if weights.sum() > 1e-10:
                weights /= weights.sum()
                robust_estimates[i] = np.sum(weights * x_clean)
        
        # Plot data
        ax = axes[0, idx]
        normal_idx = np.ones(n_samples, dtype=bool)
        if n_outliers > 0:
            normal_idx[outlier_idx] = False
            ax.scatter(y_clean[outlier_idx], x_clean[outlier_idx], 
                      alpha=0.7, s=30, color='red', marker='x', label='Outliers')
        ax.scatter(y_clean[normal_idx], x_clean[normal_idx], 
                  alpha=0.3, s=10, color='gray', label='Normal')
        ax.plot(y_grid, mmse_estimates, 'b-', linewidth=2, label='MMSE', alpha=0.8)
        ax.plot(y_grid, linear_estimates, 'g--', linewidth=2, label='Linear', alpha=0.8)
        ax.plot(y_grid, robust_estimates, 'r:', linewidth=2, label='Robust', alpha=0.8)
        ax.set_xlabel('Y')
        ax.set_ylabel('X')
        ax.set_title(f'{outlier_frac*100:.0f}% Outliers')
        ax.legend(loc='upper left')
        ax.grid(True, alpha=0.3)
        
        # MSE computation on clean test set
        test_x = np.random.normal(0, 1, 500)
        test_y = 0.8 * test_x + np.random.normal(0, 0.3, 500)
        
        # Interpolate estimators
        from scipy.interpolate import interp1d
        mmse_func = interp1d(y_grid, mmse_estimates, fill_value='extrapolate')
        linear_func = lambda y: a_linear * y + b_linear
        robust_func = interp1d(y_grid, robust_estimates, fill_value='extrapolate')
        
        mse_mmse = np.mean((test_x - mmse_func(test_y))**2)
        mse_linear = np.mean((test_x - linear_func(test_y))**2)
        mse_robust = np.mean((test_x - robust_func(test_y))**2)
        
        mse_results['mmse'].append(mse_mmse)
        mse_results['linear'].append(mse_linear)
        mse_results['robust'].append(mse_robust)
        
        # MSE bars
        ax = axes[1, idx]
        bars = ax.bar(['MMSE', 'Linear', 'Robust'], 
                      [mse_mmse, mse_linear, mse_robust],
                      color=['blue', 'green', 'red'], alpha=0.7)
        ax.set_ylabel('Test MSE')
        ax.set_title('Performance')
        ax.grid(True, alpha=0.3, axis='y')
        
        for bar, val in zip(bars, [mse_mmse, mse_linear, mse_robust]):
            ax.text(bar.get_x() + bar.get_width()/2, val*1.05,
                   f'{val:.4f}', ha='center', fontsize=9)
    
    plt.suptitle('Outlier Robustness Comparison', fontsize=14)
    plt.tight_layout()
    plt.show()
    
    # Summary plot
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot([f*100 for f in outlier_fractions], mse_results['mmse'], 
           'b.-', linewidth=2, markersize=10, label='MMSE')
    ax.plot([f*100 for f in outlier_fractions], mse_results['linear'], 
           'g.--', linewidth=2, markersize=10, label='Linear MMSE')
    ax.plot([f*100 for f in outlier_fractions], mse_results['robust'], 
           'r.:', linewidth=2, markersize=10, label='Robust MMSE')
    ax.set_xlabel('Outlier Percentage (%)')
    ax.set_ylabel('Test MSE')
    ax.set_title('Degradation with Outliers')
    ax.legend()
    ax.grid(True, alpha=0.3)
    plt.show()

outlier_analysis()

## Key Takeaways

1. **MMSE = E[X|Y]** regardless of distribution
2. **Linear MMSE** optimal only for jointly Gaussian
3. **Performance gap** quantifies nonlinearity in E[X|Y]
4. **Robustness** matters for heavy-tailed distributions