EE 541 - Unit 2
Fall 2025
A Paradox
Python is 10-100× slower than C/C++ for raw computation, yet dominates scientific computing. Why?
Design Philosophy
The Two-Language Problem
Traditional approach: prototype in MATLAB/Python, rewrite in C++ for production
Python’s approach: Python for control flow, C for computation
Everything in Python is an object:
Every Object Contains:
Consequences:
5
takes 28 bytes (vs 4 in C)import sys
# Simple integer
x = 42
print(f"Integer 42:")
print(f" Size: {sys.getsizeof(x)} bytes")
print(f" ID: {id(x):#x}")
print(f" Type: {type(x)}")
# List of integers
lst = [1, 2, 3, 4, 5]
print(f"\nList [1,2,3,4,5]:")
print(f" List object: {sys.getsizeof(lst)} bytes")
print(f" Plus 5 integers: {5 * sys.getsizeof(1)} bytes")
print(f" Total: ~{sys.getsizeof(lst) + 5*28} bytes")
# C equivalent would be 20 bytes (5 × 4)
Integer 42:
Size: 28 bytes
ID: 0x10284f4d0
Type: <class 'int'>
List [1,2,3,4,5]:
List object: 104 bytes
Plus 5 integers: 140 bytes
Total: ~244 bytes
This overhead is why NumPy exists - it stores raw C arrays with Python wrapper.
Variables in Python are not storage locations.
Variables are names that refer to objects. Assignment creates a reference, not a copy.
x = [1, 2, 3] # x refers to a list object
y = x # y refers to THE SAME object
y.append(4) # Modifying through y
print(f"x = {x}") # x sees the change!
# To create a copy:
z = x.copy() # or x[:] or list(x)
z.append(5)
print(f"x = {x}, z = {z}") # x unchanged
x = [1, 2, 3, 4]
x = [1, 2, 3, 4], z = [1, 2, 3, 4, 5]
Python determines types at runtime, not compile time.
Static Typing (C/Java)
Compiler knows:
Dynamic Typing (Python)
Runtime must:
a
b
__add__
methodPerformance Impact
import timeit
import operator
# Python function call overhead
def python_add(a, b):
return a + b
# Direct operator
op_add = operator.add
a, b = 5, 10
n = 1000000
t1 = timeit.timeit(lambda: a + b, number=n)
t2 = timeit.timeit(lambda: op_add(a, b), number=n)
t3 = timeit.timeit(lambda: python_add(a, b), number=n)
print(f"Direct +: {t1:.3f}s")
print(f"operator.add: {t2:.3f}s ({t2/t1:.1f}x slower)")
print(f"Function: {t3:.3f}s ({t3/t1:.1f}x slower)")
Direct +: 0.018s
operator.add: 0.023s (1.3x slower)
Function: 0.033s (1.8x slower)
Every for
loop in Python uses the iterator protocol, not index-based access.
What Really Happens
Translates to:
iterator = iter(sequence)
while True:
try:
item = next(iterator)
process(item)
except StopIteration:
break
This Enables:
# Create a custom iterator
class Fibonacci:
def __init__(self, max_n):
self.max_n = max_n
self.n = 0
self.current = 0
self.next_val = 1
def __iter__(self):
return self
def __next__(self):
if self.n >= self.max_n:
raise StopIteration
result = self.current
self.current, self.next_val = \
self.next_val, self.current + self.next_val
self.n += 1
return result
# Use like any sequence
for f in Fibonacci(10):
print(f, end=' ')
print()
# This is why range() is memory efficient
print(f"\nrange(1000000): {sys.getsizeof(range(1000000))} bytes")
0 1 1 2 3 5 8 13 21 34
range(1000000): 48 bytes
Python functions are objects with attributes, not just code blocks.
Functions Are Objects
__name__
, __doc__
)This Enables:
def make_multiplier(n):
"""Factory function creating closures."""
def multiplier(x):
return x * n # n is captured
# Function is an object we can modify
multiplier.factor = n
multiplier.__name__ = f'times_{n}'
return multiplier
times_2 = make_multiplier(2)
times_10 = make_multiplier(10)
print(f"{times_2.__name__}: {times_2(5)}")
print(f"{times_10.__name__}: {times_10(5)}")
print(f"Factors: {times_2.factor}, {times_10.factor}")
# Inspect closure
import inspect
closure_vars = inspect.getclosurevars(times_2)
print(f"Captured: {closure_vars.nonlocals}")
times_2: 10
times_10: 50
Factors: 2, 10
Captured: {'n': 2}
Functions remember their creation environment.
Python resolves names through a hierarchy of namespace dictionaries.
x = "global" # Global namespace
def outer():
x = "enclosing" # Enclosing namespace
def inner():
print(f"Reading x: {x}") # LEGB search finds enclosing
def inner_local():
x = "local" # Creates local binding
print(f"Local x: {x}")
inner()
inner_local()
print(f"Enclosing x: {x}")
outer()
print(f"Global x: {x}")
# Namespace modification
def show_namespace_dict():
local_var = 42
print(f"locals(): {locals()}")
print(f"'local_var' in locals(): {'local_var' in locals()}")
show_namespace_dict()
Reading x: enclosing
Local x: local
Enclosing x: enclosing
Global x: global
locals(): {'local_var': 42}
'local_var' in locals(): True
Resolution Rules:
Binding Behavior:
global
keyword binds to global namespacenonlocal
binds to enclosing namespace# UnboundLocalError example
count = 0
def increment():
# Python sees assignment, creates local slot
# But referenced before assignment
try:
count += 1 # UnboundLocalError
except UnboundLocalError as e:
print(f"Error: {e}")
increment()
Error: cannot access local variable 'count' where it is not associated with a value
Python lists and NumPy arrays have different memory layouts.
import numpy as np
# Memory comparison
py_list = [1, 2, 3, 4, 5]
np_array = np.array([1, 2, 3, 4, 5])
print(f"Python list overhead: {sys.getsizeof(py_list)} bytes")
print(f"NumPy array overhead: {sys.getsizeof(np_array)} bytes")
print(f"NumPy data buffer: {np_array.nbytes} bytes")
# Performance impact
import timeit
lst = list(range(1000))
arr = np.arange(1000)
t_list = timeit.timeit(lambda: sum(lst), number=10000)
t_numpy = timeit.timeit(lambda: arr.sum(), number=10000)
print(f"\nSum 1000 elements:")
print(f"Python list: {t_list:.4f}s")
print(f"NumPy array: {t_numpy:.4f}s ({t_list/t_numpy:.1f}x faster)")
Python list overhead: 104 bytes
NumPy array overhead: 152 bytes
NumPy data buffer: 40 bytes
Sum 1000 elements:
Python list: 0.0297s
NumPy array: 0.0071s (4.2x faster)
Python’s distinction between mutable and immutable types affects program design.
Immutable Types
Mutable Types
# Immutable: operations create new objects
s = "hello"
print(f"Original id: {id(s):#x}")
s = s + " world" # Creates new object
print(f"After +=: {id(s):#x}")
# Mutable: operations modify in place
lst = [1, 2, 3]
original_id = id(lst)
lst.append(4) # Modifies in place
print(f"\nList id before: {original_id:#x}")
print(f"List id after: {id(lst):#x}")
print(f"Same object: {id(lst) == original_id}")
# The danger of mutable defaults
def add_to_list(item, target=[]): # Mutable default argument
target.append(item)
return target
list1 = add_to_list(1)
list2 = add_to_list(2) # Shares same default list
print(f"\nlist1: {list1}")
print(f"list2: {list2}")
print(f"Same object: {list1 is list2}")
Original id: 0x10580ccf0
After +=: 0x13493e0b0
List id before: 0x134a03a40
List id after: 0x134a03a40
Same object: True
list1: [1, 2]
list2: [1, 2]
Same object: True
Exceptions are Python’s control flow mechanism for exceptional cases.
Exceptions Are:
EAFP vs LBYL
Python prefers EAFP:
import timeit
# EAFP vs LBYL performance
d = {str(i): i for i in range(100)}
def eafp_found():
try:
return d['50']
except KeyError:
return None
def lbyl_found():
if '50' in d:
return d['50']
return None
def eafp_not_found():
try:
return d['200']
except KeyError:
return None
def lbyl_not_found():
if '200' in d:
return d['200']
return None
n = 100000
print("Key exists:")
print(f" EAFP: {timeit.timeit(eafp_found, number=n):.3f}s")
print(f" LBYL: {timeit.timeit(lbyl_found, number=n):.3f}s")
print("Key missing:")
print(f" EAFP: {timeit.timeit(eafp_not_found, number=n):.3f}s")
print(f" LBYL: {timeit.timeit(lbyl_not_found, number=n):.3f}s")
Key exists:
EAFP: 0.002s
LBYL: 0.004s
Key missing:
EAFP: 0.009s
LBYL: 0.002s
Understanding Python’s overhead helps write efficient code.
Optimization Strategies:
append = lst.append
# Example: Method lookup caching
data = []
n = 100000
# Slow: lookup append each time
start = timeit.default_timer()
for i in range(n):
data.append(i)
slow_time = timeit.default_timer() - start
# Fast: cache the method
data = []
append = data.append # Cache method lookup
start = timeit.default_timer()
for i in range(n):
append(i)
fast_time = timeit.default_timer() - start
print(f"Normal: {slow_time:.4f}s")
print(f"Cached: {fast_time:.4f}s ({slow_time/fast_time:.2f}x faster)")
Normal: 0.0031s
Cached: 0.0032s (0.99x faster)
Python provides three sequence types with different trade-offs.
Design Consequences:
import sys
# Memory comparison
lst = [1, 2, 3, 4, 5]
tup = (1, 2, 3, 4, 5)
string = "12345"
print("Memory usage:")
print(f" List: {sys.getsizeof(lst)} bytes")
print(f" Tuple: {sys.getsizeof(tup)} bytes")
print(f" String: {sys.getsizeof(string)} bytes")
# Hashability determines dict key eligibility
print("\nCan be dict key:")
for obj, name in [(lst, 'List'), (tup, 'Tuple'), (string, 'String')]:
try:
d = {obj: 'value'}
print(f" {name}: Yes")
except TypeError:
print(f" {name}: No")
Memory usage:
List: 104 bytes
Tuple: 80 bytes
String: 54 bytes
Can be dict key:
List: No
Tuple: Yes
String: Yes
Lists allocate extra space to amortize the cost of growth.
# Observe allocation behavior
import sys
lst = []
sizes_and_caps = []
for i in range(20):
lst.append(i)
# Size includes list object overhead
total_size = sys.getsizeof(lst)
# Approximate capacity from size
capacity = (total_size - sys.getsizeof([])) // 8
sizes_and_caps.append((len(lst), capacity))
# Show growth points
print("Size → Capacity:")
last_cap = 0
for size, cap in sizes_and_caps:
if cap != last_cap:
print(f" {size:2d} → {cap:2d} (grew by {cap - last_cap})")
last_cap = cap
Size → Capacity:
1 → 4 (grew by 4)
5 → 8 (grew by 4)
9 → 16 (grew by 8)
17 → 24 (grew by 8)
Growth Strategy:
When a list exceeds capacity, Python:
Cost Analysis:
Different operations have different costs based on internal structure.
# Demonstrating operation costs
import time
def measure_operation(lst, operation, description):
"""Measure single operation time."""
start = time.perf_counter()
operation(lst)
elapsed = time.perf_counter() - start
return elapsed * 1e6 # Convert to microseconds
# Large list for measurable times
n = 100000
test_list = list(range(n))
# Operations
times = {}
times['append'] = measure_operation(test_list.copy(),
lambda l: l.append(999),
"Append")
times['insert_0'] = measure_operation(test_list.copy(),
lambda l: l.insert(0, 999),
"Insert at 0")
times['pop'] = measure_operation(test_list.copy(),
lambda l: l.pop(),
"Pop from end")
times['pop_0'] = measure_operation(test_list.copy(),
lambda l: l.pop(0),
"Pop from start")
print(f"Operation times (n={n}):")
for op, time in times.items():
print(f" {op:12s}: {time:8.2f} μs")
Operation times (n=100000):
append : 0.62 μs
insert_0 : 30.54 μs
pop : 0.25 μs
pop_0 : 13.92 μs
Dictionaries use open addressing with random probing for collision resolution.
# Dictionary internals
d = {'a': 1, 'b': 2, 'c': 3}
# Show hash values
for key in d:
print(f"hash('{key}') = {hash(key):20d}")
# Dictionary growth
import sys
sizes = []
for i in range(20):
d = {str(j): j for j in range(i)}
sizes.append((i, sys.getsizeof(d)))
print("\nDict size growth:")
last_size = 0
for n, size in sizes:
if size != last_size:
print(f" {n:2d} items: {size:4d} bytes")
last_size = size
hash('a') = -5286209556170500060
hash('b') = -1093043700687098929
hash('c') = 4292675871619449719
Dict size growth:
0 items: 64 bytes
1 items: 184 bytes
6 items: 272 bytes
11 items: 464 bytes
Design Choices:
Consequences:
Understanding how dictionaries resolve collisions and maintain performance.
# Collision demonstration
class SimpleDict:
"""Simplified dict to show collision handling."""
def __init__(self, size=8):
self.size = size
self.keys = [None] * size
self.values = [None] * size
self.used = 0
def _find_slot(self, key):
slot = hash(key) % self.size
perturb = hash(key)
while self.keys[slot] is not None:
if self.keys[slot] == key:
return slot # Found existing
print(f" Collision at slot {slot}")
# Simplified probing
slot = (5 * slot + 1 + perturb) % self.size
perturb >>= 5
return slot # Found empty
def insert(self, key, value):
print(f"Inserting '{key}':")
slot = self._find_slot(key)
print(f" Using slot {slot}")
if self.keys[slot] is None:
self.used += 1
self.keys[slot] = key
self.values[slot] = value
# Demonstrate collisions
d = SimpleDict(size=8)
d.insert('a', 1)
d.insert('i', 2) # May collide depending on hash
Inserting 'a':
Using slot 4
Inserting 'i':
Using slot 6
Strings optimize for immutability with clever memory management.
# String interning behavior
a = "hello"
b = "hello"
c = "hel" + "lo" # Compile-time concatenation
print("Interning of literals:")
print(f" a is b: {a is b}")
print(f" a is c: {a is c}")
# Dynamic strings not automatically interned
d = "hel"
e = d + "lo" # Runtime concatenation
print(f" a is e: {a is e}")
# Force interning
f = sys.intern(e)
print(f" a is f: {a is f}")
# Memory for different encodings
ascii_str = "hello"
unicode_str = "hello 世界"
print(f"\nMemory usage:")
print(f" ASCII: {sys.getsizeof(ascii_str)} bytes")
print(f" Unicode: {sys.getsizeof(unicode_str)} bytes")
Interning of literals:
a is b: True
a is c: True
a is e: False
a is f: True
Memory usage:
ASCII: 54 bytes
Unicode: 90 bytes
Interning Rules:
Python automatically interns:
Benefits:
String Storage:
Building strings efficiently requires understanding concatenation costs.
import io
import timeit
n = 1000
chunk = "x" * 10
# Method 1: Repeated concatenation (BAD)
def build_concat():
s = ""
for _ in range(n):
s += chunk # Creates new string each time
return s
# Method 2: Join (GOOD)
def build_join():
parts = []
for _ in range(n):
parts.append(chunk)
return "".join(parts)
# Method 3: StringIO (GOOD)
def build_io():
buffer = io.StringIO()
for _ in range(n):
buffer.write(chunk)
return buffer.getvalue()
# Compare performance
t1 = timeit.timeit(build_concat, number=100)
t2 = timeit.timeit(build_join, number=100)
t3 = timeit.timeit(build_io, number=100)
print(f"Building {n}-chunk string (100 iterations):")
print(f" Concatenation: {t1:.4f}s")
print(f" Join: {t2:.4f}s ({t1/t2:.1f}x faster)")
print(f" StringIO: {t3:.4f}s ({t1/t3:.1f}x faster)")
Building 1000-chunk string (100 iterations):
Concatenation: 0.0037s
Join: 0.0016s (2.3x faster)
StringIO: 0.0028s (1.3x faster)
Sets use the same hash table technology as dictionaries, optimized for membership testing.
# Set performance demonstration
import random
# Create test data
numbers = list(range(10000))
random.shuffle(numbers)
test_list = numbers[:5000]
test_set = set(test_list)
# Membership testing
lookups = random.sample(range(10000), 100)
import time
# List lookup
start = time.perf_counter()
for x in lookups:
_ = x in test_list
list_time = time.perf_counter() - start
# Set lookup
start = time.perf_counter()
for x in lookups:
_ = x in test_set
set_time = time.perf_counter() - start
print(f"100 lookups in collection of 5000:")
print(f" List: {list_time*1000:.3f} ms")
print(f" Set: {set_time*1000:.3f} ms")
print(f" Speedup: {list_time/set_time:.1f}x")
# Set operations
a = {1, 2, 3, 4, 5}
b = {4, 5, 6, 7, 8}
print(f"\nSet operations:")
print(f" a & b = {a & b}") # Intersection
print(f" a | b = {a | b}") # Union
print(f" a - b = {a - b}") # Difference
print(f" a ^ b = {a ^ b}") # Symmetric difference
100 lookups in collection of 5000:
List: 2.365 ms
Set: 0.023 ms
Speedup: 100.8x
Set operations:
a & b = {4, 5}
a | b = {1, 2, 3, 4, 5, 6, 7, 8}
a - b = {1, 2, 3}
a ^ b = {1, 2, 3, 6, 7, 8}
Set Implementation:
When to use sets:
Memory overhead:
Python’s default copying behavior can lead to unexpected aliasing.
import copy
# Create nested structure
original = [[1, 2], [3, 4], [5, 6]]
# Different copy methods
reference = original
shallow = original.copy() # or list(original) or original[:]
deep = copy.deepcopy(original)
# Modify nested object
original[0].append(3)
print("After modifying original[0]:")
print(f" original: {original}")
print(f" reference: {reference}") # Changed
print(f" shallow: {shallow}") # Changed!
print(f" deep: {deep}") # Unchanged
# Identity checks
print("\nIdentity comparisons:")
print(f" original is reference: {original is reference}")
print(f" original is shallow: {original is shallow}")
print(f" original[0] is shallow[0]: {original[0] is shallow[0]}") # True!
print(f" original[0] is deep[0]: {original[0] is deep[0]}") # False
After modifying original[0]:
original: [[1, 2, 3], [3, 4], [5, 6]]
reference: [[1, 2, 3], [3, 4], [5, 6]]
shallow: [[1, 2, 3], [3, 4], [5, 6]]
deep: [[1, 2], [3, 4], [5, 6]]
Identity comparisons:
original is reference: True
original is shallow: False
original[0] is shallow[0]: True
original[0] is deep[0]: False
import timeit
def add_function(x, y):
return x + y
add_lambda = lambda x, y: x + y
# Direct operation
def test_inline():
total = 0
for i in range(1000):
total += i + 1
return total
# Function call
def test_function():
total = 0
for i in range(1000):
total += add_function(i, 1)
return total
# Lambda call
def test_lambda():
total = 0
for i in range(1000):
total += add_lambda(i, 1)
return total
n = 10000
t_inline = timeit.timeit(test_inline, number=n)
t_func = timeit.timeit(test_function, number=n)
t_lambda = timeit.timeit(test_lambda, number=n)
print(f"1000 additions, {n} iterations:")
print(f" Inline: {t_inline:.3f}s")
print(f" Function: {t_func:.3f}s ({t_func/t_inline:.1f}x)")
print(f" Lambda: {t_lambda:.3f}s ({t_lambda/t_inline:.1f}x)")
1000 additions, 10000 iterations:
Inline: 0.211s
Function: 0.332s (1.6x)
Lambda: 0.346s (1.6x)
Call overhead includes:
Optimization strategies:
@jit
for numerical codedef make_counter(initial=0):
count = initial
def increment(step=1):
nonlocal count
count += step
return count
def get_value():
return count
# Functions share the closure
increment.get = get_value
return increment
# Create closures
counter1 = make_counter()
counter2 = make_counter(100)
print("Separate closures:")
print(f" counter1(): {counter1()}")
print(f" counter1(): {counter1()}")
print(f" counter2(): {counter2()}")
# Inspect closure
import inspect
closure = inspect.getclosurevars(counter1)
print(f"\nClosure variables: {closure.nonlocals}")
# Shared state demonstration
def make_accumulator():
data = []
def add(x):
data.append(x)
return sum(data)
def reset():
data.clear()
add.reset = reset
return add
acc = make_accumulator()
print(f"\nacc(5): {acc(5)}")
print(f"acc(3): {acc(3)}")
Separate closures:
counter1(): 1
counter1(): 2
counter2(): 101
Closure variables: {'count': 2}
acc(5): 5
acc(3): 8
# Performance impact of closures
import timeit
# Global state
global_counter = 0
def increment_global():
global global_counter
global_counter += 1
return global_counter
# Closure state
def make_closure_counter():
counter = 0
def increment():
nonlocal counter
counter += 1
return counter
return increment
closure_inc = make_closure_counter()
# Class state
class ClassCounter:
def __init__(self):
self.counter = 0
def increment(self):
self.counter += 1
return self.counter
class_counter = ClassCounter()
# Measure
n = 100000
t_global = timeit.timeit(increment_global, number=n)
t_closure = timeit.timeit(closure_inc, number=n)
t_class = timeit.timeit(class_counter.increment, number=n)
print(f"\n{n} increments:")
print(f" Global: {t_global:.3f}s")
print(f" Closure: {t_closure:.3f}s")
print(f" Class: {t_class:.3f}s")
100000 increments:
Global: 0.003s
Closure: 0.003s
Class: 0.003s
import time
import functools
def timer(func):
"""Decorator to measure execution time."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__}: {elapsed:.4f}s")
return result
return wrapper
def cache(func):
"""Simple memoization decorator."""
memo = {}
@functools.wraps(func)
def wrapper(*args):
if args in memo:
return memo[args]
result = func(*args)
memo[args] = result
return result
wrapper.cache = memo # Expose cache
return wrapper
@timer
@cache
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
# First call builds cache
result = fibonacci(10)
print(f"fibonacci(10) = {result}")
# Check cache
print(f"Cache size: {len(fibonacci.cache)}")
fibonacci: 0.0000s
fibonacci: 0.0000s
fibonacci: 0.0000s
fibonacci: 0.0000s
fibonacci: 0.0000s
fibonacci: 0.0000s
fibonacci: 0.0000s
fibonacci: 0.0000s
fibonacci: 0.0000s
fibonacci: 0.0000s
fibonacci: 0.0001s
fibonacci: 0.0000s
fibonacci: 0.0001s
fibonacci: 0.0000s
fibonacci: 0.0001s
fibonacci: 0.0000s
fibonacci: 0.0001s
fibonacci: 0.0000s
fibonacci: 0.0001s
fibonacci(10) = 55
Cache size: 11
# Parameterized decorator
def validate_range(min_val, max_val):
def decorator(func):
@functools.wraps(func)
def wrapper(x):
if not min_val <= x <= max_val:
raise ValueError(f"{x} not in [{min_val}, {max_val}]")
return func(x)
return wrapper
return decorator
@validate_range(0, 1)
def probability(p):
return f"P = {p}"
print(probability(0.5))
try:
probability(1.5)
except ValueError as e:
print(f"Error: {e}")
# Stack decorators for composition
def logged(func):
@functools.wraps(func)
def wrapper(*args):
print(f"Calling {func.__name__}{args}")
return func(*args)
return wrapper
@logged
@validate_range(-1, 1)
def cosine(x):
import math
return math.cos(math.pi * x)
result = cosine(0.5)
print(f"cos(0.5π) = {result}")
P = 0.5
Error: 1.5 not in [0, 1]
Calling cosine(0.5,)
cos(0.5π) = 6.123233995736766e-17
import numpy as np
import timeit
# Three abstraction levels
def python_dot(a, b):
"""Pure Python dot product."""
total = 0
for i in range(len(a)):
total += a[i] * b[i]
return total
def numpy_dot(a, b):
"""NumPy dot product."""
return np.dot(a, b)
# Attempt to import numba (may not be available)
try:
from numba import jit
@jit(nopython=True)
def numba_dot(a, b):
"""JIT-compiled dot product."""
total = 0.0
for i in range(len(a)):
total += a[i] * b[i]
return total
has_numba = True
except ImportError:
has_numba = False
numba_dot = None
# Test different sizes
for size in [10, 100, 1000, 10000]:
list_a = list(range(size))
list_b = list(range(size))
arr_a = np.array(list_a, dtype=float)
arr_b = np.array(list_b, dtype=float)
t_python = timeit.timeit(
lambda: python_dot(list_a, list_b), number=100
)
t_numpy = timeit.timeit(
lambda: numpy_dot(arr_a, arr_b), number=100
)
print(f"Size {size:5d}:")
print(f" Python: {t_python*10:.3f} ms")
print(f" NumPy: {t_numpy*10:.3f} ms ({t_python/t_numpy:.0f}x)")
if has_numba and size <= 1000:
t_numba = timeit.timeit(
lambda: numba_dot(arr_a, arr_b), number=100
)
print(f" Numba: {t_numba*10:.3f} ms ({t_python/t_numba:.0f}x)")
Size 10:
Python: 0.000 ms
NumPy: 0.000 ms (1x)
Size 100:
Python: 0.002 ms
NumPy: 0.001 ms (2x)
Size 1000:
Python: 0.026 ms
NumPy: 0.000 ms (57x)
Size 10000:
Python: 0.281 ms
NumPy: 0.002 ms (141x)
Abstraction trade-offs:
Each layer adds overhead but provides:
When to drop down a level:
Vectorization threshold:
import sys
# Memory comparison
def list_squares(n):
"""Returns list of squares."""
return [x**2 for x in range(n)]
def gen_squares(n):
"""Yields squares one at a time."""
for x in range(n):
yield x**2
# Create both
n = 1000000
squares_list = list_squares(n)
squares_gen = gen_squares(n)
print(f"Memory usage:")
print(f" List: {sys.getsizeof(squares_list):,} bytes")
print(f" Gen: {sys.getsizeof(squares_gen):,} bytes")
# Generator expressions
gen_expr = (x**2 for x in range(n))
print(f" Expr: {sys.getsizeof(gen_expr):,} bytes")
# Pipeline example
def read_numbers(filename):
"""Simulate reading numbers from file."""
for i in range(100):
yield i
def process_pipeline():
"""Chain generators for processing."""
numbers = read_numbers("data.txt")
squared = (x**2 for x in numbers)
filtered = (x for x in squared if x % 2 == 0)
result = sum(filtered) # Only here we consume
return result
result = process_pipeline()
print(f"\nPipeline result: {result}")
Memory usage:
List: 8,448,728 bytes
Gen: 208 bytes
Expr: 208 bytes
Pipeline result: 161700
# Infinite sequences
def fibonacci():
"""Infinite Fibonacci generator."""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# Take only what's needed
fib = fibonacci()
first_10 = [next(fib) for _ in range(10)]
print(f"First 10 Fibonacci: {first_10}")
# Generator with state
def running_average():
"""Maintains running average."""
total = 0
count = 0
while True:
value = yield total / count if count else 0
if value is not None:
total += value
count += 1
avg = running_average()
next(avg) # Prime the generator
print(f"\nRunning average:")
for val in [10, 20, 30, 40]:
result = avg.send(val)
print(f" Added {val}, average: {result:.1f}")
# Memory-efficient file processing
def process_large_file(filename):
"""Process file without loading it all."""
with open(filename, 'r') as f:
for line in f: # Generator, not list
if line.strip():
yield line.strip().upper()
First 10 Fibonacci: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
Running average:
Added 10, average: 10.0
Added 20, average: 15.0
Added 30, average: 20.0
Added 40, average: 25.0
# Inheritance approach
class Vector(list):
"""Vector inherits from list."""
def __init__(self, components):
super().__init__(components)
def magnitude(self):
return sum(x**2 for x in self) ** 0.5
def dot(self, other):
return sum(a*b for a, b in zip(self, other))
# Problems with inheritance
v = Vector([3, 4])
print(f"Vector: {v}")
print(f"Magnitude: {v.magnitude()}")
# But inherits unwanted methods
v.reverse() # Modifies in place!
print(f"After reverse: {v}")
v.append(5) # Now 3D?
print(f"After append: {v}")
Vector: [3, 4]
Magnitude: 5.0
After reverse: [4, 3]
After append: [4, 3, 5]
# Composition approach
class Signal:
"""Signal uses composition."""
def __init__(self, data, sample_rate):
self._data = np.array(data)
self.fs = sample_rate
self._filter = None
def __len__(self):
return len(self._data)
def __getitem__(self, idx):
return self._data[idx]
@property
def duration(self):
return len(self._data) / self.fs
def fft(self):
"""Delegate to NumPy."""
return np.fft.fft(self._data)
def resample(self, new_rate):
"""Return new Signal."""
ratio = new_rate / self.fs
new_length = int(len(self._data) * ratio)
new_data = np.interp(
np.linspace(0, len(self._data), new_length),
np.arange(len(self._data)),
self._data
)
return Signal(new_data, new_rate)
# Clean interface
sig = Signal([1, 2, 3, 4], sample_rate=100)
print(f"Duration: {sig.duration}s")
print(f"FFT shape: {sig.fft().shape}")
Duration: 0.04s
FFT shape: (4,)
class Vector:
"""2D vector with operator overloading."""
def __new__(cls, x, y):
"""Called before __init__ to create instance."""
print(f"__new__: Creating {cls.__name__}")
instance = super().__new__(cls)
return instance
def __init__(self, x, y):
"""Initialize after instance exists."""
print(f"__init__: Setting x={x}, y={y}")
self.x = x
self.y = y
def __repr__(self):
"""Developer representation."""
return f"Vector({self.x}, {self.y})"
def __str__(self):
"""User-friendly string."""
return f"<{self.x}, {self.y}>"
# Creation process
v = Vector(3, 4)
print(f"\nCreated: {v}")
print(f"repr: {repr(v)}")
print(f"str: {str(v)}")
# Instance dictionary
print(f"\nInstance dict: {v.__dict__}")
print(f"Class: {v.__class__.__name__}")
__new__: Creating Vector
__init__: Setting x=3, y=4
Created: <3, 4>
repr: Vector(3, 4)
str: <3, 4>
Instance dict: {'x': 3, 'y': 4}
Class: Vector
# Method resolution demonstration
class Base:
def method(self):
return "Base"
def override_me(self):
return "Base version"
class Derived(Base):
def override_me(self):
return "Derived version"
def method(self):
# Call parent version
parent = super().method()
return f"{parent} → Derived"
d = Derived()
print(f"d.override_me(): {d.override_me()}")
print(f"d.method(): {d.method()}")
# MRO (Method Resolution Order)
print(f"\nMRO: {[cls.__name__ for cls in Derived.__mro__]}")
# Bound vs unbound methods
print(f"\nBound method: {d.method}")
print(f"Unbound: {Derived.method}")
print(f"Same function: {d.method.__func__ is Derived.method}")
d.override_me(): Derived version
d.method(): Base → Derived
MRO: ['Derived', 'Base', 'object']
Bound method: <bound method Derived.method of <__main__.Derived object at 0x327e2ee10>>
Unbound: <function Derived.method at 0x327f06480>
Same function: True
class Vector:
def __init__(self, x, y):
self.x = x
self.y = y
def __repr__(self):
return f"Vector({self.x}, {self.y})"
# Arithmetic operators
def __add__(self, other):
if isinstance(other, Vector):
return Vector(self.x + other.x, self.y + other.y)
return NotImplemented
def __mul__(self, scalar):
if isinstance(scalar, (int, float)):
return Vector(self.x * scalar, self.y * scalar)
return NotImplemented
def __rmul__(self, scalar):
"""Right multiplication: scalar * vector"""
return self.__mul__(scalar)
def __neg__(self):
return Vector(-self.x, -self.y)
def __abs__(self):
return (self.x**2 + self.y**2) ** 0.5
# Comparison operators
def __eq__(self, other):
if not isinstance(other, Vector):
return False
return self.x == other.x and self.y == other.y
def __lt__(self, other):
"""Compare by magnitude."""
return abs(self) < abs(other)
# Usage
a = Vector(3, 4)
b = Vector(1, 2)
print(f"a = {a}")
print(f"b = {b}")
print(f"a + b = {a + b}")
print(f"2 * a = {2 * a}")
print(f"-a = {-a}")
print(f"|a| = {abs(a)}")
print(f"a == b: {a == b}")
print(f"a > b: {a > b}")
a = Vector(3, 4)
b = Vector(1, 2)
a + b = Vector(4, 6)
2 * a = Vector(6, 8)
-a = Vector(-3, -4)
|a| = 5.0
a == b: False
a > b: True
# In-place operators
class Matrix:
def __init__(self, data):
self.data = np.array(data, dtype=float)
def __repr__(self):
return f"Matrix({self.data.tolist()})"
def __iadd__(self, other):
"""In-place addition: m += other"""
if isinstance(other, Matrix):
self.data += other.data
elif isinstance(other, (int, float)):
self.data += other
else:
return NotImplemented
return self # Must return self
def __matmul__(self, other):
"""Matrix multiplication: m @ other"""
if isinstance(other, Matrix):
return Matrix(self.data @ other.data)
return NotImplemented
def __getitem__(self, key):
"""Indexing: m[i, j]"""
return self.data[key]
def __setitem__(self, key, value):
"""Assignment: m[i, j] = value"""
self.data[key] = value
# Usage
m1 = Matrix([[1, 2], [3, 4]])
m2 = Matrix([[5, 6], [7, 8]])
print(f"m1 = {m1}")
print(f"m2 = {m2}")
print(f"m1 @ m2 = {m1 @ m2}")
print(f"m1[0, 1] = {m1[0, 1]}")
m1 += 10
print(f"After m1 += 10: {m1}")
m1 = Matrix([[1.0, 2.0], [3.0, 4.0]])
m2 = Matrix([[5.0, 6.0], [7.0, 8.0]])
m1 @ m2 = Matrix([[19.0, 22.0], [43.0, 50.0]])
m1[0, 1] = 2.0
After m1 += 10: Matrix([[11.0, 12.0], [13.0, 14.0]])
class Temperature:
"""Temperature with Celsius/Fahrenheit properties."""
def __init__(self, celsius=0):
self._celsius = celsius
@property
def celsius(self):
"""Getter for Celsius."""
return self._celsius
@celsius.setter
def celsius(self, value):
"""Setter with validation."""
if value < -273.15:
raise ValueError(f"Temperature below absolute zero")
self._celsius = value
@property
def fahrenheit(self):
"""Computed Fahrenheit property."""
return self._celsius * 9/5 + 32
@fahrenheit.setter
def fahrenheit(self, value):
"""Set via Fahrenheit."""
self.celsius = (value - 32) * 5/9
@property
def kelvin(self):
"""Read-only Kelvin property."""
return self._celsius + 273.15
# Usage
temp = Temperature(25)
print(f"Celsius: {temp.celsius}°C")
print(f"Fahrenheit: {temp.fahrenheit}°F")
print(f"Kelvin: {temp.kelvin}K")
temp.fahrenheit = 86
print(f"\nAfter setting 86°F:")
print(f"Celsius: {temp.celsius}°C")
try:
temp.celsius = -300
except ValueError as e:
print(f"\nValidation error: {e}")
Celsius: 25°C
Fahrenheit: 77.0°F
Kelvin: 298.15K
After setting 86°F:
Celsius: 30.0°C
Validation error: Temperature below absolute zero
class ValidatedProperty:
"""Descriptor for validated attributes."""
def __init__(self, min_value=None, max_value=None):
self.min_value = min_value
self.max_value = max_value
def __set_name__(self, owner, name):
self.name = f'_{name}'
def __get__(self, obj, objtype=None):
if obj is None:
return self
return getattr(obj, self.name, None)
def __set__(self, obj, value):
if self.min_value is not None and value < self.min_value:
raise ValueError(f"{value} below minimum {self.min_value}")
if self.max_value is not None and value > self.max_value:
raise ValueError(f"{value} above maximum {self.max_value}")
setattr(obj, self.name, value)
class Signal:
"""Signal with validated properties."""
amplitude = ValidatedProperty(min_value=0, max_value=1)
frequency = ValidatedProperty(min_value=0, max_value=20000)
def __init__(self, amp=0.5, freq=440):
self.amplitude = amp
self.frequency = freq
# Usage
sig = Signal()
print(f"Default: amp={sig.amplitude}, freq={sig.frequency}")
sig.amplitude = 0.8
print(f"Updated: amp={sig.amplitude}")
try:
sig.amplitude = 1.5 # Too high
except ValueError as e:
print(f"Error: {e}")
Default: amp=0.5, freq=440
Updated: amp=0.8
Error: 1.5 above maximum 1
class Counter:
"""Demonstrates class vs instance attributes."""
# Class attribute (shared)
total_instances = 0
default_step = 1
def __init__(self, initial=0):
# Instance attributes (unique)
self.count = initial
Counter.total_instances += 1
def increment(self, step=None):
# Use class default if not specified
if step is None:
step = self.default_step
self.count += step
@classmethod
def get_total(cls):
"""Class method accesses class attributes."""
return cls.total_instances
@staticmethod
def validate_step(step):
"""Static method - no access to instance or class."""
return step > 0
# Create instances
c1 = Counter()
c2 = Counter(10)
c3 = Counter(20)
print(f"Total instances: {Counter.total_instances}")
print(f"c1.count: {c1.count}")
print(f"c2.count: {c2.count}")
# Modify class attribute
Counter.default_step = 5
c1.increment()
c2.increment()
print(f"\nAfter increment with step=5:")
print(f"c1.count: {c1.count}")
print(f"c2.count: {c2.count}")
# Instance shadows class attribute
c1.default_step = 10
print(f"\nc1.default_step: {c1.default_step}")
print(f"c2.default_step: {c2.default_step}")
Total instances: 3
c1.count: 0
c2.count: 10
After increment with step=5:
c1.count: 5
c2.count: 15
c1.default_step: 10
c2.default_step: 5
# Attribute access performance
import timeit
class DataHolder:
class_data = [1, 2, 3, 4, 5]
def __init__(self):
self.instance_data = [1, 2, 3, 4, 5]
def access_instance(self):
return self.instance_data[2]
def access_class(self):
return self.class_data[2]
@classmethod
def access_via_classmethod(cls):
return cls.class_data[2]
obj = DataHolder()
# Measure access times
n = 1000000
t_inst = timeit.timeit(obj.access_instance, number=n)
t_class = timeit.timeit(obj.access_class, number=n)
t_classmethod = timeit.timeit(DataHolder.access_via_classmethod, number=n)
print(f"Access times ({n} iterations):")
print(f" Instance attr: {t_inst:.3f}s")
print(f" Class attr: {t_class:.3f}s")
print(f" Via @classmethod: {t_classmethod:.3f}s")
# Memory implications
import sys
print(f"\nMemory usage:")
print(f" Instance dict: {sys.getsizeof(obj.__dict__)} bytes")
print(f" Class dict: {sys.getsizeof(DataHolder.__dict__)} bytes")
Access times (1000000 iterations):
Instance attr: 0.020s
Class attr: 0.026s
Via @classmethod: 0.024s
Memory usage:
Instance dict: 296 bytes
Class dict: 40 bytes
from abc import ABC, abstractmethod
import math
class Shape(ABC):
"""Abstract base class for shapes."""
@abstractmethod
def area(self):
"""Must be implemented by subclasses."""
pass
@abstractmethod
def perimeter(self):
"""Must be implemented by subclasses."""
pass
def describe(self):
"""Concrete method using abstract methods."""
return f"Area: {self.area():.2f}, Perimeter: {self.perimeter():.2f}"
class Circle(Shape):
def __init__(self, radius):
self.radius = radius
def area(self):
return math.pi * self.radius ** 2
def perimeter(self):
return 2 * math.pi * self.radius
class Rectangle(Shape):
def __init__(self, width, height):
self.width = width
self.height = height
def area(self):
return self.width * self.height
def perimeter(self):
return 2 * (self.width + self.height)
# Cannot instantiate ABC
try:
shape = Shape() # Error!
except TypeError as e:
print(f"Error: {e}")
# Concrete classes work
circle = Circle(5)
rect = Rectangle(3, 4)
print(f"\nCircle: {circle.describe()}")
print(f"Rectangle: {rect.describe()}")
# Check inheritance
print(f"\nisinstance(circle, Shape): {isinstance(circle, Shape)}")
Error: Can't instantiate abstract class Shape with abstract methods area, perimeter
Circle: Area: 78.54, Perimeter: 31.42
Rectangle: Area: 12.00, Perimeter: 14.00
isinstance(circle, Shape): True
from typing import Protocol, runtime_checkable
@runtime_checkable
class Drawable(Protocol):
"""Protocol - no inheritance needed."""
def draw(self) -> None: ...
@runtime_checkable
class Measurable(Protocol):
"""Protocol with property."""
@property
def size(self) -> float: ...
# Classes that match protocol (no inheritance!)
class Square:
def __init__(self, side):
self.side = side
def draw(self):
print(f"Drawing {self.side}x{self.side} square")
@property
def size(self):
return self.side * self.side
class Text:
def __init__(self, content):
self.content = content
def draw(self):
print(f"Drawing: {self.content}")
# Check protocol conformance
sq = Square(5)
txt = Text("Hello")
print("Protocol checks:")
print(f"Square is Drawable: {isinstance(sq, Drawable)}")
print(f"Text is Drawable: {isinstance(txt, Drawable)}")
print(f"Square is Measurable: {isinstance(sq, Measurable)}")
print(f"Text is Measurable: {isinstance(txt, Measurable)}")
# Use in type hints
def render(items: list[Drawable]) -> None:
for item in items:
item.draw()
render([sq, txt]) # Both work!
Protocol checks:
Square is Drawable: True
Text is Drawable: True
Square is Measurable: True
Text is Measurable: False
Drawing 5x5 square
Drawing: Hello
from dataclasses import dataclass, field
from typing import List
import numpy as np
@dataclass
class DataPoint:
"""Simple data container."""
x: float
y: float
label: str = "unlabeled"
metadata: dict = field(default_factory=dict)
def distance_to(self, other):
"""Euclidean distance."""
return ((self.x - other.x)**2 +
(self.y - other.y)**2) ** 0.5
@dataclass(frozen=True)
class ImmutableVector:
"""Frozen dataclass - hashable."""
x: float
y: float
z: float
def magnitude(self):
return (self.x**2 + self.y**2 + self.z**2) ** 0.5
def __add__(self, other):
"""Return new vector (immutable)."""
return ImmutableVector(
self.x + other.x,
self.y + other.y,
self.z + other.z
)
# Usage
p1 = DataPoint(3, 4)
p2 = DataPoint(6, 8, "target")
print(f"p1: {p1}")
print(f"p2: {p2}")
print(f"Distance: {p1.distance_to(p2):.2f}")
# Frozen vector
v1 = ImmutableVector(1, 2, 3)
v2 = ImmutableVector(4, 5, 6)
v3 = v1 + v2
print(f"\nv1 + v2 = {v3}")
print(f"Hashable: {hash(v1)}")
# Can use as dict key
vector_dict = {v1: "first", v2: "second"}
p1: DataPoint(x=3, y=4, label='unlabeled', metadata={})
p2: DataPoint(x=6, y=8, label='target', metadata={})
Distance: 5.00
v1 + v2 = ImmutableVector(x=5, y=7, z=9)
Hashable: 529344067295497451
@dataclass(order=True)
class Measurement:
"""Sortable measurement with validation."""
timestamp: float = field(compare=True)
value: float = field(compare=False)
sensor_id: str = field(default="unknown", compare=False)
def __post_init__(self):
"""Validation after initialization."""
if self.value < 0:
raise ValueError(f"Negative value: {self.value}")
if self.timestamp < 0:
raise ValueError(f"Invalid timestamp: {self.timestamp}")
# Sorting by timestamp
measurements = [
Measurement(100.5, 23.4, "A1"),
Measurement(99.2, 25.1, "A2"),
Measurement(101.3, 22.8, "A1"),
]
sorted_measurements = sorted(measurements)
print("Sorted by timestamp:")
for m in sorted_measurements:
print(f" t={m.timestamp}: {m.value}")
# Performance comparison
from dataclasses import dataclass
import sys
@dataclass(slots=True)
class SlottedPoint:
x: float
y: float
@dataclass
class RegularPoint:
x: float
y: float
sp = SlottedPoint(1, 2)
rp = RegularPoint(1, 2)
print(f"\nMemory usage:")
print(f" With slots: {sys.getsizeof(sp)} bytes")
print(f" Regular: {sys.getsizeof(rp) + sys.getsizeof(rp.__dict__)} bytes")
Sorted by timestamp:
t=99.2: 25.1
t=100.5: 23.4
t=101.3: 22.8
Memory usage:
With slots: 48 bytes
Regular: 352 bytes
NumPy solves Python’s fundamental limitation: every operation triggers expensive interpreter overhead.
import numpy as np
import timeit
# Demonstrate the performance difference
def python_add(list1, list2):
"""Pure Python element-wise addition."""
result = []
for i in range(len(list1)):
result.append(list1[i] + list2[i])
return result
def numpy_add(arr1, arr2):
"""NumPy vectorized addition."""
return arr1 + arr2
# Test data
size = 100000
py_list1 = list(range(size))
py_list2 = list(range(size, size*2))
np_arr1 = np.array(py_list1)
np_arr2 = np.array(py_list2)
# Benchmark
t_python = timeit.timeit(
lambda: python_add(py_list1, py_list2),
number=10
)
t_numpy = timeit.timeit(
lambda: numpy_add(np_arr1, np_arr2),
number=10
)
print(f"Adding {size:,} numbers (10 iterations):")
print(f" Python lists: {t_python:.3f}s")
print(f" NumPy arrays: {t_numpy:.3f}s")
print(f" Speedup: {t_python/t_numpy:.0f}x")
# Show function call overhead
python_ops = size * 4 # loop, 2 indexes, 1 addition per iteration
numpy_ops = 1 # single function call
print(f"\nPython operations: {python_ops:,}")
print(f"NumPy operations: {numpy_ops}")
print(f"Overhead reduction: {python_ops/numpy_ops:.0f}x")
Adding 100,000 numbers (10 iterations):
Python lists: 0.024s
NumPy arrays: 0.000s
Speedup: 62x
Python operations: 400,000
NumPy operations: 1
Overhead reduction: 400000x
The NumPy Solution:
Cost of flexibility:
When NumPy wins:
NumPy arrays are sophisticated objects that separate metadata from data, enabling powerful features like views and broadcasting.
import numpy as np
import sys
# Examine array internals
arr = np.arange(12, dtype=np.float64).reshape(3, 4)
print("Array structure analysis:")
print(f" Array object size: {sys.getsizeof(arr)} bytes")
print(f" Data buffer size: {arr.nbytes} bytes")
print(f" Shape: {arr.shape}")
print(f" Strides: {arr.strides}") # Bytes to next element in each dimension
print(f" Data type: {arr.dtype}")
print(f" Item size: {arr.itemsize} bytes")
# Memory layout inspection
print(f"\nMemory layout:")
print(f" C-contiguous: {arr.flags['C_CONTIGUOUS']}")
print(f" F-contiguous: {arr.flags['F_CONTIGUOUS']}")
print(f" Owns data: {arr.flags['OWNDATA']}")
# Compare with Python list
py_list = arr.tolist()
print(f"\nMemory comparison for 12 elements:")
print(f" NumPy array: {sys.getsizeof(arr) + arr.nbytes} bytes total")
print(f" Python list: {sys.getsizeof(py_list)} bytes (list)")
# Calculate Python list element overhead
element_overhead = sum(sys.getsizeof(x) for row in py_list for x in row)
print(f" Python objects: {element_overhead} bytes (elements)")
print(f" Python total: {sys.getsizeof(py_list) + element_overhead} bytes")
# Show memory addressing
print(f"\nMemory addressing:")
print(f" Base address: {arr.__array_interface__['data'][0]:#x}")
print(f" Element [0,0] offset: 0 bytes")
print(f" Element [0,1] offset: {arr.strides[1]} bytes")
print(f" Element [1,0] offset: {arr.strides[0]} bytes")
# Demonstrate stride calculation
row, col = 1, 2
offset = row * arr.strides[0] + col * arr.strides[1]
print(f" Element [1,2] offset: {offset} bytes → value {arr[1,2]}")
Array structure analysis:
Array object size: 128 bytes
Data buffer size: 96 bytes
Shape: (3, 4)
Strides: (32, 8)
Data type: float64
Item size: 8 bytes
Memory layout:
C-contiguous: True
F-contiguous: False
Owns data: False
Memory comparison for 12 elements:
NumPy array: 224 bytes total
Python list: 80 bytes (list)
Python objects: 288 bytes (elements)
Python total: 368 bytes
Memory addressing:
Base address: 0x600003cdc1e0
Element [0,0] offset: 0 bytes
Element [0,1] offset: 8 bytes
Element [1,0] offset: 32 bytes
Element [1,2] offset: 48 bytes → value 6.0
Array metadata enables:
Array element ordering in memory affects performance significantly due to CPU cache behavior.
import numpy as np
import timeit
# Create arrays with different memory layouts
size = 1000
c_array = np.random.rand(size, size) # C-order (default)
f_array = np.asfortranarray(c_array) # F-order copy
print("Memory layout comparison:")
print(f"C-order shape: {c_array.shape}, strides: {c_array.strides}")
print(f"F-order shape: {f_array.shape}, strides: {f_array.strides}")
print(f"C-order flags: C={c_array.flags.c_contiguous}, F={c_array.flags.f_contiguous}")
print(f"F-order flags: C={f_array.flags.c_contiguous}, F={f_array.flags.f_contiguous}")
# Row access performance (C-order wins)
def sum_rows(arr):
return [np.sum(arr[i, :]) for i in range(100)]
# Column access performance (F-order wins)
def sum_cols(arr):
return [np.sum(arr[:, j]) for j in range(100)]
t_c_rows = timeit.timeit(lambda: sum_rows(c_array), number=100)
t_f_rows = timeit.timeit(lambda: sum_rows(f_array), number=100)
t_c_cols = timeit.timeit(lambda: sum_cols(c_array), number=100)
t_f_cols = timeit.timeit(lambda: sum_cols(f_array), number=100)
print(f"\nRow access performance (100 iterations):")
print(f" C-order: {t_c_rows:.3f}s")
print(f" F-order: {t_f_rows:.3f}s ({t_f_rows/t_c_rows:.1f}x slower)")
print(f"\nColumn access performance (100 iterations):")
print(f" C-order: {t_c_cols:.3f}s")
print(f" F-order: {t_f_cols:.3f}s ({t_c_cols/t_f_cols:.1f}x faster)")
Memory layout comparison:
C-order shape: (1000, 1000), strides: (8000, 8)
F-order shape: (1000, 1000), strides: (8, 8000)
C-order flags: C=True, F=False
F-order flags: C=False, F=True
Row access performance (100 iterations):
C-order: 0.015s
F-order: 0.017s (1.1x slower)
Column access performance (100 iterations):
C-order: 0.016s
F-order: 0.014s (1.1x faster)
Memory layout matters because:
C-order (row-major):
F-order (column-major):
Performance implications:
NumPy operations can either create views (sharing data) or copies (duplicating data). Understanding the difference prevents bugs and optimizes memory usage.
import numpy as np
# Create original array
original = np.arange(12).reshape(3, 4)
print("Original array:")
print(original)
# View operations (share data)
view_reshaped = original.reshape(4, 3)
view_slice = original[1:, :]
view_transpose = original.T
print(f"\nView operations (share data):")
print(f" original.base is None: {original.base is None}")
print(f" view_reshaped.base is original: {view_reshaped.base is original}")
print(f" view_slice.base is original: {view_slice.base is original}")
print(f" view_transpose.base is original: {view_transpose.base is original}")
# Test sharing - modify through view
original[0, 0] = 999
print(f"\nAfter setting original[0,0] = 999:")
print(f" original[0,0] = {original[0,0]}")
print(f" view_reshaped[0,0] = {view_reshaped[0,0]}") # Same memory!
# Copy operations (independent data)
copy_array = original.copy()
copy_flatten = original.flatten() # Unlike ravel(), always copies
print(f"\nCopy operations (independent data):")
print(f" copy_array.base is None: {copy_array.base is None}")
print(f" copy_flatten.base is None: {copy_flatten.base is None}")
# Test independence
original[1, 1] = 888
print(f"\nAfter setting original[1,1] = 888:")
print(f" original[1,1] = {original[1,1]}")
print(f" copy_array[1,1] = {copy_array[1,1]}") # Independent!
# Memory usage comparison
import sys
print(f"\nMemory usage:")
print(f" Original: {original.nbytes} bytes")
print(f" View: {view_reshaped.nbytes} bytes data (shares with original)")
print(f" Copy: {copy_array.nbytes} bytes (independent)")
print(f" Total with view: ~{original.nbytes} bytes")
print(f" Total with copy: ~{original.nbytes + copy_array.nbytes} bytes")
# Dangerous view scenario
def demonstrate_view_trap():
"""Common mistake with views."""
data = np.arange(10)
subset = data[::2] # Every other element (view)
# Modify original
data.fill(0)
return subset # Returns modified data!
result = demonstrate_view_trap()
print(f"\nView trap result: {result}") # All zeros, not original values!
Original array:
[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
View operations (share data):
original.base is None: False
view_reshaped.base is original: False
view_slice.base is original: False
view_transpose.base is original: False
After setting original[0,0] = 999:
original[0,0] = 999
view_reshaped[0,0] = 999
Copy operations (independent data):
copy_array.base is None: True
copy_flatten.base is None: True
After setting original[1,1] = 888:
original[1,1] = 888
copy_array[1,1] = 5
Memory usage:
Original: 96 bytes
View: 96 bytes data (shares with original)
Copy: 96 bytes (independent)
Total with view: ~96 bytes
Total with copy: ~192 bytes
View trap result: [0 0 0 0 0]
View operations (share data):
arr[start:stop]
arr.reshape()
arr.T
arr[::2]
Copy operations (new data):
arr.copy()
arr[[1,3,5]]
arr[arr > 0]
flatten()
, ravel()
(conditionally)Broadcasting enables operations between arrays of different shapes without explicit loops or memory duplication.
import numpy as np
# Broadcasting examples with shape analysis
def show_broadcast(a, b, operation_name):
"""Demonstrate broadcasting between two arrays."""
print(f"\n{operation_name}:")
print(f" Array A shape: {a.shape}")
# Handle scalar case
if np.isscalar(b):
print(f" Array B: scalar ({b})")
b_bytes = 8 # Approximate scalar size
else:
print(f" Array B shape: {b.shape}")
b_bytes = b.nbytes
try:
result = a + b
print(f" Result shape: {result.shape}")
print(f" Memory usage: A={a.nbytes}B, B={b_bytes}B, Result={result.nbytes}B")
return result
except ValueError as e:
print(f" Error: {e}")
return None
# Example 1: Scalar broadcasting
a1 = np.array([[1, 2, 3], [4, 5, 6]])
b1 = 10
result1 = show_broadcast(a1, b1, "Matrix + Scalar")
# Example 2: Vector to matrix
a2 = np.array([[1, 2, 3, 4],
[5, 6, 7, 8],
[9, 10, 11, 12]])
b2 = np.array([100, 200, 300, 400])
result2 = show_broadcast(a2, b2, "Matrix + Row Vector")
# Example 3: Column vector to matrix
a3 = a2 # Same matrix
b3 = np.array([[10], [20], [30]]) # Column vector
result3 = show_broadcast(a3, b3, "Matrix + Column Vector")
# Example 4: Two vectors creating matrix
a4 = np.array([[1], [2], [3]]) # (3,1)
b4 = np.array([10, 20, 30, 40]) # (4,)
result4 = show_broadcast(a4, b4, "Column Vector + Row Vector")
# Example 5: Incompatible shapes
a5 = np.array([[1, 2, 3], [4, 5, 6]]) # (2,3)
b5 = np.array([1, 2]) # (2,)
result5 = show_broadcast(a5, b5, "Incompatible Shapes")
# Memory efficiency demonstration
print(f"\nMemory efficiency:")
large_matrix = np.random.rand(1000, 1000)
small_vector = np.random.rand(1000)
# Without broadcasting (manual)
manual_result = np.zeros_like(large_matrix)
for i in range(1000):
manual_result[i, :] = large_matrix[i, :] + small_vector
# With broadcasting
broadcast_result = large_matrix + small_vector
print(f" Large matrix: {large_matrix.nbytes / 1024**2:.1f} MB")
print(f" Small vector: {small_vector.nbytes / 1024:.1f} KB")
print(f" Broadcasting creates no intermediate arrays")
print(f" Manual approach would need temporary: {manual_result.nbytes / 1024**2:.1f} MB")
# Performance comparison
import timeit
t_manual = timeit.timeit(lambda: np.array([large_matrix[i, :] + small_vector for i in range(10)]), number=100)
t_broadcast = timeit.timeit(lambda: large_matrix[:10, :] + small_vector, number=100)
print(f"\nPerformance (10 rows, 100 iterations):")
print(f" Manual loop: {t_manual:.3f}s")
print(f" Broadcasting: {t_broadcast:.3f}s ({t_manual/t_broadcast:.0f}x faster)")
Matrix + Scalar:
Array A shape: (2, 3)
Array B: scalar (10)
Result shape: (2, 3)
Memory usage: A=48B, B=8B, Result=48B
Matrix + Row Vector:
Array A shape: (3, 4)
Array B shape: (4,)
Result shape: (3, 4)
Memory usage: A=96B, B=32B, Result=96B
Matrix + Column Vector:
Array A shape: (3, 4)
Array B shape: (3, 1)
Result shape: (3, 4)
Memory usage: A=96B, B=24B, Result=96B
Column Vector + Row Vector:
Array A shape: (3, 1)
Array B shape: (4,)
Result shape: (3, 4)
Memory usage: A=24B, B=32B, Result=96B
Incompatible Shapes:
Array A shape: (2, 3)
Array B shape: (2,)
Error: operands could not be broadcast together with shapes (2,3) (2,)
Memory efficiency:
Large matrix: 7.6 MB
Small vector: 7.8 KB
Broadcasting creates no intermediate arrays
Manual approach would need temporary: 7.6 MB
Performance (10 rows, 100 iterations):
Manual loop: 0.001s
Broadcasting: 0.000s (2x faster)
Broadcasting advantages:
Common broadcasting patterns:
Vectorization replaces Python loops with single operations on entire arrays, dramatically improving performance.
import numpy as np
import timeit
# Demonstrate vectorization impact
def element_wise_python(a, b):
"""Element-wise operations in Python."""
result = []
for i in range(len(a)):
result.append(a[i] * b[i] + 1.0)
return result
def element_wise_vectorized(a, b):
"""Vectorized operations."""
return a * b + 1.0
def mathematical_function_python(x):
"""Complex math in Python loop."""
result = []
for val in x:
result.append(val**2 + np.sin(val) + np.exp(-val))
return result
def mathematical_function_vectorized(x):
"""Complex math vectorized."""
return x**2 + np.sin(x) + np.exp(-x)
# Performance comparison
sizes = [100, 1000, 10000, 100000]
print("Vectorization performance scaling:")
print("Size | Python | NumPy | Speedup")
print("-" * 40)
for size in sizes:
# Create test data
a_list = [float(i) for i in range(size)]
b_list = [float(i+1) for i in range(size)]
a_array = np.array(a_list)
b_array = np.array(b_list)
# Reduce iterations for larger arrays
iterations = max(1, 1000 // size)
# Time Python version
t_python = timeit.timeit(
lambda: element_wise_python(a_list, b_list),
number=iterations
)
# Time NumPy version
t_numpy = timeit.timeit(
lambda: element_wise_vectorized(a_array, b_array),
number=iterations
)
speedup = t_python / t_numpy if t_numpy > 0 else float('inf')
print(f"{size:5d} | {t_python:.4f} | {t_numpy:.4f} | {speedup:5.0f}x")
# Complex mathematical operations
print(f"\nComplex mathematical functions (size 10,000):")
x_list = [i * 0.01 for i in range(10000)]
x_array = np.array(x_list)
t_math_python = timeit.timeit(
lambda: mathematical_function_python(x_list),
number=10
)
t_math_numpy = timeit.timeit(
lambda: mathematical_function_vectorized(x_array),
number=10
)
print(f" Python loop: {t_math_python:.3f}s")
print(f" Vectorized: {t_math_numpy:.3f}s")
print(f" Speedup: {t_math_python/t_math_numpy:.0f}x")
# Memory allocation overhead
def analyze_memory_pattern():
"""Show memory allocation in vectorized operations."""
x = np.arange(1000000)
# Multiple operations - intermediate arrays
result1 = x * 2 # Creates intermediate array
result2 = result1 + 1 # Creates another intermediate
result3 = result2 ** 2 # Creates final result
# Combined operation - fewer intermediates
result_combined = (x * 2 + 1) ** 2
return len([result1, result2, result3, result_combined])
print(f"\nMemory efficiency:")
print(f" Intermediate arrays created in vectorized operations")
print(f" Combine operations when possible: (x * 2 + 1) ** 2")
print(f" vs separate: temp1 = x * 2; temp2 = temp1 + 1; result = temp2 ** 2")
# Function overhead demonstration
def tiny_function(x):
return x + 1
x = np.arange(100000)
# Apply function element-wise (slow)
t_apply = timeit.timeit(
lambda: np.array([tiny_function(val) for val in x]),
number=10
)
# Vectorized equivalent (fast)
t_vector = timeit.timeit(
lambda: x + 1,
number=10
)
print(f"\nFunction call overhead (100,000 elements):")
print(f" Element-wise function: {t_apply:.3f}s")
print(f" Vectorized operation: {t_vector:.3f}s")
print(f" Overhead cost: {t_apply/t_vector:.0f}x slower")
Vectorization performance scaling:
Size | Python | NumPy | Speedup
----------------------------------------
100 | 0.0000 | 0.0000 | 2x
1000 | 0.0000 | 0.0000 | 6x
10000 | 0.0003 | 0.0003 | 1x
100000 | 0.0028 | 0.0002 | 12x
Complex mathematical functions (size 10,000):
Python loop: 0.067s
Vectorized: 0.001s
Speedup: 93x
Memory efficiency:
Intermediate arrays created in vectorized operations
Combine operations when possible: (x * 2 + 1) ** 2
vs separate: temp1 = x * 2; temp2 = temp1 + 1; result = temp2 ** 2
Function call overhead (100,000 elements):
Element-wise function: 0.072s
Vectorized operation: 0.000s
Overhead cost: 466x slower
Vectorization principles:
+
, *
, **
work element-wisenp.sin()
, np.exp()
are vectorizedNumPy provides sophisticated indexing mechanisms that go beyond basic slicing, enabling powerful data selection and manipulation.
import numpy as np
import timeit
# Advanced indexing demonstration
data = np.arange(20).reshape(4, 5)
print("Original array:")
print(data)
# Boolean indexing
print("\nBoolean indexing (data > 10):")
mask = data > 10
print(f"Mask shape: {mask.shape}")
print(f"Result: {data[mask]}") # Returns 1D array
print(f"Result shape: {data[mask].shape}")
# Fancy indexing with lists
print("\nFancy indexing with row selection:")
row_indices = [0, 2, 3]
selected_rows = data[row_indices]
print(f"Selected rows {row_indices}:")
print(selected_rows)
# 2D fancy indexing
print("\nFancy indexing for specific elements:")
rows = [0, 1, 2]
cols = [1, 3, 4]
elements = data[rows, cols] # Selects (0,1), (1,3), (2,4)
print(f"Elements at (0,1), (1,3), (2,4): {elements}")
# Mixed indexing
print("\nMixed indexing (slice + fancy):")
mixed = data[:, [1, 3]] # All rows, columns 1 and 3
print(mixed)
# Where function for conditional selection
print("\nUsing np.where for conditional operations:")
result = np.where(data > 10, data, 0) # Replace values ≤10 with 0
print("Values > 10 kept, others become 0:")
print(result)
# Performance analysis
def compare_indexing_performance():
"""Compare different indexing approaches."""
size = 100000
arr = np.random.randint(0, 100, size)
# Boolean indexing
t_boolean = timeit.timeit(
lambda: arr[arr > 50],
number=100
)
# Fancy indexing
indices = np.where(arr > 50)[0]
t_fancy = timeit.timeit(
lambda: arr[indices],
number=100
)
# Basic slicing (for comparison)
t_slice = timeit.timeit(
lambda: arr[10000:20000],
number=100
)
return t_boolean, t_fancy, t_slice
t_bool, t_fancy, t_slice = compare_indexing_performance()
print(f"\nIndexing performance (100,000 elements, 100 iterations):")
print(f" Basic slicing: {t_slice:.3f}s (creates view)")
print(f" Boolean indexing: {t_bool:.3f}s ({t_bool/t_slice:.0f}x slower)")
print(f" Fancy indexing: {t_fancy:.3f}s ({t_fancy/t_slice:.0f}x slower)")
# Memory implications
large_array = np.random.rand(10000, 1000)
bool_mask = large_array > 0.5
print(f"\nMemory implications:")
print(f" Original array: {large_array.nbytes / 1024**2:.1f} MB")
print(f" Boolean mask: {bool_mask.nbytes / 1024**2:.1f} MB")
print(f" Boolean result: {large_array[bool_mask].nbytes / 1024**2:.1f} MB")
# Advanced boolean operations
print(f"\nComplex boolean operations:")
arr = np.random.randint(0, 20, (5, 5))
print("Array:")
print(arr)
# Multiple conditions
complex_mask = (arr > 5) & (arr < 15) # Element-wise AND
print(f"\nElements between 5 and 15: {arr[complex_mask]}")
# Any/all operations
print(f"Any element > 15: {np.any(arr > 15)}")
print(f"All elements > 0: {np.all(arr > 0)}")
# Axis-specific operations
print(f"Rows with any element > 15: {np.any(arr > 15, axis=1)}")
print(f"Columns with all elements > 5: {np.all(arr > 5, axis=0)}")
Original array:
[[ 0 1 2 3 4]
[ 5 6 7 8 9]
[10 11 12 13 14]
[15 16 17 18 19]]
Boolean indexing (data > 10):
Mask shape: (4, 5)
Result: [11 12 13 14 15 16 17 18 19]
Result shape: (9,)
Fancy indexing with row selection:
Selected rows [0, 2, 3]:
[[ 0 1 2 3 4]
[10 11 12 13 14]
[15 16 17 18 19]]
Fancy indexing for specific elements:
Elements at (0,1), (1,3), (2,4): [ 1 8 14]
Mixed indexing (slice + fancy):
[[ 1 3]
[ 6 8]
[11 13]
[16 18]]
Using np.where for conditional operations:
Values > 10 kept, others become 0:
[[ 0 0 0 0 0]
[ 0 0 0 0 0]
[ 0 11 12 13 14]
[15 16 17 18 19]]
Indexing performance (100,000 elements, 100 iterations):
Basic slicing: 0.000s (creates view)
Boolean indexing: 0.026s (2916x slower)
Fancy indexing: 0.002s (276x slower)
Memory implications:
Original array: 76.3 MB
Boolean mask: 9.5 MB
Boolean result: 38.1 MB
Complex boolean operations:
Array:
[[ 6 9 4 7 18]
[ 6 17 18 8 1]
[15 0 17 7 16]
[ 0 18 1 19 13]
[ 0 10 17 2 0]]
Elements between 5 and 15: [ 6 9 7 6 8 7 13 10]
Any element > 15: True
All elements > 0: False
Rows with any element > 15: [ True True True True True]
Columns with all elements > 5: [False False False False False]
Indexing creates views vs copies:
Structured arrays enable heterogeneous data with named fields, similar to database records or C structs.
import numpy as np
# Define structured data type
person_dtype = np.dtype([
('name', 'U20'), # Unicode string, max 20 chars
('age', 'i4'), # 32-bit integer
('height', 'f4'), # 32-bit float
('salary', 'f8'), # 64-bit float
('active', '?') # Boolean
])
print("Structured array data type:")
print(f" Field names: {person_dtype.names}")
print(f" Item size: {person_dtype.itemsize} bytes")
# Create structured array
people = np.array([
('Alice', 25, 165.5, 75000.0, True),
('Bob', 30, 180.2, 82000.0, False),
('Charlie', 35, 172.8, 95000.0, True),
('Diana', 28, 158.3, 68000.0, True)
], dtype=person_dtype)
print(f"\nStructured array:")
print(people)
# Access by field name
print(f"\nField access:")
print(f" Names: {people['name']}")
print(f" Ages: {people['age']}")
print(f" Average salary: ${people['salary'].mean():,.2f}")
# Boolean indexing with structured arrays
active_people = people[people['active']]
print(f"\nActive employees:")
for person in active_people:
print(f" {person['name']}: ${person['salary']:,.0f}")
# Complex queries
high_earners = people[people['salary'] > 80000]
young_tall = people[(people['age'] < 30) & (people['height'] > 160)]
print(f"\nHigh earners (>$80k): {high_earners['name']}")
print(f"Young & tall (<30, >160cm): {young_tall['name']}")
# Field modification
people_copy = people.copy()
people_copy['salary'] *= 1.05 # 5% raise
print(f"\nAfter 5% raise:")
for name, old_sal, new_sal in zip(people['name'], people['salary'], people_copy['salary']):
print(f" {name}: ${old_sal:,.0f} → ${new_sal:,.0f}")
# Record arrays - enhanced interface
rec_people = people.view(np.recarray)
print(f"\nRecord array access (attribute style):")
print(f" First person: {rec_people[0].name}, age {rec_people[0].age}")
print(f" Average height: {rec_people.height.mean():.1f}cm")
# Performance comparison with regular arrays
def compare_structured_performance():
"""Compare structured vs separate arrays."""
size = 100000
# Structured approach
structured_data = np.zeros(size, dtype=[
('x', 'f8'), ('y', 'f8'), ('z', 'f8')
])
# Separate arrays approach
x = np.zeros(size)
y = np.zeros(size)
z = np.zeros(size)
# Memory usage
struct_memory = structured_data.nbytes
separate_memory = x.nbytes + y.nbytes + z.nbytes
# Access performance
import timeit
t_struct = timeit.timeit(
lambda: structured_data['x'].sum(),
number=100
)
t_separate = timeit.timeit(
lambda: x.sum(),
number=100
)
return struct_memory, separate_memory, t_struct, t_separate
struct_mem, sep_mem, t_struct, t_sep = compare_structured_performance()
print(f"\nPerformance comparison (100k elements):")
print(f" Memory usage:")
print(f" Structured: {struct_mem / 1024**2:.1f} MB")
print(f" Separate: {sep_mem / 1024**2:.1f} MB")
print(f" Access time:")
print(f" Structured: {t_struct:.3f}s")
print(f" Separate: {t_sep:.3f}s ({t_struct/t_sep:.1f}x slower)")
# Nested structures
nested_dtype = np.dtype([
('person', [('name', 'U10'), ('age', 'i4')]),
('location', [('city', 'U15'), ('country', 'U15')]),
('scores', 'f4', (3,)) # Array field
])
nested_data = np.array([
(('John', 25), ('NYC', 'USA'), [85.5, 92.0, 78.5]),
(('Marie', 30), ('Paris', 'France'), [90.0, 88.5, 95.0])
], dtype=nested_dtype)
print(f"\nNested structures:")
print(f" First person: {nested_data[0]['person']['name']}")
print(f" Location: {nested_data[0]['location']['city']}")
print(f" Scores: {nested_data[0]['scores']}")
# Real-world example: Time series data
timeseries_dtype = np.dtype([
('timestamp', 'datetime64[us]'),
('price', 'f8'),
('volume', 'i8'),
('bid', 'f8'),
('ask', 'f8')
])
# Create sample financial data
n_points = 5
timestamps = np.datetime64('2024-01-01') + np.arange(n_points) * np.timedelta64(1, 'h')
prices = 100.0 + np.random.randn(n_points) * 2
volumes = np.random.randint(1000, 10000, n_points)
spreads = np.random.uniform(0.01, 0.05, n_points)
financial_data = np.zeros(n_points, dtype=timeseries_dtype)
financial_data['timestamp'] = timestamps
financial_data['price'] = prices
financial_data['volume'] = volumes
financial_data['bid'] = prices - spreads/2
financial_data['ask'] = prices + spreads/2
print(f"\nFinancial time series:")
for row in financial_data:
print(f" {row['timestamp']}: ${row['price']:.2f} (vol: {row['volume']:,})")
Structured array data type:
Field names: ('name', 'age', 'height', 'salary', 'active')
Item size: 97 bytes
Structured array:
[('Alice', 25, 165.5, 75000., True) ('Bob', 30, 180.2, 82000., False)
('Charlie', 35, 172.8, 95000., True) ('Diana', 28, 158.3, 68000., True)]
Field access:
Names: ['Alice' 'Bob' 'Charlie' 'Diana']
Ages: [25 30 35 28]
Average salary: $80,000.00
Active employees:
Alice: $75,000
Charlie: $95,000
Diana: $68,000
High earners (>$80k): ['Bob' 'Charlie']
Young & tall (<30, >160cm): ['Alice']
After 5% raise:
Alice: $75,000 → $78,750
Bob: $82,000 → $86,100
Charlie: $95,000 → $99,750
Diana: $68,000 → $71,400
Record array access (attribute style):
First person: Alice, age 25
Average height: 169.2cm
Performance comparison (100k elements):
Memory usage:
Structured: 2.3 MB
Separate: 2.3 MB
Access time:
Structured: 0.002s
Separate: 0.001s (1.6x slower)
Nested structures:
First person: John
Location: NYC
Scores: [85.5 92. 78.5]
Financial time series:
2024-01-01T00:00:00.000000: $99.18 (vol: 6,628)
2024-01-01T01:00:00.000000: $97.38 (vol: 4,316)
2024-01-01T02:00:00.000000: $99.86 (vol: 1,429)
2024-01-01T03:00:00.000000: $100.57 (vol: 9,512)
2024-01-01T04:00:00.000000: $97.14 (vol: 9,863)
Structured arrays advantages:
Use cases:
Memory mapping allows working with datasets larger than available RAM by treating disk files as virtual memory.
import numpy as np
import os
import tempfile
# Create a large dataset on disk
def create_large_dataset():
"""Create a large array saved to disk."""
# Create temporary file
temp_dir = tempfile.mkdtemp()
filename = os.path.join(temp_dir, 'large_data.dat')
# Create large array (1M x 100 = 800MB as float64)
print("Creating large dataset...")
shape = (1000000, 100)
large_array = np.random.randn(*shape).astype(np.float64)
# Save to disk in binary format
large_array.tofile(filename)
size_mb = os.path.getsize(filename) / (1024**2)
print(f" Created file: {size_mb:.0f} MB")
print(f" Location: {filename}")
return filename, shape
# Memory mapping demonstration
filename, shape = create_large_dataset()
# Memory map the file
print(f"\nMemory mapping file...")
mmap_array = np.memmap(filename, dtype=np.float64, mode='r+', shape=shape)
print(f" Memory mapped array shape: {mmap_array.shape}")
print(f" Data type: {mmap_array.dtype}")
print(f" Memory mapping mode: {mmap_array.mode}")
# Work with memory mapped data
print(f"\nWorking with memory mapped data:")
print(f" Mean of first 1000 rows: {mmap_array[:1000].mean():.4f}")
print(f" Max of column 0: {mmap_array[:, 0].max():.4f}")
# Modify data through memory map
original_value = mmap_array[0, 0]
mmap_array[0, 0] = 999.0
print(f" Modified mmap_array[0,0]: {original_value:.4f} → {mmap_array[0, 0]:.4f}")
# Changes are written to disk
mmap_array.flush() # Force write to disk
# Verify persistence by creating new memory map
mmap_array2 = np.memmap(filename, dtype=np.float64, mode='r', shape=shape)
print(f" Value persisted to disk: {mmap_array2[0, 0]:.4f}")
# Performance comparison: memory map vs loading into RAM
def compare_memory_usage():
"""Compare memory usage approaches."""
import psutil
import gc
process = psutil.Process()
# Memory usage before
gc.collect()
mem_before = process.memory_info().rss / (1024**2)
# Load entire array into memory
loaded_array = np.fromfile(filename, dtype=np.float64).reshape(shape)
mem_after_load = process.memory_info().rss / (1024**2)
# Delete loaded array
del loaded_array
gc.collect()
mem_after_delete = process.memory_info().rss / (1024**2)
# Create memory map
mmap_test = np.memmap(filename, dtype=np.float64, mode='r', shape=shape)
mem_after_mmap = process.memory_info().rss / (1024**2)
return {
'baseline': mem_before,
'after_load': mem_after_load,
'after_delete': mem_after_delete,
'after_mmap': mem_after_mmap
}
memory_usage = compare_memory_usage()
print(f"\nMemory usage comparison:")
print(f" Baseline: {memory_usage['baseline']:.0f} MB")
print(f" After loading to RAM: {memory_usage['after_load']:.0f} MB (+{memory_usage['after_load'] - memory_usage['baseline']:.0f} MB)")
print(f" After deleting array: {memory_usage['after_delete']:.0f} MB")
print(f" After memory mapping: {memory_usage['after_mmap']:.0f} MB (+{memory_usage['after_mmap'] - memory_usage['baseline']:.0f} MB)")
# Processing large datasets in chunks
def process_in_chunks(mmap_array, chunk_size=10000):
"""Process large array in chunks to control memory usage."""
results = []
n_rows = mmap_array.shape[0]
print(f"\nProcessing {n_rows:,} rows in chunks of {chunk_size:,}:")
for start in range(0, n_rows, chunk_size):
end = min(start + chunk_size, n_rows)
chunk = mmap_array[start:end]
# Process chunk (example: compute statistics)
chunk_mean = chunk.mean()
chunk_std = chunk.std()
results.append((start, end, chunk_mean, chunk_std))
if len(results) <= 3: # Show first few chunks
print(f" Chunk {len(results)}: rows {start:,}-{end:,}, mean={chunk_mean:.4f}, std={chunk_std:.4f}")
return results
chunk_results = process_in_chunks(mmap_array, chunk_size=100000)
# Create read-only vs read-write memory maps
print(f"\nMemory map access modes:")
# Read-only (safe for concurrent access)
readonly_mmap = np.memmap(filename, dtype=np.float64, mode='r', shape=shape)
print(f" Read-only: mode='{readonly_mmap.mode}', writable={readonly_mmap.flags.writeable}")
# Read-write (can modify data)
readwrite_mmap = np.memmap(filename, dtype=np.float64, mode='r+', shape=shape)
print(f" Read-write: mode='{readwrite_mmap.mode}', writable={readwrite_mmap.flags.writeable}")
# Write mode (creates new file, overwrites existing)
new_filename = filename.replace('.dat', '_new.dat')
write_mmap = np.memmap(new_filename, dtype=np.float64, mode='w+', shape=(1000, 50))
print(f" Write mode: created new file with shape {write_mmap.shape}")
# Multi-dimensional memory mapping with structured data
structured_filename = filename.replace('.dat', '_structured.dat')
record_dtype = np.dtype([
('timestamp', 'i8'),
('value', 'f8'),
('quality', 'i4')
])
# Create structured memory mapped file
n_records = 100000
structured_mmap = np.memmap(structured_filename, dtype=record_dtype, mode='w+', shape=(n_records,))
# Populate with sample data
structured_mmap['timestamp'] = np.arange(n_records)
structured_mmap['value'] = np.random.randn(n_records)
structured_mmap['quality'] = np.random.randint(0, 3, n_records)
print(f"\nStructured memory map:")
print(f" Shape: {structured_mmap.shape}")
print(f" Fields: {structured_mmap.dtype.names}")
print(f" Record size: {structured_mmap.dtype.itemsize} bytes")
print(f" First few records:")
for i in range(3):
record = structured_mmap[i]
print(f" {record['timestamp']}: value={record['value']:.4f}, quality={record['quality']}")
# Clean up temporary files
import shutil
temp_dir = os.path.dirname(filename)
print(f"\nCleaning up temporary files in: {temp_dir}")
try:
shutil.rmtree(temp_dir)
print(" Cleanup successful")
except:
print(" Cleanup failed - files may still exist")
Creating large dataset...
Created file: 763 MB
Location: /var/folders/r2/vnnxf5sn7s96swzszkfdy9sr0000gn/T/tmpbplzro5i/large_data.dat
Memory mapping file...
Memory mapped array shape: (1000000, 100)
Data type: float64
Memory mapping mode: r+
Working with memory mapped data:
Mean of first 1000 rows: 0.0063
Max of column 0: 4.9683
Modified mmap_array[0,0]: -1.3707 → 999.0000
Value persisted to disk: 999.0000
Memory usage comparison:
Baseline: 1765 MB
After loading to RAM: 2528 MB (+763 MB)
After deleting array: 1765 MB
After memory mapping: 1765 MB (+0 MB)
Processing 1,000,000 rows in chunks of 100,000:
Chunk 1: rows 0-100,000, mean=-0.0000, std=1.0485
Chunk 2: rows 100,000-200,000, mean=0.0000, std=1.0003
Chunk 3: rows 200,000-300,000, mean=-0.0002, std=0.9996
Memory map access modes:
Read-only: mode='r', writable=False
Read-write: mode='r+', writable=True
Write mode: created new file with shape (1000, 50)
Structured memory map:
Shape: (100000,)
Fields: ('timestamp', 'value', 'quality')
Record size: 20 bytes
First few records:
0: value=1.8753, quality=2
1: value=1.1751, quality=0
2: value=-0.2225, quality=2
Cleaning up temporary files in: /var/folders/r2/vnnxf5sn7s96swzszkfdy9sr0000gn/T/tmpbplzro5i
Cleanup successful
Memory mapping advantages:
Use cases:
Performance considerations:
NumPy provides numerous ways to create arrays, each optimized for different use cases and data patterns.
import numpy as np
# Basic array creation
print("=== Basic Creation ===")
# From lists
arr_1d = np.array([1, 2, 3, 4, 5])
arr_2d = np.array([[1, 2, 3], [4, 5, 6]])
print(f"From list: {arr_1d}")
print(f"2D array:\n{arr_2d}")
# Specify data types explicitly
float_arr = np.array([1, 2, 3], dtype=np.float64)
complex_arr = np.array([1, 2, 3], dtype=complex)
print(f"Float64: {float_arr} ({float_arr.dtype})")
print(f"Complex: {complex_arr} ({complex_arr.dtype})")
# Range-based creation
print("\n=== Range-Based Creation ===")
arange_arr = np.arange(0, 10, 2) # start, stop, step
linspace_arr = np.linspace(0, 1, 5) # start, stop, num_points
logspace_arr = np.logspace(0, 2, 5) # 10^0 to 10^2, 5 points
print(f"arange(0, 10, 2): {arange_arr}")
print(f"linspace(0, 1, 5): {linspace_arr}")
print(f"logspace(0, 2, 5): {logspace_arr}")
# Geometric progressions
geom_space = np.geomspace(1, 1000, 4) # geometric spacing
print(f"geomspace(1, 1000, 4): {geom_space}")
# Special arrays
print("\n=== Special Arrays ===")
zeros_arr = np.zeros((2, 3))
ones_arr = np.ones((2, 3))
full_arr = np.full((2, 3), 7.5)
identity = np.eye(3) # identity matrix
diag_arr = np.diag([1, 2, 3, 4]) # diagonal matrix
print(f"Zeros (2,3):\n{zeros_arr}")
print(f"Ones (2,3):\n{ones_arr}")
print(f"Full 7.5 (2,3):\n{full_arr}")
print(f"Identity (3,3):\n{identity}")
print(f"Diagonal:\n{diag_arr}")
# Array-like creation (same shape as existing)
base = np.array([[1, 2], [3, 4]])
zeros_like = np.zeros_like(base)
ones_like = np.ones_like(base)
empty_like = np.empty_like(base) # uninitialized
print(f"\nBase array:\n{base}")
print(f"zeros_like:\n{zeros_like}")
print(f"ones_like:\n{ones_like}")
# Grid creation for mathematical functions
x = np.linspace(-2, 2, 5)
y = np.linspace(-1, 1, 3)
X, Y = np.meshgrid(x, y)
print(f"\n=== Grid Creation ===")
print(f"x: {x}")
print(f"y: {y}")
print(f"X grid:\n{X}")
print(f"Y grid:\n{Y}")
# Using grids for function evaluation
Z = X**2 + Y**2
print(f"Z = X² + Y²:\n{Z}")
# Random array creation (using modern interface)
rng = np.random.default_rng(seed=42)
print("\n=== Random Creation ===")
random_uniform = rng.random((2, 3)) # uniform [0, 1)
random_normal = rng.normal(0, 1, (2, 3)) # normal(mean, std)
random_int = rng.integers(0, 10, (2, 3)) # integers [0, 10)
print(f"Uniform [0,1):\n{random_uniform}")
print(f"Normal(0,1):\n{random_normal}")
print(f"Integers [0,10):\n{random_int}")
# Structured arrays for heterogeneous data
dt = np.dtype([('name', 'U10'), ('age', 'i4'), ('height', 'f4')])
structured = np.array([('Alice', 25, 165.5), ('Bob', 30, 180.2)], dtype=dt)
print(f"\nStructured array:\n{structured}")
print(f"Names: {structured['name']}")
# Memory layout control
c_order = np.ones((1000, 1000), order='C') # C-contiguous (row-major)
f_order = np.ones((1000, 1000), order='F') # Fortran-contiguous (column-major)
print(f"\n=== Memory Layout ===")
print(f"C-order contiguous: {c_order.flags.c_contiguous}")
print(f"F-order contiguous: {f_order.flags.f_contiguous}")
=== Basic Creation ===
From list: [1 2 3 4 5]
2D array:
[[1 2 3]
[4 5 6]]
Float64: [1. 2. 3.] (float64)
Complex: [1.+0.j 2.+0.j 3.+0.j] (complex128)
=== Range-Based Creation ===
arange(0, 10, 2): [0 2 4 6 8]
linspace(0, 1, 5): [0. 0.25 0.5 0.75 1. ]
logspace(0, 2, 5): [ 1. 3.16227766 10. 31.6227766 100. ]
geomspace(1, 1000, 4): [ 1. 10. 100. 1000.]
=== Special Arrays ===
Zeros (2,3):
[[0. 0. 0.]
[0. 0. 0.]]
Ones (2,3):
[[1. 1. 1.]
[1. 1. 1.]]
Full 7.5 (2,3):
[[7.5 7.5 7.5]
[7.5 7.5 7.5]]
Identity (3,3):
[[1. 0. 0.]
[0. 1. 0.]
[0. 0. 1.]]
Diagonal:
[[1 0 0 0]
[0 2 0 0]
[0 0 3 0]
[0 0 0 4]]
Base array:
[[1 2]
[3 4]]
zeros_like:
[[0 0]
[0 0]]
ones_like:
[[1 1]
[1 1]]
=== Grid Creation ===
x: [-2. -1. 0. 1. 2.]
y: [-1. 0. 1.]
X grid:
[[-2. -1. 0. 1. 2.]
[-2. -1. 0. 1. 2.]
[-2. -1. 0. 1. 2.]]
Y grid:
[[-1. -1. -1. -1. -1.]
[ 0. 0. 0. 0. 0.]
[ 1. 1. 1. 1. 1.]]
Z = X² + Y²:
[[5. 2. 1. 2. 5.]
[4. 1. 0. 1. 4.]
[5. 2. 1. 2. 5.]]
=== Random Creation ===
Uniform [0,1):
[[0.77395605 0.43887844 0.85859792]
[0.69736803 0.09417735 0.97562235]]
Normal(0,1):
[[ 0.1278404 -0.31624259 -0.01680116]
[-0.85304393 0.87939797 0.77779194]]
Integers [0,10):
[[7 6 4]
[8 5 4]]
Structured array:
[('Alice', 25, 165.5) ('Bob', 30, 180.2)]
Names: ['Alice' 'Bob']
=== Memory Layout ===
C-order contiguous: True
F-order contiguous: True
# Advanced creation patterns
print("=== Advanced Patterns ===")
# Concatenating during creation
block1 = np.ones((2, 2))
block2 = np.zeros((2, 2))
block3 = 2 * np.ones((2, 2))
block4 = 3 * np.ones((2, 2))
# Block matrix creation
block_matrix = np.block([[block1, block2],
[block3, block4]])
print(f"Block matrix:\n{block_matrix}")
# Creating arrays from functions
def gaussian_2d(x, y, sigma=1.0):
return np.exp(-(x**2 + y**2) / (2 * sigma**2))
x = np.linspace(-3, 3, 7)
y = np.linspace(-3, 3, 7)
X, Y = np.meshgrid(x, y)
gaussian = gaussian_2d(X, Y)
print(f"\n2D Gaussian:\n{gaussian}")
# Creating arrays from mathematical sequences
n = 10
fibonacci = np.zeros(n, dtype=int)
fibonacci[0], fibonacci[1] = 0, 1
for i in range(2, n):
fibonacci[i] = fibonacci[i-1] + fibonacci[i-2]
print(f"\nFibonacci sequence: {fibonacci}")
# Trigonometric arrays
t = np.linspace(0, 2*np.pi, 8)
sin_cos_matrix = np.column_stack([np.sin(t), np.cos(t)])
print(f"\nSin/Cos matrix:\n{sin_cos_matrix}")
# Polynomial coefficient arrays
# Represents: 3x³ + 2x² - x + 5
poly_coeffs = np.array([3, 2, -1, 5]) # highest degree first
x_vals = np.linspace(-1, 1, 5)
poly_vals = np.polyval(poly_coeffs, x_vals)
print(f"\nPolynomial values: {poly_vals}")
# Broadcasting-aware creation
data_points = np.array([1, 2, 3, 4, 5])[:, np.newaxis] # column vector
weights = np.array([0.1, 0.2, 0.3]) # row vector
weighted_data = data_points * weights # broadcasts to (5, 3)
print(f"\nData points shape: {data_points.shape}")
print(f"Weights shape: {weights.shape}")
print(f"Weighted data shape: {weighted_data.shape}")
print(f"Weighted data:\n{weighted_data}")
=== Advanced Patterns ===
Block matrix:
[[1. 1. 0. 0.]
[1. 1. 0. 0.]
[2. 2. 3. 3.]
[2. 2. 3. 3.]]
2D Gaussian:
[[1.23409804e-04 1.50343919e-03 6.73794700e-03 1.11089965e-02
6.73794700e-03 1.50343919e-03 1.23409804e-04]
[1.50343919e-03 1.83156389e-02 8.20849986e-02 1.35335283e-01
8.20849986e-02 1.83156389e-02 1.50343919e-03]
[6.73794700e-03 8.20849986e-02 3.67879441e-01 6.06530660e-01
3.67879441e-01 8.20849986e-02 6.73794700e-03]
[1.11089965e-02 1.35335283e-01 6.06530660e-01 1.00000000e+00
6.06530660e-01 1.35335283e-01 1.11089965e-02]
[6.73794700e-03 8.20849986e-02 3.67879441e-01 6.06530660e-01
3.67879441e-01 8.20849986e-02 6.73794700e-03]
[1.50343919e-03 1.83156389e-02 8.20849986e-02 1.35335283e-01
8.20849986e-02 1.83156389e-02 1.50343919e-03]
[1.23409804e-04 1.50343919e-03 6.73794700e-03 1.11089965e-02
6.73794700e-03 1.50343919e-03 1.23409804e-04]]
Fibonacci sequence: [ 0 1 1 2 3 5 8 13 21 34]
Sin/Cos matrix:
[[ 0.00000000e+00 1.00000000e+00]
[ 7.81831482e-01 6.23489802e-01]
[ 9.74927912e-01 -2.22520934e-01]
[ 4.33883739e-01 -9.00968868e-01]
[-4.33883739e-01 -9.00968868e-01]
[-9.74927912e-01 -2.22520934e-01]
[-7.81831482e-01 6.23489802e-01]
[-2.44929360e-16 1.00000000e+00]]
Polynomial values: [5. 5.625 5. 5.375 9. ]
Data points shape: (5, 1)
Weights shape: (3,)
Weighted data shape: (5, 3)
Weighted data:
[[0.1 0.2 0.3]
[0.2 0.4 0.6]
[0.3 0.6 0.9]
[0.4 0.8 1.2]
[0.5 1. 1.5]]
Array creation decision matrix:
Use Case | Method | Example |
---|---|---|
Known values | np.array() |
np.array([1, 2, 3]) |
Regular spacing | np.linspace() |
np.linspace(0, 10, 100) |
Integer sequence | np.arange() |
np.arange(0, 10, 2) |
Logarithmic spacing | np.logspace() |
np.logspace(0, 3, 100) |
Matrix operations | np.eye() , np.diag() |
np.eye(5) |
Initialize with value | np.full() |
np.full((3, 3), 7.5) |
Grid evaluation | np.meshgrid() |
X, Y = np.meshgrid(x, y) |
Random data | rng.random() |
rng.normal(0, 1, (100, 50)) |
Universal functions (ufuncs) operate element-wise on arrays and are the foundation of NumPy’s mathematical capabilities.
import numpy as np
# Create test data
x = np.array([0, 0.5, 1.0, 1.5, 2.0])
y = np.array([1, 2, 3, 4, 5])
matrix = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print("=== Basic Arithmetic ===")
print(f"x: {x}")
print(f"y: {y}")
print(f"x + y: {x + y}")
print(f"x * y: {x * y}")
print(f"x ** 2: {x ** 2}")
print(f"y / 2: {y / 2}")
print(f"y // 2: {y // 2}") # floor division
print(f"y % 3: {y % 3}") # modulo
# Array-scalar operations (broadcasting)
print(f"\nArray-scalar operations:")
print(f"matrix + 10:\n{matrix + 10}")
print(f"matrix * 2:\n{matrix * 2}")
print("\n=== Trigonometric Functions ===")
angles = np.array([0, np.pi/6, np.pi/4, np.pi/3, np.pi/2])
print(f"angles: {angles}")
print(f"sin: {np.sin(angles)}")
print(f"cos: {np.cos(angles)}")
print(f"tan: {np.tan(angles)}")
# Inverse trig functions
sin_vals = np.array([0, 0.5, np.sqrt(2)/2, np.sqrt(3)/2, 1])
print(f"\nInverse trig:")
print(f"arcsin: {np.arcsin(sin_vals)}")
print(f"arccos: {np.arccos(sin_vals)}")
# Hyperbolic functions
print(f"\nHyperbolic:")
print(f"sinh: {np.sinh([0, 1, 2])}")
print(f"cosh: {np.cosh([0, 1, 2])}")
print(f"tanh: {np.tanh([0, 1, 2])}")
print("\n=== Exponential and Logarithmic ===")
vals = np.array([0, 1, 2, 10])
print(f"values: {vals}")
print(f"exp: {np.exp(vals)}")
print(f"exp2 (2^x): {np.exp2(vals)}")
print(f"expm1 (exp(x)-1): {np.expm1([1e-8, 1e-4, 1e-2])}") # accurate for small x
log_vals = np.array([1, np.e, 10, 100])
print(f"\nlog values: {log_vals}")
print(f"log (natural): {np.log(log_vals)}")
print(f"log10: {np.log10(log_vals)}")
print(f"log2: {np.log2([1, 2, 4, 8, 16])}")
print(f"log1p (log(1+x)): {np.log1p([1e-8, 1e-4, 1e-2])}") # accurate for small x
print("\n=== Power and Root Functions ===")
base_vals = np.array([1, 4, 9, 16, 25])
print(f"values: {base_vals}")
print(f"sqrt: {np.sqrt(base_vals)}")
print(f"cbrt (cube root): {np.cbrt([1, 8, 27, 64, 125])}")
print(f"square: {np.square([1, 2, 3, 4, 5])}")
print(f"power(x, 3): {np.power([1, 2, 3, 4, 5], 3)}")
print(f"reciprocal: {np.reciprocal([1, 2, 4, 5, 10], dtype=float)}")
print("\n=== Rounding and Truncation ===")
decimals = np.array([-2.7, -1.3, 0.8, 1.2, 2.6])
print(f"decimals: {decimals}")
print(f"round: {np.round(decimals)}")
print(f"floor: {np.floor(decimals)}")
print(f"ceil: {np.ceil(decimals)}")
print(f"trunc: {np.trunc(decimals)}")
print(f"rint (round to int): {np.rint(decimals)}")
# Precision rounding
precise_vals = np.array([3.14159, 2.71828, 1.41421])
print(f"\nPrecision rounding:")
print(f"original: {precise_vals}")
print(f"round(2): {np.round(precise_vals, 2)}")
print(f"round(-1): {np.round([123, 456, 789], -1)}") # round to nearest 10
print("\n=== Comparison Operations ===")
a = np.array([1, 2, 3, 4, 5])
b = np.array([1, 1, 3, 5, 4])
print(f"a: {a}")
print(f"b: {b}")
print(f"a == b: {a == b}")
print(f"a > b: {a > b}")
print(f"a >= b: {a >= b}")
# Logical operations on boolean arrays
bool_a = np.array([True, False, True, False])
bool_b = np.array([True, True, False, False])
print(f"\nLogical operations:")
print(f"bool_a: {bool_a}")
print(f"bool_b: {bool_b}")
print(f"logical_and: {np.logical_and(bool_a, bool_b)}")
print(f"logical_or: {np.logical_or(bool_a, bool_b)}")
print(f"logical_xor: {np.logical_xor(bool_a, bool_b)}")
print(f"logical_not: {np.logical_not(bool_a)}")
print("\n=== Complex Number Operations ===")
complex_vals = np.array([1+2j, 3-4j, 0+5j, -2+0j])
print(f"complex values: {complex_vals}")
print(f"real part: {np.real(complex_vals)}")
print(f"imaginary part: {np.imag(complex_vals)}")
print(f"absolute value: {np.abs(complex_vals)}")
print(f"angle (radians): {np.angle(complex_vals)}")
print(f"conjugate: {np.conj(complex_vals)}")
=== Basic Arithmetic ===
x: [0. 0.5 1. 1.5 2. ]
y: [1 2 3 4 5]
x + y: [1. 2.5 4. 5.5 7. ]
x * y: [ 0. 1. 3. 6. 10.]
x ** 2: [0. 0.25 1. 2.25 4. ]
y / 2: [0.5 1. 1.5 2. 2.5]
y // 2: [0 1 1 2 2]
y % 3: [1 2 0 1 2]
Array-scalar operations:
matrix + 10:
[[11 12 13]
[14 15 16]
[17 18 19]]
matrix * 2:
[[ 2 4 6]
[ 8 10 12]
[14 16 18]]
=== Trigonometric Functions ===
angles: [0. 0.52359878 0.78539816 1.04719755 1.57079633]
sin: [0. 0.5 0.70710678 0.8660254 1. ]
cos: [1.00000000e+00 8.66025404e-01 7.07106781e-01 5.00000000e-01
6.12323400e-17]
tan: [0.00000000e+00 5.77350269e-01 1.00000000e+00 1.73205081e+00
1.63312394e+16]
Inverse trig:
arcsin: [0. 0.52359878 0.78539816 1.04719755 1.57079633]
arccos: [1.57079633 1.04719755 0.78539816 0.52359878 0. ]
Hyperbolic:
sinh: [0. 1.17520119 3.62686041]
cosh: [1. 1.54308063 3.76219569]
tanh: [0. 0.76159416 0.96402758]
=== Exponential and Logarithmic ===
values: [ 0 1 2 10]
exp: [1.00000000e+00 2.71828183e+00 7.38905610e+00 2.20264658e+04]
exp2 (2^x): [1.000e+00 2.000e+00 4.000e+00 1.024e+03]
expm1 (exp(x)-1): [1.00000001e-08 1.00005000e-04 1.00501671e-02]
log values: [ 1. 2.71828183 10. 100. ]
log (natural): [0. 1. 2.30258509 4.60517019]
log10: [0. 0.43429448 1. 2. ]
log2: [0. 1. 2. 3. 4.]
log1p (log(1+x)): [9.99999995e-09 9.99950003e-05 9.95033085e-03]
=== Power and Root Functions ===
values: [ 1 4 9 16 25]
sqrt: [1. 2. 3. 4. 5.]
cbrt (cube root): [1. 2. 3. 4. 5.]
square: [ 1 4 9 16 25]
power(x, 3): [ 1 8 27 64 125]
reciprocal: [1. 0.5 0.25 0.2 0.1 ]
=== Rounding and Truncation ===
decimals: [-2.7 -1.3 0.8 1.2 2.6]
round: [-3. -1. 1. 1. 3.]
floor: [-3. -2. 0. 1. 2.]
ceil: [-2. -1. 1. 2. 3.]
trunc: [-2. -1. 0. 1. 2.]
rint (round to int): [-3. -1. 1. 1. 3.]
Precision rounding:
original: [3.14159 2.71828 1.41421]
round(2): [3.14 2.72 1.41]
round(-1): [120 460 790]
=== Comparison Operations ===
a: [1 2 3 4 5]
b: [1 1 3 5 4]
a == b: [ True False True False False]
a > b: [False True False False True]
a >= b: [ True True True False True]
Logical operations:
bool_a: [ True False True False]
bool_b: [ True True False False]
logical_and: [ True False False False]
logical_or: [ True True True False]
logical_xor: [False True True False]
logical_not: [False True False True]
=== Complex Number Operations ===
complex values: [ 1.+2.j 3.-4.j 0.+5.j -2.+0.j]
real part: [ 1. 3. 0. -2.]
imaginary part: [ 2. -4. 5. 0.]
absolute value: [2.23606798 5. 5. 2. ]
angle (radians): [ 1.10714872 -0.92729522 1.57079633 3.14159265]
conjugate: [ 1.-2.j 3.+4.j 0.-5.j -2.-0.j]
# Advanced mathematical functions
print("=== Special Mathematical Functions ===")
# Sign and absolute value
mixed_vals = np.array([-3, -1, 0, 2, 5])
print(f"values: {mixed_vals}")
print(f"sign: {np.sign(mixed_vals)}")
print(f"absolute: {np.abs(mixed_vals)}")
# Min/max with arrays
arr1 = np.array([1, 5, 2, 8, 3])
arr2 = np.array([2, 3, 6, 1, 9])
print(f"\nElement-wise min/max:")
print(f"arr1: {arr1}")
print(f"arr2: {arr2}")
print(f"minimum: {np.minimum(arr1, arr2)}")
print(f"maximum: {np.maximum(arr1, arr2)}")
# Clipping values
values = np.array([-5, -2, 0, 3, 8, 12])
print(f"\nClipping:")
print(f"original: {values}")
print(f"clip(0, 10): {np.clip(values, 0, 10)}")
print(f"clip(2, None): {np.clip(values, 2, None)}") # only lower bound
# Mathematical constants
print(f"\n=== Mathematical Constants ===")
print(f"π: {np.pi}")
print(f"e: {np.e}")
print(f"∞: {np.inf}")
print(f"NaN: {np.nan}")
# Working with special values
special_vals = np.array([1, np.inf, -np.inf, np.nan, 0])
print(f"\nSpecial value detection:")
print(f"values: {special_vals}")
print(f"isnan: {np.isnan(special_vals)}")
print(f"isinf: {np.isinf(special_vals)}")
print(f"isfinite: {np.isfinite(special_vals)}")
# NaN-safe operations
data_with_nan = np.array([1, 2, np.nan, 4, 5])
print(f"\nNaN-safe operations:")
print(f"data: {data_with_nan}")
print(f"nanmean: {np.nanmean(data_with_nan)}")
print(f"nanstd: {np.nanstd(data_with_nan)}")
print(f"nanmin: {np.nanmin(data_with_nan)}")
print(f"nanmax: {np.nanmax(data_with_nan)}")
# Custom ufuncs
def gaussian(x, mu=0, sigma=1):
"""Gaussian function."""
return np.exp(-0.5 * ((x - mu) / sigma)**2) / (sigma * np.sqrt(2 * np.pi))
# Vectorize custom function
x_vals = np.linspace(-3, 3, 7)
gaussian_vals = gaussian(x_vals, mu=0, sigma=1)
print(f"\nCustom function:")
print(f"x: {x_vals}")
print(f"gaussian: {gaussian_vals}")
# Performance: ufunc vs Python loop
large_array = np.random.default_rng(42).random(1000000)
import timeit
# Time ufunc
ufunc_time = timeit.timeit(lambda: np.sin(large_array), number=100)
# Time Python loop (small sample for timing)
small_array = large_array[:1000]
loop_time = timeit.timeit(lambda: [np.sin(x) for x in small_array], number=100)
print(f"\n=== Performance Comparison ===")
print(f"Array size: {len(large_array):,}")
print(f"ufunc time (100 iterations): {ufunc_time:.4f}s")
print(f"Loop time (1000 elements, 100 iterations): {loop_time:.4f}s")
print(f"Estimated loop time for full array: {loop_time * len(large_array) / len(small_array):.1f}s")
print(f"Speedup: ~{(loop_time * len(large_array) / len(small_array)) / ufunc_time:.0f}x")
=== Special Mathematical Functions ===
values: [-3 -1 0 2 5]
sign: [-1 -1 0 1 1]
absolute: [3 1 0 2 5]
Element-wise min/max:
arr1: [1 5 2 8 3]
arr2: [2 3 6 1 9]
minimum: [1 3 2 1 3]
maximum: [2 5 6 8 9]
Clipping:
original: [-5 -2 0 3 8 12]
clip(0, 10): [ 0 0 0 3 8 10]
clip(2, None): [ 2 2 2 3 8 12]
=== Mathematical Constants ===
π: 3.141592653589793
e: 2.718281828459045
∞: inf
NaN: nan
Special value detection:
values: [ 1. inf -inf nan 0.]
isnan: [False False False True False]
isinf: [False True True False False]
isfinite: [ True False False False True]
NaN-safe operations:
data: [ 1. 2. nan 4. 5.]
nanmean: 3.0
nanstd: 1.5811388300841898
nanmin: 1.0
nanmax: 5.0
Custom function:
x: [-3. -2. -1. 0. 1. 2. 3.]
gaussian: [0.00443185 0.05399097 0.24197072 0.39894228 0.24197072 0.05399097
0.00443185]
=== Performance Comparison ===
Array size: 1,000,000
ufunc time (100 iterations): 0.3559s
Loop time (1000 elements, 100 iterations): 0.0315s
Estimated loop time for full array: 31.5s
Speedup: ~89x
Universal function properties:
Common ufunc patterns:
np.sin(np.pi * x)
np.where(condition, x, y)
A * np.exp(-t/tau) * np.cos(omega*t)
np.fft.fft(np.hamming(n) * signal)
Arrays frequently need reshaping, splitting, or combining operations during numerical computations.
import numpy as np
# Create test arrays
arr_1d = np.arange(12)
arr_2d = np.arange(12).reshape(3, 4)
arr_3d = np.arange(24).reshape(2, 3, 4)
print("=== Original Arrays ===")
print(f"1D array: {arr_1d}")
print(f"2D array:\n{arr_2d}")
print(f"3D array shape: {arr_3d.shape}")
print("\n=== Reshaping Operations ===")
# Reshape (creates view when possible)
reshaped_2x6 = arr_1d.reshape(2, 6)
reshaped_4x3 = arr_1d.reshape(4, 3)
print(f"1D → 2×6:\n{reshaped_2x6}")
print(f"1D → 4×3:\n{reshaped_4x3}")
# Automatic dimension inference
auto_reshape = arr_1d.reshape(3, -1) # -1 means "figure it out"
print(f"reshape(3, -1):\n{auto_reshape}")
# Flatten operations
flat_array = arr_2d.flatten() # always copies
ravel_array = arr_2d.ravel() # returns view when possible
print(f"flatten(): {flat_array}")
print(f"ravel(): {ravel_array}")
print(f"flatten creates copy: {not np.shares_memory(arr_2d, flat_array)}")
print(f"ravel creates view: {np.shares_memory(arr_2d, ravel_array)}")
# Transpose operations
print(f"\n=== Transpose Operations ===")
print(f"Original 2D:\n{arr_2d}")
print(f"Transpose:\n{arr_2d.T}")
# Multi-dimensional transpose
print(f"3D shape: {arr_3d.shape}")
transposed_3d = arr_3d.transpose(2, 0, 1) # move axes
print(f"After transpose(2,0,1): {transposed_3d.shape}")
# Swap specific axes
swapped = np.swapaxes(arr_3d, 0, 2)
print(f"swapaxes(0, 2): {swapped.shape}")
print("\n=== Adding and Removing Dimensions ===")
# Add new axes
expanded = arr_1d[:, np.newaxis] # add axis as column
print(f"Original shape: {arr_1d.shape}")
print(f"After [:, newaxis]: {expanded.shape}")
print(f"Values:\n{expanded}")
# Multiple new axes
multi_expand = arr_1d[np.newaxis, :, np.newaxis]
print(f"Multiple newaxis: {multi_expand.shape}")
# Using expand_dims
expand_axis0 = np.expand_dims(arr_1d, axis=0)
expand_axis1 = np.expand_dims(arr_1d, axis=1)
print(f"expand_dims axis=0: {expand_axis0.shape}")
print(f"expand_dims axis=1: {expand_axis1.shape}")
# Remove single-dimensional entries
squeezed = np.squeeze(expand_axis0)
print(f"After squeeze: {squeezed.shape}")
print("\n=== Concatenation Operations ===")
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
c = np.array([[9, 10]])
print(f"Array a:\n{a}")
print(f"Array b:\n{b}")
print(f"Array c:\n{c}")
# Concatenate along different axes
concat_rows = np.concatenate([a, b], axis=0) # stack vertically
concat_cols = np.concatenate([a, b], axis=1) # stack horizontally
print(f"Concatenate rows (axis=0):\n{concat_rows}")
print(f"Concatenate columns (axis=1):\n{concat_cols}")
# Convenience functions
vstack_result = np.vstack([a, b, c]) # vertical stack
hstack_result = np.hstack([a, b]) # horizontal stack
print(f"vstack:\n{vstack_result}")
print(f"hstack:\n{hstack_result}")
# Stack with new dimension
stack_axis0 = np.stack([a, b], axis=0) # new axis at position 0
stack_axis2 = np.stack([a, b], axis=2) # new axis at position 2
print(f"stack axis=0 shape: {stack_axis0.shape}")
print(f"stack axis=2 shape: {stack_axis2.shape}")
print("\n=== Splitting Operations ===")
large_array = np.arange(16).reshape(4, 4)
print(f"Array to split:\n{large_array}")
# Split into equal parts
split_rows = np.split(large_array, 2, axis=0) # split into 2 parts along rows
split_cols = np.split(large_array, 4, axis=1) # split into 4 parts along columns
print(f"Split into 2 row groups:")
for i, part in enumerate(split_rows):
print(f" Part {i}:\n{part}")
print(f"Split into 4 column groups:")
for i, part in enumerate(split_cols):
print(f" Part {i}:\n{part}")
# Split at specific indices
split_indices = np.split(large_array, [1, 3], axis=0) # split at rows 1 and 3
print(f"Split at indices [1, 3]:")
for i, part in enumerate(split_indices):
print(f" Part {i}:\n{part}")
# Array splitting convenience functions
vsplit_result = np.vsplit(large_array, 2) # vertical split
hsplit_result = np.hsplit(large_array, 2) # horizontal split
print(f"vsplit into 2 parts:")
for i, part in enumerate(vsplit_result):
print(f" Part {i}:\n{part}")
print("\n=== Tiling and Repeating ===")
small = np.array([[1, 2], [3, 4]])
print(f"Small array:\n{small}")
# Tile array
tiled = np.tile(small, (2, 3)) # repeat 2 times vertically, 3 times horizontally
print(f"Tiled (2, 3):\n{tiled}")
# Repeat elements
arr = np.array([1, 2, 3])
repeat_each = np.repeat(arr, 3) # repeat each element 3 times
repeat_diff = np.repeat(arr, [1, 2, 3]) # repeat different amounts
print(f"Original: {arr}")
print(f"Repeat each 3 times: {repeat_each}")
print(f"Repeat [1,2,3] times: {repeat_diff}")
# Repeat along axis
arr_2d_small = np.array([[1, 2], [3, 4]])
repeat_axis0 = np.repeat(arr_2d_small, 2, axis=0)
repeat_axis1 = np.repeat(arr_2d_small, [1, 3], axis=1)
print(f"\nOriginal 2D:\n{arr_2d_small}")
print(f"Repeat along axis=0:\n{repeat_axis0}")
print(f"Repeat along axis=1:\n{repeat_axis1}")
=== Original Arrays ===
1D array: [ 0 1 2 3 4 5 6 7 8 9 10 11]
2D array:
[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
3D array shape: (2, 3, 4)
=== Reshaping Operations ===
1D → 2×6:
[[ 0 1 2 3 4 5]
[ 6 7 8 9 10 11]]
1D → 4×3:
[[ 0 1 2]
[ 3 4 5]
[ 6 7 8]
[ 9 10 11]]
reshape(3, -1):
[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
flatten(): [ 0 1 2 3 4 5 6 7 8 9 10 11]
ravel(): [ 0 1 2 3 4 5 6 7 8 9 10 11]
flatten creates copy: True
ravel creates view: True
=== Transpose Operations ===
Original 2D:
[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]]
Transpose:
[[ 0 4 8]
[ 1 5 9]
[ 2 6 10]
[ 3 7 11]]
3D shape: (2, 3, 4)
After transpose(2,0,1): (4, 2, 3)
swapaxes(0, 2): (4, 3, 2)
=== Adding and Removing Dimensions ===
Original shape: (12,)
After [:, newaxis]: (12, 1)
Values:
[[ 0]
[ 1]
[ 2]
[ 3]
[ 4]
[ 5]
[ 6]
[ 7]
[ 8]
[ 9]
[10]
[11]]
Multiple newaxis: (1, 12, 1)
expand_dims axis=0: (1, 12)
expand_dims axis=1: (12, 1)
After squeeze: (12,)
=== Concatenation Operations ===
Array a:
[[1 2]
[3 4]]
Array b:
[[5 6]
[7 8]]
Array c:
[[ 9 10]]
Concatenate rows (axis=0):
[[1 2]
[3 4]
[5 6]
[7 8]]
Concatenate columns (axis=1):
[[1 2 5 6]
[3 4 7 8]]
vstack:
[[ 1 2]
[ 3 4]
[ 5 6]
[ 7 8]
[ 9 10]]
hstack:
[[1 2 5 6]
[3 4 7 8]]
stack axis=0 shape: (2, 2, 2)
stack axis=2 shape: (2, 2, 2)
=== Splitting Operations ===
Array to split:
[[ 0 1 2 3]
[ 4 5 6 7]
[ 8 9 10 11]
[12 13 14 15]]
Split into 2 row groups:
Part 0:
[[0 1 2 3]
[4 5 6 7]]
Part 1:
[[ 8 9 10 11]
[12 13 14 15]]
Split into 4 column groups:
Part 0:
[[ 0]
[ 4]
[ 8]
[12]]
Part 1:
[[ 1]
[ 5]
[ 9]
[13]]
Part 2:
[[ 2]
[ 6]
[10]
[14]]
Part 3:
[[ 3]
[ 7]
[11]
[15]]
Split at indices [1, 3]:
Part 0:
[[0 1 2 3]]
Part 1:
[[ 4 5 6 7]
[ 8 9 10 11]]
Part 2:
[[12 13 14 15]]
vsplit into 2 parts:
Part 0:
[[0 1 2 3]
[4 5 6 7]]
Part 1:
[[ 8 9 10 11]
[12 13 14 15]]
=== Tiling and Repeating ===
Small array:
[[1 2]
[3 4]]
Tiled (2, 3):
[[1 2 1 2 1 2]
[3 4 3 4 3 4]
[1 2 1 2 1 2]
[3 4 3 4 3 4]]
Original: [1 2 3]
Repeat each 3 times: [1 1 1 2 2 2 3 3 3]
Repeat [1,2,3] times: [1 2 2 3 3 3]
Original 2D:
[[1 2]
[3 4]]
Repeat along axis=0:
[[1 2]
[1 2]
[3 4]
[3 4]]
Repeat along axis=1:
[[1 2 2 2]
[3 4 4 4]]
# Advanced manipulation patterns
print("=== Advanced Manipulation Patterns ===")
# Roll elements (circular shift)
data = np.array([1, 2, 3, 4, 5])
rolled = np.roll(data, 2) # shift right by 2
rolled_left = np.roll(data, -1) # shift left by 1
print(f"Original: {data}")
print(f"Roll right 2: {rolled}")
print(f"Roll left 1: {rolled_left}")
# Roll in 2D
arr_2d = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
roll_rows = np.roll(arr_2d, 1, axis=0)
roll_cols = np.roll(arr_2d, -1, axis=1)
print(f"\nOriginal 2D:\n{arr_2d}")
print(f"Roll rows:\n{roll_rows}")
print(f"Roll columns:\n{roll_cols}")
# Flip arrays
print(f"\n=== Flipping Operations ===")
fliplr_result = np.fliplr(arr_2d) # flip left-right
flipud_result = np.flipud(arr_2d) # flip up-down
print(f"Flip left-right:\n{fliplr_result}")
print(f"Flip up-down:\n{flipud_result}")
# General flip along any axis
flip_axis0 = np.flip(arr_2d, axis=0)
flip_axis1 = np.flip(arr_2d, axis=1)
flip_both = np.flip(arr_2d) # flip all axes
print(f"Flip axis=0:\n{flip_axis0}")
print(f"Flip axis=1:\n{flip_axis1}")
print(f"Flip both:\n{flip_both}")
# Rotation (90-degree multiples)
rot90_once = np.rot90(arr_2d)
rot90_twice = np.rot90(arr_2d, k=2)
rot90_ccw = np.rot90(arr_2d, k=-1) # clockwise
print(f"\nRotate 90° CCW:\n{rot90_once}")
print(f"Rotate 180°:\n{rot90_twice}")
print(f"Rotate 90° CW:\n{rot90_ccw}")
# Block operations for creating larger arrays
print(f"\n=== Block Assembly ===")
A = np.ones((2, 2))
B = 2 * np.ones((2, 2))
C = 3 * np.ones((2, 2))
D = 4 * np.ones((2, 2))
# Create block matrix
block_matrix = np.block([[A, B], [C, D]])
print(f"Block matrix:\n{block_matrix}")
# More complex blocking - ensure dimension compatibility
# First create a 4x4 block from A,B,C,D
block_top = np.block([[A, B], [C, D]]) # (4, 4)
E = 5 * np.ones((4, 2)) # Match height of block_top
# For bottom row: F and G together must match width of top row (4+2=6)
F = 6 * np.ones((2, 4)) # Match width of block_top
G = 7 * np.ones((2, 2)) # Match width of E
# Now create properly dimensioned complex block
complex_block = np.block([[block_top, E], [F, G]])
print(f"Complex block shape: {complex_block.shape}")
print(f"Complex block:\n{complex_block}")
# Alternative: hierarchical blocking
print(f"\n=== Hierarchical Blocking ===")
# Create smaller blocks first
top_left = np.block([[A, B], [C, D]]) # (4, 4)
top_right = 8 * np.ones((4, 3)) # (4, 3)
bottom_left = 9 * np.ones((3, 4)) # (3, 4)
bottom_right = 10 * np.ones((3, 3)) # (3, 3)
hierarchical = np.block([[top_left, top_right],
[bottom_left, bottom_right]])
print(f"Hierarchical shape: {hierarchical.shape}")
print(f"Total elements: {hierarchical.size}")
# Array insertion and deletion
print(f"\n=== Insert and Delete ===")
original = np.array([1, 2, 3, 4, 5])
inserted = np.insert(original, 2, [99, 100]) # insert at index 2
deleted = np.delete(original, [1, 3]) # delete indices 1 and 3
print(f"Original: {original}")
print(f"Insert [99, 100] at index 2: {inserted}")
print(f"Delete indices [1, 3]: {deleted}")
# 2D insertion/deletion
arr_2d = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
insert_row = np.insert(arr_2d, 1, [99, 99, 99], axis=0) # insert row
insert_col = np.insert(arr_2d, 2, [99, 99, 99], axis=1) # insert column
delete_row = np.delete(arr_2d, 1, axis=0) # delete middle row
print(f"\nOriginal 2D:\n{arr_2d}")
print(f"Insert row at index 1:\n{insert_row}")
print(f"Delete row at index 1:\n{delete_row}")
# Memory considerations
print(f"\n=== Memory Efficiency ===")
large_arr = np.random.default_rng(42).random((1000, 1000))
# View vs copy operations
view_reshape = large_arr.reshape(-1) # creates view
copy_flatten = large_arr.flatten() # creates copy
print(f"Original size: {large_arr.nbytes / 1024**2:.1f} MB")
print(f"Reshape creates view: {np.shares_memory(large_arr, view_reshape)}")
print(f"Flatten creates copy: {not np.shares_memory(large_arr, copy_flatten)}")
print(f"Total memory with copy: {(large_arr.nbytes + copy_flatten.nbytes) / 1024**2:.1f} MB")
=== Advanced Manipulation Patterns ===
Original: [1 2 3 4 5]
Roll right 2: [4 5 1 2 3]
Roll left 1: [2 3 4 5 1]
Original 2D:
[[1 2 3]
[4 5 6]
[7 8 9]]
Roll rows:
[[7 8 9]
[1 2 3]
[4 5 6]]
Roll columns:
[[2 3 1]
[5 6 4]
[8 9 7]]
=== Flipping Operations ===
Flip left-right:
[[3 2 1]
[6 5 4]
[9 8 7]]
Flip up-down:
[[7 8 9]
[4 5 6]
[1 2 3]]
Flip axis=0:
[[7 8 9]
[4 5 6]
[1 2 3]]
Flip axis=1:
[[3 2 1]
[6 5 4]
[9 8 7]]
Flip both:
[[9 8 7]
[6 5 4]
[3 2 1]]
Rotate 90° CCW:
[[3 6 9]
[2 5 8]
[1 4 7]]
Rotate 180°:
[[9 8 7]
[6 5 4]
[3 2 1]]
Rotate 90° CW:
[[7 4 1]
[8 5 2]
[9 6 3]]
=== Block Assembly ===
Block matrix:
[[1. 1. 2. 2.]
[1. 1. 2. 2.]
[3. 3. 4. 4.]
[3. 3. 4. 4.]]
Complex block shape: (6, 6)
Complex block:
[[1. 1. 2. 2. 5. 5.]
[1. 1. 2. 2. 5. 5.]
[3. 3. 4. 4. 5. 5.]
[3. 3. 4. 4. 5. 5.]
[6. 6. 6. 6. 7. 7.]
[6. 6. 6. 6. 7. 7.]]
=== Hierarchical Blocking ===
Hierarchical shape: (7, 7)
Total elements: 49
=== Insert and Delete ===
Original: [1 2 3 4 5]
Insert [99, 100] at index 2: [ 1 2 99 100 3 4 5]
Delete indices [1, 3]: [1 3 5]
Original 2D:
[[1 2 3]
[4 5 6]
[7 8 9]]
Insert row at index 1:
[[ 1 2 3]
[99 99 99]
[ 4 5 6]
[ 7 8 9]]
Delete row at index 1:
[[1 2 3]
[7 8 9]]
=== Memory Efficiency ===
Original size: 7.6 MB
Reshape creates view: True
Flatten creates copy: True
Total memory with copy: 15.3 MB
Operation memory behavior:
Operation | Creates View | Creates Copy | Notes |
---|---|---|---|
reshape() |
Usually | Sometimes | Copy if not C-contiguous |
ravel() |
Usually | Sometimes | Copy if not contiguous |
flatten() |
Never | Always | Always copies |
transpose() |
Always | Never | View with different strides |
concatenate() |
Never | Always | New array required |
split() |
Always | Never | Views of original |
squeeze() |
Always | Never | Remove size-1 dimensions |
Common manipulation workflows:
reshape()
, transpose()
concatenate()
, stack()
, block()
split()
, array slicingnewaxis
, expand_dims()
Essential matrix operations for numerical computing
import time
# Matrix creation and basic properties
A = np.array([[1, 2, 3],
[4, 5, 6],
[7, 8, 10]]) # Note: singular if 10→9
B = np.array([[2, 0, 1],
[1, 3, 0],
[0, 1, 2]])
print("=== Matrix Properties ===")
print(f"A determinant: {np.linalg.det(A):.3f}")
print(f"A condition number: {np.linalg.cond(A):.3f}")
print(f"A rank: {np.linalg.matrix_rank(A)}")
print(f"A trace: {np.trace(A)}")
print(f"Frobenius norm: {np.linalg.norm(A, 'fro'):.3f}")
print(f"\n=== Matrix Products ===")
# Element-wise vs matrix multiplication
elementwise = A * B # Hadamard product
matmul = A @ B # Matrix multiplication (preferred)
matmul_alt = np.dot(A, B) # Alternative syntax
print(f"A * B (element-wise) sum: {np.sum(elementwise)}")
print(f"A @ B (matrix mult) trace: {np.trace(matmul)}")
print(f"Results identical: {np.allclose(matmul, matmul_alt)}")
# Performance comparison for large matrices
n = 1000
large_A = np.random.default_rng(42).random((n, n))
large_B = np.random.default_rng(43).random((n, n))
start = time.time()
result_at = large_A @ large_B
time_at = time.time() - start
start = time.time()
result_dot = np.dot(large_A, large_B)
time_dot = time.time() - start
print(f"\n=== Performance (n={n}) ===")
print(f"@ operator: {time_at:.4f} seconds")
print(f"np.dot(): {time_dot:.4f} seconds")
print(f"Results match: {np.allclose(result_at, result_dot)}")
=== Matrix Properties ===
A determinant: -3.000
A condition number: 88.448
A rank: 3
A trace: 16
Frobenius norm: 17.436
=== Matrix Products ===
A * B (element-wise) sum: 52
A @ B (matrix mult) trace: 52
Results identical: True
=== Performance (n=1000) ===
@ operator: 0.0114 seconds
np.dot(): 0.0087 seconds
Results match: True
print("=== Linear Systems ===")
# Solve Ax = b
b = np.array([6, 15, 25])
x_solve = np.linalg.solve(A, b)
print(f"Solution x: {x_solve}")
print(f"Verification Ax: {A @ x_solve}")
print(f"Residual |Ax - b|: {np.linalg.norm(A @ x_solve - b):.2e}")
# Matrix inverse (use sparingly!)
A_inv = np.linalg.inv(A)
x_inv = A_inv @ b
print(f"Solution via inverse: {x_inv}")
print(f"Solutions match: {np.allclose(x_solve, x_inv)}")
# Condition number impact
print(f"\n=== Numerical Stability ===")
ill_conditioned = np.array([[1, 1], [1, 1.0001]]) # Nearly singular
well_conditioned = np.array([[2, 1], [1, 2]])
print(f"Ill-conditioned cond: {np.linalg.cond(ill_conditioned):.2e}")
print(f"Well-conditioned cond: {np.linalg.cond(well_conditioned):.2e}")
# Demonstrate stability
b_small = np.array([2, 2.0001])
try:
x_ill = np.linalg.solve(ill_conditioned, b_small)
x_well = np.linalg.solve(well_conditioned, b_small)
print(f"Ill-conditioned solution: {x_ill}")
print(f"Well-conditioned solution: {x_well}")
except np.linalg.LinAlgError as e:
print(f"Numerical error: {e}")
=== Linear Systems ===
Solution x: [1. 1. 1.]
Verification Ax: [ 6. 15. 25.]
Residual |Ax - b|: 0.00e+00
Solution via inverse: [1. 1. 1.]
Solutions match: True
=== Numerical Stability ===
Ill-conditioned cond: 4.00e+04
Well-conditioned cond: 3.00e+00
Ill-conditioned solution: [1. 1.]
Well-conditioned solution: [0.66663333 0.66673333]
print("=== Eigenvalue Analysis ===")
# Symmetric matrix for real eigenvalues
S = (A + A.T) / 2 # Make symmetric
eigenvals, eigenvecs = np.linalg.eig(S)
# Sort by eigenvalue magnitude
idx = np.argsort(np.abs(eigenvals))[::-1]
eigenvals = eigenvals[idx]
eigenvecs = eigenvecs[:, idx]
print(f"Eigenvalues: {eigenvals}")
print(f"Largest eigenvalue: {eigenvals[0]:.3f}")
print(f"Spectral radius: {np.max(np.abs(eigenvals)):.3f}")
# Verify eigenvector property: Av = λv
v1 = eigenvecs[:, 0]
lambda1 = eigenvals[0]
Av1 = S @ v1
lambda_v1 = lambda1 * v1
print(f"Av₁: {Av1}")
print(f"λ₁v₁: {lambda_v1}")
print(f"Eigenvalue error: {np.linalg.norm(Av1 - lambda_v1):.2e}")
# Principal component direction
print(f"Principal eigenvector: {v1}")
print(f"Normalized: {v1 / np.linalg.norm(v1)}")
print(f"\n=== Matrix Decompositions ===")
# QR decomposition
Q, R = np.linalg.qr(A)
print(f"QR reconstruction error: {np.linalg.norm(Q @ R - A):.2e}")
print(f"Q orthogonality: {np.linalg.norm(Q.T @ Q - np.eye(3)):.2e}")
print(f"R upper triangular: {np.allclose(R, np.triu(R))}")
# SVD decomposition
U, s, Vt = np.linalg.svd(A)
print(f"SVD singular values: {s}")
print(f"SVD reconstruction error: {np.linalg.norm(U @ np.diag(s) @ Vt - A):.2e}")
print(f"Matrix rank (SVD): {np.sum(s > 1e-10)}")
=== Eigenvalue Analysis ===
Eigenvalues: [17.04241624 -1.23280239 0.19038615]
Largest eigenvalue: 17.042
Spectral radius: 17.042
Av₁: [ 5.81114918 9.10875999 13.17971881]
λ₁v₁: [ 5.81114918 9.10875999 13.17971881]
Eigenvalue error: 7.59e-15
Principal eigenvector: [0.34098153 0.53447585 0.77334802]
Normalized: [0.34098153 0.53447585 0.77334802]
=== Matrix Decompositions ===
QR reconstruction error: 5.79e-15
Q orthogonality: 5.24e-16
R upper triangular: True
SVD singular values: [17.41250517 0.87516135 0.19686652]
SVD reconstruction error: 1.16e-14
Matrix rank (SVD): 3
Matrix operation decision matrix:
Problem | Recommended Method | Alternative | Avoid |
---|---|---|---|
Linear system Ax=b | solve(A, b) |
LU decomposition | inv(A) @ b |
Matrix-vector product | A @ v |
dot(A, v) |
Element-wise * |
Eigenvalues only | eigvals(A) |
eig(A)[0] |
Power iteration |
Least squares | lstsq(A, b) |
Normal equations | inv(A.T @ A) |
Matrix rank | matrix_rank(A) |
SVD + threshold | det(A) != 0 |
Condition number | cond(A) |
s_max/s_min |
Manual calculation |
Performance characteristics:
Comprehensive data analysis with NumPy statistical functions
import numpy as np
import matplotlib.pyplot as plt
# Generate sample dataset for analysis
np.random.seed(42)
data_2d = np.random.default_rng(42).normal(100, 15, (1000, 5)) # 1000 samples, 5 features
data_2d[:, 1] *= 1.5 # Scale second column
data_2d[:, 2] += np.sin(np.arange(1000) * 0.01) * 10 # Add pattern to third column
print("=== Basic Statistics ===")
print(f"Data shape: {data_2d.shape}")
print(f"Mean (all): {np.mean(data_2d):.2f}")
print(f"Std (all): {np.std(data_2d):.2f}")
print(f"Min: {np.min(data_2d):.2f}, Max: {np.max(data_2d):.2f}")
print(f"Median: {np.median(data_2d):.2f}")
# Per-column statistics
print(f"\n=== Column-wise Statistics ===")
col_means = np.mean(data_2d, axis=0)
col_stds = np.std(data_2d, axis=0)
col_vars = np.var(data_2d, axis=0)
print(f"Column means: {col_means}")
print(f"Column stds: {col_stds}")
print(f"Column variance: {col_vars}")
# Percentiles and quartiles
print(f"\n=== Percentiles ===")
percentiles = [10, 25, 50, 75, 90]
perc_values = np.percentile(data_2d, percentiles, axis=0)
print(f"Percentiles {percentiles}:")
for i, p in enumerate(percentiles):
print(f" {p:2d}%: {perc_values[i]}")
# Distribution analysis
print(f"\n=== Distribution Properties ===")
from scipy import stats as sp_stats
skewness = sp_stats.skew(data_2d, axis=0)
kurtosis = sp_stats.kurtosis(data_2d, axis=0)
print(f"Skewness: {skewness}")
print(f"Kurtosis: {kurtosis}")
print(f"Normality tests (col 0 p-value): {sp_stats.normaltest(data_2d[:, 0])[1]:.3f}")
=== Basic Statistics ===
Data shape: (1000, 5)
Mean (all): 110.08
Std (all): 26.41
Min: 45.27, Max: 227.72
Median: 104.70
=== Column-wise Statistics ===
Column means: [ 99.31481844 150.20615594 101.8884911 99.63382608 99.37642542]
Column stds: [15.00012517 23.29567561 16.0246545 15.27507478 14.5981574 ]
Column variance: [225.00375504 542.6885021 256.78955195 233.32790948 213.10619938]
=== Percentiles ===
Percentiles [10, 25, 50, 75, 90]:
10%: [ 80.01921498 120.24945756 80.89358993 80.13092701 80.32894777]
25%: [ 89.09368428 134.23864293 91.68213802 89.40103361 89.51094201]
50%: [100.0330224 150.95034715 102.02971994 99.43309766 99.36446098]
75%: [109.19724573 165.74404331 113.21666008 109.80130927 108.8348503 ]
90%: [118.39089722 179.43178042 121.44047056 119.22377475 117.70651899]
=== Distribution Properties ===
Skewness: [-0.04445004 -0.02798872 -0.0645359 0.0391488 0.05162126]
Kurtosis: [-0.17662822 0.2025187 0.19100294 -0.15181094 0.17409964]
Normality tests (col 0 p-value): 0.438
print("=== Advanced Aggregations ===")
# Multiple statistics at once
def describe_array(arr, axis=None):
"""Comprehensive statistics like pandas describe"""
stats = {
'count': arr.size if axis is None else arr.shape[axis],
'mean': np.mean(arr, axis=axis),
'std': np.std(arr, axis=axis),
'min': np.min(arr, axis=axis),
'25%': np.percentile(arr, 25, axis=axis),
'50%': np.percentile(arr, 50, axis=axis),
'75%': np.percentile(arr, 75, axis=axis),
'max': np.max(arr, axis=axis)
}
return stats
# Apply to our data
column_stats = describe_array(data_2d, axis=0)
print("Column 0 statistics:")
for stat, value in column_stats.items():
if isinstance(value, np.ndarray):
print(f" {stat}: {value[0]:.2f}")
else:
print(f" {stat}: {value}")
# Correlation analysis
print(f"\n=== Correlation Matrix ===")
correlation_matrix = np.corrcoef(data_2d.T) # Transpose for variable correlation
print(f"Correlation matrix shape: {correlation_matrix.shape}")
print("Column 0 correlations with others:")
for i in range(5):
print(f" Col 0 vs Col {i}: {correlation_matrix[0, i]:.3f}")
# Find highest correlations
max_corr_idx = np.unravel_index(
np.argmax(np.abs(correlation_matrix - np.eye(5))),
correlation_matrix.shape
)
print(f"Highest correlation: Col {max_corr_idx[0]} vs Col {max_corr_idx[1]}: {correlation_matrix[max_corr_idx]:.3f}")
print(f"\n=== Robust Statistics ===")
# Robust alternatives to mean/std
median_vals = np.median(data_2d, axis=0)
mad_vals = np.median(np.abs(data_2d - median_vals), axis=0) # Median Absolute Deviation
iqr_vals = np.percentile(data_2d, 75, axis=0) - np.percentile(data_2d, 25, axis=0)
print(f"Medians: {median_vals}")
print(f"MAD: {mad_vals}")
print(f"IQR: {iqr_vals}")
=== Advanced Aggregations ===
Column 0 statistics:
count: 1000
mean: 99.31
std: 15.00
min: 54.04
25%: 89.09
50%: 100.03
75%: 109.20
max: 139.52
=== Correlation Matrix ===
Correlation matrix shape: (5, 5)
Column 0 correlations with others:
Col 0 vs Col 0: 1.000
Col 0 vs Col 1: 0.049
Col 0 vs Col 2: -0.037
Col 0 vs Col 3: 0.029
Col 0 vs Col 4: 0.021
Highest correlation: Col 0 vs Col 1: 0.049
=== Robust Statistics ===
Medians: [100.0330224 150.95034715 102.02971994 99.43309766 99.36446098]
MAD: [10.0874853 15.57187442 10.90789664 10.30672496 9.68430676]
IQR: [20.10356145 31.50540039 21.53452206 20.40027565 19.3239083 ]
print("=== Performance Optimizations ===")
import time
# Large array for performance testing
large_data = np.random.default_rng(42).normal(0, 1, (10000, 100))
# Compare different aggregation approaches
def time_operation(func, data, name):
start = time.time()
result = func(data)
elapsed = time.time() - start
print(f" {name}: {elapsed:.4f}s")
return result, elapsed
print("Timing comparisons (10k×100 array):")
# Different axis specifications
_, t1 = time_operation(lambda x: np.mean(x), large_data, "Mean (all)")
_, t2 = time_operation(lambda x: np.mean(x, axis=0), large_data, "Mean (axis=0)")
_, t3 = time_operation(lambda x: np.mean(x, axis=1), large_data, "Mean (axis=1)")
# Memory-efficient vs memory-intensive
print(f"\nMemory considerations:")
small_chunk = large_data[:1000, :50]
full_stats = np.array([np.mean(small_chunk), np.std(small_chunk), np.median(small_chunk)])
# Incremental computation
running_mean = np.zeros(small_chunk.shape[1])
for i in range(0, small_chunk.shape[0], 100):
chunk = small_chunk[i:i+100, :]
running_mean += np.mean(chunk, axis=0)
running_mean /= (small_chunk.shape[0] // 100)
print(f"Batch mean: {np.mean(running_mean):.4f}")
print(f"Direct mean: {np.mean(small_chunk):.4f}")
print(f"Difference: {np.abs(np.mean(running_mean) - np.mean(small_chunk)):.2e}")
print(f"\n=== Specialized Functions ===")
# Binning and histograms
hist, edges = np.histogram(data_2d[:, 0], bins=20)
print(f"Histogram bins: {len(hist)}")
print(f"Most frequent bin: {np.argmax(hist)} ({hist[np.argmax(hist)]} counts)")
# Digital signal processing statistics
signal = data_2d[:100, 0] # First 100 points
gradient = np.gradient(signal)
print(f"Signal mean gradient: {np.mean(gradient):.4f}")
print(f"Signal RMS: {np.sqrt(np.mean(signal**2)):.2f}")
# Weighted statistics
weights = np.exp(-0.001 * np.arange(1000)) # Exponential decay weights
weighted_mean = np.average(data_2d[:, 0], weights=weights)
print(f"Weighted mean (decay): {weighted_mean:.2f}")
print(f"Regular mean: {np.mean(data_2d[:, 0]):.2f}")
=== Performance Optimizations ===
Timing comparisons (10k×100 array):
Mean (all): 0.0002s
Mean (axis=0): 0.0002s
Mean (axis=1): 0.0002s
Memory considerations:
Batch mean: -0.0107
Direct mean: -0.0107
Difference: 1.73e-18
=== Specialized Functions ===
Histogram bins: 20
Most frequent bin: 11 (116 counts)
Signal mean gradient: -0.0371
Signal RMS: 100.55
Weighted mean (decay): 99.27
Regular mean: 99.31
Statistical function categories and performance:
Category | Functions | Use Case | Performance Notes |
---|---|---|---|
Basic | mean , std , var , sum |
Quick summaries | Optimized, use axis parameter |
Percentiles | median , percentile , quantile |
Distribution analysis | O(n log n), consider nanpercentile |
Extremes | min , max , argmin , argmax |
Outlier detection | O(n), very fast |
Correlation | corrcoef , cov |
Relationship analysis | O(n²) for features, memory intensive |
Advanced | histogram , bincount |
Distribution shape | Binning strategy affects performance |
Robust | median , MAD |
Outlier-resistant | Slower than mean/std but more reliable |
Memory-efficient patterns:
# Incremental statistics for large datasets
def incremental_stats(data, chunk_size=1000):
"""Compute statistics without loading full array"""
n_chunks = len(data) // chunk_size
running_sum = 0
running_sq_sum = 0
for i in range(n_chunks):
chunk = data[i*chunk_size:(i+1)*chunk_size]
running_sum += np.sum(chunk)
running_sq_sum += np.sum(chunk**2)
mean = running_sum / (n_chunks * chunk_size)
variance = running_sq_sum / (n_chunks * chunk_size) - mean**2
return mean, np.sqrt(variance)
Real-world applications demonstrating NumPy’s computational power
import numpy as np
import matplotlib.pyplot as plt
import time
print("=== Signal Processing Example ===")
# Generate synthetic signal with noise
fs = 1000 # Sampling frequency
t = np.linspace(0, 1, fs)
signal_clean = np.sin(2 * np.pi * 50 * t) + 0.5 * np.sin(2 * np.pi * 120 * t)
noise = np.random.default_rng(42).normal(0, 0.3, len(t))
signal_noisy = signal_clean + noise
# Digital filtering using convolution
def moving_average_filter(signal, window_size):
"""Simple moving average filter"""
kernel = np.ones(window_size) / window_size
return np.convolve(signal, kernel, mode='same')
# Apply filters of different sizes
filtered_5 = moving_average_filter(signal_noisy, 5)
filtered_15 = moving_average_filter(signal_noisy, 15)
filtered_25 = moving_average_filter(signal_noisy, 25)
# Calculate signal-to-noise ratio improvement
def snr_db(clean, noisy):
"""Calculate SNR in dB"""
signal_power = np.mean(clean**2)
noise_power = np.mean((noisy - clean)**2)
return 10 * np.log10(signal_power / noise_power)
print(f"Original SNR: {snr_db(signal_clean, signal_noisy):.2f} dB")
print(f"Filtered SNR (5-pt): {snr_db(signal_clean, filtered_5):.2f} dB")
print(f"Filtered SNR (15-pt): {snr_db(signal_clean, filtered_15):.2f} dB")
print(f"Filtered SNR (25-pt): {snr_db(signal_clean, filtered_25):.2f} dB")
# Frequency domain analysis
fft_signal = np.fft.fft(signal_noisy)
freqs = np.fft.fftfreq(len(t), 1/fs)
dominant_freq = freqs[np.argmax(np.abs(fft_signal[:len(fft_signal)//2]))]
print(f"Dominant frequency: {dominant_freq:.1f} Hz (expected: 50 Hz)")
print(f"\n=== Performance: {len(t)} samples ===")
start = time.time()
_ = moving_average_filter(signal_noisy, 15)
filter_time = time.time() - start
start = time.time()
_ = np.fft.fft(signal_noisy)
fft_time = time.time() - start
print(f"Moving average: {filter_time:.4f}s")
print(f"FFT: {fft_time:.4f}s")
=== Signal Processing Example ===
Original SNR: 8.51 dB
Filtered SNR (5-pt): 10.63 dB
Filtered SNR (15-pt): 1.92 dB
Filtered SNR (25-pt): -1.18 dB
Dominant frequency: 50.0 Hz (expected: 50 Hz)
=== Performance: 1000 samples ===
Moving average: 0.0000s
FFT: 0.0000s
print("=== Monte Carlo Simulation ===")
# Option pricing using Black-Scholes Monte Carlo
def monte_carlo_option_price(S0, K, T, r, sigma, n_paths=10000):
"""
Price European call option using Monte Carlo
S0: Initial stock price, K: Strike price, T: Time to expiration
r: Risk-free rate, sigma: Volatility
"""
dt = T / 252 # Daily time steps
paths = np.zeros((252, n_paths))
paths[0] = S0
# Generate random walks
random_shocks = np.random.default_rng(42).normal(0, 1, (251, n_paths))
for t in range(1, 252):
paths[t] = paths[t-1] * np.exp(
(r - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * random_shocks[t-1]
)
# Calculate payoffs
final_prices = paths[-1]
payoffs = np.maximum(final_prices - K, 0)
option_price = np.exp(-r * T) * np.mean(payoffs)
return option_price, paths, payoffs
# Price options with different parameters
S0, K, T, r = 100, 105, 1.0, 0.05
sigma_low, sigma_high = 0.2, 0.4
price_low, paths_low, payoffs_low = monte_carlo_option_price(S0, K, T, r, sigma_low)
price_high, paths_high, payoffs_high = monte_carlo_option_price(S0, K, T, r, sigma_high)
print(f"Option prices (S0={S0}, K={K}, T={T}):")
print(f"Low volatility (σ={sigma_low}): ${price_low:.2f}")
print(f"High volatility (σ={sigma_high}): ${price_high:.2f}")
print(f"Volatility premium: ${price_high - price_low:.2f}")
# Convergence analysis
path_counts = [1000, 2000, 5000, 10000, 20000]
prices = []
times = []
for n in path_counts:
start = time.time()
price, _, _ = monte_carlo_option_price(S0, K, T, r, sigma_low, n)
elapsed = time.time() - start
prices.append(price)
times.append(elapsed)
print(f"\nConvergence analysis:")
for i, (n, p, t) in enumerate(zip(path_counts, prices, times)):
print(f" {n:5d} paths: ${p:.3f} ({t:.3f}s)")
# Error estimation
final_price = prices[-1]
errors = [abs(p - final_price) for p in prices[:-1]]
print(f"Price errors vs 20k paths: {errors}")
=== Monte Carlo Simulation ===
Option prices (S0=100, K=105, T=1.0):
Low volatility (σ=0.2): $8.14
High volatility (σ=0.4): $16.17
Volatility premium: $8.04
Convergence analysis:
1000 paths: $8.114 (0.003s)
2000 paths: $8.107 (0.006s)
5000 paths: $8.318 (0.013s)
10000 paths: $8.138 (0.028s)
20000 paths: $8.157 (0.057s)
Price errors vs 20k paths: [0.043087504099002416, 0.0497754051377548, 0.16126274836529397, 0.01948377032524995]
print("=== Image Processing Pipeline ===")
# Create synthetic 2D data representing an image
np.random.seed(42)
image_size = 100
x, y = np.meshgrid(np.linspace(-2, 2, image_size), np.linspace(-2, 2, image_size))
# Generate synthetic image with features
clean_image = (
np.exp(-(x**2 + y**2) / 0.5) + # Gaussian blob
0.5 * np.exp(-((x-1)**2 + (y-1)**2) / 0.3) + # Smaller blob
0.3 * np.cos(x * 3) * np.sin(y * 3) # Pattern
)
# Add noise
noisy_image = clean_image + np.random.default_rng(42).normal(0, 0.1, clean_image.shape)
# Image processing operations
def gaussian_kernel(size, sigma):
"""Generate 2D Gaussian kernel"""
ax = np.arange(-size // 2 + 1., size // 2 + 1.)
xx, yy = np.meshgrid(ax, ax)
kernel = np.exp(-(xx**2 + yy**2) / (2 * sigma**2))
return kernel / np.sum(kernel)
def convolve2d(image, kernel):
"""2D convolution using numpy"""
# Pad image
pad_h, pad_w = kernel.shape[0] // 2, kernel.shape[1] // 2
padded = np.pad(image, ((pad_h, pad_h), (pad_w, pad_w)), mode='edge')
# Convolve
result = np.zeros_like(image)
for i in range(image.shape[0]):
for j in range(image.shape[1]):
result[i, j] = np.sum(padded[i:i+kernel.shape[0], j:j+kernel.shape[1]] * kernel)
return result
# Apply different filters
kernel_small = gaussian_kernel(5, 0.8)
kernel_large = gaussian_kernel(11, 2.0)
smoothed_small = convolve2d(noisy_image, kernel_small)
smoothed_large = convolve2d(noisy_image, kernel_large)
# Edge detection using gradient
grad_x = np.gradient(smoothed_small, axis=1)
grad_y = np.gradient(smoothed_small, axis=0)
edge_magnitude = np.sqrt(grad_x**2 + grad_y**2)
print(f"Image processing results:")
print(f"Original image stats: mean={np.mean(clean_image):.3f}, std={np.std(clean_image):.3f}")
print(f"Noisy image stats: mean={np.mean(noisy_image):.3f}, std={np.std(noisy_image):.3f}")
print(f"Smoothed (small) stats: mean={np.mean(smoothed_small):.3f}, std={np.std(smoothed_small):.3f}")
print(f"Edge magnitude max: {np.max(edge_magnitude):.3f}")
# Performance comparison
print(f"\nPerformance comparison:")
start = time.time()
_ = convolve2d(noisy_image, kernel_small)
conv_time = time.time() - start
start = time.time()
_ = np.gradient(noisy_image)
grad_time = time.time() - start
print(f"2D convolution: {conv_time:.4f}s")
print(f"Gradient computation: {grad_time:.4f}s")
# Memory usage analysis
print(f"\nMemory usage:")
print(f"Image size: {noisy_image.nbytes / 1024:.1f} KB")
print(f"Kernel size: {kernel_small.nbytes} bytes")
print(f"Gradient arrays: {(grad_x.nbytes + grad_y.nbytes) / 1024:.1f} KB")
=== Image Processing Pipeline ===
Image processing results:
Original image stats: mean=0.125, std=0.254
Noisy image stats: mean=0.124, std=0.274
Smoothed (small) stats: mean=0.124, std=0.256
Edge magnitude max: 0.148
Performance comparison:
2D convolution: 0.0211s
Gradient computation: 0.0001s
Memory usage:
Image size: 78.1 KB
Kernel size: 200 bytes
Gradient arrays: 156.2 KB
Numerical computing best practices:
Application Domain | Key NumPy Features | Performance Tips |
---|---|---|
Signal Processing | FFT, convolution, filtering | Use scipy.fft for large transforms |
Monte Carlo | Random generation, statistics | Vectorize paths, use numba for speed |
Image Processing | 2D operations, gradients | Consider scipy.ndimage for advanced ops |
Linear Systems | Matrix operations, decomposition | Profile memory usage, use views |
Optimization | Gradient computation, line search | Minimize temporary arrays |
Machine Learning | Broadcasting, dot products | Batch operations, avoid Python loops |
Memory optimization strategies:
# In-place operations save memory
arr += 1 # instead of arr = arr + 1
# Pre-allocate arrays when possible
result = np.empty((n, m)) # faster than appending
# Use views instead of copies
view = arr[::2, ::2] # every other element
# Memory-mapped arrays for large datasets
mmap_arr = np.memmap('data.dat', dtype='float32', mode='r+', shape=(large_n,))
Computational complexity awareness:
# Simple LCG implementation
class SimpleLCG:
"""Linear Congruential Generator."""
def __init__(self, seed=None):
# Numerical Recipes parameters
self.a = 1664525
self.c = 1013904223
self.m = 2**32
self.state = seed if seed else 12345
def next_int(self):
"""Generate next integer."""
self.state = (self.a * self.state + self.c) % self.m
return self.state
def random(self):
"""Generate float in [0, 1)."""
return self.next_int() / self.m
def randint(self, low, high):
"""Generate integer in [low, high)."""
return low + self.next_int() % (high - low)
# Test generator
lcg = SimpleLCG(seed=42)
print("First 10 random floats:")
for _ in range(10):
print(f" {lcg.random():.6f}")
# Period detection
lcg = SimpleLCG(seed=1)
initial = lcg.state
period = 0
while True:
lcg.next_int()
period += 1
if lcg.state == initial or period > 1000000:
break
print(f"\nPeriod detected: {period:,}")
First 10 random floats:
0.252345
0.088125
0.577281
0.222554
0.375660
0.025664
0.447281
0.118460
0.873814
0.994634
Period detected: 1,000,001
# Modern generators comparison
import numpy as np
import time
# Different PRNG algorithms
generators = {
'PCG64': np.random.PCG64,
'MT19937': np.random.MT19937, # Mersenne Twister
'Philox': np.random.Philox,
'SFC64': np.random.SFC64,
}
print("Generator properties:")
for name, gen_class in generators.items():
gen = np.random.Generator(gen_class(seed=42))
# Generate samples for timing
start = time.perf_counter()
samples = gen.random(1000000)
elapsed = time.perf_counter() - start
print(f"\n{name}:")
print(f" Time for 1M: {elapsed:.3f}s")
print(f" Mean: {samples.mean():.6f}")
print(f" Std: {samples.std():.6f}")
# State size comparison
mt = np.random.MT19937(seed=42)
pcg = np.random.PCG64(seed=42)
print(f"\nState size:")
print(f" MT19937: {len(mt.state['state']['key'])} × 32 bits")
print(f" PCG64: 2 × 64 bits")
Generator properties:
PCG64:
Time for 1M: 0.004s
Mean: 0.500026
Std: 0.288635
MT19937:
Time for 1M: 0.003s
Mean: 0.499778
Std: 0.288707
Philox:
Time for 1M: 0.004s
Mean: 0.499806
Std: 0.288525
SFC64:
Time for 1M: 0.003s
Mean: 0.499989
Std: 0.288734
State size:
MT19937: 624 × 32 bits
PCG64: 2 × 64 bits
import numpy as np
# Modern NumPy random (v1.17+)
rng = np.random.default_rng(seed=42)
# Basic generation
print("Uniform [0, 1):", rng.random(5))
print("Integers [0, 10):", rng.integers(0, 10, size=5))
print("Normal(0, 1):", rng.normal(0, 1, size=5))
# Generator state management
initial_state = rng.bit_generator.state
# Generate some numbers
vals1 = rng.random(3)
print(f"\nFirst draw: {vals1}")
# Restore state
rng.bit_generator.state = initial_state
vals2 = rng.random(3)
print(f"After restore: {vals2}")
print(f"Same values: {np.array_equal(vals1, vals2)}")
# Spawn independent streams
parent_rng = np.random.default_rng(seed=42)
child_rngs = parent_rng.spawn(3)
print("\nIndependent streams:")
for i, child in enumerate(child_rngs):
print(f" Stream {i}: {child.random(3)}")
Uniform [0, 1): [0.77395605 0.43887844 0.85859792 0.69736803 0.09417735]
Integers [0, 10): [5 9 7 7 7]
Normal(0, 1): [-0.01680116 -0.85304393 0.87939797 0.77779194 0.0660307 ]
First draw: [0.82276161 0.4434142 0.22723872]
After restore: [0.82276161 0.4434142 0.22723872]
Same values: True
Independent streams:
Stream 0: [0.91674416 0.91098667 0.8765925 ]
Stream 1: [0.46749078 0.0464489 0.59551001]
Stream 2: [0.0712392 0.71015972 0.07180046]
# Performance: vectorized vs loop
import time
rng = np.random.default_rng(seed=42)
n = 1000000
# Vectorized generation
start = time.perf_counter()
vec_samples = rng.normal(0, 1, size=n)
vec_time = time.perf_counter() - start
# Loop generation
start = time.perf_counter()
loop_samples = [rng.normal(0, 1) for _ in range(n)]
loop_time = time.perf_counter() - start
print(f"Generation of {n:,} normals:")
print(f" Vectorized: {vec_time:.3f}s")
print(f" Loop: {loop_time:.3f}s")
print(f" Speedup: {loop_time/vec_time:.1f}x")
# Memory layout matters
print(f"\nMemory layout:")
print(f" Vectorized: {vec_samples.flags['C_CONTIGUOUS']}")
print(f" Array strides: {vec_samples.strides}")
# Custom distribution via inverse transform
def exponential_inverse_transform(rng, rate, size):
"""Generate exponential via inverse CDF."""
u = rng.random(size)
return -np.log(1 - u) / rate
custom_exp = exponential_inverse_transform(rng, rate=2.0, size=1000)
numpy_exp = rng.exponential(scale=0.5, size=1000)
print(f"\nCustom vs NumPy exponential:")
print(f" Custom mean: {custom_exp.mean():.3f}")
print(f" NumPy mean: {numpy_exp.mean():.3f}")
print(f" Theoretical: {0.5:.3f}")
Generation of 1,000,000 normals:
Vectorized: 0.005s
Loop: 0.310s
Speedup: 67.1x
Memory layout:
Vectorized: True
Array strides: (8,)
Custom vs NumPy exponential:
Custom mean: 0.507
NumPy mean: 0.497
Theoretical: 0.500
import numpy as np
from scipy import stats
rng = np.random.default_rng(seed=42)
# Sampling from various distributions
n = 1000
# Continuous distributions
uniform = rng.uniform(low=0, high=10, size=n)
normal = rng.normal(loc=100, scale=15, size=n)
exponential = rng.exponential(scale=2.0, size=n)
gamma = rng.gamma(shape=2, scale=2, size=n)
# Discrete distributions
poisson = rng.poisson(lam=3, size=n)
binomial = rng.binomial(n=10, p=0.3, size=n)
print("Sample statistics:")
print(f"Uniform[0,10]: μ={uniform.mean():.2f}, σ={uniform.std():.2f}")
print(f"Normal(100,15): μ={normal.mean():.2f}, σ={normal.std():.2f}")
print(f"Exponential(2): μ={exponential.mean():.2f}, σ={exponential.std():.2f}")
print(f"Poisson(3): μ={poisson.mean():.2f}, σ={poisson.std():.2f}")
# Multivariate distributions
mean = [0, 0]
cov = [[1, 0.5], [0.5, 1]]
multivariate = rng.multivariate_normal(mean, cov, size=1000)
print(f"\nMultivariate normal:")
print(f" Shape: {multivariate.shape}")
print(f" Correlation: {np.corrcoef(multivariate.T)[0,1]:.3f}")
Sample statistics:
Uniform[0,10]: μ=4.97, σ=2.91
Normal(100,15): μ=98.83, σ=15.21
Exponential(2): μ=1.97, σ=1.96
Poisson(3): μ=2.97, σ=1.75
Multivariate normal:
Shape: (1000, 2)
Correlation: 0.459
# Transformation methods
rng = np.random.default_rng(seed=42)
# Box-Muller transform for normal
def box_muller(rng, size):
"""Generate normal(0,1) from uniform."""
u1 = rng.random(size)
u2 = rng.random(size)
z0 = np.sqrt(-2 * np.log(u1)) * np.cos(2 * np.pi * u2)
z1 = np.sqrt(-2 * np.log(u1)) * np.sin(2 * np.pi * u2)
return z0, z1
z0, z1 = box_muller(rng, 5000)
print(f"Box-Muller normal:")
print(f" Mean: {z0.mean():.3f}, {z1.mean():.3f}")
print(f" Std: {z0.std():.3f}, {z1.std():.3f}")
# Rejection sampling for arbitrary distribution
def rejection_sample(pdf, bounds, rng, n_samples):
"""Sample from arbitrary PDF via rejection."""
samples = []
low, high = bounds
# Find maximum of PDF for efficiency
x_test = np.linspace(low, high, 1000)
max_pdf = np.max([pdf(x) for x in x_test])
while len(samples) < n_samples:
x = rng.uniform(low, high)
y = rng.uniform(0, max_pdf)
if y <= pdf(x):
samples.append(x)
return np.array(samples)
# Sample from beta-like distribution
pdf = lambda x: 2 * x if 0 <= x <= 1 else 0
samples = rejection_sample(pdf, (0, 1), rng, 1000)
print(f"\nRejection sampling:")
print(f" Mean: {samples.mean():.3f} (theory: 0.667)")
Box-Muller normal:
Mean: -0.005, 0.000
Std: 1.002, 1.005
Rejection sampling:
Mean: 0.666 (theory: 0.667)
import numpy as np
def monte_carlo_integrate(func, bounds, n_samples, rng=None):
"""Basic Monte Carlo integration."""
if rng is None:
rng = np.random.default_rng()
a, b = bounds
# Sample uniformly in domain
x = rng.uniform(a, b, n_samples)
# Evaluate function
y = func(x)
# MC estimate: (b-a) * mean(f)
integral = (b - a) * np.mean(y)
std_error = (b - a) * np.std(y) / np.sqrt(n_samples)
return integral, std_error
# Example: ∫₀¹ sin(πx) dx = 2/π
rng = np.random.default_rng(seed=42)
for n in [100, 1000, 10000, 100000]:
integral, error = monte_carlo_integrate(
lambda x: np.sin(np.pi * x),
bounds=(0, 1),
n_samples=n,
rng=rng
)
true_value = 2/np.pi
print(f"n={n:6d}: {integral:.5f} ± {error:.5f} (true: {true_value:.5f})")
# Multidimensional integration
def monte_carlo_nd(func, bounds, n_samples, rng=None):
"""N-dimensional Monte Carlo integration."""
if rng is None:
rng = np.random.default_rng()
ndim = len(bounds)
volume = np.prod([b - a for a, b in bounds])
# Sample in hypercube
samples = np.zeros((n_samples, ndim))
for i, (a, b) in enumerate(bounds):
samples[:, i] = rng.uniform(a, b, n_samples)
# Evaluate function
values = np.array([func(*s) for s in samples])
integral = volume * np.mean(values)
std_error = volume * np.std(values) / np.sqrt(n_samples)
return integral, std_error
# 2D example: ∫∫ exp(-(x²+y²)) dx dy over unit square
def gaussian_2d(x, y):
return np.exp(-(x**2 + y**2))
integral_2d, error_2d = monte_carlo_nd(
gaussian_2d,
bounds=[(0, 1), (0, 1)],
n_samples=10000,
rng=rng
)
print(f"\n2D integral: {integral_2d:.5f} ± {error_2d:.5f}")
n= 100: 0.67226 ± 0.02825 (true: 0.63662)
n= 1000: 0.63032 ± 0.00972 (true: 0.63662)
n= 10000: 0.63801 ± 0.00308 (true: 0.63662)
n=100000: 0.63706 ± 0.00097 (true: 0.63662)
2D integral: 0.55976 ± 0.00217
# Variance reduction: stratified sampling
def stratified_monte_carlo(func, bounds, n_strata, samples_per_stratum, rng=None):
"""Stratified Monte Carlo for variance reduction."""
if rng is None:
rng = np.random.default_rng()
a, b = bounds
stratum_width = (b - a) / n_strata
estimates = []
for i in range(n_strata):
# Sample within stratum
stratum_a = a + i * stratum_width
stratum_b = a + (i + 1) * stratum_width
x = rng.uniform(stratum_a, stratum_b, samples_per_stratum)
y = func(x)
# Stratum estimate
estimates.append(np.mean(y))
# Combine strata
integral = (b - a) * np.mean(estimates)
std_error = (b - a) * np.std(estimates) / np.sqrt(n_strata)
return integral, std_error
# Compare standard vs stratified
n_total = 10000
# Standard MC
standard_int, standard_err = monte_carlo_integrate(
lambda x: np.sin(np.pi * x),
bounds=(0, 1),
n_samples=n_total,
rng=rng
)
# Stratified MC
n_strata = 100
stratified_int, stratified_err = stratified_monte_carlo(
lambda x: np.sin(np.pi * x),
bounds=(0, 1),
n_strata=n_strata,
samples_per_stratum=n_total // n_strata,
rng=rng
)
print(f"Variance reduction comparison ({n_total} samples):")
print(f" Standard: {standard_int:.5f} ± {standard_err:.5f}")
print(f" Stratified: {stratified_int:.5f} ± {stratified_err:.5f}")
print(f" Error reduction: {standard_err/stratified_err:.2f}x")
Variance reduction comparison (10000 samples):
Standard: 0.63776 ± 0.00306
Stratified: 0.63655 ± 0.03078
Error reduction: 0.10x
import numpy as np
class RandomWalk:
"""1D and 2D random walk simulator."""
def __init__(self, seed=None):
self.rng = np.random.default_rng(seed)
def walk_1d(self, n_steps, p_up=0.5):
"""1D random walk with bias."""
steps = self.rng.choice(
[-1, 1],
size=n_steps,
p=[1-p_up, p_up]
)
return np.cumsum(steps)
def walk_2d(self, n_steps):
"""2D random walk on lattice."""
# Possible moves: up, down, left, right
moves = np.array([[0, 1], [0, -1], [-1, 0], [1, 0]])
choices = self.rng.integers(0, 4, size=n_steps)
steps = moves[choices]
path = np.cumsum(steps, axis=0)
return path[:, 0], path[:, 1]
def first_passage_time(self, barrier, max_steps=10000):
"""Time to first reach barrier."""
position = 0
for step in range(max_steps):
position += self.rng.choice([-1, 1])
if abs(position) >= barrier:
return step + 1
return None
# Analyze random walk properties
walker = RandomWalk(seed=42)
# First passage times
barriers = [10, 20, 50]
n_trials = 1000
for barrier in barriers:
times = []
for _ in range(n_trials):
t = walker.first_passage_time(barrier)
if t is not None:
times.append(t)
mean_time = np.mean(times)
theoretical = barrier ** 2 # E[T] = n² for symmetric walk
print(f"Barrier ±{barrier}:")
print(f" Mean time: {mean_time:.1f}")
print(f" Theory: {theoretical:.1f}")
print(f" Success rate: {len(times)/n_trials:.1%}")
Barrier ±10:
Mean time: 99.2
Theory: 100.0
Success rate: 100.0%
Barrier ±20:
Mean time: 405.9
Theory: 400.0
Success rate: 100.0%
Barrier ±50:
Mean time: 2384.1
Theory: 2500.0
Success rate: 99.3%
# Stochastic differential equations
def euler_maruyama(sde_drift, sde_diffusion, x0, t_span, dt, rng=None):
"""Solve SDE using Euler-Maruyama method."""
if rng is None:
rng = np.random.default_rng()
t = np.arange(t_span[0], t_span[1], dt)
n = len(t)
x = np.zeros(n)
x[0] = x0
# Generate all random increments at once
dW = rng.normal(0, np.sqrt(dt), n-1)
for i in range(n-1):
drift = sde_drift(x[i], t[i])
diffusion = sde_diffusion(x[i], t[i])
x[i+1] = x[i] + drift*dt + diffusion*dW[i]
return t, x
# Ornstein-Uhlenbeck process (mean-reverting)
def ou_drift(x, t, theta=1.0, mu=0.0):
return theta * (mu - x)
def ou_diffusion(x, t, sigma=0.3):
return sigma
rng = np.random.default_rng(seed=42)
# Simulate multiple paths
print("Ornstein-Uhlenbeck process:")
for x0 in [2.0, 0.0, -2.0]:
t, x = euler_maruyama(
ou_drift, ou_diffusion,
x0=x0, t_span=(0, 10), dt=0.01,
rng=rng
)
print(f" Start: {x0:+.1f}, End: {x[-1]:+.2f}, Mean: {x[len(x)//2:].mean():+.2f}")
# Geometric Brownian Motion
def gbm_drift(S, t, mu=0.05):
return mu * S
def gbm_diffusion(S, t, sigma=0.2):
return sigma * S
t, S = euler_maruyama(
gbm_drift, gbm_diffusion,
x0=100, t_span=(0, 1), dt=0.001,
rng=rng
)
print(f"\nGBM Stock simulation:")
print(f" Initial: ${S[0]:.2f}")
print(f" Final: ${S[-1]:.2f}")
print(f" Return: {(S[-1]/S[0] - 1)*100:.1f}%")
Ornstein-Uhlenbeck process:
Start: +2.0, End: +0.05, Mean: -0.15
Start: +0.0, End: -0.57, Mean: -0.38
Start: -2.0, End: +0.19, Mean: +0.13
GBM Stock simulation:
Initial: $100.00
Final: $104.77
Return: 4.8%
import numpy as np
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
def monte_carlo_pi(n_samples, seed):
"""Estimate π using Monte Carlo."""
rng = np.random.default_rng(seed)
# Generate points in unit square
x = rng.random(n_samples)
y = rng.random(n_samples)
# Count points inside circle
inside = (x**2 + y**2) <= 1
pi_estimate = 4 * inside.sum() / n_samples
return pi_estimate
# Serial execution
rng_main = np.random.default_rng(seed=42)
n_trials = 4
n_samples = 1000000
print("Serial execution:")
serial_results = []
for i in range(n_trials):
# Use different seed for each trial
seed = rng_main.integers(0, 2**32)
result = monte_carlo_pi(n_samples, seed)
serial_results.append(result)
print(f" Trial {i}: π ≈ {result:.5f}")
print(f" Mean: {np.mean(serial_results):.5f}")
print(f" Std: {np.std(serial_results):.5f}")
# Parallel execution with spawned generators
def parallel_monte_carlo():
"""Parallel MC with independent streams."""
parent_rng = np.random.default_rng(seed=42)
# Spawn independent streams
n_workers = mp.cpu_count()
child_seeds = parent_rng.spawn(n_workers)
# Create tasks with independent seeds
tasks = []
for i in range(n_workers):
# Extract seed value from SeedSequence
seed_value = child_seeds[i].generate_state(1)[0]
tasks.append((n_samples, seed_value))
return tasks
# Note: ProcessPoolExecutor not shown due to execution environment
print(f"\nParallel setup for {mp.cpu_count()} cores created")
Serial execution:
Trial 0: π ≈ 3.14089
Trial 1: π ≈ 3.14045
Trial 2: π ≈ 3.14242
Trial 3: π ≈ 3.14100
Mean: 3.14119
Std: 0.00074
Parallel setup for 16 cores created
# Reproducibility patterns
class RandomExperiment:
"""Reproducible random experiment."""
def __init__(self, master_seed=None):
self.master_seed = master_seed
self.master_rng = np.random.default_rng(master_seed)
self.checkpoints = {}
def get_generator(self, name):
"""Get named generator for reproducibility."""
if name not in self.checkpoints:
# Create deterministic seed from name
seed = self.master_rng.integers(0, 2**32)
self.checkpoints[name] = {
'seed': seed,
'rng': np.random.default_rng(seed)
}
return self.checkpoints[name]['rng']
def reset_generator(self, name):
"""Reset named generator to initial state."""
if name in self.checkpoints:
seed = self.checkpoints[name]['seed']
self.checkpoints[name]['rng'] = np.random.default_rng(seed)
def run_trial(self, trial_id):
"""Run reproducible trial."""
# Each component gets its own stream
data_rng = self.get_generator(f'data_{trial_id}')
noise_rng = self.get_generator(f'noise_{trial_id}')
# Generate data
signal = np.sin(np.linspace(0, 2*np.pi, 100))
noise = noise_rng.normal(0, 0.1, 100)
data = signal + noise
# Subsample
indices = data_rng.choice(100, 20, replace=False)
sample = data[indices]
return sample.mean(), sample.std()
# Run experiment
exp = RandomExperiment(master_seed=12345)
print("First run:")
for i in range(3):
mean, std = exp.run_trial(i)
print(f" Trial {i}: μ={mean:+.3f}, σ={std:.3f}")
# Reset and verify reproducibility
exp = RandomExperiment(master_seed=12345)
print("\nSecond run (same seed):")
for i in range(3):
mean, std = exp.run_trial(i)
print(f" Trial {i}: μ={mean:+.3f}, σ={std:.3f}")
First run:
Trial 0: μ=+0.073, σ=0.678
Trial 1: μ=+0.207, σ=0.638
Trial 2: μ=+0.149, σ=0.752
Second run (same seed):
Trial 0: μ=+0.073, σ=0.678
Trial 1: μ=+0.207, σ=0.638
Trial 2: μ=+0.149, σ=0.752
# Lambda basics
square = lambda x: x**2
add = lambda x, y: x + y
magnitude = lambda v: sum(x**2 for x in v) ** 0.5
print(f"square(5) = {square(5)}")
print(f"add(3, 4) = {add(3, 4)}")
print(f"magnitude([3, 4]) = {magnitude([3, 4])}")
# Lambda limitations - no statements
try:
# This won't work - assignment is a statement
bad_lambda = lambda x: (y := x + 1, y * 2)
except SyntaxError as e:
print(f"\nSyntax error in lambda")
# Workaround using expression forms
result = (lambda x: (x + 1) * 2)(5)
print(f"Workaround result: {result}")
# Lambda with conditional expression
abs_val = lambda x: x if x >= 0 else -x
sign = lambda x: 1 if x > 0 else (-1 if x < 0 else 0)
print(f"\nabs_val(-5) = {abs_val(-5)}")
print(f"sign(-5) = {sign(-5)}")
print(f"sign(0) = {sign(0)}")
print(f"sign(5) = {sign(5)}")
square(5) = 25
add(3, 4) = 7
magnitude([3, 4]) = 5.0
Workaround result: 12
abs_val(-5) = 5
sign(-5) = -1
sign(0) = 0
sign(5) = 1
# Lambdas in data processing
import numpy as np
# Sorting with custom key
points = [(3, 4), (1, 2), (5, 1), (2, 3)]
# Sort by distance from origin
by_distance = sorted(points, key=lambda p: (p[0]**2 + p[1]**2)**0.5)
print(f"By distance: {by_distance}")
# Sort by second coordinate
by_y = sorted(points, key=lambda p: p[1])
print(f"By y-coord: {by_y}")
# Complex sorting - primary and secondary keys
data = [('Alice', 85), ('Bob', 92), ('Charlie', 85), ('David', 78)]
sorted_data = sorted(data, key=lambda x: (-x[1], x[0]))
print(f"\nBy score desc, then name:")
for name, score in sorted_data:
print(f" {name}: {score}")
# Lambda in numerical operations
vectors = np.array([[1, 2], [3, 4], [5, 6]])
# Apply transformation
transform = lambda v: v / np.linalg.norm(v)
normalized = np.apply_along_axis(transform, 1, vectors)
print(f"\nNormalized vectors:")
print(normalized)
By distance: [(1, 2), (2, 3), (3, 4), (5, 1)]
By y-coord: [(5, 1), (1, 2), (2, 3), (3, 4)]
By score desc, then name:
Bob: 92
Alice: 85
Charlie: 85
David: 78
Normalized vectors:
[[0.4472136 0.89442719]
[0.6 0.8 ]
[0.6401844 0.76822128]]
import timeit
# Performance comparison
def regular_square(x):
return x * x
lambda_square = lambda x: x * x
# Built-in operator
from operator import mul
partial_square = lambda x: mul(x, x)
# Test data
data = list(range(1000))
# Time different approaches
n = 10000
t_regular = timeit.timeit(
lambda: [regular_square(x) for x in data],
number=n
)
t_lambda = timeit.timeit(
lambda: [(lambda x: x * x)(x) for x in data],
number=n
)
t_map_lambda = timeit.timeit(
lambda: list(map(lambda x: x * x, data)),
number=n
)
t_builtin = timeit.timeit(
lambda: [x * x for x in data],
number=n
)
print(f"Squaring 1000 numbers ({n} iterations):")
print(f" Regular function: {t_regular:.3f}s")
print(f" Lambda in loop: {t_lambda:.3f}s ({t_lambda/t_regular:.2f}x)")
print(f" Map with lambda: {t_map_lambda:.3f}s ({t_map_lambda/t_regular:.2f}x)")
print(f" Direct operator: {t_builtin:.3f}s ({t_builtin/t_regular:.2f}x)")
Squaring 1000 numbers (10000 iterations):
Regular function: 0.244s
Lambda in loop: 0.533s (2.18x)
Map with lambda: 0.276s (1.13x)
Direct operator: 0.127s (0.52x)
# Closure pitfall demonstration
print("Closure problem:")
funcs_bad = []
for i in range(3):
funcs_bad.append(lambda x: x + i)
# All functions use final value of i
for j, f in enumerate(funcs_bad):
print(f" func[{j}](10) = {f(10)}") # All return 12!
print("\nFixed with default argument:")
funcs_good = []
for i in range(3):
funcs_good.append(lambda x, i=i: x + i)
for j, f in enumerate(funcs_good):
print(f" func[{j}](10) = {f(10)}") # Correct: 10, 11, 12
# Alternative: use functools.partial
from functools import partial
def add(x, y):
return x + y
print("\nUsing partial:")
funcs_partial = [partial(add, y=i) for i in range(3)]
for j, f in enumerate(funcs_partial):
print(f" func[{j}](10) = {f(10)}")
# Memory implications
import sys
print(f"\nMemory usage:")
print(f" Lambda: {sys.getsizeof(lambda x: x**2)} bytes")
print(f" Function: {sys.getsizeof(regular_square)} bytes")
print(f" Partial: {sys.getsizeof(partial(mul, 2))} bytes")
Closure problem:
func[0](10) = 12
func[1](10) = 12
func[2](10) = 12
Fixed with default argument:
func[0](10) = 10
func[1](10) = 11
func[2](10) = 12
Using partial:
func[0](10) = 10
func[1](10) = 11
func[2](10) = 12
Memory usage:
Lambda: 152 bytes
Function: 152 bytes
Partial: 80 bytes
from functools import reduce
import operator
data = [1, 2, 3, 4, 5]
# Map: transform each element
squared = list(map(lambda x: x**2, data))
print(f"Original: {data}")
print(f"Squared: {squared}")
# Filter: select elements
evens = list(filter(lambda x: x % 2 == 0, data))
odds = list(filter(lambda x: x % 2 == 1, data))
print(f"\nEvens: {evens}")
print(f"Odds: {odds}")
# Reduce: accumulate to single value
sum_all = reduce(operator.add, data)
product = reduce(operator.mul, data)
max_val = reduce(lambda a, b: a if a > b else b, data)
print(f"\nSum: {sum_all}")
print(f"Product: {product}")
print(f"Max: {max_val}")
# Combining operations
result = reduce(
operator.add,
filter(
lambda x: x > 10,
map(lambda x: x**2, data)
)
)
print(f"\nSum of squares > 10: {result}")
# Functional pipeline
def pipeline(data, *functions):
"""Apply functions in sequence."""
for func in functions:
data = func(data)
return data
result = pipeline(
range(10),
lambda x: map(lambda n: n**2, x),
lambda x: filter(lambda n: n % 2 == 0, x),
list
)
print(f"\nPipeline result: {result}")
Original: [1, 2, 3, 4, 5]
Squared: [1, 4, 9, 16, 25]
Evens: [2, 4]
Odds: [1, 3, 5]
Sum: 15
Product: 120
Max: 5
Sum of squares > 10: 41
Pipeline result: [0, 4, 16, 36, 64]
# Performance comparison
import timeit
import numpy as np
data = list(range(1000))
# Map alternatives
t_map = timeit.timeit(
lambda: list(map(lambda x: x**2, data)),
number=1000
)
t_comp = timeit.timeit(
lambda: [x**2 for x in data],
number=1000
)
t_numpy = timeit.timeit(
lambda: np.array(data)**2,
number=1000
)
print(f"Squaring 1000 numbers:")
print(f" map(): {t_map:.3f}s")
print(f" List comp: {t_comp:.3f}s ({t_map/t_comp:.2f}x)")
print(f" NumPy: {t_numpy:.3f}s ({t_map/t_numpy:.1f}x)")
# Memory efficiency
print(f"\nMemory behavior:")
# Map returns iterator (lazy)
mapped = map(lambda x: x**2, range(1000000))
print(f" map object: {sys.getsizeof(mapped)} bytes")
# List comprehension creates full list
comp = [x**2 for x in range(100000)] # Smaller for memory
print(f" List comp (100k): {sys.getsizeof(comp):,} bytes")
# Generator expression (lazy like map)
gen = (x**2 for x in range(1000000))
print(f" Generator expr: {sys.getsizeof(gen)} bytes")
# Practical reduce example
from itertools import accumulate
# Running statistics
values = [3, 1, 4, 1, 5, 9, 2, 6]
running_sum = list(accumulate(values))
running_max = list(accumulate(values, max))
print(f"\nValues: {values}")
print(f"Running sum: {running_sum}")
print(f"Running max: {running_max}")
Squaring 1000 numbers:
map(): 0.035s
List comp: 0.019s (1.80x)
NumPy: 0.027s (1.3x)
Memory behavior:
map object: 48 bytes
List comp (100k): 800,984 bytes
Generator expr: 208 bytes
Values: [3, 1, 4, 1, 5, 9, 2, 6]
Running sum: [3, 4, 8, 9, 14, 23, 25, 31]
Running max: [3, 3, 4, 4, 5, 9, 9, 9]
import numpy as np
from functools import reduce
# Signal processing pipeline
def process_signals(signals):
"""Functional signal processing pipeline."""
# Filter: remove invalid signals
valid = filter(lambda s: not np.any(np.isnan(s)), signals)
# Map: normalize each signal
normalized = map(lambda s: (s - s.mean()) / s.std(), valid)
# Map: compute features
features = map(lambda s: {
'energy': np.sum(s**2),
'peak': np.max(np.abs(s)),
'zero_crossings': np.sum(np.diff(np.sign(s)) != 0)
}, normalized)
return list(features)
# Generate test signals
np.random.seed(42)
signals = [
np.sin(np.linspace(0, 2*np.pi, 100)) + 0.1*np.random.randn(100),
np.random.randn(100),
np.array([np.nan] * 100), # Invalid
np.cos(np.linspace(0, 4*np.pi, 100)),
]
results = process_signals(signals)
print("Signal features:")
for i, feat in enumerate(results):
print(f" Signal {i}: energy={feat['energy']:.1f}, "
f"peak={feat['peak']:.2f}, crossings={feat['zero_crossings']}")
# Matrix operations with functional approach
def matrix_pipeline(matrix):
"""Process matrix rows functionally."""
# Filter rows by condition
filtered_rows = filter(
lambda row: np.sum(row) > 0,
matrix
)
# Normalize each row
normalized = map(
lambda row: row / np.linalg.norm(row),
filtered_rows
)
# Reduce to summary statistics
row_list = list(normalized)
if row_list:
mean_row = reduce(
lambda a, b: a + b,
row_list
) / len(row_list)
return mean_row
return None
# Test matrix
matrix = np.random.randn(10, 5)
matrix[2] = -np.abs(matrix[2]) # Make one row negative sum
result = matrix_pipeline(matrix)
print(f"\nMean normalized row: {result}")
Signal features:
Signal 0: energy=100.0, peak=1.80, crossings=3
Signal 1: energy=100.0, peak=2.84, crossings=54
Signal 2: energy=100.0, peak=1.42, crossings=4
Mean normalized row: [ 0.01159341 0.05561288 -0.05330941 0.1654067 0.35741119]
# Parallel map for expensive operations
from multiprocessing import Pool
import time
def expensive_computation(x):
"""Simulate expensive computation."""
# In real code, this might be optimization, simulation, etc.
result = 0
for i in range(100000):
result += x * np.sin(i) * np.cos(i)
return result / 100000
# Sequential map
data = list(range(10))
start = time.perf_counter()
sequential_results = list(map(expensive_computation, data))
seq_time = time.perf_counter() - start
print(f"Sequential map: {seq_time:.3f}s")
# Parallel map (simplified for demonstration)
# Note: In Jupyter/interactive environments, use:
# from multiprocessing.dummy import Pool # Thread pool
# Or restructure for subprocess pool
def parallel_map(func, data, n_workers=4):
"""Simple parallel map implementation."""
# Simulate parallel execution
# In production, use multiprocessing.Pool
results = []
chunk_size = len(data) // n_workers
for i in range(0, len(data), chunk_size):
chunk = data[i:i+chunk_size]
results.extend(map(func, chunk))
return results
# Lazy evaluation for large datasets
def lazy_pipeline(data_generator):
"""Process infinite/large data streams."""
# Chain of transformations (all lazy)
transformed = map(lambda x: x**2, data_generator)
filtered = filter(lambda x: x % 2 == 0, transformed)
processed = map(lambda x: x // 2, filtered)
# Only compute what's needed
return processed
# Infinite generator
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# Process only first 10 values
pipeline = lazy_pipeline(fibonacci())
first_10 = [next(pipeline) for _ in range(10)]
print(f"\nFirst 10 processed Fibonacci: {first_10}")
# Memory comparison
import sys
eager_list = [x**2 for x in range(100000)]
lazy_gen = (x**2 for x in range(100000))
print(f"\nMemory usage:")
print(f" Eager list: {sys.getsizeof(eager_list):,} bytes")
print(f" Lazy generator: {sys.getsizeof(lazy_gen):,} bytes")
Sequential map: 0.963s
First 10 processed Fibonacci: [0, 2, 32, 578, 10368, 186050, 3338528, 59907458, 1074995712, 19290015362]
Memory usage:
Eager list: 800,984 bytes
Lazy generator: 208 bytes
from functools import partial, reduce
import operator
# Partial function application
def power(base, exponent):
return base ** exponent
# Create specialized functions
square = partial(power, exponent=2)
cube = partial(power, exponent=3)
two_to_power = partial(power, base=2) # 2^x
print("Partial functions:")
print(f" square(5) = {square(5)}")
print(f" cube(5) = {cube(5)}")
print(f" 2^10 = {two_to_power(exponent=10)}")
# Partial with multiple arguments
def integrate(func, lower, upper, method='trapezoid', n_points=100):
"""Numerical integration."""
x = np.linspace(lower, upper, n_points)
y = func(x)
if method == 'trapezoid':
return np.trapz(y, x)
elif method == 'simpson':
from scipy.integrate import simpson
return simpson(y, x)
else:
raise ValueError(f"Unknown method: {method}")
# Create specialized integrators
simpson_integrate = partial(integrate, method='simpson')
unit_integrate = partial(integrate, lower=0, upper=1)
precise_integrate = partial(integrate, n_points=1000)
# Use specialized versions
result1 = simpson_integrate(np.sin, 0, np.pi)
result2 = unit_integrate(lambda x: x**2)
result3 = precise_integrate(np.exp, 0, 1)
print(f"\nIntegration results:")
print(f" ∫sin(x)dx [0,π] = {result1:.4f}")
print(f" ∫x²dx [0,1] = {result2:.4f}")
print(f" ∫e^x dx [0,1] = {result3:.4f}")
Partial functions:
square(5) = 25
cube(5) = 125
2^10 = 1024
Integration results:
∫sin(x)dx [0,π] = 2.0000
∫x²dx [0,1] = 0.3334
∫e^x dx [0,1] = 1.7183
def curry(func):
def curried(*args, **kwargs):
if len(args) + len(kwargs) >= func.__code__.co_argcount:
return func(*args, **kwargs)
def new_curried(*new_args, **new_kwargs):
return curried(*(args + new_args), **{**kwargs, **new_kwargs})
return new_curried
return curried
@curry
def add3(x, y, z):
return x + y + z
# Sequential application
f1 = add3(1) # Returns partial function
f2 = f1(2) # Returns partial function
result = f2(3) # Returns final result
print(f"Curried addition: {result}")
# Can also call directly
print(f"Direct call: {add3(1, 2, 3)}")
# Practical currying example
@curry
def filter_and_transform(predicate, transform, data):
"""Filter then transform data."""
return [transform(x) for x in data if predicate(x)]
# Create specialized processors
positive_squares = filter_and_transform(
lambda x: x > 0,
lambda x: x**2
)
even_doubles = filter_and_transform(
lambda x: x % 2 == 0,
lambda x: x * 2
)
data = [-2, -1, 0, 1, 2, 3, 4, 5]
print(f"\nOriginal: {data}")
print(f"Positive squares: {positive_squares(data)}")
print(f"Even doubles: {even_doubles(data)}")
# Partial for configuration
class DataProcessor:
def __init__(self):
self.operations = []
def add_operation(self, func, *args, **kwargs):
"""Add partially applied operation."""
if args or kwargs:
func = partial(func, *args, **kwargs)
self.operations.append(func)
def process(self, data):
"""Apply all operations in sequence."""
return reduce(lambda d, op: op(d), self.operations, data)
processor = DataProcessor()
processor.add_operation(np.multiply, 2) # Multiply by 2
processor.add_operation(np.add, 10) # Add 10
data = np.array([1, 2, 3, 4, 5])
result = processor.process(data)
print(f"\nPipeline result: {result}")
Curried addition: 6
Direct call: 6
Original: [-2, -1, 0, 1, 2, 3, 4, 5]
Positive squares: [1, 4, 9, 16, 25]
Even doubles: [-4, 0, 4, 8]
Pipeline result: [12 14 16 18 20]
import time
import functools
from collections import defaultdict
# Timing decorator with statistics
class TimingStats:
def __init__(self):
self.times = defaultdict(list)
def timer(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
self.times[func.__name__].append(elapsed)
# Alert on slow calls
if elapsed > 0.1:
print(f"⚠️ Slow call: {func.__name__} took {elapsed:.3f}s")
return result
wrapper.stats = lambda: self.get_stats(func.__name__)
return wrapper
def get_stats(self, func_name):
times = self.times[func_name]
if not times:
return None
return {
'count': len(times),
'total': sum(times),
'mean': np.mean(times),
'std': np.std(times),
'min': min(times),
'max': max(times)
}
stats = TimingStats()
@stats.timer
def slow_function(n):
"""Simulate variable-time function."""
time.sleep(n * 0.01)
return n ** 2
# Execute multiple times
results = [slow_function(i) for i in [1, 2, 10, 3, 1]]
# Get statistics
print("Timing statistics:")
for key, value in slow_function.stats().items():
if isinstance(value, float):
print(f" {key}: {value:.4f}")
else:
print(f" {key}: {value}")
⚠️ Slow call: slow_function took 0.105s
Timing statistics:
count: 5
total: 0.1851
mean: 0.0370
std: 0.0351
min: 0.0103
max: 0.1050
# Validation decorator with type checking
def validate_inputs(**validators):
"""Decorator factory for input validation."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Get function signature
import inspect
sig = inspect.signature(func)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
# Validate each parameter
for param_name, validator in validators.items():
if param_name in bound.arguments:
value = bound.arguments[param_name]
# Type checking
if isinstance(validator, type):
if not isinstance(value, validator):
raise TypeError(
f"{param_name} must be {validator.__name__}, "
f"got {type(value).__name__}"
)
# Custom validation function
elif callable(validator):
if not validator(value):
raise ValueError(
f"{param_name}={value} failed validation"
)
# Range validation
elif isinstance(validator, tuple):
min_val, max_val = validator
if not min_val <= value <= max_val:
raise ValueError(
f"{param_name}={value} not in range "
f"[{min_val}, {max_val}]"
)
return func(*args, **kwargs)
return wrapper
return decorator
@validate_inputs(
x=(0, 100), # Range
y=float, # Type
z=lambda v: v > 0 # Custom
)
def compute(x, y, z=1.0):
"""Function with validated inputs."""
return (x * y) / z
# Test validation
print("Valid call:", compute(50, 2.5, z=2.0))
try:
compute(150, 2.5) # x out of range
except ValueError as e:
print(f"Validation error: {e}")
try:
compute(50, "2.5") # y wrong type
except TypeError as e:
print(f"Type error: {e}")
Valid call: 62.5
Validation error: x=150 not in range [0, 100]
Type error: y must be float, got str
# Parameterized class-based decorator
class Retry:
"""Decorator to retry failed operations."""
def __init__(self, max_attempts=3, delay=1.0, exceptions=(Exception,)):
self.max_attempts = max_attempts
self.delay = delay
self.exceptions = exceptions
self.attempt_count = defaultdict(int)
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(self.max_attempts):
try:
result = func(*args, **kwargs)
# Reset counter on success
self.attempt_count[func.__name__] = 0
return result
except self.exceptions as e:
last_exception = e
self.attempt_count[func.__name__] += 1
if attempt < self.max_attempts - 1:
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {self.delay}s...")
time.sleep(self.delay)
else:
print(f"All {self.max_attempts} attempts failed")
raise last_exception
# Add method to check statistics
wrapper.get_stats = lambda: {
'total_attempts': self.attempt_count[func.__name__],
'max_attempts': self.max_attempts
}
return wrapper
# Use parameterized decorator
@Retry(max_attempts=3, delay=0.1)
def unreliable_operation():
"""Simulates operation that might fail."""
import random
if random.random() < 0.7: # 70% failure rate
raise ConnectionError("Network error")
return "Success!"
# Test retry behavior
np.random.seed(42)
try:
result = unreliable_operation()
print(f"Result: {result}")
except ConnectionError as e:
print(f"Final failure: {e}")
print(f"Stats: {unreliable_operation.get_stats()}")
Result: Success!
Stats: {'total_attempts': 0, 'max_attempts': 3}
# Decorator factory with configuration
def memoize(maxsize=128, typed=False):
"""Configurable memoization decorator."""
def decorator(func):
cache = {}
cache_info = {'hits': 0, 'misses': 0}
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Create cache key
if typed:
key = (args, tuple(sorted(kwargs.items())),
tuple(type(a) for a in args))
else:
key = (args, tuple(sorted(kwargs.items())))
if key in cache:
cache_info['hits'] += 1
return cache[key]
# Compute and cache
result = func(*args, **kwargs)
# Limit cache size
if len(cache) >= maxsize:
# Remove oldest (simplified LRU)
oldest = next(iter(cache))
del cache[oldest]
cache[key] = result
cache_info['misses'] += 1
return result
# Attach cache control methods
wrapper.cache_clear = lambda: cache.clear()
wrapper.cache_info = lambda: cache_info.copy()
return wrapper
return decorator
# Fibonacci with custom cache
@memoize(maxsize=50, typed=True)
def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)
# Compute Fibonacci numbers
results = [fib(i) for i in range(20)]
print(f"Fib(0-19): {results[:10]}...")
info = fib.cache_info()
print(f"\nCache performance:")
print(f" Hits: {info['hits']}")
print(f" Misses: {info['misses']}")
print(f" Hit rate: {info['hits']/(info['hits']+info['misses']):.1%}")
# Contextual decorator
class Profile:
"""Context-aware profiling decorator."""
def __init__(self, enabled=True):
self.enabled = enabled
self.timings = []
def __call__(self, func):
def wrapper(*args, **kwargs):
if self.enabled:
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
self.timings.append(elapsed)
return result
else:
return func(*args, **kwargs)
wrapper.enable = lambda: setattr(self, 'enabled', True)
wrapper.disable = lambda: setattr(self, 'enabled', False)
wrapper.report = self.report
return wrapper
def report(self):
if self.timings:
print(f"Profiling report:")
print(f" Calls: {len(self.timings)}")
print(f" Total: {sum(self.timings):.3f}s")
print(f" Mean: {np.mean(self.timings):.3f}s")
Fib(0-19): [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]...
Cache performance:
Hits: 36
Misses: 20
Hit rate: 64.3%
SciPy provides Python bindings to decades of optimized numerical code.
import scipy
import numpy as np
import timeit
# Demonstrate overhead vs computation
def small_operation():
"""Python function call dominates."""
A = np.array([[1, 2], [3, 4]])
return scipy.linalg.inv(A)
def large_operation():
"""Fortran computation dominates."""
A = np.random.randn(1000, 1000)
return scipy.linalg.inv(A)
# Measure call overhead
empty_call = timeit.timeit(lambda: None, number=100000)
print(f"Python function call: {empty_call*10:.2f}μs")
# Small vs large operation efficiency
A_small = np.random.randn(2, 2)
A_large = np.random.randn(100, 100)
t_small = timeit.timeit(
lambda: scipy.linalg.inv(A_small),
number=10000
)
t_large = timeit.timeit(
lambda: scipy.linalg.inv(A_large),
number=100
)
print(f"\n2×2 inversion: {t_small/10:.1f}μs per call")
print(f"100×100 inversion: {t_large*10:.1f}ms per call")
# BLAS level comparison
flops_small = 2**3 # O(n³) for inversion
flops_large = 100**3
print(f"\nFLOPS ratio: {flops_large/flops_small:,.0f}×")
print(f"Time ratio: {(t_large/100)/(t_small/10000):.0f}×")
print(f"Efficiency gain: {(flops_large/flops_small)/((t_large/100)/(t_small/10000)):.1f}×")
Python function call: 0.01μs
2×2 inversion: 0.0μs per call
100×100 inversion: 1.0ms per call
FLOPS ratio: 125,000×
Time ratio: 226×
Efficiency gain: 553.1×
Design Implications:
Wrapper overhead is constant - Same Python call cost regardless of problem size
Amortization strategy - Process vectors/matrices, not scalars
Library selection hierarchy:
Memory layout matters - Fortran order (column-major) vs C order (row-major)
Different algorithms excel at different problem structures.
from scipy.optimize import linprog
import numpy as np
# Linear programming problem:
# minimize: -x - 2y
# subject to: x + 2y <= 8
# 2x + y <= 8
# x, y >= 0
c = [-1, -2] # Coefficients (minimize)
A_ub = [[1, 2], [2, 1]] # Inequality constraints
b_ub = [8, 8]
bounds = [(0, None), (0, None)]
# Compare methods
methods = ['interior-point', 'simplex', 'highs']
for method in methods:
result = linprog(c, A_ub=A_ub, b_ub=b_ub,
bounds=bounds, method=method)
print(f"{method:15s}: x={result.x}, "
f"iterations={result.nit}")
# Performance characteristics
n_vars = [10, 100, 1000]
for n in n_vars:
# Random LP problem
c = np.random.randn(n)
A = np.random.randn(n//2, n)
b = np.random.randn(n//2)
import time
# Interior point
start = time.perf_counter()
res_ip = linprog(c, A_ub=A, b_ub=b,
method='interior-point',
options={'disp': False})
time_ip = time.perf_counter() - start
# Simplex
start = time.perf_counter()
res_simplex = linprog(c, A_ub=A, b_ub=b,
method='simplex',
options={'disp': False})
time_simplex = time.perf_counter() - start
print(f"\nn={n:4d} variables:")
print(f" Interior: {time_ip*1000:6.1f}ms")
print(f" Simplex: {time_simplex*1000:6.1f}ms")
interior-point : x=[1.6959609 3.15201955], iterations=4
simplex : x=[2.66666667 2.66666667], iterations=2
highs : x=[0. 4.], iterations=2
n= 10 variables:
Interior: 0.9ms
Simplex: 1.1ms
n= 100 variables:
Interior: 34.8ms
Simplex: 21.0ms
n=1000 variables:
Interior: 118.9ms
Simplex: 1093.8ms
Algorithm Selection Criteria:
Interior Point:
Simplex:
Modern hybrids (HiGHS):
Root finding methods trade convergence speed for robustness.
from scipy import optimize
import numpy as np
def f(x):
"""Test function with known root."""
return x**3 - x - 1
def fprime(x):
"""Derivative of f."""
return 3*x**2 - 1
# Compare root finding methods
x0 = 2.0 # Initial guess
bracket = [1, 2] # Known bracket
# Bisection: guaranteed convergence
root_bisect = optimize.bisect(f, *bracket)
print(f"Bisection: {root_bisect:.10f}")
# Brent: combines bisection, secant, inverse quadratic
root_brent = optimize.brentq(f, *bracket)
print(f"Brent: {root_brent:.10f}")
# Newton: fast but needs derivative
root_newton = optimize.newton(f, x0, fprime=fprime)
print(f"Newton: {root_newton:.10f}")
# Performance comparison
import timeit
n = 1000
times = {}
times['bisect'] = timeit.timeit(
lambda: optimize.bisect(f, 1, 2),
number=n
)
times['brent'] = timeit.timeit(
lambda: optimize.brentq(f, 1, 2),
number=n
)
times['newton'] = timeit.timeit(
lambda: optimize.newton(f, 2.0, fprime=fprime),
number=n
)
print(f"\nTime for {n} root findings:")
for method, time in times.items():
print(f" {method:8s}: {time*1000:.1f}ms")
Bisection: 1.3247179572
Brent: 1.3247179572
Newton: 1.3247179572
Time for 1000 root findings:
bisect : 20.4ms
brent : 5.5ms
newton : 55.4ms
Method Characteristics:
Bisection:
Newton-Raphson:
Brent’s Method (scipy default):
Integration routines balance function evaluations against accuracy.
from scipy import integrate
import numpy as np
def oscillatory(x):
"""Challenging integrand."""
return np.exp(-x**2) * np.cos(10*x)
# Basic integration
result, error = integrate.quad(oscillatory, 0, 3)
print(f"quad result: {result:.8f} ± {error:.2e}")
# Show function evaluation count
full_output = integrate.quad(oscillatory, 0, 3,
full_output=True)
result, error, info = full_output
print(f"Function evaluations: {info['neval']}")
# Compare tolerances
tolerances = [1e-3, 1e-6, 1e-9]
for tol in tolerances:
result, error, info = integrate.quad(
oscillatory, 0, 3,
epsabs=tol, full_output=True
)
print(f"tol={tol:.0e}: {info['neval']:3d} evals, "
f"error={error:.2e}")
# Fixed vs adaptive sampling
def fixed_trapezoid(f, a, b, n):
"""Fixed spacing trapezoid rule."""
x = np.linspace(a, b, n)
y = f(x)
return np.trapz(y, x)
n_fixed = 1000
result_fixed = fixed_trapezoid(oscillatory, 0, 3, n_fixed)
result_adaptive, _, info = integrate.quad(
oscillatory, 0, 3, full_output=True
)
print(f"\nFixed {n_fixed} points: {result_fixed:.6f}")
print(f"Adaptive {info['neval']} points: {result_adaptive:.6f}")
quad result: -0.00000982 ± 6.48e-15
Function evaluations: 147
tol=1e-03: 63 evals, error=4.38e-06
tol=1e-06: 105 evals, error=9.08e-08
tol=1e-09: 147 evals, error=6.48e-15
Fixed 1000 points: -0.000010
Adaptive 147 points: -0.000010
QUADPACK Algorithms (scipy.integrate):
quad (adaptive quadrature):
Performance Factors:
Specialized Routines:
quad_vec
: Vectorized integrandsnquad
: Multiple integrationdblquad
, tplquad
: 2D/3D convenienceodeint
: ODE integrationSparse matrices trade format complexity for memory efficiency.
import scipy.sparse as sp
import numpy as np
# Create sparse matrix
n = 1000
density = 0.01
A_dense = sp.rand(n, n, density=density, format='dense')
A_dense = np.array(A_dense)
# Convert to different formats
A_coo = sp.coo_matrix(A_dense)
A_csr = sp.csr_matrix(A_dense)
A_csc = sp.csc_matrix(A_dense)
# Memory usage
import sys
dense_size = sys.getsizeof(A_dense) / 1024**2
coo_size = (A_coo.data.nbytes +
A_coo.row.nbytes +
A_coo.col.nbytes) / 1024**2
csr_size = (A_csr.data.nbytes +
A_csr.indices.nbytes +
A_csr.indptr.nbytes) / 1024**2
print(f"Matrix {n}×{n}, density {density:.1%}")
print(f"Dense: {dense_size:.2f} MB")
print(f"COO: {coo_size:.2f} MB ({dense_size/coo_size:.1f}× smaller)")
print(f"CSR: {csr_size:.2f} MB ({dense_size/csr_size:.1f}× smaller)")
# Operation performance
v = np.random.randn(n)
import timeit
t_dense = timeit.timeit(lambda: A_dense @ v, number=100)
t_csr = timeit.timeit(lambda: A_csr @ v, number=100)
print(f"\nMatrix-vector multiply (100 iterations):")
print(f"Dense: {t_dense*1000:.1f}ms")
print(f"CSR: {t_csr*1000:.1f}ms ({t_dense/t_csr:.1f}× faster)")
# Format conversion cost
t_to_csr = timeit.timeit(
lambda: sp.csr_matrix(A_dense),
number=10
)
print(f"\nConversion to CSR: {t_to_csr*100:.1f}ms")
Matrix 1000×1000, density 1.0%
Dense: 7.63 MB
COO: 0.15 MB (50.0× smaller)
CSR: 0.12 MB (64.5× smaller)
Matrix-vector multiply (100 iterations):
Dense: 7.7ms
CSR: 0.8ms (10.0× faster)
Conversion to CSR: 4.4ms
Storage Format Trade-offs:
COO (Coordinate):
CSR/CSC (Compressed Sparse Row/Column):
Performance Implications:
Stiff ODEs require implicit methods despite higher computational cost.
from scipy.integrate import solve_ivp
import numpy as np
# Stiff ODE system
def stiff_ode(t, y):
"""Van der Pol oscillator (stiff for μ >> 1)."""
mu = 1000
return [y[1],
mu*(1 - y[0]**2)*y[1] - y[0]]
y0 = [2, 0]
t_span = [0, 3000]
# Compare solvers
import time
# Explicit method (will struggle)
start = time.perf_counter()
sol_rk45 = solve_ivp(stiff_ode, t_span, y0,
method='RK45', rtol=1e-6)
time_rk45 = time.perf_counter() - start
# Implicit method (designed for stiffness)
start = time.perf_counter()
sol_radau = solve_ivp(stiff_ode, t_span, y0,
method='Radau', rtol=1e-6)
time_radau = time.perf_counter() - start
print(f"RK45 (explicit):")
print(f" Steps: {sol_rk45.nfev}")
print(f" Time: {time_rk45:.3f}s")
print(f"\nRadau (implicit):")
print(f" Steps: {sol_radau.nfev}")
print(f" Time: {time_radau:.3f}s")
print(f" Speedup: {time_rk45/time_radau:.1f}×")
# Dense output
t_dense = np.linspace(0, 3000, 1000)
sol_dense = solve_ivp(stiff_ode, t_span, y0,
t_eval=t_dense, method='Radau',
dense_output=True)
print(f"\nDense output: {len(t_dense)} points")
print(f"Internal steps: {sol_dense.nfev}")
RK45 (explicit):
Steps: 11823044
Time: 39.903s
Radau (implicit):
Steps: 7702
Time: 0.082s
Speedup: 485.6×
Dense output: 1000 points
Internal steps: 2804
Stiffness Indicators:
Method Properties:
Explicit (RK45):
Implicit (Radau, BDF):
Matplotlib uses a hierarchical object model that separates concerns across multiple abstraction levels.
import matplotlib.pyplot as plt
import matplotlib as mpl
# Examine object hierarchy
fig = plt.figure(figsize=(8, 4))
ax1 = fig.add_subplot(121)
ax2 = fig.add_subplot(122)
# Plot creates multiple artists
line, = ax1.plot([1, 2, 3], [1, 4, 2], 'b-')
scatter = ax2.scatter([1, 2, 3], [2, 3, 1])
# Object relationships
print("Figure attributes:")
print(f" Number of axes: {len(fig.axes)}")
print(f" DPI: {fig.dpi}")
print(f" Size: {fig.get_size_inches()} inches")
print("\nAxes ax1 contents:")
print(f" Lines: {len(ax1.lines)}")
print(f" Patches: {len(ax1.patches)}")
print(f" Texts: {len(ax1.texts)}")
print(f" Artists: {len(ax1.get_children())}")
# Access artist properties
print(f"\nLine2D properties:")
print(f" Data: x={line.get_xdata()}, y={line.get_ydata()}")
print(f" Color: {line.get_color()}")
print(f" Linewidth: {line.get_linewidth()}")
print(f" Marker: '{line.get_marker()}'")
# Transform pipeline
data_point = (2, 4)
display_point = ax1.transData.transform(data_point)
figure_point = fig.transFigure.inverted().transform(display_point)
print(f"\nTransform pipeline for {data_point}:")
print(f" Data space: {data_point}")
print(f" Display space: {display_point}")
print(f" Figure space: {figure_point}")
plt.close() # Clean up
Figure attributes:
Number of axes: 2
DPI: 100.0
Size: [8. 4.] inches
Axes ax1 contents:
Lines: 1
Patches: 0
Texts: 0
Artists: 11
Line2D properties:
Data: x=[1 2 3], y=[1 4 2]
Color: b
Linewidth: 2.0
Marker: 'None'
Transform pipeline for (2, 4):
Data space: (2, 4)
Display space: [ 663.63636364 1276. ]
Figure space: [0.82954545 3.19 ]
Architecture Components:
Figure: Top-level container
Axes: Plotting area with coordinates
Artists: Everything visible
Transforms: Coordinate mappings
Backend: Rendering engine
Different visualizations optimize for different data characteristics and analysis goals.
import matplotlib.pyplot as plt
import numpy as np
# Performance comparison for different plot types
data_sizes = [100, 1000, 10000, 100000]
plot_types = {
'line': lambda ax, n: ax.plot(np.arange(n), np.random.randn(n)),
'scatter': lambda ax, n: ax.scatter(np.arange(n), np.random.randn(n)),
'hist': lambda ax, n: ax.hist(np.random.randn(n), bins=50),
'imshow': lambda ax, n: ax.imshow(np.random.randn(int(np.sqrt(n)), int(np.sqrt(n)))),
}
import time
print("Rendering time (ms) for different plot types:")
print(f"{'Size':<10} {'Line':<10} {'Scatter':<10} {'Hist':<10} {'Imshow':<10}")
print("-" * 50)
for size in data_sizes:
times = {}
for plot_type, plot_func in plot_types.items():
fig, ax = plt.subplots()
start = time.perf_counter()
plot_func(ax, size)
fig.canvas.draw()
elapsed = (time.perf_counter() - start) * 1000
times[plot_type] = elapsed
plt.close(fig)
print(f"{size:<10} {times['line']:<10.1f} {times['scatter']:<10.1f} "
f"{times['hist']:<10.1f} {times['imshow']:<10.1f}")
# Memory usage analysis
import sys
fig, ax = plt.subplots()
line, = ax.plot(np.arange(10000), np.random.randn(10000))
scatter = ax.scatter(np.arange(1000), np.random.randn(1000))
print(f"\nMemory usage:")
print(f" Line (10k points): ~{sys.getsizeof(line.get_xydata()) / 1024:.1f} KB")
print(f" Scatter (1k points): ~{sys.getsizeof(scatter.get_offsets()) / 1024:.1f} KB")
plt.close()
Rendering time (ms) for different plot types:
Size Line Scatter Hist Imshow
--------------------------------------------------
100 12.1 10.2 21.0 14.1
1000 16.7 11.3 20.5 16.1
10000 65.2 16.3 20.4 14.0
100000 312.8 49.3 22.9 20.6
Memory usage:
Line (10k points): ~156.4 KB
Scatter (1k points): ~0.2 KB
Plot Selection Criteria:
Continuous Data:
Discrete Points:
Statistical:
Performance Tips:
plot()
over scatter()
for large datasetsrasterized=True
Complex figures require careful layout management for effective visualization.
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import numpy as np
# Method 1: Simple grid
fig1, axes = plt.subplots(2, 3, figsize=(9, 6))
print(f"Simple grid: {axes.shape} axes")
# Method 2: GridSpec for complex layouts
fig2 = plt.figure(figsize=(10, 6))
gs = gridspec.GridSpec(3, 3, figure=fig2,
width_ratios=[1, 2, 1],
height_ratios=[1, 2, 1],
hspace=0.3, wspace=0.3)
# Create axes with different spans
ax1 = fig2.add_subplot(gs[0, :]) # Top row, all columns
ax2 = fig2.add_subplot(gs[1:, 0]) # Rows 1-2, first column
ax3 = fig2.add_subplot(gs[1:, 1:]) # Rows 1-2, columns 1-2
print(f"\nGridSpec axes positions:")
print(f" ax1: {ax1.get_position()}")
print(f" ax2: {ax2.get_position()}")
print(f" ax3: {ax3.get_position()}")
# Method 3: Nested layouts
fig3 = plt.figure(figsize=(10, 6))
outer_grid = gridspec.GridSpec(2, 2, figure=fig3)
# Nested grid in first cell
inner_grid = gridspec.GridSpecFromSubplotSpec(
2, 2, subplot_spec=outer_grid[0, 0]
)
# Create nested axes
for i in range(2):
for j in range(2):
ax = fig3.add_subplot(inner_grid[i, j])
ax.text(0.5, 0.5, f'({i},{j})', ha='center')
# Regular axes in other cells
ax_right = fig3.add_subplot(outer_grid[0, 1])
ax_bottom = fig3.add_subplot(outer_grid[1, :])
plt.close('all') # Clean up
# Constrained layout for automatic spacing
fig4, axes = plt.subplots(2, 2, figsize=(8, 6),
constrained_layout=True)
for ax in axes.flat:
ax.plot(np.random.randn(100))
ax.set_title('Auto-spaced')
print("\nConstrained layout automatically adjusts spacing")
plt.close('all')
Simple grid: (2, 3) axes
GridSpec axes positions:
ax1: Bbox(x0=0.125, y0=0.7195833333333334, x1=0.9000000000000001, y1=0.88)
ax2: Bbox(x0=0.125, y0=0.10999999999999999, x1=0.28645833333333337, y1=0.6554166666666668)
ax3: Bbox(x0=0.3510416666666667, y0=0.10999999999999999, x1=0.9000000000000001, y1=0.6554166666666668)
Constrained layout automatically adjusts spacing
# Shared axes for comparison
fig, axes = plt.subplots(3, 1, figsize=(8, 6),
sharex=True)
x = np.linspace(0, 10, 100)
data = [np.sin(x), np.cos(x), np.sin(x) * np.cos(x)]
labels = ['sin(x)', 'cos(x)', 'sin(x)cos(x)']
for ax, y, label in zip(axes, data, labels):
ax.plot(x, y, linewidth=2)
ax.set_ylabel(label)
ax.grid(True, alpha=0.3)
# Only bottom axis shows x-label
if ax == axes[-1]:
ax.set_xlabel('X')
axes[0].set_title('Shared X-axis', fontweight='bold')
# Measure rendering overhead
import time
layout_times = {}
# Simple layout
start = time.perf_counter()
fig, axes = plt.subplots(3, 3)
fig.canvas.draw()
layout_times['simple'] = time.perf_counter() - start
# Complex GridSpec
start = time.perf_counter()
fig = plt.figure()
gs = gridspec.GridSpec(5, 5)
for i in range(5):
for j in range(5):
ax = fig.add_subplot(gs[i, j])
fig.canvas.draw()
layout_times['complex'] = time.perf_counter() - start
print("\nLayout creation overhead:")
for layout, t in layout_times.items():
print(f" {layout}: {t*1000:.1f}ms")
plt.close('all')
Layout creation overhead:
simple: 78.4ms
complex: 143.3ms
Three-dimensional plots require special consideration for perspective and interaction.
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
import time
# Performance analysis for 3D plots
fig = plt.figure(figsize=(10, 5))
# Compare 2D vs 3D rendering
grid_sizes = [10, 20, 30, 40, 50]
times_2d = []
times_3d = []
for size in grid_sizes:
X = np.linspace(-5, 5, size)
Y = np.linspace(-5, 5, size)
X, Y = np.meshgrid(X, Y)
Z = np.sin(np.sqrt(X**2 + Y**2))
# 2D heatmap
fig2d, ax2d = plt.subplots()
start = time.perf_counter()
ax2d.imshow(Z)
fig2d.canvas.draw()
times_2d.append(time.perf_counter() - start)
plt.close(fig2d)
# 3D surface
fig3d = plt.figure()
ax3d = fig3d.add_subplot(111, projection='3d')
start = time.perf_counter()
ax3d.plot_surface(X, Y, Z)
fig3d.canvas.draw()
times_3d.append(time.perf_counter() - start)
plt.close(fig3d)
print("Rendering time comparison (ms):")
print(f"{'Grid':<10} {'2D Heatmap':<15} {'3D Surface':<15} {'Ratio':<10}")
print("-" * 50)
for i, size in enumerate(grid_sizes):
ratio = times_3d[i] / times_2d[i]
print(f"{size}×{size:<6} {times_2d[i]*1000:<15.1f} "
f"{times_3d[i]*1000:<15.1f} {ratio:<10.1f}x")
# View angle impact
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
# Create surface
X = np.linspace(-5, 5, 30)
Y = np.linspace(-5, 5, 30)
X, Y = np.meshgrid(X, Y)
Z = np.sin(np.sqrt(X**2 + Y**2))
ax.plot_surface(X, Y, Z, cmap='viridis')
# Get view angles
elev = ax.elev
azim = ax.azim
print(f"\nDefault view angles:")
print(f" Elevation: {elev}°")
print(f" Azimuth: {azim}°")
# Programmatic rotation
ax.view_init(elev=30, azim=45)
print(f"Modified view: elev=30°, azim=45°")
plt.close()
Rendering time comparison (ms):
Grid 2D Heatmap 3D Surface Ratio
--------------------------------------------------
10×10 14.6 19.5 1.3 x
20×20 18.1 25.8 1.4 x
30×30 14.9 36.6 2.4 x
40×40 16.8 51.5 3.1 x
50×50 158.4 69.3 0.4 x
Default view angles:
Elevation: 30°
Azimuth: -60°
Modified view: elev=30°, azim=45°
<Figure size 1000x500 with 0 Axes>
# 3D plotting optimizations
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
# Optimization strategies
fig = plt.figure(figsize=(8, 4))
# Strategy 1: Reduce resolution
ax1 = fig.add_subplot(121, projection='3d')
X_low = np.linspace(-5, 5, 20) # Low resolution
Y_low = np.linspace(-5, 5, 20)
X_low, Y_low = np.meshgrid(X_low, Y_low)
Z_low = np.sin(np.sqrt(X_low**2 + Y_low**2))
ax1.plot_surface(X_low, Y_low, Z_low,
cmap='viridis', alpha=0.8)
ax1.set_title('Low Resolution (Fast)')
# Strategy 2: Use simpler representations
ax2 = fig.add_subplot(122, projection='3d')
ax2.plot_wireframe(X_low, Y_low, Z_low,
color='blue', linewidth=0.5,
rstride=2, cstride=2) # Skip points
ax2.set_title('Wireframe (Faster)')
# Memory usage
import sys
surf_artist = ax1.collections[0]
wire_artist = ax2.collections[0]
print("\nMemory comparison:")
print(f" Surface plot: ~{len(surf_artist._facecolors)*4/1024:.1f} KB")
print(f" Wireframe: ~{len(wire_artist._segments3d)*24/1024:.1f} KB")
# Z-order sorting overhead
n_triangles = X_low.size * 2 # Approximate
print(f"\nZ-buffer sorting:")
print(f" Triangles to sort: ~{n_triangles}")
print(f" Complexity: O(n log n) = O({n_triangles} log {n_triangles})")
plt.tight_layout()
plt.close()
# Interactive considerations
print("\n3D interaction overhead:")
print(" Mouse rotation: Recomputes projections")
print(" Zoom: Recalculates clipping")
print(" Pan: Updates transform matrices")
Memory comparison:
Surface plot: ~0.0 KB
Wireframe: ~0.5 KB
Z-buffer sorting:
Triangles to sort: ~800
Complexity: O(n log n) = O(800 log 800)
3D interaction overhead:
Mouse rotation: Recomputes projections
Zoom: Recalculates clipping
Pan: Updates transform matrices
Animations visualize temporal evolution but require careful performance optimization.
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import numpy as np
# Basic animation structure
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6))
# Data setup
x = np.linspace(0, 2*np.pi, 100)
line1, = ax1.plot([], [], 'b-', linewidth=2)
line2, = ax2.plot([], [], 'r-', linewidth=2)
ax1.set_xlim(0, 2*np.pi)
ax1.set_ylim(-1.5, 1.5)
ax1.set_title('Animation with Blitting')
ax1.grid(True, alpha=0.3)
ax2.set_xlim(0, 2*np.pi)
ax2.set_ylim(-1.5, 1.5)
ax2.set_title('Accumulated History')
ax2.grid(True, alpha=0.3)
# Animation state
history = []
def init():
"""Initialize animation."""
line1.set_data([], [])
line2.set_data([], [])
return line1, line2
def animate(frame):
"""Animation function called for each frame."""
# Update data
y1 = np.sin(x + frame * 0.1)
line1.set_data(x, y1)
# Accumulate history
if len(history) > 50:
history.pop(0)
history.append(np.sin(2*np.pi * frame / 50))
# Plot history
if history:
line2.set_data(np.linspace(0, 2*np.pi, len(history)),
history)
return line1, line2
# Performance comparison
import time
# Without blitting
start = time.perf_counter()
for i in range(50):
animate(i)
fig.canvas.draw()
no_blit_time = time.perf_counter() - start
print(f"Animation performance (50 frames):")
print(f" Without blitting: {no_blit_time*1000:.1f}ms")
print(f" FPS: {50/no_blit_time:.1f}")
# Memory management
print(f"\nMemory considerations:")
print(f" Line data: {x.nbytes + x.nbytes} bytes per frame")
print(f" History buffer: {len(history) * 8} bytes")
print(f" Canvas cache: ~{fig.canvas.get_width_height()[0] * fig.canvas.get_width_height()[1] * 4} bytes")
plt.close()
# Optimization techniques
print("\nOptimization strategies:")
print(" 1. Use blitting: ~5-10x speedup")
print(" 2. Update only changed artists")
print(" 3. Reduce figure DPI for preview")
print(" 4. Cache static elements")
print(" 5. Use collections for many objects")
Animation performance (50 frames):
Without blitting: 531.3ms
FPS: 94.1
Memory considerations:
Line data: 1600 bytes per frame
History buffer: 400 bytes
Canvas cache: ~1920000 bytes
Optimization strategies:
1. Use blitting: ~5-10x speedup
2. Update only changed artists
3. Reduce figure DPI for preview
4. Cache static elements
5. Use collections for many objects
# Advanced animation patterns
class AnimationController:
"""Efficient animation with caching and blitting."""
def __init__(self, fig, ax):
self.fig = fig
self.ax = ax
self.artists = []
self.static_artists = []
self.backgrounds = {}
def add_artist(self, artist, static=False):
"""Register artist for animation."""
if static:
self.static_artists.append(artist)
else:
self.artists.append(artist)
artist.set_animated(True)
def cache_background(self):
"""Cache static elements for blitting."""
self.fig.canvas.draw()
self.backgrounds['main'] = \
self.fig.canvas.copy_from_bbox(self.ax.bbox)
def update_frame(self, frame_data):
"""Efficient frame update with blitting."""
# Restore background
self.fig.canvas.restore_region(
self.backgrounds['main']
)
# Update and draw animated artists
for artist, data in zip(self.artists, frame_data):
artist.set_data(*data)
self.ax.draw_artist(artist)
# Blit changed region
self.fig.canvas.blit(self.ax.bbox)
# Streaming data animation
class StreamPlot:
"""Real-time data streaming plot."""
def __init__(self, maxlen=100):
self.maxlen = maxlen
self.xdata = []
self.ydata = []
self.fig, self.ax = plt.subplots()
self.line, = self.ax.plot([], [], 'b-')
self.ax.set_ylim(-2, 2)
self.ax.set_xlim(0, maxlen)
def update(self, new_value):
"""Add new data point."""
self.ydata.append(new_value)
if len(self.ydata) > self.maxlen:
self.ydata.pop(0)
self.xdata = list(range(len(self.ydata)))
self.line.set_data(self.xdata, self.ydata)
# Auto-scale if needed
if self.ydata:
ymin, ymax = min(self.ydata), max(self.ydata)
margin = (ymax - ymin) * 0.1
self.ax.set_ylim(ymin - margin, ymax + margin)
# Performance metrics
stream = StreamPlot()
update_times = []
for i in range(100):
start = time.perf_counter()
stream.update(np.sin(i * 0.1) + 0.1 * np.random.randn())
update_times.append(time.perf_counter() - start)
print(f"\nStreaming performance:")
print(f" Mean update: {np.mean(update_times)*1000:.2f}ms")
print(f" Max update: {np.max(update_times)*1000:.2f}ms")
print(f" Achievable FPS: {1/np.mean(update_times):.0f}")
plt.close('all')
Streaming performance:
Mean update: 0.02ms
Max update: 0.06ms
Achievable FPS: 41091
Visualizing large datasets requires strategic optimization to maintain interactivity.
import matplotlib.pyplot as plt
import numpy as np
import time
# Generate large dataset
n_points = 50000
x = np.random.randn(n_points)
y = 2*x + np.random.randn(n_points)
# Benchmark different approaches
results = {}
# 1. Direct scatter (limited to 5k for speed)
fig, ax = plt.subplots()
start = time.perf_counter()
ax.scatter(x[:5000], y[:5000], alpha=0.5, s=1)
fig.canvas.draw()
results['scatter_5k'] = time.perf_counter() - start
plt.close()
# 2. Plot instead of scatter
fig, ax = plt.subplots()
start = time.perf_counter()
ax.plot(x, y, 'o', markersize=1, alpha=0.5)
fig.canvas.draw()
results['plot_50k'] = time.perf_counter() - start
plt.close()
# 3. Hexbin
fig, ax = plt.subplots()
start = time.perf_counter()
ax.hexbin(x, y, gridsize=30)
fig.canvas.draw()
results['hexbin_50k'] = time.perf_counter() - start
plt.close()
# 4. 2D histogram
fig, ax = plt.subplots()
start = time.perf_counter()
ax.hist2d(x, y, bins=50)
fig.canvas.draw()
results['hist2d_50k'] = time.perf_counter() - start
plt.close()
print("Rendering performance:")
for method, time_val in results.items():
print(f" {method:15s}: {time_val*1000:6.1f}ms")
# Memory usage estimation
import sys
# Scatter artist memory
fig, ax = plt.subplots()
scatter = ax.scatter(x[:1000], y[:1000])
scatter_memory = (sys.getsizeof(scatter.get_offsets()) +
sys.getsizeof(scatter.get_facecolors()))
# Line artist memory
line, = ax.plot(x[:1000], y[:1000], 'o')
line_memory = sys.getsizeof(line.get_xydata())
print(f"\nMemory usage (1000 points):")
print(f" Scatter: ~{scatter_memory/1024:.1f} KB")
print(f" Line: ~{line_memory/1024:.1f} KB")
print(f" Ratio: {scatter_memory/line_memory:.1f}x")
plt.close('all')
Rendering performance:
scatter_5k : 15.7ms
plot_50k : 16.0ms
hexbin_50k : 20.6ms
hist2d_50k : 17.4ms
Memory usage (1000 points):
Scatter: ~0.3 KB
Line: ~15.8 KB
Ratio: 0.0x
# Optimization strategies
class OptimizedPlotter:
"""Strategies for large dataset visualization."""
@staticmethod
def downsample(x, y, max_points=1000):
"""Randomly sample if too many points."""
n = len(x)
if n <= max_points:
return x, y
idx = np.random.choice(n, max_points, replace=False)
return x[idx], y[idx]
@staticmethod
def use_rasterization(ax, x, y):
"""Rasterize dense plots."""
ax.scatter(x, y, alpha=0.5, s=1, rasterized=True)
# Rasterized elements become bitmaps
# Faster zooming/panning
@staticmethod
def aggregate_data(x, y, bins=50):
"""Convert to density representation."""
H, xedges, yedges = np.histogram2d(x, y, bins=bins)
return H, xedges, yedges
@staticmethod
def progressive_rendering(x, y, ax):
"""Render in stages for responsiveness."""
# Quick preview
x_preview, y_preview = OptimizedPlotter.downsample(
x, y, max_points=100
)
line = ax.plot(x_preview, y_preview, 'o',
markersize=2, alpha=0.3)[0]
ax.figure.canvas.draw()
# Full render
line.set_data(x, y)
ax.figure.canvas.draw()
# Level-of-detail (LOD) system
class LODPlot:
"""Adaptive detail based on zoom level."""
def __init__(self, x, y):
self.x_full = x
self.y_full = y
self.create_levels()
def create_levels(self):
"""Create multiple resolution levels."""
self.levels = {}
n = len(self.x_full)
# Full resolution
self.levels[0] = (self.x_full, self.y_full)
# Reduced resolutions
for level in [1, 2, 3]:
step = 2 ** level
self.levels[level] = (
self.x_full[::step],
self.y_full[::step]
)
def get_data(self, zoom_level):
"""Return appropriate resolution."""
if zoom_level > 10:
return self.levels[0] # Full
elif zoom_level > 5:
return self.levels[1] # Half
elif zoom_level > 2:
return self.levels[2] # Quarter
else:
return self.levels[3] # Eighth
# Test LOD system
lod = LODPlot(x, y)
print("\nLOD system points at each level:")
for level in range(4):
x_level, y_level = lod.levels[level]
print(f" Level {level}: {len(x_level)} points")
# Chunked rendering for streaming
def chunked_plot(x, y, ax, chunk_size=5000):
"""Plot data in chunks to maintain responsiveness."""
n_chunks = len(x) // chunk_size + 1
for i in range(n_chunks):
start = i * chunk_size
end = min((i + 1) * chunk_size, len(x))
ax.plot(x[start:end], y[start:end], 'o',
markersize=1, alpha=0.5)
# Allow GUI to update
if i % 5 == 0:
ax.figure.canvas.draw()
plt.pause(0.001)
print("\nOptimization impact:")
print(" Rasterization: 2-5x faster interaction")
print(" Aggregation: 10-100x data reduction")
print(" LOD: Adaptive performance")
LOD system points at each level:
Level 0: 50000 points
Level 1: 25000 points
Level 2: 12500 points
Level 3: 6250 points
Optimization impact:
Rasterization: 2-5x faster interaction
Aggregation: 10-100x data reduction
LOD: Adaptive performance
Different backends optimize for different use cases: interactivity vs. publication quality.
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import os
# Check available backends
print("Available backends:")
for backend in matplotlib.rcsetup.all_backends:
print(f" {backend}")
print(f"\nCurrent backend: {matplotlib.get_backend()}")
# Create test figure
fig, ax = plt.subplots(figsize=(6, 4))
x = np.linspace(0, 10, 1000)
ax.plot(x, np.sin(x), 'b-', linewidth=2)
ax.fill_between(x, 0, np.sin(x), alpha=0.3)
ax.set_title('Test Figure')
ax.grid(True, alpha=0.3)
# Save in different formats
formats = {
'png': {'dpi': 100, 'bbox_inches': 'tight'},
'pdf': {'bbox_inches': 'tight'},
'svg': {'bbox_inches': 'tight'},
'eps': {'bbox_inches': 'tight'},
}
file_sizes = {}
import time
for fmt, kwargs in formats.items():
filename = f'test_figure.{fmt}'
start = time.perf_counter()
fig.savefig(filename, format=fmt, **kwargs)
save_time = time.perf_counter() - start
# Check file size
if os.path.exists(filename):
size = os.path.getsize(filename) / 1024 # KB
file_sizes[fmt] = (size, save_time)
os.remove(filename) # Clean up
print("\nOutput format comparison:")
print(f"{'Format':<10} {'Size (KB)':<12} {'Time (ms)':<10}")
print("-" * 35)
for fmt, (size, save_time) in file_sizes.items():
print(f"{fmt:<10} {size:<12.1f} {save_time*1000:<10.1f}")
plt.close()
# DPI impact on file size
dpis = [72, 100, 150, 200, 300]
sizes = []
times = []
for dpi in dpis:
start = time.perf_counter()
fig.savefig('temp.png', dpi=dpi)
times.append(time.perf_counter() - start)
sizes.append(os.path.getsize('temp.png') / 1024)
os.remove('temp.png')
print("\nDPI impact (PNG):")
print(f"{'DPI':<10} {'Size (KB)':<12} {'Time (ms)':<10}")
print("-" * 35)
for dpi, size, save_time in zip(dpis, sizes, times):
print(f"{dpi:<10} {size:<12.1f} {save_time*1000:<10.1f}")
Available backends:
gtk3agg
gtk3cairo
gtk4agg
gtk4cairo
macosx
nbagg
notebook
qtagg
qtcairo
qt5agg
qt5cairo
tkagg
tkcairo
webagg
wx
wxagg
wxcairo
agg
cairo
pdf
pgf
ps
svg
template
Current backend: module://matplotlib_inline.backend_inline
Output format comparison:
Format Size (KB) Time (ms)
-----------------------------------
png 23.4 29.8
pdf 23.2 114.1
svg 68.8 20.1
eps 62.9 20.6
DPI impact (PNG):
DPI Size (KB) Time (ms)
-----------------------------------
72 15.7 11.6
100 23.8 16.1
150 36.7 22.1
200 51.4 33.0
300 82.3 61.9
# Backend-specific optimizations
import matplotlib.pyplot as plt
import numpy as np
# Configure for publication
def setup_publication():
"""Configure matplotlib for publication."""
plt.rcParams.update({
'font.size': 10,
'font.family': 'serif',
'text.usetex': False, # Set True for LaTeX
'axes.linewidth': 1.5,
'axes.labelsize': 11,
'axes.titlesize': 12,
'xtick.labelsize': 10,
'ytick.labelsize': 10,
'legend.fontsize': 10,
'figure.dpi': 100,
'savefig.dpi': 300,
'savefig.bbox': 'tight',
'savefig.pad_inches': 0.1,
})
# Configure for presentation
def setup_presentation():
"""Configure matplotlib for slides."""
plt.rcParams.update({
'font.size': 14,
'font.family': 'sans-serif',
'axes.linewidth': 2,
'axes.labelsize': 16,
'axes.titlesize': 18,
'lines.linewidth': 3,
'lines.markersize': 10,
'xtick.labelsize': 14,
'ytick.labelsize': 14,
'legend.fontsize': 14,
'figure.dpi': 100,
'savefig.dpi': 150,
})
# Backend-specific features
def interactive_features():
"""Features only available in interactive backends."""
fig, ax = plt.subplots()
ax.plot([1, 2, 3], [1, 4, 2])
# Only works with interactive backends
if matplotlib.get_backend().endswith('Agg'):
print("Non-interactive backend - no GUI features")
else:
# These would work in interactive mode
print("Interactive features available:")
print(" - fig.canvas.mpl_connect('button_press_event', callback)")
print(" - plt.ginput(n=3) # Get n mouse clicks")
print(" - plt.waitforbuttonpress()")
plt.close()
interactive_features()
# Format selection guide
print("\nFormat selection guide:")
print(" PNG: Web, general use, fixed resolution")
print(" PDF: LaTeX, publications, vector graphics")
print(" SVG: Web animations, editing, vector")
print(" EPS: Legacy publications, PostScript")
# Memory vs quality tradeoff
print("\nMemory/Quality tradeoffs:")
print(" Rasterized=True: Reduces PDF size for dense plots")
print(" DPI: Higher = better quality, larger files")
print(" Compression: PNG supports lossless compression")
Interactive features available:
- fig.canvas.mpl_connect('button_press_event', callback)
- plt.ginput(n=3) # Get n mouse clicks
- plt.waitforbuttonpress()
Format selection guide:
PNG: Web, general use, fixed resolution
PDF: LaTeX, publications, vector graphics
SVG: Web animations, editing, vector
EPS: Legacy publications, PostScript
Memory/Quality tradeoffs:
Rasterized=True: Reduces PDF size for dense plots
DPI: Higher = better quality, larger files
Compression: PNG supports lossless compression
Understanding implementations reveals performance bottlenecks and numerical limitations.
import numpy as np
import time
# Motivation: Why implementation matters
def library_solve(A, b):
"""Using NumPy's optimized solver."""
return np.linalg.solve(A, b)
def naive_gaussian_elimination(A, b):
"""Educational implementation."""
n = len(b)
A = A.copy()
b = b.copy()
# Forward elimination
for i in range(n):
# Pivot
for j in range(i+1, n):
factor = A[j, i] / A[i, i]
A[j, i:] -= factor * A[i, i:]
b[j] -= factor * b[i]
# Back substitution
x = np.zeros(n)
for i in range(n-1, -1, -1):
x[i] = (b[i] - np.sum(A[i, i+1:] * x[i+1:])) / A[i, i]
return x
# Test both methods
n = 100
A = np.random.randn(n, n)
b = np.random.randn(n)
# Make A well-conditioned
A = A @ A.T + np.eye(n) * 10
t_lib = time.perf_counter()
x_lib = library_solve(A, b)
t_lib = time.perf_counter() - t_lib
t_naive = time.perf_counter()
x_naive = naive_gaussian_elimination(A, b)
t_naive = time.perf_counter() - t_naive
print(f"Performance comparison (n={n}):")
print(f" NumPy: {t_lib*1000:.2f}ms")
print(f" Naive: {t_naive*1000:.2f}ms")
print(f" Ratio: {t_naive/t_lib:.1f}x slower")
# Accuracy comparison
error_lib = np.linalg.norm(A @ x_lib - b)
error_naive = np.linalg.norm(A @ x_naive - b)
print(f"\nAccuracy (residual norm):")
print(f" NumPy: {error_lib:.2e}")
print(f" Naive: {error_naive:.2e}")
Performance comparison (n=100):
NumPy: 0.49ms
Naive: 6.32ms
Ratio: 12.9x slower
Accuracy (residual norm):
NumPy: 1.49e-14
Naive: 1.75e-14
Implementation Benefits:
Performance Understanding
Numerical Awareness
Algorithm Selection
Debugging Capability
Libraries optimize the common case. Understanding implementations helps recognize when your problem is uncommon.
Gradient descent seems simple mathematically but has subtle implementation challenges.
import numpy as np
class GradientDescent:
"""Complete gradient descent implementation."""
def __init__(self, learning_rate=0.01, momentum=0.0,
adaptive='none', epsilon=1e-8):
self.lr = learning_rate
self.momentum = momentum
self.adaptive = adaptive
self.epsilon = epsilon
# State for momentum and adaptive methods
self.velocity = None
self.accumulated_grad = None
self.accumulated_sq_grad = None
self.iteration = 0
def step(self, params, gradients):
"""Single optimization step."""
if self.velocity is None:
self.velocity = np.zeros_like(params)
if self.adaptive == 'adagrad':
if self.accumulated_sq_grad is None:
self.accumulated_sq_grad = np.zeros_like(params)
self.accumulated_sq_grad += gradients ** 2
adapted_lr = self.lr / (np.sqrt(self.accumulated_sq_grad) + self.epsilon)
update = adapted_lr * gradients
elif self.adaptive == 'rmsprop':
if self.accumulated_sq_grad is None:
self.accumulated_sq_grad = np.zeros_like(params)
decay = 0.9
self.accumulated_sq_grad = (decay * self.accumulated_sq_grad +
(1 - decay) * gradients ** 2)
adapted_lr = self.lr / (np.sqrt(self.accumulated_sq_grad) + self.epsilon)
update = adapted_lr * gradients
else: # Standard GD
update = self.lr * gradients
# Apply momentum
self.velocity = self.momentum * self.velocity - update
params += self.velocity
self.iteration += 1
return params
# Test on simple quadratic
def f(x):
"""f(x,y) = x² + 4y²"""
return x[0]**2 + 4*x[1]**2
def grad_f(x):
"""Gradient of f"""
return np.array([2*x[0], 8*x[1]])
# Compare optimizers
optimizers = {
'Vanilla GD': GradientDescent(learning_rate=0.1),
'Momentum': GradientDescent(learning_rate=0.1, momentum=0.9),
'AdaGrad': GradientDescent(learning_rate=1.0, adaptive='adagrad'),
'RMSprop': GradientDescent(learning_rate=0.1, adaptive='rmsprop'),
}
x0 = np.array([5.0, 5.0])
n_iterations = 50
print("Convergence comparison:")
print(f"{'Method':<15} {'Final Loss':<12} {'Iterations to 1e-3':<20}")
print("-" * 50)
for name, optimizer in optimizers.items():
x = x0.copy()
losses = []
converged = None
for i in range(n_iterations):
grad = grad_f(x)
x = optimizer.step(x, grad)
loss = f(x)
losses.append(loss)
if loss < 1e-3 and converged is None:
converged = i
final_loss = losses[-1]
converged = converged if converged else '>50'
print(f"{name:<15} {final_loss:<12.2e} {converged:<20}")
Convergence comparison:
Method Final Loss Iterations to 1e-3
--------------------------------------------------
Vanilla GD 5.09e-09 22
Momentum 2.51e-01 >50
AdaGrad 1.01e-03 >50
RMSprop 1.23e+00 >50
# Numerical issues in gradient descent
def problematic_function(x):
"""Rosenbrock function - difficult optimization."""
return (1 - x[0])**2 + 100*(x[1] - x[0]**2)**2
def grad_rosenbrock(x):
"""Gradient of Rosenbrock."""
dx = -2*(1 - x[0]) - 400*x[0]*(x[1] - x[0]**2)
dy = 200*(x[1] - x[0]**2)
return np.array([dx, dy])
# Demonstrate issues
x_start = np.array([-1.0, 1.0])
# Issue 1: Learning rate sensitivity
learning_rates = [0.0001, 0.001, 0.01]
print("\nLearning rate sensitivity on Rosenbrock:")
for lr in learning_rates:
x = x_start.copy()
for i in range(100):
grad = grad_rosenbrock(x)
x -= lr * grad
# Check for divergence
if np.any(np.abs(x) > 1e10):
print(f" lr={lr}: DIVERGED at iteration {i}")
break
else:
loss = problematic_function(x)
print(f" lr={lr}: Final loss = {loss:.6f}")
# Issue 2: Gradient vanishing/exploding
print("\nGradient magnitude issues:")
test_points = [
np.array([0.0, 0.0]),
np.array([1.0, 1.0]),
np.array([10.0, 100.0]),
]
for point in test_points:
grad = grad_rosenbrock(point)
grad_norm = np.linalg.norm(grad)
print(f" Point {point}: ||∇f|| = {grad_norm:.2e}")
# Issue 3: Condition number impact
print("\nCondition number effects:")
# Hessian at different points
def hessian_rosenbrock(x):
"""Approximate Hessian."""
h11 = 2 - 400*x[1] + 1200*x[0]**2
h12 = -400*x[0]
h22 = 200
return np.array([[h11, h12], [h12, h22]])
for point in test_points:
H = hessian_rosenbrock(point)
eigenvalues = np.linalg.eigvals(H)
condition = np.max(np.abs(eigenvalues)) / np.min(np.abs(eigenvalues))
print(f" Point {point}: κ(H) = {condition:.2e}")
Learning rate sensitivity on Rosenbrock:
lr=0.0001: Final loss = 3.961464
lr=0.001: Final loss = 3.664727
lr=0.01: DIVERGED at iteration 5
Gradient magnitude issues:
Point [0. 0.]: ||∇f|| = 2.00e+00
Point [1. 1.]: ||∇f|| = 0.00e+00
Point [ 10. 100.]: ||∇f|| = 1.80e+01
Condition number effects:
Point [0. 0.]: κ(H) = 1.00e+02
Point [1. 1.]: κ(H) = 2.51e+03
Point [ 10. 100.]: κ(H) = 1.61e+07
Matrix multiplication reveals how memory access patterns dominate modern performance.
import numpy as np
import time
def matmul_naive(A, B):
"""Naive O(n³) implementation."""
n, m = A.shape
m2, p = B.shape
assert m == m2, "Dimension mismatch"
C = np.zeros((n, p))
for i in range(n):
for j in range(p):
for k in range(m):
C[i, j] += A[i, k] * B[k, j]
return C
def matmul_cache_friendly(A, B, block_size=32):
"""Blocked multiplication for cache efficiency."""
n, m = A.shape
m2, p = B.shape
C = np.zeros((n, p))
# Process in blocks
for i0 in range(0, n, block_size):
for j0 in range(0, p, block_size):
for k0 in range(0, m, block_size):
# Block boundaries
i1 = min(i0 + block_size, n)
j1 = min(j0 + block_size, p)
k1 = min(k0 + block_size, m)
# Multiply blocks
C[i0:i1, j0:j1] += A[i0:i1, k0:k1] @ B[k0:k1, j0:j1]
return C
def matmul_transpose_trick(A, B):
"""Transpose B for better cache access."""
n, m = A.shape
m2, p = B.shape
# Transpose B for row-wise access
B_T = B.T
C = np.zeros((n, p))
for i in range(n):
for j in range(p):
# Now both accesses are row-wise
C[i, j] = np.dot(A[i, :], B_T[j, :])
return C
# Performance comparison
sizes = [50, 100, 200]
print("Matrix multiplication performance (ms):")
print(f"{'Size':<8} {'Naive':<12} {'Blocked':<12} {'Transpose':<12} {'NumPy':<12}")
print("-" * 60)
for n in sizes:
A = np.random.randn(n, n)
B = np.random.randn(n, n)
# Naive
if n <= 100: # Too slow for large matrices
t0 = time.perf_counter()
C1 = matmul_naive(A, B)
t_naive = (time.perf_counter() - t0) * 1000
else:
t_naive = np.nan
# Blocked
t0 = time.perf_counter()
C2 = matmul_cache_friendly(A, B)
t_blocked = (time.perf_counter() - t0) * 1000
# Transpose trick
t0 = time.perf_counter()
C3 = matmul_transpose_trick(A, B)
t_transpose = (time.perf_counter() - t0) * 1000
# NumPy
t0 = time.perf_counter()
C4 = A @ B
t_numpy = (time.perf_counter() - t0) * 1000
print(f"{n:<8} {t_naive:<12.2f} {t_blocked:<12.2f} "
f"{t_transpose:<12.2f} {t_numpy:<12.2f}")
# Cache effect demonstration
print("\nCache miss impact:")
n = 100
A = np.random.randn(n, n)
B = np.random.randn(n, n)
B_T = B.T
# Row-major access (cache-friendly)
t0 = time.perf_counter()
for _ in range(100):
sum_rows = np.sum(A, axis=1) # Sum rows
t_row = time.perf_counter() - t0
# Column-major access (cache-unfriendly)
t0 = time.perf_counter()
for _ in range(100):
sum_cols = np.sum(A, axis=0) # Sum columns
t_col = time.perf_counter() - t0
print(f"Row-wise access: {t_row*1000:.2f}ms")
print(f"Column-wise access: {t_col*1000:.2f}ms")
print(f"Ratio: {t_col/t_row:.2f}x slower")
Matrix multiplication performance (ms):
Size Naive Blocked Transpose NumPy
------------------------------------------------------------
50 31.68 0.06 1.48 0.01
100 249.94 0.24 6.09 0.93
200 nan 1.43 28.44 1.08
Cache miss impact:
Row-wise access: 0.34ms
Column-wise access: 0.32ms
Ratio: 0.92x slower
# Strassen's algorithm implementation
def strassen_multiply(A, B, cutoff=32):
"""Strassen's O(n^2.807) algorithm."""
n = A.shape[0]
# Base case: use regular multiplication
if n <= cutoff:
return A @ B
# Ensure even dimensions
if n % 2 != 0:
A = np.pad(A, ((0, 1), (0, 1)), mode='constant')
B = np.pad(B, ((0, 1), (0, 1)), mode='constant')
n = n + 1
mid = n // 2
# Partition matrices
A11 = A[:mid, :mid]
A12 = A[:mid, mid:]
A21 = A[mid:, :mid]
A22 = A[mid:, mid:]
B11 = B[:mid, :mid]
B12 = B[:mid, mid:]
B21 = B[mid:, :mid]
B22 = B[mid:, mid:]
# Strassen's 7 multiplications
M1 = strassen_multiply(A11 + A22, B11 + B22, cutoff)
M2 = strassen_multiply(A21 + A22, B11, cutoff)
M3 = strassen_multiply(A11, B12 - B22, cutoff)
M4 = strassen_multiply(A22, B21 - B11, cutoff)
M5 = strassen_multiply(A11 + A12, B22, cutoff)
M6 = strassen_multiply(A21 - A11, B11 + B12, cutoff)
M7 = strassen_multiply(A12 - A22, B21 + B22, cutoff)
# Combine results
C11 = M1 + M4 - M5 + M7
C12 = M3 + M5
C21 = M2 + M4
C22 = M1 - M2 + M3 + M6
C = np.block([[C11, C12], [C21, C22]])
return C[:A.shape[0], :B.shape[1]]
# Test Strassen
n = 128
A = np.random.randn(n, n)
B = np.random.randn(n, n)
t0 = time.perf_counter()
C_strassen = strassen_multiply(A, B)
t_strassen = time.perf_counter() - t0
t0 = time.perf_counter()
C_numpy = A @ B
t_numpy = time.perf_counter() - t0
error = np.linalg.norm(C_strassen - C_numpy) / np.linalg.norm(C_numpy)
print(f"\nStrassen algorithm (n={n}):")
print(f" Strassen time: {t_strassen*1000:.2f}ms")
print(f" NumPy time: {t_numpy*1000:.2f}ms")
print(f" Relative error: {error:.2e}")
# Crossover point analysis
print("\nCrossover point analysis:")
sizes = [16, 32, 64, 128]
for n in sizes:
ops_regular = n**3
ops_strassen = n**2.807
print(f" n={n:3d}: Regular={ops_regular:8d}, "
f"Strassen={ops_strassen:8.0f}, "
f"Ratio={ops_regular/ops_strassen:.2f}")
Strassen algorithm (n=128):
Strassen time: 0.45ms
NumPy time: 0.23ms
Relative error: 9.00e-16
Crossover point analysis:
n= 16: Regular= 4096, Strassen= 2399, Ratio=1.71
n= 32: Regular= 32768, Strassen= 16786, Ratio=1.95
n= 64: Regular= 262144, Strassen= 117475, Ratio=2.23
n=128: Regular= 2097152, Strassen= 822126, Ratio=2.55
Convolution is fundamental to signal processing and deep learning, with dramatic optimization opportunities.
import numpy as np
import time
def conv1d_direct(signal, kernel):
"""Direct O(NK) convolution."""
N = len(signal)
K = len(kernel)
output = np.zeros(N + K - 1)
for n in range(len(output)):
for k in range(K):
if 0 <= n - k < N:
output[n] += signal[n - k] * kernel[k]
return output
def conv1d_fft(signal, kernel):
"""FFT-based O(N log N) convolution."""
N = len(signal) + len(kernel) - 1
N_fft = 2**int(np.ceil(np.log2(N))) # Next power of 2
# Zero-pad and FFT
signal_fft = np.fft.fft(signal, N_fft)
kernel_fft = np.fft.fft(kernel, N_fft)
# Multiply in frequency domain
result_fft = signal_fft * kernel_fft
# IFFT and trim
result = np.fft.ifft(result_fft).real
return result[:N]
def conv2d_direct(image, kernel):
"""Direct 2D convolution."""
H, W = image.shape
K_H, K_W = kernel.shape
# Output dimensions
out_H = H - K_H + 1
out_W = W - K_W + 1
output = np.zeros((out_H, out_W))
for i in range(out_H):
for j in range(out_W):
output[i, j] = np.sum(
image[i:i+K_H, j:j+K_W] * kernel
)
return output
def conv2d_separable(image, kernel_h, kernel_v):
"""Separable 2D convolution."""
# Convolve rows
temp = np.zeros_like(image)
for i in range(image.shape[0]):
temp[i, :] = np.convolve(image[i, :], kernel_h, mode='same')
# Convolve columns
output = np.zeros_like(image)
for j in range(image.shape[1]):
output[:, j] = np.convolve(temp[:, j], kernel_v, mode='same')
return output
# Test 1D convolution
signal = np.random.randn(1000)
kernel = np.random.randn(50)
t0 = time.perf_counter()
result_direct = conv1d_direct(signal, kernel)
t_direct = time.perf_counter() - t0
t0 = time.perf_counter()
result_fft = conv1d_fft(signal, kernel)
t_fft = time.perf_counter() - t0
error = np.linalg.norm(result_direct - result_fft) / np.linalg.norm(result_direct)
print("1D Convolution (N=1000, K=50):")
print(f" Direct: {t_direct*1000:.2f}ms")
print(f" FFT: {t_fft*1000:.2f}ms")
print(f" Speedup: {t_direct/t_fft:.1f}x")
print(f" Error: {error:.2e}")
# Test 2D convolution
image = np.random.randn(100, 100)
kernel_2d = np.random.randn(5, 5)
# Check if separable (approximate)
U, s, Vt = np.linalg.svd(kernel_2d)
is_separable = s[1] / s[0] < 0.01
if is_separable:
# Approximate as separable
kernel_h = np.sqrt(s[0]) * U[:, 0]
kernel_v = np.sqrt(s[0]) * Vt[0, :]
t0 = time.perf_counter()
result_sep = conv2d_separable(image, kernel_h, kernel_v)
t_sep = time.perf_counter() - t0
print(f"\n2D Separable convolution: {t_sep*1000:.2f}ms")
t0 = time.perf_counter()
result_2d = conv2d_direct(image, kernel_2d)
t_2d = time.perf_counter() - t0
print(f"2D Direct convolution: {t_2d*1000:.2f}ms")
1D Convolution (N=1000, K=50):
Direct: 10.43ms
FFT: 0.14ms
Speedup: 75.1x
Error: 2.90e-16
2D Direct convolution: 18.75ms
# Advanced convolution techniques
def im2col_convolution(image, kernel, stride=1):
"""im2col transformation for convolution."""
H, W = image.shape
K_H, K_W = kernel.shape
# Output dimensions
out_H = (H - K_H) // stride + 1
out_W = (W - K_W) // stride + 1
# Create column matrix
cols = []
for i in range(0, H - K_H + 1, stride):
for j in range(0, W - K_W + 1, stride):
patch = image[i:i+K_H, j:j+K_W].flatten()
cols.append(patch)
col_matrix = np.array(cols).T
# Convolution as matrix multiplication
kernel_vec = kernel.flatten()
output = kernel_vec @ col_matrix
return output.reshape(out_H, out_W)
# Winograd convolution (simplified 1D F(2,3))
def winograd_F23(signal, kernel):
"""Winograd F(2,3) - 4 mults instead of 6."""
# Transform matrices
G = np.array([[1, 0, 0],
[0.5, 0.5, 0.5],
[0.5, -0.5, 0.5],
[0, 0, 1]])
B = np.array([[1, 0, -1, 0],
[0, 1, 1, 0],
[0, -1, 1, 0],
[0, 1, 0, -1]])
A = np.array([[1, 1, 1, 0],
[0, 1, -1, -1]])
# For demonstration, process one tile
if len(signal) >= 4 and len(kernel) == 3:
d = signal[:4]
g = kernel
# Winograd transforms
U = G @ g
V = B.T @ d
M = U * V # Element-wise (4 multiplications)
Y = A.T @ M
return Y
return None
# Performance analysis
print("\nConvolution method analysis:")
print(f"{'Method':<15} {'Multiplications':<20} {'Memory':<15}")
print("-" * 50)
N, K = 1000, 50
print(f"{'Direct':<15} {N*K:<20} {'O(N)':<15}")
print(f"{'FFT':<15} {int(3*N*np.log2(N)):<20} {'O(N)':<15}")
print(f"{'im2col':<15} {N*K:<20} {'O(N*K)':<15}")
print(f"{'Winograd F(2,3)':<15} {int(N*4/2):<20} {'O(N)':<15}")
# Numerical stability check
print("\nNumerical stability:")
signal = np.random.randn(100)
kernel = np.array([1e-8, 1, 1e-8]) # Poorly conditioned
result1 = np.convolve(signal, kernel, mode='valid')
# For 'valid' mode: output length = N - K + 1
valid_start = len(kernel) - 1
valid_end = valid_start + len(result1)
result2 = conv1d_fft(signal, kernel)[valid_start:valid_end]
error = np.linalg.norm(result1 - result2) / np.linalg.norm(result1)
print(f"FFT vs direct error with small values: {error:.2e}")
# Large kernel test
kernel_large = np.random.randn(200)
t0 = time.perf_counter()
r1 = np.convolve(signal, kernel_large, mode='valid')
t_numpy = time.perf_counter() - t0
t0 = time.perf_counter()
r2 = conv1d_fft(signal, kernel_large)[:len(r1)]
t_fft = time.perf_counter() - t0
print(f"\nLarge kernel (K=200):")
print(f" NumPy: {t_numpy*1000:.2f}ms")
print(f" FFT: {t_fft*1000:.2f}ms")
print(f" FFT wins for K > ~50")
Convolution method analysis:
Method Multiplications Memory
--------------------------------------------------
Direct 50000 O(N)
FFT 29897 O(N)
im2col 50000 O(N*K)
Winograd F(2,3) 2000 O(N)
Numerical stability:
FFT vs direct error with small values: 1.98e-16
Large kernel (K=200):
NumPy: 0.03ms
FFT: 0.12ms
FFT wins for K > ~50
The Fast Fourier Transform reduces O(N²) to O(N log N) through clever recursion.
import numpy as np
import time
def dft_naive(x):
"""Direct DFT implementation O(N²)."""
N = len(x)
X = np.zeros(N, dtype=complex)
for k in range(N):
for n in range(N):
X[k] += x[n] * np.exp(-2j * np.pi * k * n / N)
return X
def fft_recursive(x):
"""Cooley-Tukey FFT O(N log N)."""
N = len(x)
# Base case
if N <= 1:
return x
# Ensure N is power of 2
if N % 2 != 0:
raise ValueError("N must be power of 2")
# Divide
even = fft_recursive(x[0::2])
odd = fft_recursive(x[1::2])
# Conquer with twiddle factors
T = np.exp(-2j * np.pi * np.arange(N//2) / N)
return np.concatenate([
even + T * odd,
even - T * odd
])
def fft_iterative(x):
"""Iterative FFT with bit reversal."""
N = len(x)
if N & (N-1) != 0:
raise ValueError("N must be power of 2")
# Bit reversal
x = np.array(x, dtype=complex)
j = 0
for i in range(1, N):
bit = N >> 1
while j & bit:
j ^= bit
bit >>= 1
j ^= bit
if i < j:
x[i], x[j] = x[j], x[i]
# Cooley-Tukey iterations
length = 2
while length <= N:
angle = -2 * np.pi / length
w = np.exp(1j * angle * np.arange(length // 2))
for i in range(0, N, length):
for j in range(length // 2):
u = x[i + j]
v = x[i + j + length // 2] * w[j]
x[i + j] = u + v
x[i + j + length // 2] = u - v
length *= 2
return x
# Performance comparison
sizes = [16, 64, 256, 1024]
print("FFT vs DFT Performance:")
print(f"{'N':<8} {'DFT (ms)':<12} {'FFT (ms)':<12} {'Speedup':<10}")
print("-" * 45)
for N in sizes:
x = np.random.randn(N) + 1j * np.random.randn(N)
# DFT timing
if N <= 256: # DFT too slow for large N
t0 = time.perf_counter()
X_dft = dft_naive(x)
t_dft = (time.perf_counter() - t0) * 1000
else:
t_dft = np.nan
X_dft = None
# FFT timing
t0 = time.perf_counter()
X_fft = fft_recursive(x)
t_fft = (time.perf_counter() - t0) * 1000
# Verify correctness
if X_dft is not None:
error = np.linalg.norm(X_dft - X_fft) / np.linalg.norm(X_dft)
assert error < 1e-10, f"Error too large: {error}"
speedup = t_dft / t_fft if not np.isnan(t_dft) else N / np.log2(N)
print(f"{N:<8} {t_dft:<12.2f} {t_fft:<12.2f} {speedup:<10.1f}x")
# Twiddle factor precomputation
def fft_optimized(x):
"""FFT with twiddle factor caching."""
N = len(x)
# Precompute all twiddle factors
twiddles = {}
length = 2
while length <= N:
twiddles[length] = np.exp(-2j * np.pi * np.arange(length//2) / length)
length *= 2
# Use cached twiddles in computation
# ... (implementation similar to iterative)
return fft_iterative(x) # Simplified
print(f"\nTwiddle factor memory: {8 * N} bytes for N={N}")
FFT vs DFT Performance:
N DFT (ms) FFT (ms) Speedup
---------------------------------------------
16 0.16 0.13 1.2 x
64 2.36 0.39 6.1 x
256 36.93 1.72 21.5 x
1024 nan 3.80 102.4 x
Twiddle factor memory: 8192 bytes for N=1024
# Numerical precision analysis
def fft_error_growth(N):
"""Analyze error accumulation in FFT."""
x = np.random.randn(N) + 1j * np.random.randn(N)
# High precision reference
X_ref = np.fft.fft(x)
# Single precision computation
x_single = x.astype(np.complex64)
X_single = np.fft.fft(x_single).astype(np.complex128)
# Error analysis
abs_error = np.linalg.norm(X_ref - X_single)
rel_error = abs_error / np.linalg.norm(X_ref)
return rel_error
print("\nFFT Numerical Precision:")
print(f"{'N':<10} {'Relative Error':<15}")
print("-" * 25)
for N in [64, 256, 1024, 4096]:
error = fft_error_growth(N)
print(f"{N:<10} {error:<15.2e}")
# Non-power-of-2 FFT
def fft_any_size(x):
"""FFT for any size using chirp-z transform."""
N = len(x)
# Find next power of 2
M = 2**int(np.ceil(np.log2(2*N-1)))
# Chirp sequence
n = np.arange(N)
chirp = np.exp(-1j * np.pi * n**2 / N)
# Convolution via FFT
x_chirped = x * chirp
chirp_inv = np.conj(chirp)
# Zero-pad for convolution
x_padded = np.zeros(M, dtype=complex)
x_padded[:N] = x_chirped
chirp_padded = np.zeros(M, dtype=complex)
chirp_padded[:N] = chirp_inv
chirp_padded[M-N+1:] = chirp_inv[1:N][::-1]
# Convolution via FFT
X = np.fft.fft(x_padded) * np.fft.fft(chirp_padded)
result = np.fft.ifft(X)[:N] * chirp
return result
# Test non-power-of-2
N = 100 # Not a power of 2
x = np.random.randn(N)
t0 = time.perf_counter()
X1 = np.fft.fft(x) # NumPy handles any size
t_numpy = time.perf_counter() - t0
t0 = time.perf_counter()
X2 = fft_any_size(x)
t_chirp = time.perf_counter() - t0
error = np.linalg.norm(X1 - X2) / np.linalg.norm(X1)
print(f"\nNon-power-of-2 FFT (N={N}):")
print(f" NumPy: {t_numpy*1000:.3f}ms")
print(f" Chirp-Z: {t_chirp*1000:.3f}ms")
print(f" Error: {error:.2e}")
# Real FFT optimization
print("\nReal vs Complex FFT:")
x_real = np.random.randn(1024)
x_complex = x_real + 0j
t0 = time.perf_counter()
X_rfft = np.fft.rfft(x_real)
t_rfft = time.perf_counter() - t0
t0 = time.perf_counter()
X_fft = np.fft.fft(x_complex)
t_fft = time.perf_counter() - t0
print(f" Real FFT: {t_rfft*1000:.3f}ms, output size: {len(X_rfft)}")
print(f" Complex FFT: {t_fft*1000:.3f}ms, output size: {len(X_fft)}")
print(f" Speedup: {t_fft/t_rfft:.2f}x")
FFT Numerical Precision:
N Relative Error
-------------------------
64 2.69e-08
256 2.71e-08
1024 2.54e-08
4096 2.58e-08
Non-power-of-2 FFT (N=100):
NumPy: 0.021ms
Chirp-Z: 0.045ms
Error: 1.08e-14
Real vs Complex FFT:
Real FFT: 0.021ms, output size: 513
Complex FFT: 0.021ms, output size: 1024
Speedup: 1.00x
Finite precision arithmetic can destroy mathematically correct algorithms.
import numpy as np
# Example 1: Catastrophic cancellation
def quadratic_formula_naive(a, b, c):
"""Naive quadratic formula."""
discriminant = b**2 - 4*a*c
sqrt_disc = np.sqrt(discriminant)
x1 = (-b + sqrt_disc) / (2*a)
x2 = (-b - sqrt_disc) / (2*a)
return x1, x2
def quadratic_formula_stable(a, b, c):
"""Numerically stable quadratic formula."""
discriminant = b**2 - 4*a*c
sqrt_disc = np.sqrt(discriminant)
if b >= 0:
x1 = (-b - sqrt_disc) / (2*a)
x2 = (2*c) / (-b - sqrt_disc)
else:
x1 = (-b + sqrt_disc) / (2*a)
x2 = (2*c) / (-b + sqrt_disc)
return x1, x2
# Test with poorly conditioned case
a, b, c = 1, 1e8, 1
x1_naive, x2_naive = quadratic_formula_naive(a, b, c)
x1_stable, x2_stable = quadratic_formula_stable(a, b, c)
print("Quadratic formula comparison:")
print(f" Naive: x1={x1_naive:.2e}, x2={x2_naive:.2e}")
print(f" Stable: x1={x1_stable:.2e}, x2={x2_stable:.2e}")
# Verify solutions
verify_naive = a*x2_naive**2 + b*x2_naive + c
verify_stable = a*x2_stable**2 + b*x2_stable + c
print(f" Residual (naive): {verify_naive:.2e}")
print(f" Residual (stable): {verify_stable:.2e}")
# Example 2: Ill-conditioned matrix
def condition_number_demo():
"""Demonstrate condition number effects."""
# Hilbert matrix (notoriously ill-conditioned)
n = 10
H = np.zeros((n, n))
for i in range(n):
for j in range(n):
H[i, j] = 1 / (i + j + 1)
cond = np.linalg.cond(H)
print(f"\nHilbert matrix (n={n}):")
print(f" Condition number: {cond:.2e}")
# Solve Hx = b
b = np.ones(n)
x = np.linalg.solve(H, b)
# Perturb b slightly
b_perturbed = b + 1e-10 * np.random.randn(n)
x_perturbed = np.linalg.solve(H, b_perturbed)
# Relative changes
relative_change_b = np.linalg.norm(b_perturbed - b) / np.linalg.norm(b)
relative_change_x = np.linalg.norm(x_perturbed - x) / np.linalg.norm(x)
print(f" Input perturbation: {relative_change_b:.2e}")
print(f" Output perturbation: {relative_change_x:.2e}")
print(f" Amplification: {relative_change_x/relative_change_b:.0f}x")
condition_number_demo()
# Example 3: Summation algorithms
def compare_summation(n=1000000):
"""Compare summation algorithms."""
# Create data that causes problems
data = np.array([1.0] + [1e-16] * n)
# Method 1: Naive sum
naive_sum = np.sum(data)
# Method 2: Kahan summation
def kahan_sum(arr):
s = 0.0
c = 0.0
for x in arr:
y = x - c
t = s + y
c = (t - s) - y
s = t
return s
kahan_result = kahan_sum(data)
# Method 3: Pairwise summation
def pairwise_sum(arr):
if len(arr) <= 128:
return np.sum(arr)
mid = len(arr) // 2
return pairwise_sum(arr[:mid]) + pairwise_sum(arr[mid:])
pairwise_result = pairwise_sum(data)
true_sum = 1.0 + n * 1e-16
print(f"\nSummation of 1.0 + {n}×1e-16:")
print(f" True sum: {true_sum}")
print(f" Naive: {naive_sum} (error: {abs(naive_sum - true_sum):.2e})")
print(f" Kahan: {kahan_result} (error: {abs(kahan_result - true_sum):.2e})")
print(f" Pairwise: {pairwise_result} (error: {abs(pairwise_result - true_sum):.2e})")
compare_summation()
Quadratic formula comparison:
Naive: x1=-7.45e-09, x2=-1.00e+08
Stable: x1=-1.00e+08, x2=-1.00e-08
Residual (naive): 1.00e+00
Residual (stable): 1.11e-16
Hilbert matrix (n=10):
Condition number: 1.60e+13
Input perturbation: 1.06e-10
Output perturbation: 1.59e-05
Amplification: 150240x
Summation of 1.0 + 1000000×1e-16:
True sum: 1.0000000001
Naive: 1.0000000000999891 (error: 1.09e-14)
Kahan: 1.0000000001 (error: 0.00e+00)
Pairwise: 1.0000000000999987 (error: 1.33e-15)
# Numerical stability in iterative algorithms
def gram_schmidt_naive(A):
"""Naive Gram-Schmidt (unstable)."""
m, n = A.shape
Q = np.zeros((m, n))
R = np.zeros((n, n))
for j in range(n):
v = A[:, j]
for i in range(j):
R[i, j] = np.dot(Q[:, i], A[:, j])
v = v - R[i, j] * Q[:, i]
R[j, j] = np.linalg.norm(v)
Q[:, j] = v / R[j, j]
return Q, R
def gram_schmidt_modified(A):
"""Modified Gram-Schmidt (stable)."""
m, n = A.shape
Q = A.copy()
R = np.zeros((n, n))
for i in range(n):
R[i, i] = np.linalg.norm(Q[:, i])
Q[:, i] = Q[:, i] / R[i, i]
for j in range(i+1, n):
R[i, j] = np.dot(Q[:, i], Q[:, j])
Q[:, j] = Q[:, j] - R[i, j] * Q[:, i]
return Q, R
# Test stability
np.random.seed(42)
n = 10
A = np.random.randn(n, n)
# Make A poorly conditioned
A[:, -1] = A[:, 0] + 1e-10 * np.random.randn(n)
Q1, R1 = gram_schmidt_naive(A)
Q2, R2 = gram_schmidt_modified(A)
# Check orthogonality
ortho_error_naive = np.linalg.norm(Q1.T @ Q1 - np.eye(n))
ortho_error_modified = np.linalg.norm(Q2.T @ Q2 - np.eye(n))
print("\nGram-Schmidt orthogonalization:")
print(f" Orthogonality error (naive): {ortho_error_naive:.2e}")
print(f" Orthogonality error (modified): {ortho_error_modified:.2e}")
print(f" Improvement: {ortho_error_naive/ortho_error_modified:.1f}x")
# Floating point arithmetic peculiarities
print("\nFloating point surprises:")
# Not associative
a = 1e20
b = 1.0
c = -1e20
print(f" (a + b) + c = {(a + b) + c}")
print(f" a + (b + c) = {a + (b + c)}")
# Not distributive
x = 1e-15
y = 1.0
z = 1e15
print(f" x*(y + z) = {x*(y + z)}")
print(f" x*y + x*z = {x*y + x*z}")
# Comparison issues
val1 = 0.1 + 0.2
val2 = 0.3
print(f" 0.1 + 0.2 == 0.3? {val1 == val2}")
print(f" Actual: 0.1 + 0.2 = {val1:.17f}")
# Machine epsilon
eps = np.finfo(float).eps
print(f"\nMachine epsilon: {eps}")
print(f" 1.0 + eps/2 == 1.0? {1.0 + eps/2 == 1.0}")
print(f" 1.0 + eps == 1.0? {1.0 + eps == 1.0}")
Gram-Schmidt orthogonalization:
Orthogonality error (naive): 1.10e-05
Orthogonality error (modified): 1.65e-06
Improvement: 6.7x
Floating point surprises:
(a + b) + c = 0.0
a + (b + c) = 0.0
x*(y + z) = 1.000000000000001
x*y + x*z = 1.000000000000001
0.1 + 0.2 == 0.3? False
Actual: 0.1 + 0.2 = 0.30000000000000004
Machine epsilon: 2.220446049250313e-16
1.0 + eps/2 == 1.0? True
1.0 + eps == 1.0? False
Every computation accumulates errors; understanding them prevents catastrophic failures.
import numpy as np
# Precision limits demonstration
def precision_limits():
"""Explore floating point precision limits."""
print("Floating point limits:")
# Machine epsilon
eps = np.finfo(float).eps
print(f" Machine epsilon: {eps}")
# Smallest positive normal
tiny = np.finfo(float).tiny
print(f" Smallest normal: {tiny}")
# Largest finite
huge = np.finfo(float).max
print(f" Largest finite: {huge}")
# Demonstration of precision loss
print("\nPrecision loss examples:")
# Adding small to large
large = 1e16
small = 1.0
result = large + small
print(f" {large} + {small} = {result}")
print(f" Lost: {small} (completely)")
# Significant digits
a = 1234567890123456.0
b = 1234567890123457.0
print(f" Can distinguish: {a != b}")
c = 12345678901234567.0
d = 12345678901234568.0
print(f" Cannot distinguish: {c != d}")
precision_limits()
# Rounding error accumulation
def error_accumulation_demo():
"""Demonstrate error accumulation patterns."""
print("\nError accumulation in summation:")
# Sum 0.1 many times
n = 10000
# Method 1: Sequential addition
sum_sequential = 0.0
for _ in range(n):
sum_sequential += 0.1
# Method 2: Multiply
sum_multiply = 0.1 * n
true_value = n / 10 # Exact
error_seq = abs(sum_sequential - true_value)
error_mul = abs(sum_multiply - true_value)
print(f" True value: {true_value}")
print(f" Sequential sum: {sum_sequential} (error: {error_seq:.2e})")
print(f" Multiplication: {sum_multiply} (error: {error_mul:.2e})")
# Harmonic series (1/1 + 1/2 + 1/3 + ...)
print("\nHarmonic series summation order:")
n = 100000
# Forward summation
sum_forward = 0.0
for i in range(1, n+1):
sum_forward += 1/i
# Backward summation (more accurate)
sum_backward = 0.0
for i in range(n, 0, -1):
sum_backward += 1/i
# Kahan summation
sum_kahan = 0.0
c = 0.0
for i in range(1, n+1):
y = 1/i - c
t = sum_kahan + y
c = (t - sum_kahan) - y
sum_kahan = t
print(f" Forward: {sum_forward:.10f}")
print(f" Backward: {sum_backward:.10f}")
print(f" Kahan: {sum_kahan:.10f}")
print(f" Difference: {abs(sum_forward - sum_backward):.2e}")
error_accumulation_demo()
# ULP (Units in Last Place) analysis
def ulp_analysis():
"""Analyze ULP for different values."""
print("\nULP (Units in Last Place) analysis:")
values = [1.0, 10.0, 0.1, 1e10, 1e-10]
for val in values:
ulp = np.spacing(val)
relative = ulp / abs(val)
# Next representable number
next_val = np.nextafter(val, val + 1)
print(f" Value: {val:g}")
print(f" ULP: {ulp:g}")
print(f" Relative: {relative:g}")
print(f" Next: {next_val:g}")
print(f" Difference: {next_val - val:g}")
ulp_analysis()
Floating point limits:
Machine epsilon: 2.220446049250313e-16
Smallest normal: 2.2250738585072014e-308
Largest finite: 1.7976931348623157e+308
Precision loss examples:
1e+16 + 1.0 = 1e+16
Lost: 1.0 (completely)
Can distinguish: True
Cannot distinguish: False
Error accumulation in summation:
True value: 1000.0
Sequential sum: 1000.0000000001588 (error: 1.59e-10)
Multiplication: 1000.0 (error: 0.00e+00)
Harmonic series summation order:
Forward: 12.0901461299
Backward: 12.0901461299
Kahan: 12.0901461299
Difference: 7.28e-14
ULP (Units in Last Place) analysis:
Value: 1
ULP: 2.22045e-16
Relative: 2.22045e-16
Next: 1
Difference: 2.22045e-16
Value: 10
ULP: 1.77636e-15
Relative: 1.77636e-16
Next: 10
Difference: 1.77636e-15
Value: 0.1
ULP: 1.38778e-17
Relative: 1.38778e-16
Next: 0.1
Difference: 1.38778e-17
Value: 1e+10
ULP: 1.90735e-06
Relative: 1.90735e-16
Next: 1e+10
Difference: 1.90735e-06
Value: 1e-10
ULP: 1.29247e-26
Relative: 1.29247e-16
Next: 1e-10
Difference: 1.29247e-26
# Advanced error analysis
def condition_number_analysis():
"""Analyze condition numbers of operations."""
print("Condition number analysis:")
x = 2.0 # Avoid singularities
dx = 1e-10
functions = [
('x²', lambda x: x**2, lambda x: 2*x),
('√x', lambda x: np.sqrt(x), lambda x: 0.5/np.sqrt(x)),
('exp(x)', lambda x: np.exp(x), lambda x: np.exp(x)),
('log(x)', lambda x: np.log(x), lambda x: 1/x),
('1/(1-x)', lambda x: 1/(1-x), lambda x: 1/(1-x)**2),
]
print(f" At x = {x}:")
for name, f, df in functions:
cond = abs(x * df(x) / f(x)) if f(x) != 0 else np.inf
f_exact = f(x)
f_perturbed = f(x + dx)
rel_input = dx / x
rel_output = abs(f_perturbed - f_exact) / abs(f_exact)
amplification = rel_output / rel_input
print(f" {name:8s}: κ = {cond:6.2f}, actual = {amplification:6.2f}")
condition_number_analysis()
# Compensated algorithms
def compensated_algorithms():
"""Algorithms that track and compensate for errors."""
print("\nCompensated summation (Kahan):")
# Generate problematic data
n = 1000000
data = np.ones(n) * (1/n)
# Add some variation
data[::1000] = 1e-8
# Standard sum
standard_sum = np.sum(data)
# Kahan sum with error tracking
def kahan_sum_with_error(arr):
s = 0.0
c = 0.0 # Compensation
for x in arr:
y = x - c # Compensate
t = s + y # New sum
c = (t - s) - y # New compensation
s = t
return s, c
kahan_result, compensation = kahan_sum_with_error(data)
# Expected sum
true_sum = 1.0 + len(data[::1000]) * 1e-8
print(f" Standard: {standard_sum} (error: {abs(standard_sum - true_sum):.2e})")
print(f" Kahan: {kahan_result} (error: {abs(kahan_result - true_sum):.2e})")
print(f" Compensation term: {compensation:.2e}")
# Error-free transformations
print("\nError-free transformation (2Sum):")
def two_sum(a, b):
"""Compute sum and error exactly."""
s = a + b
a_prime = s - b
b_prime = s - a_prime
delta_a = a - a_prime
delta_b = b - b_prime
error = delta_a + delta_b
return s, error
a, b = 1.0, 1e-16
s, e = two_sum(a, b)
print(f" {a} + {b}")
print(f" Sum: {s}")
print(f" Error: {e}")
print(f" Exact: {s} + {e}")
compensated_algorithms()
Condition number analysis:
At x = 2.0:
x² : κ = 2.00, actual = 2.00
√x : κ = 0.50, actual = 0.50
exp(x) : κ = 2.00, actual = 2.00
log(x) : κ = 1.44, actual = 1.44
1/(1-x) : κ = 2.00, actual = 2.00
Compensated summation (Kahan):
Standard: 0.999010000000001 (error: 1.00e-03)
Kahan: 0.99901 (error: 1.00e-03)
Compensation term: -1.10e-18
Error-free transformation (2Sum):
1.0 + 1e-16
Sum: 1.0
Error: 1e-16
Exact: 1.0 + 1e-16
Stability Strategies:
Algorithmic choices:
Precision management:
Monitoring: