"""
Constrained dynamics and GENERIC-like structure for the quantum inaccessible game.
This module provides the `InaccessibleGameDynamics` class, which implements
constrained steepest-entropy-ascent dynamics in natural-parameter space,
with support for different time parametrisations.
"""
from typing import Tuple, Dict, Any
import numpy as np
from scipy.integrate import solve_ivp
from qig.core import von_neumann_entropy, marginal_entropies
from qig.exponential_family import QuantumExponentialFamily
[docs]
class InaccessibleGameDynamics:
"""
Constrained maximum entropy production dynamics.
Implements: θ̇ = -Π_∥(θ) G(θ) θ
where Π_∥ projects onto constraint manifold ∑_i h_i = C.
Primary API
-----------
solve(theta_0, n_steps, dt, ...) : Recommended method for integrating dynamics.
Uses gradient descent with Newton projection onto constraint manifold.
Supports entropy time via `entropy_time=True`.
Advanced/Internal
-----------------
flow(t, theta) : Compute θ̇ at a single point. For custom integrators.
Example
-------
>>> from qig.exponential_family import QuantumExponentialFamily
>>> from qig.dynamics import InaccessibleGameDynamics
>>>
>>> exp_family = QuantumExponentialFamily(n_pairs=1, d=3, pair_basis=True)
>>> theta_0 = exp_family.get_bell_state_parameters(epsilon=0.01)
>>>
>>> dynamics = InaccessibleGameDynamics(exp_family)
>>> result = dynamics.solve(theta_0, n_steps=100, dt=0.01)
>>>
>>> print(f"Converged: {result['converged']}")
>>> print(f"Constraint preserved: dC = {result['constraint_values'][-1] - result['C_init']:.2e}")
"""
[docs]
def __init__(self, exp_family: QuantumExponentialFamily, method: str = 'duhamel'):
"""
Initialise dynamics for given exponential family.
Parameters
----------
exp_family : QuantumExponentialFamily
The exponential family to integrate dynamics for
method : str, optional
Method for computing ∂ρ/∂θ: 'duhamel' (accurate, slow) or 'sld' (fast, ~5% error)
Default: 'duhamel'
"""
self.exp_family = exp_family
self.constraint_value: float | None = None
self.method = method
# Time parametrisation: 'affine', 'entropy', or 'real'
self.time_mode = "affine"
[docs]
def set_time_mode(self, mode: str) -> None:
"""
Set time parametrisation mode.
Parameters
----------
mode : str
'affine' : standard ODE time τ
'entropy' : entropy time t where dH/dt = 1
'real' : physical time (reserved for unitary part)
"""
assert mode in ["affine", "entropy", "real"]
self.time_mode = mode
print(f"Time mode set to: {mode}")
[docs]
def flow(self, t: float, theta: np.ndarray) -> np.ndarray:
"""
Compute θ̇ = -Π_∥ G θ at given θ (advanced/internal use).
This is a low-level method that computes the flow vector at a single point.
For most use cases, prefer `solve()` which handles integration and
constraint projection automatically.
Parameters
----------
t : float
Time (used for time-dependent problems; ignored for autonomous flow)
theta : ndarray
Natural parameters at which to compute the flow
Returns
-------
theta_dot : ndarray
The constrained flow vector θ̇ = -Π_∥ G θ
Notes
-----
If `time_mode='entropy'`, the flow is scaled so dH/dt = 1.
"""
# Compute Fisher information G(θ)
G = self.exp_family.fisher_information(theta)
# Compute constraint gradient a(θ) = ∇(∑ h_i)
_, a = self.exp_family.marginal_entropy_constraint(theta, method=self.method)
# Projection matrix Π_∥ = I - aa^T / ||a||²
a_norm_sq = float(np.dot(a, a))
if a_norm_sq < 1e-12:
# Near endpoint, no constraint
Pi = np.eye(len(theta))
else:
Pi = np.eye(len(theta)) - np.outer(a, a) / a_norm_sq
# Unconstrained gradient: -G θ
grad_H = -G @ theta
# Project onto constraint manifold
theta_dot = Pi @ grad_H
# Time reparametrisation for entropy time
if self.time_mode == "entropy":
# dH/dτ = θ^T G Π_∥ G θ
entropy_production = float(theta @ G @ Pi @ G @ theta)
if entropy_production > 1e-12:
theta_dot = theta_dot / entropy_production
# Now dH/dt = 1 by construction
return theta_dot
[docs]
def solve(self, theta_0: np.ndarray, n_steps: int = 1000, dt: float = 0.001,
convergence_tol: float = 1e-6, entropy_time: bool = False,
project: bool = True, project_every: int = 10,
verbose: bool = True) -> Dict[str, Any]:
"""
Solve constrained maximum entropy dynamics (recommended method).
Uses gradient descent with Newton projection onto constraint manifold.
This is the primary API for integrating the inaccessible game dynamics.
Parameters
----------
theta_0 : ndarray
Initial natural parameters
n_steps : int, default=1000
Maximum number of steps
dt : float, default=0.001
Step size (or entropy increment if entropy_time=True)
convergence_tol : float, default=1e-6
Stop when ||F|| < convergence_tol
entropy_time : bool, default=False
If True, scale steps so dH/dt = 1 (entropy increases by dt per step)
project : bool, default=True
Project onto constraint manifold (recommended)
project_every : int, default=10
Project every N steps (for efficiency)
verbose : bool, default=True
Print progress messages
Returns
-------
dict with keys:
trajectory : ndarray, shape (n_steps+1, n_params)
Parameter trajectory θ(t)
flow_norms : ndarray
||F(θ)|| at each step (→ 0 at convergence)
constraint_values : ndarray
Σᵢ hᵢ at each step (should stay ≈ constant)
C_init : float
Initial constraint value
converged : bool
Whether convergence was achieved
n_steps : int
Number of steps taken
Example
-------
>>> result = dynamics.solve(theta_0, n_steps=500, dt=0.01, entropy_time=True)
>>> print(f"Converged: {result['converged']}")
"""
return self.solve_constrained_maxent(
theta_0, n_steps=n_steps, dt=dt, convergence_tol=convergence_tol,
project=project, project_every=project_every,
use_entropy_time=entropy_time
)
[docs]
def integrate(
self, theta_0: np.ndarray, t_span: Tuple[float, float], n_points: int = 100
) -> Dict[str, Any]:
"""
[DEPRECATED] Integrate using scipy solve_ivp.
.. deprecated::
This method does not project onto the constraint manifold,
causing constraint drift. Use `solve()` instead.
Warning
-------
This method is numerically unstable for constrained problems.
The constraint C = Σᵢ hᵢ will drift over time, giving incorrect results.
Use `solve()` which includes Newton projection for stability.
"""
import warnings
warnings.warn(
"integrate() is deprecated and numerically unstable for constrained problems. "
"Use solve() instead, which includes Newton projection onto the constraint manifold.",
DeprecationWarning,
stacklevel=2
)
# Store initial constraint value
rho_0 = self.exp_family.rho_from_theta(theta_0)
h_0 = marginal_entropies(rho_0, self.exp_family.dims)
self.constraint_value = float(np.sum(h_0))
print(f"Initial constraint C = {self.constraint_value:.6f}")
# Solve ODE
t_eval = np.linspace(t_span[0], t_span[1], n_points)
sol = solve_ivp(
self.flow,
t_span,
theta_0,
t_eval=t_eval,
method="RK45",
rtol=1e-8,
atol=1e-10,
)
if not sol.success:
print(f"Warning: Integration failed: {sol.message}")
# Extract trajectories
time = sol.t
theta_traj = sol.y.T
# Compute entropies along trajectory
H_traj = np.zeros(len(time))
h_traj = np.zeros((len(time), self.exp_family.n_sites))
constraint_traj = np.zeros(len(time))
for i, theta in enumerate(theta_traj):
rho = self.exp_family.rho_from_theta(theta)
H_traj[i] = von_neumann_entropy(rho)
h_traj[i] = marginal_entropies(rho, self.exp_family.dims)
constraint_traj[i] = np.sum(h_traj[i])
return {
"time": time,
"theta": theta_traj,
"H": H_traj,
"h": h_traj,
"constraint": constraint_traj,
"success": sol.success,
}
[docs]
def solve_constrained_maxent(self, theta_init: np.ndarray, n_steps: int = 1000,
dt: float = 0.001, convergence_tol: float = 1e-6,
project: bool = True, project_every: int = 10,
use_entropy_time: bool = False) -> Dict[str, Any]:
"""
Solve constrained maximum entropy dynamics using gradient descent with projection.
- Uses gradient descent to maximise joint entropy
- Projects onto constraint manifold at each step
- Much more stable than ODE integration for constrained optimisation
Dynamics: dθ/dt = F(θ) = -Π_∥(θ) G(θ) θ
where Π_∥ projects onto Σᵢ hᵢ(θ) = C
Parameters
----------
theta_init : array
Initial parameter vector
n_steps : int
Maximum number of gradient steps
dt : float
Step size for gradient descent
convergence_tol : float
Stop when ||F|| < convergence_tol
project : bool
Whether to project onto constraint manifold
project_every : int
Project onto constraint manifold every N steps (for efficiency)
use_entropy_time : bool
If True, scale step size so that entropy increases by dt per step
Returns
-------
dict with keys:
trajectory : array, shape (n_steps, n_params)
Parameter trajectory θ(t)
flow_norms : array
||F(θ)|| at each step
constraint_values : array
Σᵢ hᵢ at each step (should be ≈ constant)
converged : bool
Whether convergence criterion was met
C_init : float
Initial constraint value
"""
n_params = len(theta_init)
trajectory = [theta_init.copy()]
flow_norms = []
constraint_values = []
theta = theta_init.copy()
# Compute initial constraint value using marginal entropies
rho_init = self.exp_family.rho_from_theta(theta)
h_init = marginal_entropies(rho_init, self.exp_family.dims)
C_init = float(np.sum(h_init))
for step in range(n_steps):
# Compute constrained flow at current point
G = self.exp_family.fisher_information(theta)
_, a = self.exp_family.marginal_entropy_constraint(theta)
# Unconstrained flow: maximize entropy
F_unc = -G @ theta
# Lagrange multiplier for constraint tangency
# a^T F = 0 => a^T(F_unc - ν a) = 0 => ν = (a^T F_unc)/(a^T a)
a_norm_sq = np.dot(a, a)
if a_norm_sq > 1e-12:
nu = np.dot(F_unc, a) / a_norm_sq
F = F_unc - nu * a # Constrained flow
# Debug: check tangency
tangency_check = np.dot(a, F)
if step < 5 or step % 1000 == 0:
print(f"Step {step}: ν = {nu:.6e}, tangency = {tangency_check:.6e}")
else:
F = F_unc
# Gradient descent step
if use_entropy_time:
# Scale step size for entropy time: dH/dt = 1
# Entropy production = θ^T G Π_∥ G θ (same as in flow method)
a_norm_sq = np.dot(a, a)
if a_norm_sq > 1e-12:
Pi = np.eye(len(theta)) - np.outer(a, a) / a_norm_sq
else:
Pi = np.eye(len(theta))
entropy_production = float(theta @ G @ Pi @ G @ theta)
if entropy_production > 1e-12:
effective_dt = dt / entropy_production
else:
effective_dt = dt
else:
effective_dt = dt
theta_new = theta + effective_dt * F
# Project back onto constraint manifold (every project_every steps for efficiency)
if project and (step + 1) % project_every == 0:
# Simple projection: adjust along constraint gradient direction
rho_new = self.exp_family.rho_from_theta(theta_new)
h_new = marginal_entropies(rho_new, self.exp_family.dims)
C_new = np.sum(h_new)
# Iterative Newton projection to constraint manifold
theta_proj = theta_new.copy()
for proj_iter in range(10): # Allow up to 10 iterations
rho_proj = self.exp_family.rho_from_theta(theta_proj)
h_proj = marginal_entropies(rho_proj, self.exp_family.dims)
C_proj = np.sum(h_proj)
error = C_proj - C_init
if abs(error) < 1e-12: # Tight convergence
break
# Newton step: θ ← θ - error / ||a||² * a
_, a_proj = self.exp_family.marginal_entropy_constraint(theta_proj)
a_norm_sq = np.dot(a_proj, a_proj)
if a_norm_sq > 1e-15:
step_size = error / a_norm_sq
theta_proj = theta_proj - step_size * a_proj
else:
break
theta_new = theta_proj
theta = theta_new
# Track metrics
flow_norm = np.linalg.norm(F)
flow_norms.append(flow_norm)
trajectory.append(theta.copy())
# Debug: print progress every 1000 steps
if (step + 1) % 1000 == 0:
H_current = self.exp_family.von_neumann_entropy(theta)
print(f"Step {step+1}: ||F|| = {flow_norm:.6e}, H = {H_current:.6f}, ||θ|| = {np.linalg.norm(theta):.6f}")
# Check constraint preservation
rho = self.exp_family.rho_from_theta(theta)
h = marginal_entropies(rho, self.exp_family.dims)
C_current = np.sum(h)
constraint_values.append(C_current)
# Check convergence - stop early if we achieve target accuracy
if flow_norm < convergence_tol:
traj = np.array(trajectory)
return {
'trajectory': traj,
'theta': traj, # Alias for backward compatibility
'flow_norms': np.array(flow_norms),
'constraint_values': np.array(constraint_values),
'C_init': C_init,
'converged': True,
'n_steps': step + 1
}
# Also check if we've achieved very tight convergence at any point
# This handles cases where the algorithm oscillates after convergence
if len(flow_norms) > 10:
recent_min = np.min(flow_norms[-20:]) # Check last 20 steps
if recent_min < convergence_tol:
traj = np.array(trajectory)
return {
'trajectory': traj,
'theta': traj, # Alias for backward compatibility
'flow_norms': np.array(flow_norms),
'constraint_values': np.array(constraint_values),
'C_init': C_init,
'converged': True,
'n_steps': step + 1
}
# Check for sustained convergence (last 5 steps all below tolerance)
if len(flow_norms) >= 5:
last_5_flow_norms = flow_norms[-5:]
if np.all(np.array(last_5_flow_norms) < convergence_tol):
traj = np.array(trajectory)
return {
'trajectory': traj,
'theta': traj, # Alias for backward compatibility
'flow_norms': np.array(flow_norms),
'constraint_values': np.array(constraint_values),
'C_init': C_init,
'converged': True,
'n_steps': n_steps
}
traj = np.array(trajectory)
return {
'trajectory': traj,
'theta': traj, # Alias for backward compatibility
'flow_norms': np.array(flow_norms),
'constraint_values': np.array(constraint_values),
'C_init': C_init,
'converged': False,
'n_steps': n_steps
}
[docs]
class GenericDynamics(InaccessibleGameDynamics):
"""
GENERIC-aware dynamics for the quantum inaccessible game.
Extends InaccessibleGameDynamics to track the GENERIC decomposition:
- Effective Hamiltonian H_eff(θ) from antisymmetric flow
- Diffusion operator D[ρ](θ) from symmetric flow
- Entropy production rate
- GENERIC structure preservation
"""
[docs]
def __init__(self, exp_family: QuantumExponentialFamily,
structure_constants: np.ndarray = None,
method: str = 'duhamel'):
"""
Initialize GENERIC-aware dynamics.
Parameters
----------
exp_family : QuantumExponentialFamily
The exponential family
structure_constants : np.ndarray, optional
Lie algebra structure constants f_abc
If None, will be computed from operators
method : str, optional
Method for derivatives: 'duhamel' or 'sld'
"""
super().__init__(exp_family, method)
# Store structure constants
if structure_constants is None:
from qig.structure_constants import compute_structure_constants
self.f_abc = compute_structure_constants(exp_family.operators)
else:
self.f_abc = structure_constants
# Storage for GENERIC decomposition along trajectory
self.H_eff_traj = []
self.D_rho_traj = []
self.entropy_production_traj = []
self.S_norm_traj = []
self.A_norm_traj = []
[docs]
def compute_generic_decomposition(self, theta: np.ndarray) -> Dict[str, Any]:
"""
Compute full GENERIC decomposition at given θ.
Returns
-------
dict with keys:
'M' : Jacobian matrix
'S' : Symmetric part
'A' : Antisymmetric part
'H_eff' : Effective Hamiltonian
'eta' : Hamiltonian coefficients
'D_rho' : Diffusion operator (if compute_diffusion=True)
'entropy_production' : dS/dt
"""
from qig.generic import (
effective_hamiltonian_coefficients,
effective_hamiltonian_operator
)
# Get Jacobian and decomposition
M = self.exp_family.jacobian(theta, method=self.method)
S = self.exp_family.symmetric_part(theta, method=self.method)
A = self.exp_family.antisymmetric_part(theta, method=self.method)
# Extract effective Hamiltonian
eta, info = effective_hamiltonian_coefficients(A, theta, self.f_abc)
H_eff = effective_hamiltonian_operator(eta, self.exp_family.operators)
# Compute entropy production rate
# dS/dt = -Tr(ρ̇ log ρ) where ρ̇ comes from dissipative part
# For now, use simplified form: θ^T G Π_∥ G θ
G = self.exp_family.fisher_information(theta)
_, a = self.exp_family.marginal_entropy_constraint(theta, method=self.method)
a_norm_sq = float(np.dot(a, a))
if a_norm_sq > 1e-12:
Pi = np.eye(len(theta)) - np.outer(a, a) / a_norm_sq
else:
Pi = np.eye(len(theta))
entropy_production = float(theta @ G @ Pi @ G @ theta)
return {
'M': M,
'S': S,
'A': A,
'H_eff': H_eff,
'eta': eta,
'entropy_production': entropy_production,
'S_norm': np.linalg.norm(S, 'fro'),
'A_norm': np.linalg.norm(A, 'fro'),
}
[docs]
def integrate_reversible(
self, theta_0: np.ndarray, t_span: Tuple[float, float], n_points: int = 100
) -> Dict[str, Any]:
"""
Integrate reversible (Hamiltonian) part only: -i[H_eff, ρ].
This integrates the antisymmetric flow in parameter space,
which corresponds to unitary evolution in density matrix space.
"""
# Use only the antisymmetric part of the Jacobian
def reversible_flow(t: float, theta: np.ndarray) -> np.ndarray:
A = self.exp_family.antisymmetric_part(theta, method=self.method)
# For reversible flow: θ̇ = A θ (linearized)
# But we need the full nonlinear flow, so use constraint structure
G = self.exp_family.fisher_information(theta)
_, a = self.exp_family.marginal_entropy_constraint(theta, method=self.method)
# Get Lagrange multiplier gradient
grad_nu = self.exp_family.lagrange_multiplier_gradient(
theta, method=self.method
)
# Antisymmetric contribution: a(∇ν)^T - (∇ν)a^T
# Applied to current point
reversible_term = np.outer(a, grad_nu) - np.outer(grad_nu, a)
return reversible_term @ theta
# Integrate
t_eval = np.linspace(t_span[0], t_span[1], n_points)
sol = solve_ivp(
reversible_flow,
t_span,
theta_0,
t_eval=t_eval,
method="RK45",
rtol=1e-8,
atol=1e-10,
)
return {
"time": sol.t,
"theta": sol.y.T,
"success": sol.success,
}
[docs]
def integrate_irreversible(
self, theta_0: np.ndarray, t_span: Tuple[float, float], n_points: int = 100
) -> Dict[str, Any]:
"""
Integrate irreversible (dissipative) part only: D[ρ].
This integrates the symmetric flow in parameter space,
which corresponds to entropy-increasing dissipation.
"""
# Use only the symmetric part
def irreversible_flow(t: float, theta: np.ndarray) -> np.ndarray:
G = self.exp_family.fisher_information(theta)
third_cumulant = self.exp_family.third_cumulant_contraction(theta)
hessian_C = self.exp_family.constraint_hessian(theta)
_, a = self.exp_family.marginal_entropy_constraint(theta, method=self.method)
# Symmetric part: -G - (∇G)[θ] + ν∇²C
# (drops the a(∇ν)^T term which is antisymmetric)
Gtheta = G @ theta
# Lagrange multiplier
a_norm_sq = float(np.dot(a, a))
if a_norm_sq > 1e-12:
nu = -np.dot(Gtheta, a) / a_norm_sq
else:
nu = 0.0
# Symmetric flow
return -G @ theta - third_cumulant @ theta + nu * (hessian_C @ theta)
# Integrate
t_eval = np.linspace(t_span[0], t_span[1], n_points)
sol = solve_ivp(
irreversible_flow,
t_span,
theta_0,
t_eval=t_eval,
method="RK45",
rtol=1e-8,
atol=1e-10,
)
return {
"time": sol.t,
"theta": sol.y.T,
"success": sol.success,
}
[docs]
def integrate_with_monitoring(
self, theta_0: np.ndarray, t_span: Tuple[float, float], n_points: int = 100,
compute_diffusion: bool = False
) -> Dict[str, Any]:
"""
Integrate full dynamics with GENERIC structure monitoring.
Parameters
----------
theta_0 : np.ndarray
Initial parameters
t_span : Tuple[float, float]
Time interval
n_points : int
Number of evaluation points
compute_diffusion : bool
Whether to compute D[ρ] at each step (expensive!)
Returns
-------
dict with trajectory and GENERIC monitoring data
"""
# First integrate the full dynamics using solve() (stable)
# Convert t_span to n_steps and dt
# Note: solve() returns n_steps+1 points (including initial),
# so use n_points-1 to match integrate()'s behavior
t_total = t_span[1] - t_span[0]
dt = t_total / (n_points - 1) if n_points > 1 else t_total
result = self.solve(theta_0, n_steps=n_points-1, dt=dt, verbose=False)
# Add compatibility keys
result['theta'] = result['trajectory'] # Alias for backward compatibility
result['time'] = np.linspace(t_span[0], t_span[1], len(result['trajectory']))
# Note: 'converged' means reached fixed point; 'success' means integration completed
# For compatibility with old integrate() which used scipy's success flag
result['success'] = True # solve() always completes (unlike scipy which can fail)
# Compute entropies along trajectory (for compatibility with integrate())
theta_traj = result['trajectory']
n_steps_actual = len(theta_traj)
H_traj = np.zeros(n_steps_actual)
h_traj = np.zeros((n_steps_actual, self.exp_family.n_sites))
for i, theta in enumerate(theta_traj):
rho = self.exp_family.rho_from_theta(theta)
H_traj[i] = von_neumann_entropy(rho)
h_traj[i] = marginal_entropies(rho, self.exp_family.dims)
result['H'] = H_traj
result['h'] = h_traj
result['constraint'] = np.sum(h_traj, axis=1)
# Compute GENERIC decomposition at each point
theta_traj = result['theta']
n_steps = len(theta_traj)
# Storage
H_eff_traj = []
entropy_production = np.zeros(n_steps)
S_norms = np.zeros(n_steps)
A_norms = np.zeros(n_steps)
D_rho_traj = [] if compute_diffusion else None
print("Computing GENERIC decomposition along trajectory...")
for i, theta in enumerate(theta_traj):
if n_steps >= 10 and i % (n_steps // 10) == 0:
print(f" Progress: {i}/{n_steps}")
elif n_steps < 10 and i == 0:
print(f" Progress: {i}/{n_steps}")
decomp = self.compute_generic_decomposition(theta)
H_eff_traj.append(decomp['H_eff'])
entropy_production[i] = decomp['entropy_production']
S_norms[i] = decomp['S_norm']
A_norms[i] = decomp['A_norm']
if compute_diffusion:
from qig.generic import diffusion_operator
D_rho = diffusion_operator(
decomp['S'], theta, self.exp_family, method=self.method
)
D_rho_traj.append(D_rho)
# Add GENERIC data to results
result['H_eff'] = H_eff_traj
result['entropy_production'] = entropy_production
result['S_norm'] = S_norms
result['A_norm'] = A_norms
result['cumulative_entropy'] = np.cumsum(entropy_production) * np.mean(np.diff(result['time']))
if compute_diffusion:
result['D_rho'] = D_rho_traj
return result
__all__ = ["InaccessibleGameDynamics", "GenericDynamics"]