diff --git a/demos/demo_matrix_decomposition.py b/demos/demo_matrix_decomposition.py new file mode 100644 index 0000000..ddce32c --- /dev/null +++ b/demos/demo_matrix_decomposition.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +import numpy as np +from collections import Counter + +# Import from your package +from pspace.core import CoordinateFactory, CoordinateSystem, BasisFunctionType, PolyFunction + +def main(): + + #-------------------------------------------------------------# + # 1. Build coordinate system with 2 axes (uniform for demo) + #-------------------------------------------------------------# + + cf = CoordinateFactory() + + # y0 ~ Uniform[-1, 1], max deg = 2 + coord0 = cf.createUniformCoordinate( + coord_id=cf.newCoordinateID(), + coord_name="y0", + dist_coords={'a': -1.0, 'b': 1.0}, + max_monomial_dof=3 + ) + + # y1 ~ Uniform[-1, 1], max deg = 2 + coord1 = cf.createUniformCoordinate( + coord_id=cf.newCoordinateID(), + coord_name="y1", + dist_coords={'a': -1.0, 'b': 1.0}, + max_monomial_dof=2 + ) + + # Tensor-degree basis + cs = CoordinateSystem(BasisFunctionType.TENSOR_DEGREE) + cs.addCoordinateAxis(coord0) + cs.addCoordinateAxis(coord1) + cs.initialize() + + print(f"Num basis functions = {cs.getNumBasisFunctions()}") + + #-------------------------------------------------------------# + # 2. Define polynomial f(y) = 3 + 3*y0 + 3*y0^2*y1 + #-------------------------------------------------------------# + + polyf = PolyFunction([ + (3.0, Counter({})), # constant + (3.0, Counter({0: 1})), # linear term y0 + (3.0, Counter({0: 2, 1: 1})) # mixed quadratic + ]) + + #-------------------------------------------------------------# + # 3. Run sparse vs full assembly check + #-------------------------------------------------------------# + + print(polyf.degrees) + + ok, diffs = cs.check_decomposition_matrix_sparse_full(polyf, tol=1e-10, verbose=True) + print("[Result] Matrix assembly check:", "PASSED" if ok else "FAILED") + +if __name__ == "__main__": + main() diff --git a/demos/demo_state_equation.py b/demos/demo_state_equation.py new file mode 100644 index 0000000..0d38a5b --- /dev/null +++ b/demos/demo_state_equation.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +import numpy as np +from collections import Counter +from pspace.core import ( + CoordinateFactory, CoordinateSystem, + BasisFunctionType, PolyFunction, StateEquation +) + +#---------------------------------------------------------------------# +# 1. Coordinate system setup +#---------------------------------------------------------------------# + +cf = CoordinateFactory() +cs = CoordinateSystem(BasisFunctionType.TENSOR_DEGREE, verbose=False) + +coord_x = cf.createUniformCoordinate(cf.newCoordinateID(), "x", dict(a=-1, b=1), max_monomial_dof=2) +coord_y = cf.createUniformCoordinate(cf.newCoordinateID(), "y", dict(a=-1, b=1), max_monomial_dof=2) + +cs.addCoordinateAxis(coord_x) +cs.addCoordinateAxis(coord_y) + +cs.initialize() + +print(f"Num basis functions = {cs.getNumBasisFunctions()}") + +#---------------------------------------------------------------------# +# 2. Define operator and RHS +#---------------------------------------------------------------------# + +# Operator: identity weight (Gram matrix) +gram_op = PolyFunction([(1.0, Counter())]) + +# RHS: g(x,y) = 1 + 2x + 3y +rhs_fn = PolyFunction([ + (1.0, Counter()), + (2.0, Counter({coord_x.id: 1})), + (3.0, Counter({coord_y.id: 1})) +]) + +#---------------------------------------------------------------------# +# 3. Build and assemble StateEquation +#---------------------------------------------------------------------# + +eq = StateEquation("reconstruction", gram_op, rhs_fn, cs) +eq.assemble(analytic=False, sparse=True) + +print("\n[Operator matrix G_phi]") +print(np.round(eq.operator_matrix, 3)) + +print("\n[RHS vector]") +print(np.round(eq.rhs_vector, 3)) + +#---------------------------------------------------------------------# +# 4. Precondition and solve +#---------------------------------------------------------------------# + +eq.precondition(method="cholesky") +a_phi = eq.solve() + +print("\n[Whitened operator condition number]") +print(np.linalg.cond(eq.operator_whitened)) + +print("\n[Solved coefficients a_phi]") +print(np.round(a_phi, 6)) + +# Verify reconstruction consistency +residual = eq.operator_matrix @ a_phi - eq.rhs_vector +print("‖Residual‖₂ =", np.linalg.norm(residual)) diff --git a/pspace/core.py b/pspace/core.py index 8045464..8294cfa 100644 --- a/pspace/core.py +++ b/pspace/core.py @@ -1,980 +1,1244 @@ #!/usr/bin/env python -from __future__ import print_function + +#=====================================================================# +# சாத்தியவியல், இடவியல், காலவியல் பகுதி வேறுபாட்டுச் +# சமன்பாடுகளுக்கான கணிதப் பகுப்பாய்வுத் தொகுதி + +# ஆசிரியர் : கோமகன் பூபதி (komahan@gatech.edu) +#————————————————————————————————————————————————————————————————————-# +# MATHEMATICAL ANALYSIS MODULE FOR PROBABILISTIC-SPATIO-TEMPORAL +# PARTIAL DIFFERENTIAL EQUATIONS +# +# Author : Komahan Boopathy (komahan@gatech.edu) +#=====================================================================# # External modules -import numpy as np -np.set_printoptions(precision=3,suppress=True) import math +import sympy as sp +import numpy as np +np.set_printoptions(precision=3, suppress=True) + from collections import Counter -from enum import Enum +from enum import Enum +from itertools import product # Local modules -from stochastic_utils import tensor_indices, nqpts, sparse -from orthogonal_polynomials import unit_hermite as Hhat -from orthogonal_polynomials import unit_legendre as Phat -from orthogonal_polynomials import unit_laguerre as Lhat -from plotter import plot_jacobian, plot_vector - -def index(ii): - return ii - if ii == 0: - return 0 - if ii == 1: - return 1 - if ii == 2: - return 3 - if ii == 3: - return 2 - -## TODO -# Parameters are Monomials -# Deterministic parameters are constant monomials (degree 0) -# Probabilistic parameters are highest degree monomials - -class ParameterType(Enum): +from .stochastic_utils import (minnum_quadrature_points, + generate_basis_tensor_degree, + generate_basis_total_degree, + sum_degrees, + safe_zero_degrees, + sum_degrees_union_matrix, + sum_degrees_union_vector) + +from .orthogonal_polynomials import unit_hermite +from .orthogonal_polynomials import unit_legendre +from .orthogonal_polynomials import unit_laguerre + +#=====================================================================# +# Enums +#=====================================================================# + +class CoordinateType(Enum): """ - Enumeration for types of parameters. Assumption is that any - parameter that you create will either be 'deterministic' or - 'probabilistic' in nature. + DOMAIN TYPES """ - DETERMINISTIC = 1 - PROBABILISTIC = 2 - + PROBABILISTIC = 1 + SPATIAL = 2 + TEMPORAL = 3 + + class DistributionType(Enum): """ - Enumeration of probability distribution types. + GEOMETRY: DENSITY DISTRIBUTION """ - NONE = 0 - NORMAL = 1 - UNIFORM = 2 - EXPONENTIAL = 3 - POISSON = 4 - BINORMAL = 5 - -class Parameter(object): + NORMAL = 0 + UNIFORM = 1 + EXPONENTIAL = 2 + POISSON = 3 + BINORMAL = 4 + + +class BasisFunctionType(Enum): + """ + VECTOR-SPACE CONSTRUCTION METHODS """ - A hashable parameter object wrapping information about the - parameter used in computations. Hashable implies being able to - serve as keys dictionaries. + TENSOR_DEGREE = 0 + TOTAL_DEGREE = 1 + ADAPTIVE_DEGREE = 2 - Author: Komahan Boopathy +class PolyFunction: + def __init__(self, terms): + """ + terms : list of (coeff, Counter) pairs + Example: + [ + (3, Counter()), # constant + (3, Counter({0:1})), # 3*y0 + (3, Counter({0:2, 1:1})) # 3*y0^2 * y1 + ] + """ + self._terms = [] + self._degrees = [] # list of Counters + self._max_degrees = Counter() + + for t in terms: + if isinstance(t, tuple) and isinstance(t[1], Counter): + coeff, degs = t + self._terms.append((coeff, degs)) + self._degrees.append(degs) + for k, v in degs.items(): + self._max_degrees[k] = max(self._max_degrees.get(k, 0), v) + else: + raise TypeError(f"Invalid term format: {t!r}") + + @property + def terms(self): + return self._terms + + @property + def degrees(self): + """List[Counter]: degree structure per monomial""" + return self._degrees + + @property + def max_degrees(self): + """Counter: max degree per axis (union of monomials)""" + return self._max_degrees + + def __call__(self, Y): + total = 0.0 + for coeff, degs in self._terms: + mon = coeff + for cid, d in degs.items(): + mon *= Y[cid] ** d + total += mon + return total + + def __repr__(self): + return f"PolyFunction({self._terms})" + +class OrthoPolyFunction: """ - def __init__(self, pdata): - self.param_id = pdata['param_id'] - self.param_name = pdata['param_name'] - self.param_type = pdata['param_type'] - self.dist_type = pdata['dist_type'] - self.monomial_degree = pdata['monomial_degree'] - self.basis_map = {} - self.quadrature_map = {} - return - + Polynomial function expressed in orthonormal basis (Legendre, Hermite, etc.) + Works with CoordinateSystem and its Coordinate axes. + """ + + def __init__(self, terms, coordinates): + """ + Parameters + ---------- + terms : list[(coeff: float, degs: Counter)] + List of terms (basis coefficient, degree counter per coordinate). + coordinates : dict[int, Coordinate] + Dictionary of coordinate objects {cid: Coordinate}. + """ + self._terms = terms + self._coords = coordinates + + def __call__(self, Y): + """ + Evaluate function at point Y in computational coordinates. + Y : dict {cid: float} + """ + total = 0.0 + for coeff, degs in self._terms: + term_val = coeff + for cid, d in degs.items(): + # Evaluate orthogonal basis polynomial at Y[cid] + term_val *= self._coords[cid].psi_y(Y[cid], d) + total += term_val + return total + + def toPolyFunction(self): + """ + Expand OrthoPolyFunction into monomial PolyFunction + using change-of-basis matrices (currently supports Legendre). + """ + from numpy.polynomial import legendre as npleg + from numpy.polynomial import polynomial as nppoly + from .core import PolyFunction # adjust import if needed + + monomial_terms = Counter() + + for coeff, degs in self._terms: + # Build tensor product expansion for each coordinate + expansions = [] + for cid, d in degs.items(): + coord = self._coords[cid] + if coord.dist_type.name == "UNIFORM": # Legendre basis + Pn = npleg.Legendre.basis(d) + poly = Pn.convert(kind=nppoly.Polynomial) + coeffs_power = np.array(poly.coef, dtype=float) + s = np.sqrt((2*d+1)/2.0) # orthonormal scale + coeffs_power *= s + expansions.append((cid, coeffs_power)) + else: + raise NotImplementedError("toPolyFunction only supports Legendre for now") + + # Recursive tensor product accumulation + def recurse(idx, running_coeff, running_degs): + if idx == len(expansions): + monomial_terms[running_degs] += coeff * running_coeff + return + cid, coeffs_power = expansions[idx] + for p, val in enumerate(coeffs_power): + if abs(val) < 1e-15: + continue + recurse(idx+1, running_coeff*val, + running_degs + Counter({cid: p})) + + recurse(0, 1.0, Counter()) + + # Build PolyFunction terms + terms = [(float(val), degs) for degs, val in monomial_terms.items() if abs(val) > 1e-15] + return PolyFunction(terms) + + def coeffs(self): + """Return coefficients directly.""" + return {tuple(sorted(d.items())): c for c, d in self._terms} + + def __repr__(self): + return f"OrthoPolyFunction({len(self._terms)} terms, basis=orthonormal)" + +#=====================================================================# +# Coordinate Base Class +#=====================================================================# + +class Coordinate(object): + def __init__(self, coord_data): + self.id = coord_data['coord_id'] + self.name = coord_data['coord_name'] + self.type = coord_data['coord_type'] + self.distribution = coord_data['dist_type'] + self.degree = coord_data['monomial_degree'] + self.symbol = sp.Symbol(self.name) # y + self.rho = None # rho(y) + def __str__(self): - return str(self.__class__.__name__) + " " + str(self.__dict__) + return str(self.__class__.__name__) + " " + str(self.__dict__) + "\n" - def __hash__(self): - return hash((self.param_id)) + def weight(self): + """Return symbolic weight function ρ(y) attached to this coordinate.""" + return self.rho - def __eq__(self, other): - return (self.param_id) == (other.param_id) + #-----------------------------------------------------------------# + # VARIABLE TRANSFORMATIONS + #-----------------------------------------------------------------# - def __ne__(self, other): - return not(self == other) + def physical_to_standard(self, yscalar): + """Map physical y -> standard z""" + raise NotImplementedError - def getParameterValue(self): - return self.param_value - - def getParameterType(self): - return self.param_type + def quadrature_to_physical(self, xscalar): + """Map quadrature x -> physical y""" + raise NotImplementedError - def getDistributionType(self): - return self.dist_type + def standard_to_physical(self, zscalar): + """Map standard z -> physical y""" + raise NotImplementedError - def getParameterID(self): - return self.param_id + def quadrature_to_standard(self, xscalar): + """Map quadrature x -> standard z""" + return self.physical_to_standard(self.quadrature_to_physical(xscalar)) - def setParameterID(self, pid): - self.param_id = pid - return + #-----------------------------------------------------------------# + # BASIS EVALUATIONS + #-----------------------------------------------------------------# + + def psi_z(self, zscalar, degree): + raise NotImplementedError + + def psi_y(self, yscalar, degree): + z = self.physical_to_standard(yscalar) + return self.psi_z(z, degree) + + def psi_x(self, xscalar, degree): + z = self.quadrature_to_standard(xscalar) + return self.psi_z(z, degree) + + #-----------------------------------------------------------------# + # subclass provides a 1D Gauss rule for the needed degree + #-----------------------------------------------------------------# + + def gaussian_quadrature(self, degree): + """ + Return native Gauss rule (x_nodes, w_nodes) sized for `degree`. + Subclass chooses the correct family and normalization. + """ + raise NotImplementedError - def getQuadraturePointsWeights(self, npoints): - pass + def getQuadraturePointsWeights(self, degree): + x, w = self.gaussian_quadrature(degree) + z = self.quadrature_to_standard(x) + y = np.array([self.standard_to_physical(zz) for zz in z]) + return {'yq': y, 'zq': z, 'wq': w} - def evalOrthoNormalBasis(self, z, d): - pass - -class DeterministicParameter(Parameter): +#=====================================================================# +# Coordinate Implementations +#=====================================================================# + +class NormalCoordinate(Coordinate): def __init__(self, pdata): - super(DeterministicParameter, self).__init__(pdata) - self.param_value = pdata['param_value'] - return - - def getQuadraturePointsWeights(self, npoints): - raise - try: - return self.quadrature_map[npoints] - except: - # Store in map - cmap = {'yq' : self.param_value, 'zq' : self.param_value, 'wq' : 1.0} - self.quadrature_map[npoints] = cmap - return cmap - - def evalOrthoNormalBasis(self, z, d): - """ - Evaluate the orthonormal basis at supplied coordinate. If it - exists in the map already, return the value from map. If not - evaluate the orthonormal polynomial and return. - - Note: For the deterministic case, the value is always one. - """ - zkey = hash(z) - try: - return self.basis_map[(d,zkey)] - except: - self.basis_map[(d,zkey)] = 1.0 - return val - -class ProbabilisticParameter(Parameter): + super().__init__(pdata) + mu = sp.sympify(pdata['dist_coords']['mu']) + sigma = sp.sympify(pdata['dist_coords']['sigma']) + self.dist_coords = {'mu': mu, 'sigma': sigma} + self.rho = sp.exp(-(self.symbol - mu)**2 / (2*sigma**2)) / (sp.sqrt(2*sp.pi) * sigma) + + def domain(self): + return -sp.oo, sp.oo + + def physical_to_standard(self, yscalar): + """Map physical y -> standard z""" + mu, sigma = self.dist_coords['mu'], self.dist_coords['sigma'] + return (yscalar - mu) / sigma + + def quadrature_to_physical(self, xscalar): + """Map quadrature x -> physical y""" + mu, sigma = self.dist_coords['mu'], self.dist_coords['sigma'] + return mu + sigma * np.sqrt(2) * xscalar + + def standard_to_physical(self, zscalar): + """Map standard z -> physical y""" + mu, sigma = self.dist_coords['mu'], self.dist_coords['sigma'] + return mu + sigma * zscalar + + def psi_z(self, z, degree): + return unit_hermite(z, degree) + + def gaussian_quadrature(self, degree): + npts = minnum_quadrature_points(degree) + x, w = np.polynomial.hermite.hermgauss(npts) + w = w / np.sqrt(np.pi) + return x, w + +class UniformCoordinate(Coordinate): def __init__(self, pdata): - super(ProbabilisticParameter, self).__init__(pdata) - self.dist_params = pdata['dist_params'] - return - - def getDistributionParameters(self, key): - return self.dist_params[key] - -class ExponentialParameter(Parameter): + super().__init__(pdata) + a = sp.sympify(pdata['dist_coords']['a']) + b = sp.sympify(pdata['dist_coords']['b']) + self.dist_coords = {'a': a, 'b': b} + self.rho = sp.Rational(1, b - a) + + def domain(self): + return self.dist_coords['a'], self.dist_coords['b'] + + def physical_to_standard(self, yscalar): + a, b = self.dist_coords['a'], self.dist_coords['b'] + return (yscalar - a) / (b - a) + + def quadrature_to_physical(self, xscalar): + a, b = self.dist_coords['a'], self.dist_coords['b'] + return (b - a) * xscalar + a + + def standard_to_physical(self, zscalar): + a, b = self.dist_coords['a'], self.dist_coords['b'] + return (b - a) * zscalar + a + + def psi_z(self, z, degree): + return unit_legendre(z, degree) + + def gaussian_quadrature(self, degree): + npts = minnum_quadrature_points(degree) + xi, w = np.polynomial.legendre.leggauss(npts) # on [-1,1] + x_shifted = 0.5 * (xi + 1.0) # map to [0,1] + w_shifted = 0.5 * w + return x_shifted, w_shifted + +class ExponentialCoordinate(Coordinate): def __init__(self, pdata): - super(ExponentialParameter, self).__init__(pdata) - self.dist_params = pdata['dist_params'] - return + super().__init__(pdata) + mu = sp.sympify(pdata['dist_coords']['mu']) + beta = sp.sympify(pdata['dist_coords']['beta']) + self.dist_coords = {'mu': mu, 'beta': beta} + self.rho = sp.exp(-(self.symbol - mu)/beta) / beta - def getQuadraturePointsWeights(self, npoints): - """ - numpy.polynomial.laguerre.laggauss(deg)[source] - """ - try: - return self.quadrature_map[npoints] - except: - # This is based on interval [0, \inf] with the weight - # function f(xi) = \exp(-xi) which is also the standard - # PDF f(z) = \exp(-z) - xi, w = np.polynomial.laguerre.laggauss(npoints) - mu = self.dist_params['mu'] - beta = self.dist_params['beta'] - - # scale weights to unity (Area under exp(-xi) in [0,inf] is 1.0 - w = w/1.0 - - # transformation of variables - y = mu + beta*xi - - # assert if weights don't add up to unity - eps = np.finfo(np.float64).eps - assert((1.0 - 2.0*eps <= np.sum(w) <= 1.0 + 2.0*eps) == True) - - # Return quadrature point in standard space as well - z = xi # (y-mu)/beta - - # Store in map - cmap = {'yq' : y, 'zq' : z, 'wq' : w} - self.quadrature_map[npoints] = cmap - return cmap - - - def evalOrthoNormalBasis(self, z, d): - """ - Evaluate the orthonormal basis at supplied coordinate. If it - exists in the map already, return the value from map. If not - evaluate the orthonormal polynomial and return. - """ - zkey = hash((z,d)) - try: - return self.basis_map[zkey] - except: - val = Lhat(z,d) - self.basis_map[zkey] = val - return val - -class NormalParameter(Parameter): - def __init__(self, pdata): - super(NormalParameter, self).__init__(pdata) - self.dist_params = pdata['dist_params'] - return + def domain(self): + return self.dist_coords['mu'], sp.oo - def getQuadraturePointsWeights(self, npoints): - try: - return self.quadrature_map[npoints] - except: - # This is based on physicist unnormlized weight exp(-x*x). - x, w = np.polynomial.hermite.hermgauss(npoints) - mu = self.dist_params['mu'] - sigma = self.dist_params['sigma'] - - # scale weights to unity (Area under exp(-x) in [0,inf] is 1.0 - w = w/np.sqrt(np.pi) - - # transformation of variables - y = mu + sigma*np.sqrt(2)*x - - # assert if weights don't add up to unity - eps = np.finfo(np.float64).eps - assert((1.0 - 2.0*eps <= np.sum(w) <= 1.0 + 2.0*eps) == True) - - # Return quadrature point in standard space as well - z = (y-mu)/sigma - - # Store in map - cmap = {'yq' : y, 'zq' : z, 'wq' : w} - self.quadrature_map[npoints] = cmap - return cmap - - def evalOrthoNormalBasis(self, z, d): - """ - Evaluate the orthonormal basis at supplied coordinate. If it - exists in the map already, return the value from map. If not - evaluate the orthonormal polynomial and return. - """ - zkey = hash((z,d)) - try: - return self.basis_map[zkey] - except: - val = Hhat(z,d) - self.basis_map[zkey] = val - return val - -class UniformParameter(Parameter): - def __init__(self, pdata): - super(UniformParameter, self).__init__(pdata) - self.dist_params = pdata['dist_params'] - return + def physical_to_standard(self, yscalar): + mu, beta = self.dist_coords['mu'], self.dist_coords['beta'] + return (yscalar - mu) / beta - def getQuadraturePointsWeights(self, npoints): - try: - return self.quadrature_map[npoints] - except: - # This is based on weight 1.0 on interval [-1,1] - x, w = np.polynomial.legendre.leggauss(npoints) - a = self.dist_params['a'] - b = self.dist_params['b'] - - # scale weights to unity - w = w/2.0 - - # transformation of variables - y = (b-a)*x/2 + (b+a)/2 - - # assert if weights don't add up to unity - eps = np.finfo(np.float64).eps - assert((1.0 - 2.0*eps <= np.sum(w) <= 1.0 + 2.0*eps) == True) - - # Return quadrature point in standard space as well - z = (y-a)/(b-a) - - # Store in map - cmap = {'yq' : y, 'zq' : z, 'wq' : w} - self.quadrature_map[npoints] = cmap - return cmap - - def evalOrthoNormalBasis(self, z, d): - """ - Evaluate the orthonormal basis at supplied coordinate. If it - exists in the map already, return the value from map. If not - evaluate the orthonormal polynomial and return. - """ - zkey = hash((z,d)) - try: - return self.basis_map[zkey] - except: - val = Phat(z,d) - self.basis_map[zkey] = val - return val - -class HashableDict(dict): - def __hash__(self): - return hash(tuple(sorted(self.iteritems()))) - -class ParameterFactory: - """ - This class takes in primitives and makes data strucuture required - for other classes in this module (this is like creating elements). + def quadrature_to_physical(self, xscalar): + mu, beta = self.dist_coords['mu'], self.dist_coords['beta'] + return mu + beta * xscalar - Deterministic parameter is a parameter whose polynomial degree is - zero and stochastic parameter is a parameter whose polynomial - degree is non zero. - """ + def standard_to_physical(self, zscalar): + mu, beta = self.dist_coords['mu'], self.dist_coords['beta'] + return mu + beta * zscalar + + def psi_z(self, z, degree): + return unit_laguerre(z, degree) + + def gaussian_quadrature(self, degree): + npts = minnum_quadrature_points(degree) + x, w = np.polynomial.laguerre.laggauss(npts) + return x, w + +#=====================================================================# +# Coordinate Factory +#=====================================================================# + +class CoordinateFactory: def __init__(self): - self.next_param_id = 0 + self.next_coord_id = 0 return - - def getParameterID(self): - pid = self.next_param_id - self.next_param_id = self.next_param_id + 1 + + def newCoordinateID(self): + pid = self.next_coord_id + self.next_coord_id = self.next_coord_id + 1 return pid - def createDeterministicParameter(self, pname, pvalue): - # Prepare map for calling constructor of deterministic - # parameter - pdata = {} - pdata['param_name'] = pname - pdata['param_type'] = ParameterType.DETERMINISTIC - pdata['dist_type'] = DistributionType.NONE - pdata['param_value'] = pvalue - pdata['monomial_degree'] = 0 - pdata['param_id'] = self.getParameterID() - return DeterministicParameter(pdata) - - def createNormalParameter(self, pname, dist_params, monomial_degree): - # Prepare map for calling constructor of Normal/Gaussian - # parameter + def createNormalCoordinate(self, coord_id, coord_name, dist_coords, max_monomial_dof): pdata = {} - pdata['param_name'] = pname - pdata['param_type'] = ParameterType.PROBABILISTIC + pdata['coord_id'] = coord_id + pdata['coord_name'] = coord_name + pdata['coord_type'] = CoordinateType.PROBABILISTIC pdata['dist_type'] = DistributionType.NORMAL - pdata['dist_params'] = dist_params - pdata['monomial_degree'] = monomial_degree - pdata['param_id'] = self.getParameterID() - return NormalParameter(pdata) - - def createUniformParameter(self, pname, dist_params, monomial_degree): - # Prepare map for calling constructor of Uniform - # parameter + pdata['dist_coords'] = dist_coords + pdata['monomial_degree'] = max_monomial_dof + return NormalCoordinate(pdata) + + def createUniformCoordinate(self, coord_id, coord_name, dist_coords, max_monomial_dof): pdata = {} - pdata['param_name'] = pname - pdata['param_type'] = ParameterType.PROBABILISTIC + pdata['coord_id'] = coord_id + pdata['coord_name'] = coord_name + pdata['coord_type'] = CoordinateType.PROBABILISTIC pdata['dist_type'] = DistributionType.UNIFORM - pdata['dist_params'] = dist_params - pdata['monomial_degree'] = monomial_degree - pdata['param_id'] = self.getParameterID() - return UniformParameter(pdata) - - def createExponentialParameter(self, pname, dist_params, monomial_degree): - # Prepare map for calling constructor of Uniform - # parameter + pdata['dist_coords'] = dist_coords + pdata['monomial_degree'] = max_monomial_dof + return UniformCoordinate(pdata) + + def createExponentialCoordinate(self, coord_id, coord_name, dist_coords, max_monomial_dof): pdata = {} - pdata['param_name'] = pname - pdata['param_type'] = ParameterType.PROBABILISTIC + pdata['coord_id'] = coord_id + pdata['coord_name'] = coord_name + pdata['coord_type'] = CoordinateType.PROBABILISTIC pdata['dist_type'] = DistributionType.EXPONENTIAL - pdata['dist_params'] = dist_params - pdata['monomial_degree'] = monomial_degree - pdata['param_id'] = self.getParameterID() - return ExponentialParameter(pdata) + pdata['dist_coords'] = dist_coords + pdata['monomial_degree'] = max_monomial_dof + return ExponentialCoordinate(pdata) -class ParameterContainer: - """ - Class that contains all stochastic parameters and handles - quadrature and evaluation of basis functions. +#=====================================================================# +# Coordinate System +#=====================================================================# - This object is simply a container for objects of type Parameter. - - Author: Komahan Boopathy +class CoordinateSystem: """ - def __init__(self): - self.num_terms = 1 + 1) holds coordinates (axes), + 2) manages basis (multi-indices), + 3) integrates inner products via tensor-product quadrature. + """ + def __init__(self, basis_type, verbose=False): + self.coordinates = {} # {cid : Coordinate} + self.basis_construction = basis_type + self.basis = None # {basis_id: Counter({cid:deg,...})} + self.verbose = bool(verbose) - # container for storing all parameters - self.parameter_map = {} + def __str__(self): + return str(self.__class__.__name__) + " " + str(self.__dict__) + "\n" - # replace with DegreeSet class - self.basistermwise_parameter_degrees = {} # For each parameter and basis entry what - # is the degree according to tensor - # product + #-----------------------------------------------------------------# + # Basis and initialization + #-----------------------------------------------------------------# + def evaluate_basis(self, yscalar, degree: int): + """ψ(y) = ψ(z(y)), evaluated in Y-frame.""" + zscalar = self.physical_to_standard(yscalar) + return self.psi_z(zscalar, degree) - # Replace with basis class - self.psi_map = {} - - return + def getNumBasisFunctions(self): + return len(self.basis) if self.basis is not None else 0 - def __str__(self): - return str(self.__class__.__name__) + " " + str(self.__dict__) + def getNumCoordinateAxes(self): + return len(self.coordinates) - def getNumParameters(self): - return len(self.parameter_map.keys()) + def getMonomialDegreeCoordinates(self): + return {cid: coord.degree for cid, coord in self.coordinates.items()} + + def addCoordinateAxis(self, coordinate): + self.coordinates[coordinate.id] = coordinate def initialize(self): + max_deg_map = self.getMonomialDegreeCoordinates() + if self.basis_construction == BasisFunctionType.TENSOR_DEGREE: + self.basis = generate_basis_tensor_degree(max_deg_map) + elif self.basis_construction == BasisFunctionType.TOTAL_DEGREE: + self.basis = generate_basis_total_degree(max_deg_map) + else: + raise NotImplementedError("ADAPTIVE_DEGREE path not implemented") + + #-----------------------------------------------------------------# + # Quadrature + #-----------------------------------------------------------------# + + def print_quadrature(self, qmap): + if not self.verbose: + return + print("Quadrature rule:") + print("-" * 80) + for q, data in qmap.items(): + y_str = ", ".join(f"y{cid}={val:.6g}" for cid, val in data['Y'].items()) + z_str = ", ".join(f"z{cid}={val:.6g}" for cid, val in data['Z'].items()) + print(f"q={q:3d} : {y_str:<36} | {z_str:<36} | W={data['W']:.6g}") + print("-" * 80) + print("sum W =", sum(d['W'] for d in qmap.values())) + + def build_quadrature(self, degrees: Counter): """ - Initialize the container after all 'paramters' are added. + degrees : Counter({cid: p_i}) — per-axis polynomial degree + Returns : {q_index: {'Y':{cid:y}, 'Z':{cid:z}, 'W':w}} """ - # Create degree map - sp_hd_map = self.getParameterHighestDegreeMap(exclude=ParameterType.DETERMINISTIC) - self.basistermwise_parameter_degrees = tensor_indices(sp_hd_map) - return + cids = list(self.coordinates.keys()) + one_d = {cid: self.coordinates[cid].getQuadraturePointsWeights( + int(degrees.get(cid, 0))) for cid in cids} + sizes = {cid: len(one_d[cid]['wq']) for cid in cids} + + qmap, ctr = {}, 0 + for i_tuple in product(*[range(sizes[cid]) for cid in cids]): + y, z, w = {}, {}, 1.0 + for cid, i in zip(cids, i_tuple): + y[cid] = one_d[cid]['yq'][i] + z[cid] = one_d[cid]['zq'][i] + w *= one_d[cid]['wq'][i] + qmap[ctr] = {'Y': y, 'Z': z, 'W': w} + ctr += 1 + + self.print_quadrature(qmap) + + return qmap + + #-----------------------------------------------------------------# + # Basis evaluation + #-----------------------------------------------------------------# + + def evaluateBasisDegreesY(self, y_by_cid, degrees_counter): + val = 1.0 + for cid, deg in degrees_counter.items(): + val *= self.coordinates[cid].psi_y(y_by_cid[cid], deg) + return val + + def evaluateBasisIndexY(self, y_by_cid, basis_id): + degrees = self.basis[basis_id] + return self.evaluateBasisDegreesY(y_by_cid, degrees) + + #-----------------------------------------------------------------# + # Sparsity Detection Utilities + #-----------------------------------------------------------------# + + def sparse_vector(self, dmapi, dmapf): + """ + Detect sparsity for . - def getNumQuadraturePoints(self): - """ - """ - param_nqpts_map = Counter() - pkeys = self.parameter_map.keys() - for pid in pkeys: - param_nqpts_map[pid] = self.getParameter(pid).monomial_degree - return param_nqpts_map - - def getNumQuadraturePointsFromDegree(self,dmap): - """ - Supply a map whose keys are parameterids and values are - monomial degrees and this function will return a map with - parameterids as keys and number of corresponding quadrature - points along the monomial dimension. - """ - ## pids = dmap.keys() - ## param_nqpts_map = Counter() - ## for pid in pids: - ## param_nqpts_map[pid] = nqpts(dmap[pid]) - ## return param_nqpts_map - - pids = dmap.keys() - param_nqpts_map = Counter() - for pid in self.parameter_map.keys(): #pids: - param_nqpts_map[pid] = self.parameter_map[pid].monomial_degree #nqpts(dmap[pid]) - return param_nqpts_map - - def getParameterDegreeForBasisTerm(self, paramid, kthterm): - """ - What is the polynomial degree of the corresponding k-th or - Hermite/Legendre basis function? For univariate stochastic - case d == k, but will change for multivariate case based on - tensor product or other rules used to construct the - multivariate basis set. - """ - return self.basistermwise_parameter_degrees[kthterm][paramid] - - def getParameters(self): - return self.parameter_map - - def getParameter(self, paramid): - return self.parameter_map[paramid] - - def getParameterHighestDegreeMap(self, exclude=ParameterType.DETERMINISTIC): - degree_map = {} - for paramkey in self.parameter_map.keys(): - param = self.parameter_map[paramkey] - if exclude != param.getParameterType(): - degree_map[paramkey] = param.monomial_degree - return degree_map - - def getNumStochasticBasisTerms(self): - return self.num_terms - - def addParameter(self, new_parameter): - - # do you want to hold separate maps for deterministic and - # stochastic parameters? - - # Add parameter object to the map of parameters - self.parameter_map[new_parameter.getParameterID()] = new_parameter - - # Increase the number of stochastic terms (tensor pdt rn) - self.num_terms = self.num_terms*new_parameter.monomial_degree - - return + dmapi : Counter({axis: degree}) for basis ψ_i + dmapf : Counter({axis: degree}) for function f + """ + for axis, deg in dmapi.items(): + if deg > dmapf.get(axis, 0): + return False + return True + + def monomial_vector_sparsity_mask(self, f_deg: Counter): + mask = set() + for i, psi_i in self.basis.items(): + if self.sparse_vector(psi_i, f_deg): + mask.add(i) + return mask + + def polynomial_vector_sparsity_mask(self, f_degrees: list[Counter]): + mask = set() + for f_deg in f_degrees: + mask |= self.monomial_vector_sparsity_mask(f_deg) + return mask + + #-----------------------------------------------------------------# + # Inner products + #-----------------------------------------------------------------# + + def inner_product(self, f_eval, g_eval, + f_deg: Counter|None=None, + g_deg: Counter|None=None): + """ + in Y-frame = ∑ f(Y_q) g(Y_q) W_q + """ + coord_ids = list(self.coordinates.keys()) + f_deg = f_deg or safe_zero_degrees(coord_ids) + g_deg = g_deg or safe_zero_degrees(coord_ids) + need = sum_degrees(f_deg, g_deg) + + qmap = self.build_quadrature(need) + s = 0.0 + for q in qmap.values(): + y = q['Y'] + s += f_eval(y) * g_eval(y) * q['W'] + return s + + def inner_product_basis(self, + i_id: int, j_id: int, + f_eval=None, f_deg: Counter|None=None): + """ + <ψ_i, f, ψ_j> in Y-frame. + """ + psi_i = self.basis[i_id] + psi_j = self.basis[j_id] + + coord_ids = list(self.coordinates.keys()) + f_deg = f_deg or safe_zero_degrees(coord_ids) + need = sum_degrees(psi_i, psi_j, f_deg) + + qmap = self.build_quadrature(need) + s = 0.0 + for q in qmap.values(): + y = q['Y'] + val = (self.evaluateBasisDegreesY(y, psi_i) * + self.evaluateBasisDegreesY(y, psi_j)) + if f_eval is not None: + val *= f_eval(y) + s += val * q['W'] + return s + + #-----------------------------------------------------------------# + # Decomposition + #-----------------------------------------------------------------# + + def decompose(self, + function : PolyFunction, + sparse : bool = True, + analytic : bool = False): + """ + Coefficients c_k = in Y-frame. + + Parameters + ---------- + function : PolyFunction + Polynomial function to decompose. + sparse : bool + If True, restrict to admissible basis indices. + analytic : bool + If True, compute coefficients with Sympy integrals instead of quadrature. + + Returns + ------- + coeffs : dict {basis_id: coefficient} + """ + coords = self.coordinates + symbols = {cid: coord.symbol for cid, coord in coords.items()} + + #---------------------------------------------------------------# + # Build admissible mask + #---------------------------------------------------------------# + if sparse: + mask = self.polynomial_vector_sparsity_mask(function.degrees) + else: + mask = self.basis.keys() + + coeffs = {} + for k in mask: + psi_k = self.basis[k] + + if analytic: + #-------------------------------------------------------# + # Analytic Sympy integration + #-------------------------------------------------------# + psi_expr = 1 + for cid, deg in psi_k.items(): + z = coords[cid].physical_to_standard(coords[cid].symbol) + psi_expr *= coords[cid].psi_z(z, deg) + + integrand = function(symbols) * psi_expr * sp.Mul(*[c.weight() for c in coords.values()]) + val = integrand + for cid, coord in coords.items(): + y = coord.symbol + a, b = coord.domain() + val = sp.integrate(val, (y, a, b)) + + coeffs[k] = sp.simplify(val) + + else: + #-------------------------------------------------------# + # Numerical quadrature + #-------------------------------------------------------# + need = sum_degrees_union_vector(function.max_degrees, psi_k) + qmap = self.build_quadrature(need) + + s = 0.0 + for q in qmap.values(): + y = q['Y'] + s += function(y) * self.evaluateBasisDegreesY(y, psi_k) * q['W'] + coeffs[k] = s + + # Optionally fill zeroes + if sparse: + for k in self.basis: + if k not in coeffs: + coeffs[k] = 0 + + return coeffs + + def admissible_pair(self, deg_i: Counter, deg_j: Counter, f_deg: Counter) -> bool: + """ + Axis-wise admissibility rule for a single monomial. + + admissible_pair = atomic check (axis-wise rule) + monomial_sparsity_mask = per-monomial mask + polynomial_sparsity_mask = per-polynomial union of masks + + Parameters + ---------- + deg_i, deg_j : Counter + Degree structure of basis functions psi_i, psi_j. + f_deg : Counter + Degree structure of one monomial in f. + + Returns + ------- + bool + True if can be nonzero. + + Rule + ---- + For every axis d: + |deg_i(d) - deg_j(d)| <= f_deg(d) <= deg_i(d) + deg_j(d) + """ - def initializeQuadrature(self, param_nqpts_map): - self.quadrature_map = self.getQuadraturePointsWeights(param_nqpts_map) - return - - def W(self, q): - wmap = self.quadrature_map[q]['W'] - return wmap - - def psi(self, k, zmap): - paramids = zmap.keys() - ans = 1.0 - for paramid in paramids: - # Deterministic ones return one! maybe we can avoid! - d = self.getParameterDegreeForBasisTerm(paramid, k) - val = self.getParameter(paramid).evalOrthoNormalBasis(zmap[paramid],d) - ans = ans*val - return ans - - def Z(self, q, key='pid'): - if key == 'pid': - # use pid as key - return self.quadrature_map[q]['Z'] + """ + Axis-wise admissibility rule for a single monomial. + + If f_deg is empty (constant monomial), then all (i,j) pairs are admissible. + """ + + # Constant monomial -> don't filter anything + if not f_deg: + return deg_i == deg_j + + axes = set(deg_i) | set(deg_j) | set(f_deg) + for d in axes: + di, dj, df = deg_i.get(d, 0), deg_j.get(d, 0), f_deg.get(d, 0) + if not (abs(di - dj) <= df): + return False + return True + + def monomial_sparsity_mask(self, f_deg: Counter, symmetric: bool = False): + """ + Sparsity mask for a single monomial term in f. + Constant monomial admits all (i,j). + """ + mask = set() + basis_keys = sorted(self.basis.keys()) + + if not f_deg: + # Constant term: admit everything + for ii, i in enumerate(basis_keys): + jstart = ii if symmetric else 0 + for j in basis_keys[jstart:]: + mask.add((i, j)) + return mask + + for ii, i in enumerate(basis_keys): + jstart = ii if symmetric else 0 + for j in basis_keys[jstart:]: + if self.admissible_pair(self.basis[i], self.basis[j], f_deg): + mask.add((i, j)) + return mask + + def polynomial_sparsity_mask(self, f_degrees: list[Counter], symmetric: bool = False): + """ + Sparsity mask for a full polynomial f, as union of monomial masks. + + Parameters + ---------- + f_degrees : list of Counters + Each Counter gives the degree structure of one monomial in f. + symmetric : bool + If True, only return pairs (i,j) with i <= j. + + Returns + ------- + mask : set of (i,j) tuples + """ + mask = set() + for f_deg in f_degrees: + mask |= self.monomial_sparsity_mask(f_deg, symmetric=symmetric) + return mask + + def decompose_matrix(self, function, sparse=False, symmetric=True): + """ + Assemble A_ij = ∫ psi_i(y) psi_j(y) f(y) w(y) dy (dense). + + Parameters + ---------- + function : PolyFunction + Callable with .degrees property for sparsity. + sparse : bool + If True, restrict to admissible pairs. + symmetric : bool + If True, compute only i ≤ j and mirror. + + Returns + ------- + A : np.ndarray + Dense (nbasis x nbasis) coefficient matrix. + """ + nbasis = self.getNumBasisFunctions() + A = np.zeros((nbasis, nbasis)) + + # Build admissible mask + if sparse: + mask = self.polynomial_sparsity_mask(function.degrees, symmetric=symmetric) else: - # use name as key - qmap = self.quadrature_map[q]['Z'] - nmap = {} - for pid in qmap.keys(): - nmap[self.getParameter(pid).param_name] = qmap[pid] - return nmap - - def Y(self, q, key='pid'): - if key == 'pid': - # use pid as key - return self.quadrature_map[q]['Y'] + if symmetric: + mask = {(i,j) for i in self.basis for j in self.basis if i <= j} + else: + mask = {(i,j) for i in self.basis for j in self.basis} + + qcache = {} + for i, j in mask: + psi_i, psi_j = self.basis[i], self.basis[j] + need = sum_degrees_union_matrix(function.max_degrees, psi_i, psi_j) + + key = tuple(sorted(need.items())) + qmap = qcache.get(key) + if qmap is None: + qmap = self.build_quadrature(need) + qcache[key] = qmap + + s = 0.0 + for q in qmap.values(): + y = q['Y'] + s += (function(y) + * self.evaluateBasisDegreesY(y, psi_i) + * self.evaluateBasisDegreesY(y, psi_j)) * q['W'] + + A[i,j] = s + if symmetric and i != j: + A[j,i] = s + + return A + + def decompose_matrix_analytic(self, function, sparse=False, symmetric=True): + """ + Assemble A_ij = ∫ psi_i(y) psi_j(y) f(y) w(y) dy (dense), + using analytic (Sympy) integration. + + Parameters + ---------- + function : PolyFunction + Polynomial function to decompose (with .degrees and .max_degrees). + sparse : bool + If True, restrict to admissible pairs. + symmetric : bool + If True, compute only i ≤ j and mirror. + + Returns + ------- + A : np.ndarray + Dense (nbasis x nbasis) coefficient matrix. + """ + import sympy as sp + + nbasis = self.getNumBasisFunctions() + A = np.zeros((nbasis, nbasis)) + + coords = self.coordinates + symbols = {cid: coord.symbol for cid, coord in coords.items()} + + # Build admissible mask + if sparse: + mask = self.polynomial_sparsity_mask(function.degrees, symmetric=symmetric) else: - # use name as key - qmap = self.quadrature_map[q]['Y'] - nmap = {} - for pid in qmap.keys(): - nmap[self.getParameter(pid).param_name] = qmap[pid] - return nmap - - ## def evalOrthoNormalBasis(self, k, q): - ## print self.Z(q) - ## return self.psi(k, self.Z(q)) - - def evalOrthoNormalBasis(self, k, q): - zq = self.Z(q) - zkey = HashableDict(zq) - try: - return self.psi_map[(k, zkey)] - except: - val = self.psi(k, zq) - self.psi_map[(k,zkey)] = val - return val - - def getQuadraturePointsWeights(self, param_nqpts_map): - """ - Return a map of k : qmap, where k is the global basis index - """ - - ## TODO generalize and store things if necessary - - params = param_nqpts_map.keys() - nqpts = param_nqpts_map.values() - - # exclude deterministic terms? - total_quadrature_points = np.prod(nqpts) - num_vars = len(params) - - # Initialize map with empty values corresponding to each key - qmap = {} - for key in range(total_quadrature_points): - qmap[key] = [] - - if num_vars == 1: - - # Get 1d-quadrature maps - map0 = self.getParameter(0).getQuadraturePointsWeights(nqpts[0]) - pid0 = self.getParameter(0).getParameterID() - - # Tensor product of 1D-quadrature to get N-D quadrature - ctr = 0 - - for i0 in range(nqpts[0]): - - yvec = { pid0 : map0['yq'][i0] } - - zvec = { pid0 : map0['zq'][i0] } - - w = map0['wq'][i0] - - data = {'Y' : yvec, 'Z' : zvec, 'W' : w} - - qmap[ctr] = data - - ctr += 1 - - - return qmap - - - elif num_vars == 2: - - # Get 1d-quadrature maps - map0 = self.getParameter(0).getQuadraturePointsWeights(nqpts[0]) - map1 = self.getParameter(1).getQuadraturePointsWeights(nqpts[1]) - - pid0 = self.getParameter(0).getParameterID() - pid1 = self.getParameter(1).getParameterID() - - # Tensor product of 1D-quadrature to get N-D quadrature - ctr = 0 - - for i0 in range(nqpts[0]): - for i1 in range(nqpts[1]): - - yvec = { pid0 : map0['yq'][i0], - pid1 : map1['yq'][i1] } - - zvec = { pid0 : map0['zq'][i0], - pid1 : map1['zq'][i1] } - - ## wvec = { pid0 : map0['wq'][i0], - ## pid1 : map1['wq'][i1], - ## pid2 : map2['wq'][i2] } - - w = map0['wq'][i0]*map1['wq'][i1] - - data = {'Y' : yvec, 'Z' : zvec, 'W' : w} - - qmap[ctr] = data - - ctr += 1 - - # Check if the sum of weights is one - - return qmap - - elif num_vars == 3: - - # Get 1d-quadrature maps - map0 = self.getParameter(0).getQuadraturePointsWeights(nqpts[0]) - map1 = self.getParameter(1).getQuadraturePointsWeights(nqpts[1]) - map2 = self.getParameter(2).getQuadraturePointsWeights(nqpts[2]) - - pid0 = self.getParameter(0).getParameterID() - pid1 = self.getParameter(1).getParameterID() - pid2 = self.getParameter(2).getParameterID() - - # Tensor product of 1D-quadrature to get N-D quadrature - ctr = 0 - - for i0 in range(nqpts[0]): - for i1 in range(nqpts[1]): - for i2 in range(nqpts[2]): - - yvec = { pid0 : map0['yq'][i0], - pid1 : map1['yq'][i1], - pid2 : map2['yq'][i2] } - - zvec = { pid0 : map0['zq'][i0], - pid1 : map1['zq'][i1], - pid2 : map2['zq'][i2] } - - ## wvec = { pid0 : map0['wq'][i0], - ## pid1 : map1['wq'][i1], - ## pid2 : map2['wq'][i2] } - - w = map0['wq'][i0]*map1['wq'][i1]*map2['wq'][i2] - - data = {'Y' : yvec, 'Z' : zvec, 'W' : w} - - qmap[ctr] = data - - ctr += 1 - - # Check if the sum of weights is one - - return qmap - - elif num_vars == 4: - - # Get 1d-quadrature maps - map0 = self.getParameter(0).getQuadraturePointsWeights(nqpts[0]) - map1 = self.getParameter(1).getQuadraturePointsWeights(nqpts[1]) - map2 = self.getParameter(2).getQuadraturePointsWeights(nqpts[2]) - map3 = self.getParameter(3).getQuadraturePointsWeights(nqpts[3]) - - pid0 = self.getParameter(0).getParameterID() - pid1 = self.getParameter(1).getParameterID() - pid2 = self.getParameter(2).getParameterID() - pid3 = self.getParameter(3).getParameterID() - - # Tensor product of 1D-quadrature to get N-D quadrature - ctr = 0 - - for i0 in range(nqpts[0]): - for i1 in range(nqpts[1]): - for i2 in range(nqpts[2]): - for i3 in range(nqpts[3]): - - yvec = { pid0 : map0['yq'][i0], - pid1 : map1['yq'][i1], - pid2 : map2['yq'][i2], - pid3 : map3['yq'][i3]} - - - zvec = { pid0 : map0['zq'][i0], - pid1 : map1['zq'][i1], - pid2 : map2['zq'][i2], - pid3 : map3['zq'][i3]} - - w = map0['wq'][i0]*map1['wq'][i1]*map2['wq'][i2]*map3['wq'][i3] - - data = {'Y' : yvec, 'Z' : zvec, 'W' : w} - - qmap[ctr] = data - - ctr += 1 - - # Check if the sum of weights is one - - return qmap - - elif num_vars == 5: - - # Get 1d-quadrature maps - map0 = self.getParameter(0).getQuadraturePointsWeights(nqpts[0]) - map1 = self.getParameter(1).getQuadraturePointsWeights(nqpts[1]) - map2 = self.getParameter(2).getQuadraturePointsWeights(nqpts[2]) - map3 = self.getParameter(3).getQuadraturePointsWeights(nqpts[3]) - map4 = self.getParameter(4).getQuadraturePointsWeights(nqpts[4]) - - pid0 = self.getParameter(0).getParameterID() - pid1 = self.getParameter(1).getParameterID() - pid2 = self.getParameter(2).getParameterID() - pid3 = self.getParameter(3).getParameterID() - pid4 = self.getParameter(4).getParameterID() - - # Tensor product of 1D-quadrature to get N-D quadrature - ctr = 0 - - for i0 in range(nqpts[0]): - for i1 in range(nqpts[1]): - for i2 in range(nqpts[2]): - for i3 in range(nqpts[3]): - for i4 in range(nqpts[4]): - - yvec = { pid0 : map0['yq'][i0], - pid1 : map1['yq'][i1], - pid2 : map2['yq'][i2], - pid3 : map3['yq'][i3], - pid4 : map4['yq'][i4]} - - - zvec = { pid0 : map0['zq'][i0], - pid1 : map1['zq'][i1], - pid2 : map2['zq'][i2], - pid3 : map3['zq'][i3], - pid4 : map4['zq'][i4]} - - w = map0['wq'][i0]*map1['wq'][i1]*map2['wq'][i2]*map3['wq'][i3]*map4['wq'][i4] - - data = {'Y' : yvec, 'Z' : zvec, 'W' : w} - - qmap[ctr] = data - - ctr += 1 - return qmap - - def projectResidual(self, elem, time, res, X, v, dv, ddv): - """ - Project the elements deterministic residual onto stochastic - basis and place in global stochastic residual array - """ - - # size of deterministic element state vector - ndisps = elem.numDisplacements() - nnodes = elem.numNodes() - nddof = ndisps*nnodes - nsdof = ndisps*self.getNumStochasticBasisTerms() - - for i in range(self.getNumStochasticBasisTerms()): - - # Initialize quadrature with number of gauss points - # necessary for i-th basis entry - self.initializeQuadrature( - self.getNumQuadraturePointsFromDegree( - self.basistermwise_parameter_degrees[i] - ) - ) - - # Quadrature Loop - rtmp = np.zeros((nddof)) - - for q in self.quadrature_map.keys(): - - # Set the parameter values into the element - elem.setParameters(self.Y(q,'name')) - - # Create space for fetching deterministic residual - # vector - resq = np.zeros((nddof)) - uq = np.zeros((nddof)) - udq = np.zeros((nddof)) - uddq = np.zeros((nddof)) - - # Obtain states at quadrature nodes - for k in range(self.num_terms): - psiky = self.evalOrthoNormalBasis(k,q) - uq[:] += v[k*nddof:(k+1)*nddof]*psiky - udq[:] += dv[k*nddof:(k+1)*nddof]*psiky - uddq[:] += ddv[k*nddof:(k+1)*nddof]*psiky - - # Fetch the deterministic element residual - elem.addResidual(time, resq, X, uq, udq, uddq) - - # Project the determinic element residual onto the - # stochastic basis and place in global residual array - psiq = self.evalOrthoNormalBasis(i,q) - alphaq = self.W(q) - rtmp[:] += resq*psiq*alphaq - - # Distribute the residual - for ii in range(nnodes): - # Local indices - listart = index(ii)*ndisps - liend = (index(ii)+1)*ndisps - gistart = index(ii)*nsdof + i*ndisps - giend = index(ii)*nsdof + (i+1)*ndisps - - # Place in global residul array node by node - #print(gistart, giend, listart, liend) - res[gistart:giend] += rtmp[listart:liend] - - # print("res=", res) - # plot_vector(res, 'stochatic-element-residual.pdf', normalize=True, precision=1.0e-6) + if symmetric: + mask = {(i, j) for i in self.basis for j in self.basis if i <= j} + else: + mask = {(i, j) for i in self.basis for j in self.basis} + + for i, j in mask: + psi_i, psi_j = self.basis[i], self.basis[j] + + # Build integrand symbolically + psi_expr_i, psi_expr_j = 1, 1 + for cid, deg in psi_i.items(): + z = coords[cid].physical_to_standard(coords[cid].symbol) + psi_expr_i *= coords[cid].psi_z(z, deg) + for cid, deg in psi_j.items(): + z = coords[cid].physical_to_standard(coords[cid].symbol) + psi_expr_j *= coords[cid].psi_z(z, deg) + + # Function f(y) expanded + f_expr = 0 + for coeff, degs in function.terms: + mon = coeff + for cid, d in degs.items(): + mon *= symbols[cid] ** d + f_expr += mon + + w_expr = sp.Mul(*[c.weight() for c in coords.values()]) + + # Full integrand: f * ψ_i * ψ_j * weight + integrand = f_expr * psi_expr_i * psi_expr_j * w_expr + + val = integrand + for cid, coord in coords.items(): + y = coord.symbol + a, b = coord.domain() + val = sp.integrate(val, (y, a, b)) + + # Simplify and cast to float + A[i, j] = float(sp.simplify(val)) + if symmetric and i != j: + A[j, i] = A[i, j] + + return A + + #-----------------------------------------------------------------# + # Consistency checks + # TestCoordinateSystem and supply the instance of cs + #-----------------------------------------------------------------# + + def check_orthonormality(self): + """ + """ + nbasis = self.getNumBasisFunctions() + A = np.zeros((nbasis, nbasis)) + for ii in range(nbasis): + for jj in range(nbasis): + A[ii,jj] = self.inner_product_basis(ii, jj) + return np.linalg.norm(A - np.eye(nbasis), ord=np.inf) + + def check_decomposition_numerical_symbolic(self, + function: PolyFunction, + sparse: bool = True, + tol=1e-10, + verbose=True): + """ + Cross-check numerical vs analytic decomposition. + """ - return - - def projectJacobian(self, - elem, - time, J, alpha, beta, gamma, - X, v, dv, ddv): - """ - Project the elements deterministic jacobian matrix onto - stochastic basis and place in global stochastic jacobian matrix - """ - # All stochastic parameters are assumed to be of degree 1 - # (constant terms) - dmapf = Counter() - for pid in self.parameter_map.keys(): - dmapf[pid] = 1 - - # size of deterministic element state vector - ndisps = elem.numDisplacements() - nnodes = elem.numNodes() - nddof = ndisps*nnodes - nsdof = ndisps*self.getNumStochasticBasisTerms() - - for i in range(self.getNumStochasticBasisTerms()): - imap = self.basistermwise_parameter_degrees[i] - - for j in range(self.getNumStochasticBasisTerms()): - jmap = self.basistermwise_parameter_degrees[j] - - smap = sparse(imap, jmap, dmapf) - - if False not in smap.values(): - - dmap = Counter() - dmap.update(imap) - dmap.update(jmap) - dmap.update(dmapf) - nqpts_map = self.getNumQuadraturePointsFromDegree(dmap) - - # Initialize quadrature with number of gauss points - # necessary for i,j-th jacobian entry - self.initializeQuadrature(nqpts_map) - - jtmp = np.zeros((nddof,nddof)) - - # Quadrature Loop - for q in self.quadrature_map.keys(): - - try: - elem.setParameters(self.Y(q,'name')) - except: - print('exception') - raise - - # Create space for fetching deterministic - # jacobian, and state vectors that go as input - Aq = np.zeros((nddof,nddof)) - uq = np.zeros((nddof)) - udq = np.zeros((nddof)) - uddq = np.zeros((nddof)) - for k in range(self.num_terms): - psiky = self.evalOrthoNormalBasis(k,q) - uq[:] += v[k*nddof:(k+1)*nddof]*psiky - udq[:] += dv[k*nddof:(k+1)*nddof]*psiky - uddq[:] += ddv[k*nddof:(k+1)*nddof]*psiky - - # Fetch the deterministic element jacobian matrix - elem.addJacobian(time, Aq, alpha, beta, gamma, X, uq, udq, uddq) - - # Project the determinic element jacobian onto the - # stochastic basis and place in the global matrix - psiziw = self.W(q)*self.evalOrthoNormalBasis(i,q) - psizjw = self.evalOrthoNormalBasis(j,q) - jtmp[:,:] += Aq*psiziw*psizjw - - # Distribute blocks (16 times) - for ii in range(0,nnodes): - for jj in range(0,nnodes): - - # Local indices - listart = index(ii)*ndisps - liend = (index(ii)+1)*ndisps - ljstart = index(jj)*ndisps - ljend = (index(jj)+1)*ndisps - - gistart = index(ii)*nsdof + i*ndisps - giend = index(ii)*nsdof + (i+1)*ndisps - gjstart = index(jj)*nsdof + j*ndisps - gjend = index(jj)*nsdof + (j+1)*ndisps - - if i == j: - J[gistart:giend, gjstart:gjend] += jtmp[listart:liend, ljstart:ljend] - else: - J[gistart:giend, gjstart:gjend] += jtmp[listart:liend, ljstart:ljend] - #J[gjstart:gjend, gistart:giend] += jtmp[listart:liend, ljstart:ljend] - - #print("J=", J) - #plot_jacobian(J, 'stochatic-element-block.pdf', normalize=True, precision=1.0e-6) - - return + # ensure basis is orthonormal first + ortho_tol = self.check_orthonormality() + + from timeit import default_timer as timer + + #-------------------------------------------------------------# + # Numerical decomposition (quadrature) + #-------------------------------------------------------------# + + start_num = timer() + coeffs_num = self.decompose(function, sparse=sparse, analytic=False) + elapsed_num = timer() - start_num + + #-------------------------------------------------------------# + # Analytic decomposition (Sympy) + #-------------------------------------------------------------# + + start_sym = timer() + coeffs_sym = self.decompose(function, sparse=sparse, analytic=True) + elapsed_sym = timer() - start_sym + + #-------------------------------------------------------------# + # Compare coefficients + #-------------------------------------------------------------# + + diffs, ok = {}, True + for k in coeffs_num.keys(): + num_val = float(coeffs_num[k]) + try: + ana_val = float(coeffs_sym[k].evalf()) + except Exception: + ana_val = float(sp.N(coeffs_sym[k], 15)) + err = abs(num_val - ana_val) + if err > tol: + ok = False + diffs[k] = (num_val, ana_val, err) + + #-------------------------------------------------------------# + # Reporting + #-------------------------------------------------------------# + + if verbose or not ok: + status = "PASSED" if ok else "FAILED" + print(f"[Consistency Check] {status} with tol = {tol}, ortho tol = {ortho_tol:.2e}") + print(f"[Elapsed Time] numerical {elapsed_num:.3e} analytic = {elapsed_sym:.3e}") + header = f"{'Basis':<7} {'numerical':>12} {'analytic':>12} {'error':>12}" + print(header) + print("-" * len(header)) + for k, (n, a, e) in diffs.items(): + print(f"{k:<7d} {n:12.6f} {a:12.6f} {e:12.2e}") + print("-" * len(header)) + + return ok, diffs + + #-----------------------------------------------------------------# + # Sparse vs full Assembly (selectively employ dot products) + #-----------------------------------------------------------------# + + def check_decomposition_numerical_sparse_full(self, function: PolyFunction, + tol=1e-12, verbose=True): + """ + Cross-check sparse vs full assembly of rank 1 decomposition + coefficients + """ + from timeit import default_timer as timer - def projectInitCond(self, elem, v, vd, vdd, xpts): - """ - Project the elements deterministic initial condition onto - stochastic basis and place in global stochastic init condition - array - """ - - # size of deterministic element state vector - ndisps = elem.numDisplacements() - nnodes = elem.numNodes() - nddof = ndisps*nnodes - nsdof = ndisps*self.getNumStochasticBasisTerms() - - for k in range(self.getNumStochasticBasisTerms()): - - # Initialize quadrature with number of gauss points - # necessary for k-th basis entry - self.initializeQuadrature( - self.getNumQuadraturePointsFromDegree( - self.basistermwise_parameter_degrees[k] - ) - ) - - # Quadrature Loop - utmp = np.zeros((nddof)) - udtmp = np.zeros((nddof)) - uddtmp = np.zeros((nddof)) - for q in self.quadrature_map.keys(): - - # Set the paramter values into the element - elem.setParameters(self.Y(q,'name')) - - # Create space for fetching deterministic initial - # conditions - uq = np.zeros((nddof)) - udq = np.zeros((nddof)) - uddq = np.zeros((nddof)) - - # Fetch the deterministic initial conditions - elem.getInitConditions(uq, udq, uddq, xpts) - - # Project the determinic initial conditions onto the - # stochastic basis - psizkw = self.W(q)*self.evalOrthoNormalBasis(k,q) - utmp += uq*psizkw - udtmp += udq*psizkw - uddtmp += uddq*psizkw - - # Distribute values - for ii in range(nnodes): - # Local indices - listart = index(ii)*ndisps - liend = (index(ii)+1)*ndisps - gistart = index(ii)*nsdof + k*ndisps - giend = index(ii)*nsdof + (k+1)*ndisps - - # Place in initial condition array node after node - v[gistart:giend] += utmp[listart:liend] - vd[gistart:giend] += udtmp[listart:liend] - vdd[gistart:giend] += uddtmp[listart:liend] - - ## # Replace numbers less than machine precision with zero to - ## # avoid numerical issues - ## if clean is True: - ## eps = np.finfo(np.float).eps - ## v[np.abs(v) < eps] = 0 - ## vd[np.abs(vd) < eps] = 0 - ## vdd[np.abs(vdd) < eps] = 0 - - return + start_sparse = timer() + coeffs_sparse = self.decompose(function, sparse = True) + elapsed_sparse = timer() - start_sparse + + start_full = timer() + coeffs_full = self.decompose(function, sparse = False) + elapsed_full = timer() - start_full + + diffs, ok = {}, True + for k in coeffs_sparse.keys(): + coeff_sparse = coeffs_sparse[k] + coeff_full = coeffs_full[k] + + err = abs(coeff_sparse - coeff_full) + if err > tol: + ok = False + + diffs[k] = (coeff_sparse, coeff_full, err) + + if verbose or not ok: + print(f"[Assembly Check] {'PASSED' if ok else 'FAILED'} with tol = {tol}") + print(f"[Elapsed Time] Sparse {elapsed_sparse} Full = {elapsed_full} Ratio = {elapsed_full/elapsed_sparse}") + header = f"{'Basis':<7} {'Sparse':>12} {'Full':>12} {'Error':>12}" + print(header) + print("-" * len(header)) + for k, (n, a, e) in diffs.items(): + print(f"{k:<7d} {float(n):12.6f} {float(a):12.6f} {float(e):12.2e}") + print("-" * len(header)) + + return ok, diffs + + def check_decomposition_matrix_sparse_full(self, function, tol=1e-12, verbose=True): + """ + Cross-check sparse vs full assembly of rank-2 (matrix) decomposition + coefficients. + """ + from timeit import default_timer as timer + + #-------------------------------------------------------------# + # Assemble sparse + full + #-------------------------------------------------------------# + + start_sparse = timer() + A_sparse = self.decompose_matrix(function, sparse=True, symmetric=True) + elapsed_sparse = timer() - start_sparse + + start_full = timer() + A_full = self.decompose_matrix(function, sparse=False, symmetric=True) + elapsed_full = timer() - start_full + + #-------------------------------------------------------------# + # Compute differences + #-------------------------------------------------------------# + + diffs, ok = {}, True + nbasis = self.getNumBasisFunctions() + for i in range(nbasis): + for j in range(nbasis): + vsparse = A_sparse[i, j] + vfull = A_full[i, j] + err = abs(vsparse - vfull) + if err > tol: + ok = False + diffs[(i, j)] = (vsparse, vfull, err) + + #---------------------------------------------------------------# + # Report + #---------------------------------------------------------------# + + if verbose or not ok: + print(f"[Matrix Assembly Check] {'PASSED' if ok else 'FAILED'} " + f"with tol = {tol}") + print(f"[Elapsed Time] Sparse {elapsed_sparse:.4e} " + f"Full {elapsed_full:.4e} " + f"Ratio {elapsed_full/elapsed_sparse:.2f}") + header = f"{'i':<3} {'j':<3} {'Sparse':>12} {'Full':>12} {'Error':>12}" + print(header) + print("-" * len(header)) + for (i, j), (vs, vf, e) in diffs.items(): + if abs(e) > tol: # only print significant diffs + print(f"{i:<3d} {j:<3d} {float(vs):12.6f} " + f"{float(vf):12.6f} {float(e):12.2e}") + print("-" * len(header)) + + return ok, diffs + + def check_decomposition_matrix_numerical_symbolic(self, function, tol=1e-12, verbose=True): + """ + Cross-check numerical vs analytic assembly of rank-2 (matrix) + decomposition coefficients. + + Parameters + ---------- + function : PolyFunction + Polynomial function to decompose. + tol : float + Absolute tolerance for consistency check. + verbose : bool + Print detailed report if True. + + Returns + ------- + ok : bool + True if all entries match within tolerance. + diffs : dict + Mapping (i,j) -> (numerical, analytic, error). + """ + from timeit import default_timer as timer + + #-------------------------------------------------------------# + # Assemble numerical + analytic + #-------------------------------------------------------------# + + start_num = timer() + A_num = self.decompose_matrix(function, sparse=True, symmetric=True) + elapsed_num = timer() - start_num + + start_an = timer() + A_an = self.decompose_matrix_analytic(function, sparse=True, symmetric=True) + elapsed_an = timer() - start_an + + #-------------------------------------------------------------# + # Compute differences + #-------------------------------------------------------------# + + diffs, ok = {}, True + nbasis = self.getNumBasisFunctions() + for i in range(nbasis): + for j in range(nbasis): + vnum = A_num[i, j] + van = A_an[i, j] + err = abs(vnum - van) + if err > tol: + ok = False + diffs[(i, j)] = (vnum, van, err) + + #-------------------------------------------------------------# + # Report + #-------------------------------------------------------------# + + if verbose or not ok: + print(f"[Matrix Numerical vs Analytic Check] {'PASSED' if ok else 'FAILED'} " + f"with tol = {tol}") + print(f"[Elapsed Time] numerical {elapsed_num:.4e} " + f"analytic {elapsed_an:.4e} " + f"Ratio {elapsed_an/elapsed_num:.2f}") + header = f"{'i':<3} {'j':<3} {'Numerical':>12} {'Analytic':>12} {'Error':>12}" + print(header) + print("-" * len(header)) + for (i, j), (vn, va, e) in diffs.items(): + if abs(e) > tol: # only print significant diffs + print(f"{i:<3d} {j:<3d} {vn:12.6f} {va:12.6f} {e:12.2e}") + print("-" * len(header)) + + return ok, diffs + +#=====================================================================# +# State Equation Interface +#=====================================================================# + +class StateEquation: + """ + Generic state equation in coefficient form: + Operator * a_state = RHS + where Operator is assembled in the CoordinateSystem basis. + """ + + def __init__(self, name, operator_fn, rhs_fn, coord_system): + """ + Parameters + ---------- + name : str + Identifier (e.g., "reconstruction", "diffusion", "helmholtz") + operator_fn : callable | PolyFunction + Defines the operator kernel f(y) for assembling A_ij = <ψ_i, f, ψ_j> + rhs_fn : callable | PolyFunction | np.ndarray + Defines RHS b_i = <ψ_i, f> or direct coefficients + coord_system : CoordinateSystem + The coordinate system in which this state equation lives. + """ + self.name = name + self.operator_fn = operator_fn + self.rhs_fn = rhs_fn + self.cs = coord_system + self.operator_matrix = None + self.rhs_vector = None + self.solution = None + + #-------------------------------------------------------------# + # Assembly + #-------------------------------------------------------------# + def assemble(self, analytic=False, sparse=True, symmetric=True): + """Assemble operator and RHS in the coordinate basis.""" + cs = self.cs + if isinstance(self.operator_fn, PolyFunction): + if analytic: + A = cs.decompose_matrix_analytic(self.operator_fn, + sparse=sparse, symmetric=symmetric) + else: + A = cs.decompose_matrix(self.operator_fn, + sparse=sparse, symmetric=symmetric) + elif callable(self.operator_fn): + raise NotImplementedError("Callable operator assembly not yet implemented") + else: + A = np.asarray(self.operator_fn) + + # RHS + if isinstance(self.rhs_fn, PolyFunction): + b = cs.decompose(self.rhs_fn, sparse=sparse) + b = np.array([b[k] for k in sorted(b.keys())]) + elif callable(self.rhs_fn): + raise NotImplementedError("Callable RHS not yet implemented") + else: + b = np.asarray(self.rhs_fn) + + self.operator_matrix = A + self.rhs_vector = b + + #-------------------------------------------------------------# + # Preconditioning / Whitening + #-------------------------------------------------------------# + def precondition(self, method="cholesky"): + """Compute and apply preconditioner P such that P⁻¹ A P⁻ᵀ ≈ I.""" + A = self.operator_matrix + if method == "cholesky": + L = np.linalg.cholesky(A) + P = L + elif method == "spectral": + eigval, eigvec = np.linalg.eigh(A) + P = eigvec @ np.diag(np.sqrt(eigval)) + else: + raise ValueError(f"Unknown preconditioner {method}") + + self.P = P + self.P_inv = np.linalg.inv(P) + self.operator_whitened = self.P_inv @ A @ self.P_inv.T + self.rhs_whitened = self.P_inv @ self.rhs_vector + + #-------------------------------------------------------------# + # Solve + #-------------------------------------------------------------# + def solve(self): + """Solve the whitened or raw system.""" + A = getattr(self, "operator_whitened", self.operator_matrix) + b = getattr(self, "rhs_whitened", self.rhs_vector) + x = np.linalg.solve(A, b) + # Back transform if whitened + if hasattr(self, "P"): + x = self.P_inv.T @ x + self.solution = x + return x + + #-------------------------------------------------------------# + # Diagnostic + #-------------------------------------------------------------# + def condition_number(self): + A = self.operator_matrix + return np.linalg.cond(A) + + def __repr__(self): + return f"StateEquation({self.name}, nbasis={self.cs.getNumBasisFunctions()})" diff --git a/pspace/orthogonal_polynomials.py b/pspace/orthogonal_polynomials.py index 62e693d..3610e19 100644 --- a/pspace/orthogonal_polynomials.py +++ b/pspace/orthogonal_polynomials.py @@ -1,6 +1,6 @@ -import numpy as np import math -from scipy import misc as sp +import numpy as np +import scipy.special as sp def tensor_indices(nterms): """ @@ -8,9 +8,9 @@ def tensor_indices(nterms): """ tot_terms = np.prod(nterms) num_vars = len(nterms) - + #print tot_terms, num_vars - + idx = {} for key in range(tot_terms): idx[key] = [] @@ -18,75 +18,104 @@ def tensor_indices(nterms): ## for term in nterms: ## for k in range(term): ## print k - + ctr = 0 for i in range(nterms[0]): for j in range(nterms[1]): for k in range(nterms[2]): ctr += 1 idx[i+j+k].append((i, j, k)) - + ## for key in idx: ## print idx[key], len(idx[key]) - + flat_list = [item for sublist in idx.values() for item in sublist] - + return flat_list -def laguerre(z,d): +def _laguerre(z,d): """ Polynomials such that _{exp(-z)}^{0,inf} = 0 """ if d == 0: - return 1.0 - 0*z + return 1.0 elif d == 1: return 1.0 - z else: den = d return ((2*d-1-z)*laguerre(z,d-1) - (d-1)*laguerre(z,d-2))/den -def unit_laguerre(z,d): +def _unit_laguerre(z,d): """ Returns unit laguerre polynomial of degree d evaluated at z """ - return laguerre(z,d) + return _laguerre(z,d) -def hermite(z, d): +def laguerre(z, d): + """ + Standard (non-associated) Laguerre polynomial L_d(z). + Orthogonal under weight exp(-z) on [0, ∞). + """ + coeffs = [0]*d + [1] # degree-d monomial + L = np.polynomial.laguerre.Laguerre(coeffs) + return L(z) + +def unit_laguerre(z, d): + """ + Orthonormal Laguerre polynomial ψ_d(z). + """ + return laguerre(z, d) / np.sqrt(1.0) # weight already normalized + +def _hermite(z, d): """ Use recursion to generate probabilist hermite polynomials Hermite polynomials are produced using exp(-z^2)/sqrt(2*pi) as the weight on trivial monomials on interval [-inf,inf]. - + """ if d == 0: - return 1.0 - 0*z + return 1.0 elif d == 1: return z else: return z*hermite(z,d-1) - (d-1)*hermite(z,d-2) -def unit_hermite(z,d): +def _unit_hermite(z,d): """ Returns units hermite polynomial of degree n evaluated at z """ - return hermite(z,d)/np.sqrt(math.factorial(d)) - -## def rlegendre(z,d): -## y = 2*z-1 #(z+1)/2.0 -## if d == 0: -## return 1.0 -## elif d == 1: -## return y -## else: -## return ((2*(d-1)+1)*y*rlegendre(y,d-1)-(d-1)*rlegendre(y,d-2))/(1.0*d) + return _hermite(z,d)/np.sqrt(math.factorial(d)) -def legendre(z, d): +def hermite(z, d): + """ + Probabilists' Hermite polynomial He_d(z). + Orthogonal under weight exp(-z^2/2) on (-∞, ∞). + """ + coeffs = [0]*d + [1] + H = np.polynomial.hermite_e.HermiteE(coeffs) + return H(z) + +def unit_hermite(z, d): + """ + Orthonormal Hermite polynomial ψ_d(z). + """ + return hermite(z, d) / np.sqrt(np.math.factorial(d)) + +def rlegendre(z, d): + if d == 0: + return 1.0 + if d == 1: + return 2.0*z - 1.0 + return ((2*d - 1) * (2*z - 1) * rlegendre(z, d-1) + - (d - 1) * rlegendre(z, d-2)) / d + +def _legendre(z, d): """ Use recursion to generate Legendre polynomials - Hermite polynomials are produced using rho(z) = 1.0 as the weight - on trivial monomials in interval [0,1]. + Legendre polynomials are produced using rho(z) = 1.0 as the weight + on trivial monomials in interval [0,1]. """ p = 0.0 for k in range(d+1): @@ -94,15 +123,53 @@ def legendre(z, d): p = p + dp return ((-1)**d)*p -def unit_legendre(z,d): - return legendre(z,d)*np.sqrt(2*d+1) +def _unit_legendre(z,d): + return _legendre(z,d)*np.sqrt(2*d+1) + +import numpy as np + +#=====================================================================# +# Shifted Legendre Basis on [0,1] +#=====================================================================# + +def legendre(z, d): + """ + Shifted Legendre polynomial of degree d on [0,1]. + Defined as P_d^*(z) = P_d(2z - 1), where P_d is the standard Legendre. + """ + coeffs = [0] * d + [1] # coefficient vector for degree d + P = np.polynomial.legendre.Legendre(coeffs) # P_d(x) on [-1,1] + return P(2*z - 1) # shift domain to [0,1] + + +def unit_legendre(z, d): + """ + L^2-orthonormal shifted Legendre polynomial on [0,1]: + ψ_d(z) = sqrt(2d+1) * P_d^*(z). + """ + return np.sqrt(2*d + 1) * legendre(z, d) + + +#=====================================================================# +# Shifted Gauss-Legendre Quadrature on [0,1] +#=====================================================================# + +def shifted_legendre_quadrature(npts): + """ + Gauss-Legendre quadrature nodes/weights shifted from [-1,1] to [0,1]. + Integrates ∫_0^1 f(z) dz exactly for deg ≤ 2npts-1. + """ + x, w = np.polynomial.legendre.leggauss(npts) # nodes/weights on [-1,1] + z = 0.5 * (x + 1) # map to [0,1] + w = 0.5 * w # rescale weights + return z, w if __name__ == "__main__": - + """ Test hermite polynomials """ - + print(" Test Hermite polynomials ") print (unit_hermite(1.2,0), hermite(1.2,0)/np.sqrt(math.factorial(0))) print (unit_hermite(1.2,1), hermite(1.2,1)/np.sqrt(math.factorial(1))) print (unit_hermite(1.2,2), hermite(1.2,2)/np.sqrt(math.factorial(2))) @@ -112,10 +179,21 @@ def unit_legendre(z,d): """ Test Legendre polynomials """ - - print ("legendre") + + print(" Test Legendre polynomials ") print (unit_legendre(1.2,0), legendre(1.2,0), rlegendre(1.2,0)) print (unit_legendre(1.2,1), legendre(1.2,1), rlegendre(1.2,1)) print (unit_legendre(1.2,2), legendre(1.2,2), rlegendre(1.2,2)) print (unit_legendre(1.2,3), legendre(1.2,3), rlegendre(1.2,3)) print (unit_legendre(1.2,4), legendre(1.2,4), rlegendre(1.2,4)) + + + """ + Test laguerre polynomials + """ + print(" Test Laguerre polynomials ") + print (unit_laguerre(1.2,0), laguerre(1.2,0)) + print (unit_laguerre(1.2,1), laguerre(1.2,1)) + print (unit_laguerre(1.2,2), laguerre(1.2,2)) + print (unit_laguerre(1.2,3), laguerre(1.2,3)) + print (unit_laguerre(1.2,4), laguerre(1.2,4)) diff --git a/pspace/stochastic_element.py b/pspace/stochastic_element.py new file mode 100644 index 0000000..8115f4d --- /dev/null +++ b/pspace/stochastic_element.py @@ -0,0 +1,582 @@ +class BasisFunction(object): + def __init__(self, basis_data): + self.id = basis_data['basis_id'] + self.name = basis_data['basis_name'] + self.degrees = basis_data['monomial_degrees'] # Counter({0: 1, 1: 0}), + + # add sanity check for whether basis vector is unitary + self.num_coords = len(self.degrees) + + def __str__(self): + return str(self.__class__.__name__) + " " + str(self.__dict__) + "\n" + + def getBasisFunctionID(self): + return self.id + + def getBasisFunctionName(self): + return self.name + +class BasisFunctionFactory: + def __init__(self): + self.next_id = 0 + + def newBasisFunctionID(self): + bid = self.next_id + self.next_id += 1 + return bid + + def __str__(self): + return str(self.__class__.__name__) + " " + str(self.__dict__) + "\n" + + def createBasisFunction(self, basis_id, basis_name, monomial_degrees): + data = {} + data['basis_id'] = basis_id + data['basis_name'] = basis_name + data['monomial_degrees'] = monomial_degrees + return BasisFunction(data) + + def checkConsistency(self, max_degree=5, npoints=20, tol=1e-12, verbose=True): + """ + Check orthonormality of unit polynomials under quadrature. + + Coordinates + ---------- + max_degree : int + Highest polynomial degree to check. + npoints : int + Number of quadrature points to use. + tol : float + Numerical tolerance for delta_{mn}. + verbose : bool + Print results if True. + + Returns + ------- + ok : bool + True if all checks pass within tolerance. + errors : list + List of (m,n,value) where error > tol. + """ + # 1D quadrature from this coordinate + qmap = self.getQuadraturePointsWeights(npoints) + z = qmap['zq'] + w = qmap['wq'] + + errors = [] + ok = True + + # check inner products + for m in range(max_degree+1): + pm = self.evalOrthoNormalBasis(z, m) + for n in range(max_degree+1): + pn = self.evalOrthoNormalBasis(z, n) + ip = np.sum(pm*pn*w) # quadrature inner product + target = 1.0 if m == n else 0.0 + if abs(ip - target) > tol: + ok = False + errors.append((m, n, ip)) + if verbose: + print(f"Fail: = {ip:.6e} (expected {target})") + if verbose and ok: + print(f"[{self.__class__.__name__}] consistency check passed " + f"for degrees ≤ {max_degree} with {npoints} points.") + return ok, errors + + def getNumQuadraturePointsFromDegree(self, dmap): + """ + Supply a map whose keys are coordinateids and values are + monomial degrees and this function will return a map with + coordinateids as keys and number of corresponding quadrature + points along the monomial dimension. + """ + ## pids = dmap.keys() + ## coord_nqpts_map = Counter() + ## for pid in pids: + ## coord_nqpts_map[pid] = nqpts(dmap[pid]) + ## return coord_nqpts_map + + pids = dmap.keys() + coord_nqpts_map = Counter() + for pid in self.coordinates.keys(): #pids: + coord_nqpts_map[pid] = self.coordinates[pid].monomial_degree #nqpts(dmap[pid]) + return coord_nqpts_map + + def getCoordinateDegreeForBasisTerm(self, coordid, kthterm): + """ + What is the polynomial degree of the corresponding k-th or + Hermite/Legendre basis function? For univariate stochastic + case d == k, but will change for multivariate case based on + tensor product or other rules used to construct the + multivariate basis set. + """ + return self.basistermwise_coordinate_degrees[kthterm][coordid] + + def getMonimialDegreeCoordinates(self): + degree_map = {} + for coordid in self.coordinates.keys(): + degree_map[coordid] = self.coordinates[coordid].monomial_degree + return degree_map # wrap with Counter() + + def getNumBasisElements(self): + return self.basis_factory.count + 1 + + def addCoordinate(self, coordinate): + # Add coordinate object to the map of coordinates + self.coordinates[coordinate.getCoordinateID()] = coordinate + + # Increase the number of stochastic terms (tensor pdt rn) + # self.num_terms = self.num_terms*new_coordinate.monomial_degree + + def initializeQuadrature(self, coord_nqpts_map): + self.quadrature_map = self.getQuadraturePointsWeights(coord_nqpts_map) + return + + def W(self, q): + wmap = self.quadrature_map[q]['W'] + return wmap + + def psi(self, k, zmap): + coordids = zmap.keys() + ans = 1.0 + for coordid in coordids: + # Deterministic ones return one! maybe we can avoid! + d = self.getCoordinateDegreeForBasisTerm(coordid, k) + val = self.getCoordinate(coordid).evalOrthoNormalBasis(zmap[coordid],d) + ans = ans*val + return ans + + def Z(self, q, key='pid'): + if key == 'pid': + # use pid as key + return self.quadrature_map[q]['Z'] + else: + # use name as key + qmap = self.quadrature_map[q]['Z'] + nmap = {} + for pid in qmap.keys(): + nmap[self.getCoordinate(pid).coord_name] = qmap[pid] + return nmap + + def Y(self, q, key='pid'): + if key == 'pid': + # use pid as key + return self.quadrature_map[q]['Y'] + else: + # use name as key + qmap = self.quadrature_map[q]['Y'] + nmap = {} + for pid in qmap.keys(): + nmap[self.getCoordinate(pid).coord_name] = qmap[pid] + return nmap + + def evalOrthoNormalBasis(self, k, q): + return self.psi(k, self.Z(q)) + + def getQuadraturePointsWeights(self, coord_nqpts_map): + """ + Return a map of quadrature point index : quadrature data (Y,Z,W). + Works for arbitrary number of random variables. + """ + pids = list(coord_nqpts_map.keys()) + nqpts = list(coord_nqpts_map.values()) + + # fetch 1D quadrature maps for each coordinate + maps = [self.getCoordinate(pid).getQuadraturePointsWeights(n) + for pid, n in zip(pids, nqpts)] + + # Cartesian product of index ranges + qmap = {} + ctr = 0 + for idx_tuple in product(*[range(n) for n in nqpts]): + yvec, zvec, w = {}, {}, 1.0 + for pid, i, m in zip(pids, idx_tuple, maps): + yvec[pid] = m['yq'][i] + zvec[pid] = m['zq'][i] + w *= m['wq'][i] + qmap[ctr] = {'Y': yvec, 'Z': zvec, 'W': w} + ctr += 1 + + return qmap + + + def checkConsistency(self, max_degree=None, tol=1e-12, verbose=True): + """ + Check orthonormality of the multivariate basis functions + under the container's quadrature. Also prints a table of + inner products for debugging. + + Coordinates + ---------- + max_degree : int or None + Maximum number of basis terms to check. If None, uses + all available basis terms. + tol : float + Numerical tolerance for delta_{ij}. + verbose : bool + Print results if True. + + Returns + ------- + ok : bool + True if all checks pass within tolerance. + errors : list + List of (i,j,value) where error > tol. + gram : np.ndarray + Inner product matrix (approximate identity). + """ + # Ensure quadrature is initialized + if not hasattr(self, "quadrature_map"): + nqpts_map = self.getNumQuadraturePoints() + self.initializeQuadrature(nqpts_map) + + nbasis = self.getNumBasisElements() + if max_degree is not None: + nbasis = min(nbasis, max_degree+1) + + gram = np.zeros((nbasis, nbasis)) + errors = [] + ok = True + + # Build Gram matrix + for i in range(nbasis): + for j in range(nbasis): + s = 0.0 + for q in self.quadrature_map.keys(): + psi_i = self.evalOrthoNormalBasis(i,q) + psi_j = self.evalOrthoNormalBasis(j,q) + wq = self.W(q) + s += psi_i * psi_j * wq + gram[i,j] = s + target = 1.0 if i == j else 0.0 + if abs(s - target) > tol: + ok = False + errors.append((i, j, s)) + + if verbose: + print(f"[CoordinateSystem] Gram matrix for {nbasis} basis terms:") + with np.printoptions(precision=3, suppress=True): + print(gram) + + if ok: + print(f"[CoordinateSystem] consistency check passed " + f"for {nbasis} basis terms.") + else: + print(f"[CoordinateSystem] FAILED: {len(errors)} inconsistencies found.") + + return ok, errors, gram + + def getSymmetricNonZeroIndices(self, dmapf): + nz = {} + N = self.getNumBasisElements() + for i in range(N): + dmapi = self.basistermwise_coordinate_degrees[i] + for j in range(i,N): + dmapj = self.basistermwise_coordinate_degrees[j] + smap = self.sparse(dmapi, dmapj, dmapf) + if False not in smap.values(): + dmap = Counter() + dmap.update(dmapi) + dmap.update(dmapj) + dmap.update(dmapf) + nqpts_map = self.getNumQuadraturePointsFromDegree(dmap) + nz[(i,j)] = nqpts_map + return nz + + def getSparseJacobian(self, f, dmapf): + # rename member functions for local readability + w = lambda q : self.W(q) + psiz = lambda i, q : self.evalOrthoNormalBasis(i,q) + + nzs = self.getSymmetricNonZeroIndices(dmapf) + N = self.getNumBasisElements() + A = np.zeros((N, N)) + for index, nqpts in nzs.items(): + self.initializeQuadrature(nqpts) + pids = self.getCoordinates().keys() + i = index[0] + j = index[1] + for q in self.quadrature_map.keys(): + val = w(q)*psiz(i,q)*psiz(j,q)*f(q) + A[i, j] += val + A[j, i] += val + return A + + def getJacobian(self, f, dmapf): + # rename member functions for local readability + w = lambda q : self.W(q) + psiz = lambda i, q : self.evalOrthoNormalBasis(i,q) + + N = self.getNumBasisElements() + A = np.zeros((N, N)) + for i in range(N): + dmapi = self.basistermwise_coordinate_degrees[i] + for j in range(N): + dmapj = self.basistermwise_coordinate_degrees[j] + + dmap = Counter() + dmap.update(dmapi) + dmap.update(dmapj) + dmap.update(dmapf) + + # add up the degree of both participating functions psizi + # and psizj to determine the total degree of integrand + nqpts_map = self.getNumQuadraturePointsFromDegree(dmap) + self.initializeQuadrature(nqpts_map) + + # Loop quadrature points + pids = self.getCoordinates().keys() + for q in self.quadrature_map.keys(): + A[i,j] += w(q)*psiz(i,q)*psiz(j,q)*f(q) + return A + + + +def index(ii): + return ii + if ii == 0: + return 0 + if ii == 1: + return 1 + if ii == 2: + return 3 + if ii == 3: + return 2 + +def projectResidual(self, elem, time, res, X, v, dv, ddv): + """ + Project the elements deterministic residual onto stochastic + basis and place in global stochastic residual array + """ + + # size of deterministic element state vector + ndisps = elem.numDisplacements() + nnodes = elem.numNodes() + nddof = ndisps*nnodes + nsdof = ndisps*self.getNumStochasticBasisTerms() + + for i in range(self.getNumStochasticBasisTerms()): + + # Initialize quadrature with number of gauss points + # necessary for i-th basis entry + self.initializeQuadrature( + self.getNumQuadraturePointsFromDegree( + self.basistermwise_parameter_degrees[i] + ) + ) + + # Quadrature Loop + rtmp = np.zeros((nddof)) + + for q in self.quadrature_map.keys(): + + # Set the parameter values into the element + elem.setParameters(self.Y(q,'name')) + + # Create space for fetching deterministic residual + # vector + resq = np.zeros((nddof)) + uq = np.zeros((nddof)) + udq = np.zeros((nddof)) + uddq = np.zeros((nddof)) + + # Obtain states at quadrature nodes + for k in range(self.num_terms): + psiky = self.evalOrthoNormalBasis(k,q) + uq[:] += v[k*nddof:(k+1)*nddof]*psiky + udq[:] += dv[k*nddof:(k+1)*nddof]*psiky + uddq[:] += ddv[k*nddof:(k+1)*nddof]*psiky + + # Fetch the deterministic element residual + elem.addResidual(time, resq, X, uq, udq, uddq) + + # Project the determinic element residual onto the + # stochastic basis and place in global residual array + psiq = self.evalOrthoNormalBasis(i,q) + alphaq = self.W(q) + rtmp[:] += resq*psiq*alphaq + + # Distribute the residual + for ii in range(nnodes): + # Local indices + listart = index(ii)*ndisps + liend = (index(ii)+1)*ndisps + gistart = index(ii)*nsdof + i*ndisps + giend = index(ii)*nsdof + (i+1)*ndisps + + # Place in global residul array node by node + #print(gistart, giend, listart, liend) + res[gistart:giend] += rtmp[listart:liend] + + # print("res=", res) + # plot_vector(res, 'stochatic-element-residual.pdf', normalize=True, precision=1.0e-6) + + return + +def projectJacobian(self, + elem, + time, J, alpha, beta, gamma, + X, v, dv, ddv): + """ + Project the elements deterministic jacobian matrix onto + stochastic basis and place in global stochastic jacobian matrix + """ + # All stochastic parameters are assumed to be of degree 1 + # (constant terms) + dmapf = Counter() + for pid in self.parameter_map.keys(): + dmapf[pid] = 1 + + # size of deterministic element state vector + ndisps = elem.numDisplacements() + nnodes = elem.numNodes() + nddof = ndisps*nnodes + nsdof = ndisps*self.getNumStochasticBasisTerms() + + for i in range(self.getNumStochasticBasisTerms()): + imap = self.basistermwise_parameter_degrees[i] + + for j in range(self.getNumStochasticBasisTerms()): + jmap = self.basistermwise_parameter_degrees[j] + + smap = sparse(imap, jmap, dmapf) + + if False not in smap.values(): + + dmap = Counter() + dmap.update(imap) + dmap.update(jmap) + dmap.update(dmapf) + nqpts_map = self.getNumQuadraturePointsFromDegree(dmap) + + # Initialize quadrature with number of gauss points + # necessary for i,j-th jacobian entry + self.initializeQuadrature(nqpts_map) + + jtmp = np.zeros((nddof,nddof)) + + # Quadrature Loop + for q in self.quadrature_map.keys(): + + try: + elem.setParameters(self.Y(q,'name')) + except: + print('exception') + raise + + # Create space for fetching deterministic + # jacobian, and state vectors that go as input + Aq = np.zeros((nddof,nddof)) + uq = np.zeros((nddof)) + udq = np.zeros((nddof)) + uddq = np.zeros((nddof)) + for k in range(self.num_terms): + psiky = self.evalOrthoNormalBasis(k,q) + uq[:] += v[k*nddof:(k+1)*nddof]*psiky + udq[:] += dv[k*nddof:(k+1)*nddof]*psiky + uddq[:] += ddv[k*nddof:(k+1)*nddof]*psiky + + # Fetch the deterministic element jacobian matrix + elem.addJacobian(time, Aq, alpha, beta, gamma, X, uq, udq, uddq) + + # Project the determinic element jacobian onto the + # stochastic basis and place in the global matrix + psiziw = self.W(q)*self.evalOrthoNormalBasis(i,q) + psizjw = self.evalOrthoNormalBasis(j,q) + jtmp[:,:] += Aq*psiziw*psizjw + + # Distribute blocks (16 times) + for ii in range(0,nnodes): + for jj in range(0,nnodes): + + # Local indices + listart = index(ii)*ndisps + liend = (index(ii)+1)*ndisps + ljstart = index(jj)*ndisps + ljend = (index(jj)+1)*ndisps + + gistart = index(ii)*nsdof + i*ndisps + giend = index(ii)*nsdof + (i+1)*ndisps + gjstart = index(jj)*nsdof + j*ndisps + gjend = index(jj)*nsdof + (j+1)*ndisps + + if i == j: + J[gistart:giend, gjstart:gjend] += jtmp[listart:liend, ljstart:ljend] + else: + J[gistart:giend, gjstart:gjend] += jtmp[listart:liend, ljstart:ljend] + #J[gjstart:gjend, gistart:giend] += jtmp[listart:liend, ljstart:ljend] + + #print("J=", J) + #plot_jacobian(J, 'stochatic-element-block.pdf', normalize=True, precision=1.0e-6) + + return + +def projectInitCond(self, elem, v, vd, vdd, xpts): + """ + Project the elements deterministic initial condition onto + stochastic basis and place in global stochastic init condition + array + """ + + # size of deterministic element state vector + ndisps = elem.numDisplacements() + nnodes = elem.numNodes() + nddof = ndisps*nnodes + nsdof = ndisps*self.getNumStochasticBasisTerms() + + for k in range(self.getNumStochasticBasisTerms()): + + # Initialize quadrature with number of gauss points + # necessary for k-th basis entry + self.initializeQuadrature( + self.getNumQuadraturePointsFromDegree( + self.basistermwise_parameter_degrees[k] + ) + ) + + # Quadrature Loop + utmp = np.zeros((nddof)) + udtmp = np.zeros((nddof)) + uddtmp = np.zeros((nddof)) + for q in self.quadrature_map.keys(): + + # Set the paramter values into the element + elem.setParameters(self.Y(q,'name')) + + # Create space for fetching deterministic initial + # conditions + uq = np.zeros((nddof)) + udq = np.zeros((nddof)) + uddq = np.zeros((nddof)) + + # Fetch the deterministic initial conditions + elem.getInitConditions(uq, udq, uddq, xpts) + + # Project the determinic initial conditions onto the + # stochastic basis + psizkw = self.W(q)*self.evalOrthoNormalBasis(k,q) + utmp += uq*psizkw + udtmp += udq*psizkw + uddtmp += uddq*psizkw + + # Distribute values + for ii in range(nnodes): + # Local indices + listart = index(ii)*ndisps + liend = (index(ii)+1)*ndisps + gistart = index(ii)*nsdof + k*ndisps + giend = index(ii)*nsdof + (k+1)*ndisps + + # Place in initial condition array node after node + v[gistart:giend] += utmp[listart:liend] + vd[gistart:giend] += udtmp[listart:liend] + vdd[gistart:giend] += uddtmp[listart:liend] + + ## # Replace numbers less than machine precision with zero to + ## # avoid numerical issues + ## if clean is True: + ## eps = np.finfo(np.float).eps + ## v[np.abs(v) < eps] = 0 + ## vd[np.abs(vd) < eps] = 0 + ## vdd[np.abs(vdd) < eps] = 0 + + return diff --git a/pspace/stochastic_utils.py b/pspace/stochastic_utils.py index f245ffc..216667e 100644 --- a/pspace/stochastic_utils.py +++ b/pspace/stochastic_utils.py @@ -1,113 +1,183 @@ -from __future__ import print_function - import numpy as np + from collections import Counter +from itertools import product +import math + +def sum_degrees(*counters): + """Per-axis sum of Counter degrees.""" + out = Counter() + for c in counters: + if c is None: + continue + out.update(c) # Counter adds per-key + return out + +def safe_zero_degrees(coord_ids): + """Counter with zeros for all coord ids.""" + return Counter({cid: 0 for cid in coord_ids}) + +def generate_basis_adaptive(f_indices, m): + """Adaptive basis: closure of f's monomials under m-fold products.""" + if m == 0: + return {(0,) * len(f_indices[0])} + combos = product(f_indices, repeat=m) + return {tuple(sum(idx[i] for idx in combo) for i in range(len(combo[0]))) + for combo in combos} -## def get_tensor_quadrature_index(k, param_nqpts_map): -## pidx = {} -## pids = param_nqpts_map.keys() -## nqpts = param_nqpts_map.values() - - -## if k < nqpt[0]: -## return [k,0,0] -## elif nqpt[0] - k: -## return [ - -## for (pid,nqpt) in zip(pids,nqpts): -## print pid, nqpt -## if k < nqpt:mod(nqpt-k,k) -## pidx[pid] = mod( - -## return - -def sparse(dmapi, dmapj, dmapf): - smap = {} - for key in dmapi.keys(): - if abs(dmapi[key] - dmapj[key]) <= dmapf[key]: - smap[key] = True - else: - smap[key] = False - return smap - -def nqpts(pdeg): +def minnum_quadrature_points(degree): """ Return the number of quadrature points necessary to integrate the monomial of degree deg. """ - return max(pdeg/2+1,1) #1 + pdeg/2 #max(deg/2+1,1) + return math.ceil((degree+1)/2) + +def generate_basis_tensor_degree(max_degree_params): + """ + Construct a tensor-product index map for basis functions. + + Parameters + ---------- + max_degree_params : dict + Map {param_id : max_degree}, where max_degree is the highest + polynomial degree to include for that parameter. + + Returns + ------- + term_polynomial_degree : dict + Map {basis_index : Counter({pid: degree, ...})}. + Each entry gives the parameterwise polynomial degrees + for that basis term. + """ + pids = list(max_degree_params.keys()) # parameter IDs + pdegs = list(max_degree_params.values()) # max degrees (exclusive upper bound) + + total_tensor_basis_terms = int(np.prod(pdegs)) + + basis = {} + k = 0 + for degrees in product(*[range(d+1) for d in pdegs]): + basis[k] = Counter({pid: deg for pid, deg in zip(pids, degrees)}) + k += 1 + + return basis + +def generate_basis_total_degree(max_degree_params): + """ + Construct a total-degree index map for basis functions. + + Parameters + ---------- + max_degree_params : dict + Map {param_id : max_degree}, where max_degree is the highest + polynomial degree allowed *per dimension*. + + Returns + ------- + basis : dict + Map {basis_index : Counter({pid: degree, ...})}. + Each entry gives the parameterwise polynomial degrees + for that basis term, with sum(degrees) <= max(total_degrees). + """ + pids = list(max_degree_params.keys()) + pdegs = list(max_degree_params.values()) + + max_total_degree = max(pdegs) + + basis = {} + k = 0 + for degrees in product(*[range(d+1) for d in pdegs]): + if sum(degrees) <= max_total_degree: + basis[k] = Counter({pid: deg for pid, deg in zip(pids, degrees)}) + k += 1 + + return basis + +def sum_degrees_union_vector(f_degrees, psi_k: Counter) -> Counter: + """ + Compute required quadrature degree for product f(y)*ψ_k(y). + + Parameters + ---------- + f_degrees : Counter or list[Counter] + Degree structures of f. Each Counter is {axis: degree}. + Can be a single Counter (max degrees) or list of monomial degrees. + psi_k : Counter + Degree structure of basis function ψ_k. -def tensor_indices(phdmap): + Returns + ------- + Counter + Max degree needed per axis to exactly integrate f * ψ_k. + + Notes + ----- + - For each axis d, the required degree is: + deg(d) = max over monomials [ f_deg(d) + psi_k(d) ] + - Ensures enough quadrature order for vector decomposition. + + Example + ------- + >>> f_degrees = [Counter({0:2, 1:1})] # y0^2 * y1 + >>> psi_k = Counter({1:2}) # y1^2 + >>> sum_degrees_union_vector(f_degrees, psi_k) + Counter({0: 2, 1: 3}) # product ~ y0^2 * y1^3 """ - Get basis functions indices based on tensor product + degs = Counter() + + # Normalize input + if isinstance(f_degrees, Counter): + f_degrees = [f_degrees] + + for f_deg in f_degrees: + axes = set(f_deg) | set(psi_k) + for a in axes: + total = f_deg.get(a, 0) + psi_k.get(a, 0) + degs[a] = max(degs.get(a, 0), total) + + return degs + +def sum_degrees_union_matrix(f_degrees, psi_i: Counter, psi_j: Counter) -> Counter: """ - pids = phdmap.keys() # parameter IDs - pdegs = phdmap.values() # parameter degrees - - # Exclude deterministic terms - total_tensor_basis_terms = np.prod(pdegs) - num_vars = len(pdegs) - - # Initialize map with empty values corresponding to each key - idx = {} - for key in range(total_tensor_basis_terms): - idx[key] = [] - - # Add actual values into the map - if num_vars == 1: - ctr = 0 - for k0 in range(pdegs[0]): - idx[k0].append(Counter({pids[0]:k0})) # add one element tuple to map - ctr += 1 - elif num_vars == 2: - ctr = 0 - for k0 in range(pdegs[0]): - for k1 in range(pdegs[1]): - idx[k0+k1].append(Counter({pids[0]:k0, - pids[1]:k1})) # add two element tuple to map - ctr += 1 - elif num_vars == 3: - ctr = 0 - for k0 in range(pdegs[0]): - for k1 in range(pdegs[1]): - for k2 in range(pdegs[2]): - idx[k0+k1+k2].append(Counter({pids[0]:k0, - pids[1]:k1, - pids[2]:k2})) # add three element tuple to map - ctr += 1 - elif num_vars == 4: - ctr = 0 - for k0 in range(pdegs[0]): - for k1 in range(pdegs[1]): - for k2 in range(pdegs[2]): - for k3 in range(pdegs[3]): - idx[k0+k1+k2+k3].append(Counter({pids[0]:k0, - pids[1]:k1, - pids[2]:k2, - pids[3]:k3})) # add four element tuple to map - ctr += 1 - elif num_vars == 5: - ctr = 0 - for k0 in range(pdegs[0]): - for k1 in range(pdegs[1]): - for k2 in range(pdegs[2]): - for k3 in range(pdegs[3]): - for k4 in range(pdegs[4]): - idx[k0+k1+k2+k3+k4].append(Counter({pids[0]:k0, - pids[1]:k1, - pids[2]:k2, - pids[3]:k3, - pids[4]:k4})) # add five element tuple to map - ctr += 1 - else: - print('fix implementation for more elements in tuple') - raise - - # Make a flat list - flat_list = [item for sublist in idx.values() for item in sublist] - - # Convert to map - term_polynomial_degree = {} - for k in range(total_tensor_basis_terms): - term_polynomial_degree[k] = flat_list[k] - return term_polynomial_degree + Compute required quadrature degree for product f(y)*ψ_i(y)*ψ_j(y). + + Parameters + ---------- + f_degrees : Counter or list[Counter] + Degree structures of f. Each Counter is {axis: degree}. + Can be a single Counter (max degrees) or list of monomial degrees. + psi_i, psi_j : Counter + Degree structures of basis functions ψ_i and ψ_j. + + Returns + ------- + Counter + Max degree needed per axis to exactly integrate f * ψ_i * ψ_j. + + Notes + ----- + - For each axis d, the required degree is: + deg(d) = max over monomials [ f_deg(d) + psi_i(d) + psi_j(d) ] + - Ensures enough quadrature order for multivariate products. + + Example + ------- + >>> f_degrees = [Counter({0:2, 1:1})] # y0^2 * y1 + >>> psi_i = Counter({0:1}) # y0 + >>> psi_j = Counter({1:2}) # y1^2 + >>> sum_degrees_union(f_degrees, psi_i, psi_j) + Counter({0: 3, 1: 3}) # product ~ y0^3 * y1^3 + """ + degs = Counter() + + # Normalize input + if isinstance(f_degrees, Counter): + f_degrees = [f_degrees] + + for f_deg in f_degrees: + axes = set(f_deg) | set(psi_i) | set(psi_j) + for a in axes: + total = f_deg.get(a, 0) + psi_i.get(a, 0) + psi_j.get(a, 0) + degs[a] = max(degs.get(a, 0), total) + + return degs diff --git a/tests/python_check.py b/tests/python_check.py index 39e23f1..59ba2be 100644 --- a/tests/python_check.py +++ b/tests/python_check.py @@ -1,7 +1,9 @@ import sys from os import path sys.path.append( path.dirname( path.dirname( path.abspath(__file__) ) ) ) + import numpy as np + from pspace.core import ParameterFactory, ParameterContainer # Create "Parameter" using "Parameter Factory" object @@ -11,7 +13,7 @@ m = pfactory.createExponentialParameter('m', dict(mu=6.0, beta=1.0), 5) d = pfactory.createUniformParameter('d', dict(a=-5.0, b=4.0), 5) e = pfactory.createExponentialParameter('e', dict(mu=6.0, beta=1.0), 5) - + # Add "Parameter" into "ParameterContainer" pc = ParameterContainer() pc.addParameter(c) @@ -24,7 +26,8 @@ pc.initializeQuadrature({0:5,1:5,2:5,3:5,4:5}) N = pc.getNumStochasticBasisTerms() -print N +print("Number of basis terms: ", N) + for k in range(N): pids = pc.getParameters().keys() for q in pc.quadrature_map.keys(): diff --git a/tests/stress_test_matrix_decomposition.py b/tests/stress_test_matrix_decomposition.py new file mode 100644 index 0000000..2a52581 --- /dev/null +++ b/tests/stress_test_matrix_decomposition.py @@ -0,0 +1,123 @@ +#=====================================================================# +# Stress tests for randomized matrix decomposition in randomly chosen +# N-dimensional coordinate systems +#---------------------------------------------------------------------# +# Focus: +# - Sparse vs Full assembly of matrix decomposition +# - Timing summaries across multiple trials +#---------------------------------------------------------------------# +# Author : Komahan Boopathy (komahan@gatech.edu) +#=====================================================================# + +import pytest +import random +import numpy as np + +# core imports +from pspace.core import (CoordinateFactory, + CoordinateSystem, + BasisFunctionType, + PolyFunction) + +# local imports +from .test_utils import (random_coordinate, + random_polynomial, + get_coordinate_system_type) + + + +#=====================================================================# +# Parameters for stress regime +#=====================================================================# + +MAX_DEG = 5 # push polynomial/basis degree +MAX_COORD = 5 # up to 5D coordinates +TRIALS = 3 # fewer trials (stress is heavier) +TOL = 1e-6 + +# Timing collectors +tensor_timings = [] +total_timings = [] + +#=====================================================================# +# Stress Test A: Tensor Degree Basis +#=====================================================================# + +@pytest.mark.parametrize("trial", range(TRIALS)) +def test_stress_tensor_matrix_sparse_full(trial): + random.seed(3000 + trial) + + print(f"\n=== Stress Trial {trial} : Matrix TENSOR Basis (sparse vs full) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TENSOR_DEGREE, + max_deg=MAX_DEG, + max_coords=MAX_COORD) + + polynomial_function = random_polynomial(cs, max_deg=MAX_DEG, max_cross_terms=MAX_DEG) + + ok, diffs = cs.check_decomposition_matrix_sparse_full(polynomial_function, + tol=1e-6, + verbose=True) + assert ok + + # record timing ratio if available + from timeit import default_timer as timer + start_sparse = timer(); cs.decompose_matrix(polynomial_function, sparse=True); elapsed_sparse = timer() - start_sparse + start_full = timer(); cs.decompose_matrix(polynomial_function, sparse=False); elapsed_full = timer() - start_full + tensor_timings.append(elapsed_full / elapsed_sparse) + + +@pytest.mark.parametrize("finalize", [True]) +def test_tensor_timing_summary(finalize): + if not tensor_timings: + pytest.skip("No tensor timings recorded") + + ratios = np.array(tensor_timings) + print("\n=== Timing Summary: Tensor Basis Matrix Decomposition ===") + print(f"Trials : {len(ratios)}") + print(f"Mean ratio : {ratios.mean():.3f}") + print(f"Std. dev : {ratios.std():.3f}") + print(f"Min ratio : {ratios.min():.3f}") + print(f"Max ratio : {ratios.max():.3f}") + + +#=====================================================================# +# Stress Test B: Total Degree Basis +#=====================================================================# + +@pytest.mark.parametrize("trial", range(TRIALS)) +def test_stress_total_matrix_sparse_full(trial): + random.seed(3000 + trial) + + print(f"\n=== Stress Trial {trial} : Matrix TOTAL Basis (sparse vs full) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TOTAL_DEGREE, + max_deg=MAX_DEG, + max_coords=MAX_COORD) + + polynomial_function = random_polynomial(cs, max_deg=MAX_DEG, max_cross_terms=MAX_DEG) + + ok, diffs = cs.check_decomposition_matrix_sparse_full(polynomial_function, + tol=TOL, + verbose=True) + assert ok + + # record timing ratio if available + from timeit import default_timer as timer + start_sparse = timer(); cs.decompose_matrix(polynomial_function, sparse=True); elapsed_sparse = timer() - start_sparse + start_full = timer(); cs.decompose_matrix(polynomial_function, sparse=False); elapsed_full = timer() - start_full + total_timings.append(elapsed_full / elapsed_sparse) + + +@pytest.mark.parametrize("finalize", [True]) +def test_total_timing_summary(finalize): + if not total_timings: + pytest.skip("No total timings recorded") + + ratios = np.array(total_timings) + print("\n=== Timing Summary: Total Basis Matrix Decomposition ===") + print(f"Trials : {len(ratios)}") + print(f"Mean ratio : {ratios.mean():.3f}") + print(f"Std. dev : {ratios.std():.3f}") + print(f"Min ratio : {ratios.min():.3f}") + print(f"Max ratio : {ratios.max():.3f}") diff --git a/tests/stress_test_vector_decomposition.py b/tests/stress_test_vector_decomposition.py new file mode 100644 index 0000000..e5e3258 --- /dev/null +++ b/tests/stress_test_vector_decomposition.py @@ -0,0 +1,119 @@ +#=====================================================================# +# Stress tests: vector decomposition in high-dim / high-degree setups +#---------------------------------------------------------------------# +# Goals: +# - probe quadrature sufficiency (.max_degrees logic) +# - probe sparsity detection (.degrees masks) +# - catch conditioning errors in Normal/Exponential distributions +# - collect timing ratios for sparse vs full assembly +#---------------------------------------------------------------------# +# Author : Komahan Boopathy (komahan@gatech.edu) +#=====================================================================# + +import pytest +import random +import numpy as np +from timeit import default_timer as timer + +from pspace.core import (CoordinateFactory, + CoordinateSystem, + BasisFunctionType, + PolyFunction) + +from .test_utils import (random_coordinate, + random_polynomial, + get_coordinate_system_type) + +from .test_utils import ENABLE_ANALYTIC_TESTS + +#=====================================================================# +# Parameters for stress regime +#=====================================================================# + +MAX_DEG = 5 # push polynomial/basis degree +MAX_COORD = 5 # up to 5D coordinates +TRIALS = 3 # fewer trials (stress is heavier) +TOL = 1e-6 + +# Timing collectors +tensor_timings = [] +total_timings = [] + +#=====================================================================# +# Sparse vs full stress tests +#=====================================================================# + +@pytest.mark.parametrize("trial", range(TRIALS)) +def test_stress_tensor_sparse_full(trial): + random.seed(3000 + trial) + print(f"\n=== STRESS Trial {trial} : Tensor Basis (sparse vs full) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TENSOR_DEGREE, + max_deg=MAX_DEG, + max_coords=MAX_COORD) + + polynomial_function = random_polynomial(cs, max_deg=MAX_DEG, max_cross_terms=MAX_DEG) + + ok, diffs = cs.check_decomposition_numerical_sparse_full(polynomial_function, + tol=TOL, + verbose=False) + assert ok + + # Timing comparison + start_sparse = timer(); cs.decompose(polynomial_function, sparse=True); elapsed_sparse = timer() - start_sparse + start_full = timer(); cs.decompose(polynomial_function, sparse=False); elapsed_full = timer() - start_full + tensor_timings.append(elapsed_full / elapsed_sparse) + + +@pytest.mark.parametrize("trial", range(TRIALS)) +def test_stress_total_sparse_full(trial): + random.seed(4000 + trial) + print(f"\n=== STRESS Trial {trial} : Total Basis (sparse vs full) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TOTAL_DEGREE, + max_deg=MAX_DEG, + max_coords=MAX_COORD) + + polynomial_function = random_polynomial(cs, max_deg=MAX_DEG, max_cross_terms=MAX_DEG) + + ok, diffs = cs.check_decomposition_numerical_sparse_full(polynomial_function, + tol=TOL, + verbose=False) + assert ok + + # Timing comparison + start_sparse = timer(); cs.decompose(polynomial_function, sparse=True); elapsed_sparse = timer() - start_sparse + start_full = timer(); cs.decompose(polynomial_function, sparse=False); elapsed_full = timer() - start_full + total_timings.append(elapsed_full / elapsed_sparse) + + +#=====================================================================# +# Timing summaries +#=====================================================================# + +@pytest.mark.parametrize("finalize", [True]) +def test_tensor_timing_summary(finalize): + if not tensor_timings: + pytest.skip("No tensor timings recorded") + + ratios = np.array(tensor_timings) + print("\n=== Timing Summary: Tensor Basis Vector Decomposition ===") + print(f"Trials : {len(ratios)}") + print(f"Mean ratio : {ratios.mean():.3f}") + print(f"Std. dev : {ratios.std():.3f}") + print(f"Min ratio : {ratios.min():.3f}") + print(f"Max ratio : {ratios.max():.3f}") + + +@pytest.mark.parametrize("finalize", [True]) +def test_total_timing_summary(finalize): + if not total_timings: + pytest.skip("No total timings recorded") + + ratios = np.array(total_timings) + print("\n=== Timing Summary: Total Basis Vector Decomposition ===") + print(f"Trials : {len(ratios)}") + print(f"Mean ratio : {ratios.mean():.3f}") + print(f"Std. dev : {ratios.std():.3f}") + print(f"Min ratio : {ratios.min():.3f}") + print(f"Max ratio : {ratios.max():.3f}") diff --git a/tests/test.py b/tests/test.py index 6d55e48..4bea73e 100644 --- a/tests/test.py +++ b/tests/test.py @@ -4,24 +4,24 @@ import numpy as np from pspace.core import ParameterFactory, ParameterContainer -def univariate(dmax): +def univariate(dmax): # Create "Parameter" using "Parameter Factory" object pfactory = ParameterFactory() c = pfactory.createNormalParameter('c', dict(mu=4.0, sigma=0.50), dmax[0]) - + # Add "Parameter" into "ParameterContainer" pc = ParameterContainer() pc.addParameter(c) pc.initialize() test(pc) - -def bivariate(dmax): + +def bivariate(dmax): # Create "Parameter" using "Parameter Factory" object pfactory = ParameterFactory() c = pfactory.createNormalParameter('c', dict(mu=4.0, sigma=0.50), dmax[0]) k = pfactory.createExponentialParameter('k', dict(mu=4.0, beta=0.50), dmax[1]) - + # Add "Parameter" into "ParameterContainer" pc = ParameterContainer() pc.addParameter(c) @@ -29,14 +29,14 @@ def bivariate(dmax): pc.initialize() test(pc) - -def trivariate(dmax): + +def trivariate(dmax): # Create "Parameter" using "Parameter Factory" object pfactory = ParameterFactory() c = pfactory.createNormalParameter('c', dict(mu=4.0, sigma=0.50), dmax[0]) k = pfactory.createExponentialParameter('k', dict(mu=4.0, beta=0.50), dmax[1]) m = pfactory.createUniformParameter('m', dict(a=-5.0, b=4.0), dmax[2]) - + # Add "Parameter" into "ParameterContainer" pc = ParameterContainer() pc.addParameter(c) @@ -45,15 +45,15 @@ def trivariate(dmax): pc.initialize() test(pc) - -def quadvariate(dmax): + +def quadvariate(dmax): # Create "Parameter" using "Parameter Factory" object pfactory = ParameterFactory() c = pfactory.createNormalParameter('c', dict(mu=4.0, sigma=0.50), dmax[0]) k = pfactory.createExponentialParameter('k', dict(mu=4.0, beta=0.50), dmax[1]) m = pfactory.createUniformParameter('m', dict(a=-5.0, b=4.0), dmax[2]) d = pfactory.createUniformParameter('m', dict(a=-5.0, b=4.0), dmax[3]) - + # Add "Parameter" into "ParameterContainer" pc = ParameterContainer() pc.addParameter(c) @@ -65,7 +65,7 @@ def quadvariate(dmax): test(pc) -def pentavariate(dmax): +def pentavariate(dmax): # Create "Parameter" using "Parameter Factory" object pfactory = ParameterFactory() c = pfactory.createNormalParameter('c', dict(mu=4.0, sigma=0.50), dmax[0]) @@ -73,7 +73,7 @@ def pentavariate(dmax): m = pfactory.createUniformParameter('m', dict(a=-5.0, b=4.0), dmax[2]) d = pfactory.createUniformParameter('m', dict(a=-5.0, b=4.0), dmax[3]) e = pfactory.createExponentialParameter('k', dict(mu=4.0, beta=0.50), dmax[4]) - + # Add "Parameter" into "ParameterContainer" pc = ParameterContainer() pc.addParameter(c) @@ -89,16 +89,16 @@ def pentavariate(dmax): def test(pc): # Test getting ND quadrature points N = pc.getNumStochasticBasisTerms() - - A = np.zeros((N, N)) + + A = np.zeros((N, N)) for i in range(N): - + dmapi = pc.basistermwise_parameter_degrees[i] param_nqpts_mapi = pc.getNumQuadraturePointsFromDegree(dmapi) - + for j in range(N): - + dmapj = pc.basistermwise_parameter_degrees[j] param_nqpts_mapj = pc.getNumQuadraturePointsFromDegree(dmapj) @@ -127,22 +127,97 @@ def gy(q,pid): pids = pc.getParameters().keys() for q in pc.quadrature_map.keys(): A[i,j] += w(q)*psiz(i,q)*psiz(j,q) - - assert(np.allclose(A, np.eye(A.shape[0])) == True) + + assert(np.allclose(A, np.eye(A.shape[0])) == True) def testall(nmax): for i in range(nmax): print (i, univariate([i+1])) for j in range(nmax): print (i, j, bivariate([i+1,j+1])) - for k in range(nmax): + for k in range(nmax): print (i, j, k, trivariate([i+1,j+1,k+1])) - for l in range(nmax): + for l in range(nmax): print (i, j, k, l, quadvariate([i+1,j+1,k+1, l+1])) - for m in range(nmax): + for m in range(nmax): print (i, j, k, l, m, pentavariate([i+1,j+1,k+1, l+1, m+1])) -if __name__ == '__main__': - - for n in range(5): - testall(n+1) +def test_standard_uniform(): + factory = ParameterFactory() + dist_params = {'a': 0.0, 'b': 1.0} # Uniform(0,1) + param = factory.createUniformParameter("u_std", dist_params, monomial_degree=5) + ok, errors = param.checkConsistency(max_degree=4, npoints=10) + assert ok, f"Standard Uniform consistency failed: {errors}" + print("Standard UniformParameter consistency check passed.") + +def test_nonstandard_uniform(): + factory = ParameterFactory() + dist_params = {'a': 2.0, 'b': 5.0} # Uniform(2,5) — non-standard + param = factory.createUniformParameter("u_nonstd", dist_params, monomial_degree=5) + ok, errors = param.checkConsistency(max_degree=4, npoints=10) + assert ok, f"Non-standard Uniform consistency failed: {errors}" + print("Non-standard UniformParameter consistency check passed.") + +def test_standard_normal(): + factory = ParameterFactory() + dist_params = {'mu': 0.0, 'sigma': 1.0} # Standard Normal N(0,1) + param = factory.createNormalParameter("n_std", dist_params, monomial_degree=5) + ok, errors = param.checkConsistency(max_degree=4, npoints=10) + assert ok, f"Standard Normal consistency failed: {errors}" + print("Standard NormalParameter consistency check passed.") + +def test_nonstandard_normal(): + factory = ParameterFactory() + dist_params = {'mu': 3.0, 'sigma': 2.0} # Non-standard Normal N(3,4) + param = factory.createNormalParameter("n_nonstd", dist_params, monomial_degree=5) + ok, errors = param.checkConsistency(max_degree=4, npoints=10) + assert ok, f"Non-standard Normal consistency failed: {errors}" + print("Non-standard NormalParameter consistency check passed.") + +def test_standard_exponential(): + factory = ParameterFactory() + dist_params = {'mu': 0.0, 'beta': 1.0} # Standard Exponential Exp(1) + param = factory.createExponentialParameter("e_std", dist_params, monomial_degree=5) + ok, errors = param.checkConsistency(max_degree=4, npoints=10) + assert ok, f"Standard Exponential consistency failed: {errors}" + print("Standard ExponentialParameter consistency check passed.") + +def test_nonstandard_exponential(): + factory = ParameterFactory() + dist_params = {'mu': 1.0, 'beta': 2.0} # Shifted/Scaled Exponential + param = factory.createExponentialParameter("e_nonstd", dist_params, monomial_degree=5) + ok, errors = param.checkConsistency(max_degree=4, npoints=10) + assert ok, f"Non-standard Exponential consistency failed: {errors}" + print("Non-standard ExponentialParameter consistency check passed.") + +def test_container_consistency(): + factory = ParameterFactory() + pc = ParameterContainer() + + # Mix of standard and non-standard + pc.addParameter(factory.createUniformParameter("u", {"a":0.0,"b":1.0}, monomial_degree=3)) + pc.addParameter(factory.createNormalParameter("n", {"mu":2.0,"sigma":1.5}, monomial_degree=3)) + pc.addParameter(factory.createExponentialParameter("e", {"mu":1.0,"beta":0.5}, monomial_degree=2)) + + pc.initialize() + + ok, errors, gram = pc.checkConsistency(max_degree=5, tol=1e-10, verbose=True) + ok, errors, gram = pc.checkConsistency(tol=1e-10, verbose=True) + + assert ok, f"Container consistency failed with errors: {errors}" + print("ParameterContainer consistency check passed.") + +if __name__ == "__main__": + test_standard_uniform() + test_nonstandard_uniform() + + test_standard_normal() + test_nonstandard_normal() + + test_standard_exponential() + test_nonstandard_exponential() + + test_container_consistency() + + #for n in range(5): + # testall(n+1) diff --git a/tests/test_matrix_decomposition.py b/tests/test_matrix_decomposition.py new file mode 100644 index 0000000..afc245f --- /dev/null +++ b/tests/test_matrix_decomposition.py @@ -0,0 +1,101 @@ +#=====================================================================# +# Randomized matrix decomposition tests in randomly chosen +# N-dimensional coordinate system +#---------------------------------------------------------------------# +# Combinations: +#---------------------------------------------------------------------# +# - numerical vs symbolic +# - sparse vs full decomposition +#---------------------------------------------------------------------# +# Author : Komahan Boopathy (komahan@gatech.edu) +#=====================================================================# + +# python module imports +import pytest +import random + +# core module imports +from pspace.core import (CoordinateFactory, + CoordinateSystem, + BasisFunctionType, + PolyFunction) + +# local module imports +from .test_utils import (random_coordinate, + random_polynomial, + get_coordinate_system_type) + +#=====================================================================# +# Sparsity-aware and sparsity-unware matrix decomposition tests +#=====================================================================# + +@pytest.mark.parametrize("trial", range(5)) +def test_randomized_tensor_basis_sparse_full(trial): + random.seed(trial) + + print(f"\n=== Trial {trial} : Matrix TENSOR Basis (sparse vs full assembly) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TENSOR_DEGREE, + max_deg = 3, max_coords = 3) + + polynomial_function = random_polynomial(cs) + + ok, diffs = cs.check_decomposition_matrix_sparse_full(polynomial_function, + tol=1e-6, + verbose=True) + assert ok + +@pytest.mark.parametrize("trial", range(5)) +def test_randomized_total_basis_sparse_full(trial): + random.seed(trial) + + print(f"\n=== Trial {trial} : Matrix TOTAL Basis (sparse vs full assembly) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TOTAL_DEGREE, + max_deg = 3, max_coords = 3) + + polynomial_function = random_polynomial(cs) + + ok, diffs = cs.check_decomposition_matrix_sparse_full(polynomial_function, + tol=1e-6, + verbose=True) + assert ok + +#=====================================================================# +# Sparse numerical versus sparse analytical matrix decomposition tests +#=====================================================================# + +@pytest.mark.parametrize("trial", range(5)) +def test_randomized_tensor_matrix_numerical_symbolic(trial): + random.seed(trial) + + print(f"\n=== Trial {trial} : Matrix TENSOR Basis (numerical vs symbolic) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TENSOR_DEGREE, + max_deg=3, max_coords=3) + + polynomial_function = random_polynomial(cs) + + ok, diffs = cs.check_decomposition_matrix_numerical_symbolic( + polynomial_function, + tol=1e-6, + verbose=True) + assert ok + + +@pytest.mark.parametrize("trial", range(5)) +def test_randomized_total_matrix_numerical_symbolic(trial): + random.seed(trial) + + print(f"\n=== Trial {trial} : Matrix TOTAL Basis (numerical vs symbolic) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TOTAL_DEGREE, + max_deg=3, max_coords=3) + + polynomial_function = random_polynomial(cs) + + ok, diffs = cs.check_decomposition_matrix_numerical_symbolic( + polynomial_function, + tol=1e-6, + verbose=True) + assert ok diff --git a/tests/test_sparsity.py b/tests/test_sparsity.py index 4a99d81..085a32f 100644 --- a/tests/test_sparsity.py +++ b/tests/test_sparsity.py @@ -1,250 +1,236 @@ +#=====================================================================# +# File to test jacobian matrix sparsity patterns +#=====================================================================# +# Author : Komahan Boopathy (komahan.boopathy@gmail.com) +#=====================================================================# + +# system/os modules import sys + from os import path sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) +import os +outdir = "tests/baseline" +os.makedirs(outdir, exist_ok = True) + +# third party modules import numpy as np from collections import Counter -from timeit import default_timer as timer -from pspace.core import ParameterFactory, ParameterContainer +# local modules +from pspace.core import CoordinateFactory +from pspace.core import CoordinateSystem, BasisFunctionType + from pspace.plotter import plot_jacobian -## def sparsity(a, b): -## """ -## Nonzero entries occur when the parameterwise(variablewise) degree -## of function 'b' matches or exceeds the degree of the basis vector -## 'a'. -## """ -## akeys = a.keys() -## smap = {} -## for akey in akeys: -## if b[akey] - a[akey] <= 0: -## smap[akey] = True -## else: -## smap[akey] = False -## return smap - -def sparse(dmapi, dmapj, dmapf): - ## print '' - ## print '' - ## print 'map i ', dmapi - ## print 'map j ', dmapj - ## print 'map f ', dmapf - smap = {} - #print "keys", dmapi.keys() - for key in dmapi.keys(): - #print 'key', key, "diff", abs(dmapi[key] - dmapj[key]), dmapf[key] - if abs(dmapi[key] - dmapj[key]) <= dmapf[key]: - smap[key] = True - else: - smap[key] = False - return smap - -def getSymmetricNonZeroIndices(pc, dmapf): - nz = {} - N = pc.getNumStochasticBasisTerms() - for i in range(N): - dmapi = pc.basistermwise_parameter_degrees[i] - for j in range(i,N): - dmapj = pc.basistermwise_parameter_degrees[j] - smap = sparse(dmapi, dmapj, dmapf) - if False not in smap.values(): - dmap = Counter() - dmap.update(dmapi) - dmap.update(dmapj) - dmap.update(dmapf) - nqpts_map = pc.getNumQuadraturePointsFromDegree(dmap) - nz[(i,j)] = nqpts_map - return nz - -def getSparseJacobian(pc, f, dmapf): - nzs = getSymmetricNonZeroIndices(pc, dmapf) - N = pc.getNumStochasticBasisTerms() - A = np.zeros((N, N)) - for index, nqpts in nzs.items(): - pc.initializeQuadrature(nqpts) - pids = pc.getParameters().keys() - i = index[0] - j = index[1] - for q in pc.quadrature_map.keys(): - val = w(q)*psiz(i,q)*psiz(j,q)*f(q) - A[i, j] += val - A[j, i] += val - return A - -def getJacobian(f, dmapf): - """ - """ - # Test getting ND quadrature points - N = pc.getNumStochasticBasisTerms() - A = np.zeros((N, N)) - - for i in range(N): - dmapi = pc.basistermwise_parameter_degrees[i] - - for j in range(N): - dmapj = pc.basistermwise_parameter_degrees[j] - - dmap = Counter() - dmap.update(dmapi) - dmap.update(dmapj) - dmap.update(dmapf) - - # add up the degree of both participating functions psizi - # and psizj to determine the total degree of integrand - nqpts_map = pc.getNumQuadraturePointsFromDegree(dmap) - pc.initializeQuadrature(nqpts_map) - # print dmap, nqpts_map - - # Loop quadrature points - pids = pc.getParameters().keys() - for q in pc.quadrature_map.keys(): - A[i,j] += w(q)*psiz(i,q)*psiz(j,q)*f(q) - return A - -if __name__ == '__main__': - """ - """ - start = timer() - - # Create "Parameter" using "Parameter Factory" object - pfactory = ParameterFactory() - c = pfactory.createNormalParameter('c', dict(mu=-4.0, sigma=0.50), 4) - k = pfactory.createExponentialParameter('k', dict(mu=6.0, beta=1.0), 5) - m = pfactory.createUniformParameter('m', dict(a=-5.0, b=4.0), 6) - d = pfactory.createUniformParameter('d', dict(a=-5.0, b=4.0), 3) - - # Add "Parameter" into "ParameterContainer" - pc = ParameterContainer() - pc.addParameter(c) - pc.addParameter(k) - pc.addParameter(m) - pc.addParameter(d) - - pc.initialize() - - suffix = 'eee' - - # rename for readability - w = lambda q : pc.W(q) - psiz = lambda i, q : pc.evalOrthoNormalBasis(i,q) - - def fy(q): - ymap = pc.Y(q) - paramids = ymap.keys() - ans = 1.0 - for paramid in paramids: - ans = ans*ymap[paramid] - return ans - - def gy(q,pid): +def check_hermite_functions(): + cs = CoordinateSystem(BasisFunctionType.TENSOR_DEGREE, False) + cf = CoordinateFactory() + + y0 = cf.createNormalCoordinate(cf.newCoordinateID(), 'y0', dict(mu = -4.0, sigma = 0.5), 3) + cs.addCoordinateAxis(y0) + + cs.initialize() + + # decompose a vector to obtain coefficients + func = lambda q : (y(q,0)**4 * y(q,1)) + (y(q,0) * y(q,2)) + (y(q,2)**3) + + dfunc = lambda z: 1.0 + z[0]**2; fdegs = Counter({0:3}) + coeffs = cs.decompose(dfunc, fdegs) + print(coeffs) + +if __name__ == "__main__": + # Build a coordinate system + cf = CoordinateFactory() + cs = CoordinateSystem(BasisFunctionType.TENSOR_DEGREE) + + y0 = cf.createNormalCoordinate(cf.newCoordinateID(), 'y0', dict(mu=0.83, sigma=1.513), 4) + y1 = cf.createUniformCoordinate(cf.newCoordinateID(), 'y1', dict(a=-2.197, b=0.538), 1) + + cs.addCoordinateAxis(y0) + cs.addCoordinateAxis(y1) + + cs.initialize() + + # Function: f(y) = 1 + y0^2 + y1 + dfunc = lambda y: y[0]**2*y[1]**2 + y[0]*y[1]**2 + 2*y[1] + 1.0 + fdegs = Counter({0:2, 1:2}) + + ok, diffs = cs.check_decomposition_consistency(dfunc, fdegs, tol=1e-8, verbose=True) + + stop + + + # Build a simple 1D Gaussian coordinate system + cs = CoordinateSystem(BasisFunctionType.TENSOR_DEGREE, verbose=False) + cf = CoordinateFactory() + + #y0 = cf.createNormalCoordinate(cf.newCoordinateID(), 'y0', dict(mu=0.0, sigma=1.0), 4) + #y0 = cf.createExponentialCoordinate(cf.newCoordinateID(), 'y0', dict(mu = +6.0, beta = 1.0), 3) + y0 = cf.createUniformCoordinate(cf.newCoordinateID(), 'y0', dict(a = -5.0, b = 4.0), 3) + + cs.addCoordinateAxis(y0) + cs.initialize() + + # f(y) = 1 + y^2 (degrees: up to 2 on axis 0) + dfunc = lambda Y: 1.0 - Y[0]**0 + fdegs = Counter({0:0}) + coeffs = cs.decompose(dfunc, fdegs) + print(coeffs) # coefficients in the Y-frame basis + + import sympy as sp + coeffs = cs.decompose_analytic(dfunc, fdegs) + #print(coeffs) # coefficients in the Y-frame basis + #print(coeffs) + print({k: sp.simplify(v) for k, v in coeffs.items()}) + + nbasis = cs.getNumBasisFunctions() + A = np.zeros((nbasis, nbasis)) + for ii in range(nbasis): + for jj in range(nbasis): + A[ii,jj] = cs.inner_product_basis(ii, jj) #, f_eval, f_deg) + print(A) + + stop + + check_hermite_functions() + stop + + cs = CoordinateSystem(BasisFunctionType.TENSOR_DEGREE, False) + cf = CoordinateFactory() + + #y0 = cf.createNormalCoordinate(0, 'y0', dict(mu=0.0, sigma=1.0), max_monomial_dof=1) + #y1 = cf.createUniformCoordinate(1, 'y1', dict(a=-1, b=1), max_monomial_dof=1) + + y0 = cf.createNormalCoordinate(cf.newCoordinateID(), 'y0', dict(mu = -4.0, sigma = 0.5), 0) + #y1 = cf.createExponentialCoordinate(cf.newCoordinateID(), 'y1', dict(mu = +6.0, beta = 1.0), 1) + #y2 = cf.createUniformCoordinate(cf.newCoordinateID(), 'y2', dict(a = -5.0, b = 4.0), 1) + + cs.addCoordinateAxis(y0) + #cs.addCoordinateAxis(y1) + #cs.addCoordinateAxis(y2) + + cs.initialize() + + # decompose a vector to obtain coefficients + f = lambda z: z[0] + fdeg = Counter({0:0}) + coeffs = cs.decompose(f, f) + print(coeffs) + + stop + + # form the mass matrix + + + print(A, np.linalg.eigvals(A)) + + # inner product of two basis entries with f + #val = cs.inner_product_basis(i_id=1, j_id=0, f_eval=f_eval, f_deg=f_deg) + #print(val) + + stop + + # Domain Definition(ADAPTIVE, FIXED={TENSOR, COMPLETE}) + cfactory = CoordinateFactory() + + # With adaptive enrichment we can keep the complexity (basis set + # size) tied to the intrinsic structure of the function to be + # decomposed in the probabilistic domain, not the worst-case + # degree cutoffs like 4, q5, 6 + + # Random coordinates + y0 = cfactory.createNormalCoordinate(cfactory.newCoordinateID(), 'y0', dict(mu = -4.0, sigma = 0.5), 3) + #y1 = cfactory.createExponentialCoordinate(cfactory.newCoordinateID(), 'y1', dict(mu = +6.0, beta = 1.0), 3) + #y2 = cfactory.createUniformCoordinate(cfactory.newCoordinateID(), 'y2', dict(a = -5.0, b = 4.0), 3) + + print(y0) + #print(y1) + #print(y2) + + # Deterministic coordinates (uniform distribution & monomial degree = 0) + # x1 = cfactory.createUniformCoordinate('x1', dict(a=-2.0, b=2.0), 0) # space + # x2 = cfactory.createUniformCoordinate('x2', dict(a=-2.0, b=2.0), 0) + # x3 = cfactory.createUniformCoordinate('x3', dict(a=-2.0, b=2.0), 0) + # t = cfactory.createUniformCoordinate('t', dict(a=0.0, b=1.0), 0) # time + + # Add "Coordinate" into "CoordinateSystem: create Axes in the Domain + csystem = CoordinateSystem(BasisFunctionType.TENSOR_DEGREE) + + csystem.addCoordinateAxis(y0) + #csystem.addCoordinateAxis(y1) + #csystem.addCoordinateAxis(y2) + + csystem.initialize() + + print(csystem) + + for i in range(csystem.getNumBasisFunctions()): + print(csystem.evaluateBasis([0.0, 0.0, 0.0], csystem.basis[i])) + + # check inner_product(psi_i, psi_j) + # check decompose(function) + + stop + + class Function: + def __init__(self, func, dmap, name): + self.func = func + self.name = name + self.dmap = dmap + return + + # find a better way to do this + def y(q, degree): ymap = pc.Y(q) - return ymap[pid] + return ymap[degree] + + def generate_baseline(F : Function): + A = pc.getJacobian(F.func, F.dmap) + np.save(os.path.join(outdir, f"matrix-full-assembly-{F.name}.npy"), A) + np.savetxt(os.path.join(outdir, f"matrix-full-assembly-{F.name}.dat"), A, fmt='%f', delimiter=' ') - # Default map for number of quadrature points + A = pc.getSparseJacobian(F.func, F.dmap) + np.save(os.path.join(outdir, f"matrix-sparse-assembly-{F.name}.npy"), A) + np.savetxt(os.path.join(outdir, f"matrix-sparse-assembly-{F.name}.dat"), A, fmt='%f', delimiter=' ') + + + # identity: y0^0 * y1^0 * y2^0 + func = lambda q : y(q,0) * y(q,1) * y(q,2) dmap = Counter() + dmap[0] = 0; dmap[1] = 0; dmap[2] = 0; + constant_function = Function(func, dmap, "identity") + generate_baseline(constant_function) - # y_1^4 - func = lambda q : gy(q,1)*gy(q,1)*gy(q,1)*gy(q,1) - dmap[0] = 0; dmap[1] = 4; dmap[2] = 0 - A = getSparseJacobian(pc, func, dmap) - #A = getJacobian(func, dmap) - plot_jacobian(A, 'sparsity-y3^4-' + suffix + '.pdf') - - # Nonzero constant - func = lambda q : 1.0 - dmap[0] = 0 ; dmap[1] = 0; dmap[2] = 0 - #A = getJacobian(func, dmap) - A = getSparseJacobian(pc, func, dmap) - plot_jacobian(A, 'sparsity-identity-' + suffix + '.pdf') - - # Linear in y_1 - func = lambda q : gy(q,0) - dmap[0] = 1; dmap[1] = 0; dmap[2] = 0 - A = getSparseJacobian(pc, func, dmap) - #A = getJacobian(func, dmap) - plot_jacobian(A, 'sparsity-y1-' + suffix + '.pdf') - - # Linear in y_2 - func = lambda q : gy(q,1) - dmap[0] = 0; dmap[1] = 1; dmap[2] = 0 - A = getSparseJacobian(pc, func, dmap) - #A = getJacobian(func, dmap) - plot_jacobian(A, 'sparsity-y2-' + suffix + '.pdf') - - # Linear in y_3 - func = lambda q : gy(q,2) - dmap[0] = 0; dmap[1] = 0; dmap[2] = 1 - A = getSparseJacobian(pc, func, dmap) - #A = getJacobian(func, dmap) - plot_jacobian(A, 'sparsity-y3-' + suffix + '.pdf') - - # y_1 + y_2 + y_3 - func = lambda q : gy(q,0) + gy(q,1) + gy(q,2) + + # identity: 2.0 * y0^0 * y1^0 * y2^0 + func = lambda q : 2.0 * y(q,0) * y(q,1) * y(q,2) + dmap = Counter() + dmap[0] = 0; dmap[1] = 0; dmap[2] = 0; + constant_function = Function(func, dmap, "constant") + generate_baseline(constant_function) + + # linear : define: y0^1 + y1^1 + y2^1 + dmap = Counter() dmap[0] = 1; dmap[1] = 1; dmap[2] = 1 - A = getSparseJacobian(pc, func, dmap) - #A = getJacobian(func, dmap) - plot_jacobian(A, 'sparsity-y1+y2+y3-' + suffix + '.pdf') - - # y_1 * y_2 - func = lambda q : gy(q,0) * gy(q,1) - dmap[0] = 1; dmap[1] = 1; dmap[2] = 0 - A = getSparseJacobian(pc, func, dmap) - #A = getJacobian(func, dmap) - plot_jacobian(A, 'sparsity-y1y2-' + suffix + '.pdf') - - # y_2 * y_3 - func = lambda q : gy(q,1) * gy(q,2) - dmap[0] = 0; dmap[1] = 1; dmap[2] = 1 - #A = getJacobian(func, dmap) - A = getSparseJacobian(pc, func, dmap) - plot_jacobian(A, 'sparsity-y2y3-' + suffix + '.pdf') - - # y_1 * y_3 - func = lambda q : gy(q,0) * gy(q,2) - dmap[0] = 1; dmap[1] = 0; dmap[2] = 1 - #A = getJacobian(func, dmap) - A = getSparseJacobian(pc, func, dmap) - plot_jacobian(A, 'sparsity-y1y3-' + suffix + '.pdf') - - # y_1^2 - func = lambda q : gy(q,0)*gy(q,0) - dmap[0] = 2; dmap[1] = 0; dmap[2] = 0 - #A = getJacobian(func, dmap) - A = getSparseJacobian(pc, func, dmap) - plot_jacobian(A, 'sparsity-y1^2-' + suffix + '.pdf') - - # y_2^2 - func = lambda q : gy(q,1)*gy(q,1) - dmap[0] = 0; dmap[1] = 2; dmap[2] = 0 - #A = getJacobian(func, dmap) - A = getSparseJacobian(pc, func, dmap) - plot_jacobian(A, 'sparsity-y2^2-' + suffix + '.pdf') - - # y_3^2 - func = lambda q : gy(q,2)*gy(q,2) - dmap[0] = 0; dmap[1] = 0; dmap[2] = 2 - A = getSparseJacobian(pc, func, dmap) - #A = getJacobian(func, dmap) - plot_jacobian(A, 'sparsity-y3^2-' + suffix + '.pdf') - - # y_3^4 - func = lambda q : gy(q,2)*gy(q,2)*gy(q,2) + gy(q,2)*gy(q,2)*gy(q,2)*gy(q,2) - dmap[0] = 0; dmap[1] = 0; dmap[2] = 4 - A = getSparseJacobian(pc, func, dmap) - #A = getJacobian(func, dmap) - plot_jacobian(A, 'sparsity-y3^4-' + suffix + '.pdf') - - # y_1 * y_2 * y_3*y_4 - func = lambda q : gy(q,0) * gy(q,1) * gy(q,2) * gy(q,3) - dmap[0] = 1; dmap[1] = 1; dmap[2] = 1; dmap[3] = 1 - print 'first' - A = getSparseJacobian(pc, func, dmap) - print 'second' - A = getSparseJacobian(pc, func, dmap) - print 'third' - A = getSparseJacobian(pc, func, dmap) - #A = getJacobian(func, dmap) - plot_jacobian(A, 'sparsity-y1y2y3y4-' + suffix + '.pdf') - end = timer() - print "elapsed time:", end - start + func = lambda q : y(q,0) + y(q,1) + y(q,2) + linear_function = Function(func, dmap, "linear") + generate_baseline(linear_function) + + # quadratic : define: y0^2 + y1^2 + y2^2 + dmap = Counter() + dmap[0] = 2; dmap[1] = 2; dmap[2] = 2 + func = lambda q : y(q,0)**2 + y(q,1)**2 + y(q,2)**2 + quadratic_function = Function(func, dmap, "quadratic") + generate_baseline(quadratic_function) + + # polynomial: y0^4 * y1 + y0 * y2 + y2^3 + dmap = Counter() + dmap[0] = 4 # degree in y0 + dmap[1] = 1 # degree in y1 + dmap[2] = 3 # degree in y2 + + func = lambda q : (y(q,0)**4 * y(q,1)) + (y(q,0) * y(q,2)) + (y(q,2)**3) + poly_function = Function(func, dmap, "quintic") + generate_baseline(poly_function) diff --git a/tests/test_sparsity_logic.py b/tests/test_sparsity_logic.py new file mode 100644 index 0000000..2b7d0fd --- /dev/null +++ b/tests/test_sparsity_logic.py @@ -0,0 +1,37 @@ +from collections import Counter + +from pspace.core import BasisFunctionType, CoordinateFactory, CoordinateSystem + + +def test_total_degree_sparse_vector_axis_cutoff(): + factory = CoordinateFactory() + cs = CoordinateSystem(BasisFunctionType.TOTAL_DEGREE) + + y0 = factory.createNormalCoordinate(factory.newCoordinateID(), 'y0', dict(mu=0.0, sigma=1.0), 4) + y1 = factory.createUniformCoordinate(factory.newCoordinateID(), 'y1', dict(a=-1.0, b=1.0), 4) + + cs.addCoordinateAxis(y0) + cs.addCoordinateAxis(y1) + cs.initialize() + + exceeds_axis = Counter({0: 4, 1: 0}) + function_deg = Counter({0: 2, 1: 2}) + + assert cs.sparse_vector(exceeds_axis, function_deg) is False + + +def test_total_degree_sparse_vector_within_axis(): + factory = CoordinateFactory() + cs = CoordinateSystem(BasisFunctionType.TOTAL_DEGREE) + + y0 = factory.createNormalCoordinate(factory.newCoordinateID(), 'y0', dict(mu=0.0, sigma=1.0), 3) + y1 = factory.createUniformCoordinate(factory.newCoordinateID(), 'y1', dict(a=-1.0, b=1.0), 3) + + cs.addCoordinateAxis(y0) + cs.addCoordinateAxis(y1) + cs.initialize() + + within_axis = Counter({0: 2, 1: 1}) + function_deg = Counter({0: 2, 1: 2}) + + assert cs.sparse_vector(within_axis, function_deg) is True diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..fddd738 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,130 @@ +#=====================================================================# +# Common stuff for testing +# +# Author : Komahan Boopathy (komahan@gatech.edu) +#=====================================================================# + +# python module imports +import random +import sympy as sp +import pytest +from collections import Counter + +# core module imports +from pspace.core import (CoordinateFactory, + CoordinateSystem, + BasisFunctionType, + PolyFunction) + +# local module imports + +#=====================================================================# +# Global test flags +#=====================================================================# + +ENABLE_ANALYTIC_TESTS = False + +#=====================================================================# +# Helper : randomized coordinate factory with logging +#=====================================================================# + +def random_coordinate(cf, cid, max_deg = 4): + coord_type = random.choice(["normal", "uniform", "exponential"]) + name = f"y{cid}" + + if coord_type == "normal": + mu = random.uniform(-2.0, 2.0) + sigma = random.uniform( 0.5, 2.0) + deg = random.randint(1, max_deg) + coord = cf.createNormalCoordinate(cf.newCoordinateID(), name, + dict(mu=mu, sigma=sigma), deg) + print(f"[Coord {cid}] NORMAL(mu={mu:.3f}, sigma={sigma:.3f}), " + f"deg={deg}") + return coord + + elif coord_type == "uniform": + a = random.uniform(-3.0, 0.0) + b = a + random.uniform(1.0, 5.0) + deg = random.randint(1, max_deg) + coord = cf.createUniformCoordinate(cf.newCoordinateID(), name, + dict(a=a, b=b), deg) + print(f"[Coord {cid}] UNIFORM(a={a:.3f}, b={b:.3f}), " + f"deg={deg}") + return coord + + elif coord_type == "exponential": + mu = random.uniform(0.0, 2.0) + beta = random.uniform(0.5, 2.0) + deg = random.randint(1, max_deg) + coord = cf.createExponentialCoordinate(cf.newCoordinateID(), name, + dict(mu=mu, beta=beta), deg) + print(f"[Coord {cid}] EXPONENTIAL(mu={mu:.3f}, beta={beta:.3f}), " + f"deg={deg}") + return coord + +#=====================================================================# +# Helper : build random polynomial (with cross terms) +#=====================================================================# + +def random_polynomial(cs, max_deg=2, max_cross_terms=3): + coords = list(cs.coordinates.keys()) + symbols = {cid : cs.coordinates[cid].symbol for cid in coords} + fdeg = Counter() + terms = [] + + #---------------------------------------------------------------# + # Individual terms + #---------------------------------------------------------------# + + for cid in coords: + deg = random.randint(0, max_deg) + coeff = random.randint(1, 3) + terms.append((coeff, Counter({cid: deg}))) + fdeg[cid] = max(fdeg.get(cid, 0), deg) + + #---------------------------------------------------------------# + # Cross terms + #---------------------------------------------------------------# + + if len(coords) >= 2: + for _ in range(random.randint(0, max_cross_terms)): + cids = random.sample(coords, k=random.randint(2, len(coords))) + coeff = random.randint(1, 3) + degs = Counter() + for cid in cids: + d = random.randint(1, max_deg) + degs[cid] = d + fdeg[cid] = max(fdeg.get(cid, 0), d) + terms.append((coeff, degs)) + + #---------------------------------------------------------------# + # interface for polynomial functions + #---------------------------------------------------------------# + + polyf = PolyFunction(terms) + + # build sympy expression (for debug/logging) + fexpr = polyf(symbols) + + print(f"[Polynomial] f(y) = {fexpr}, degrees={dict(fdeg)}") + + return polyf + +#=====================================================================# +# Common coordinate system setup for given basis type, degrees of +# freedom and number of coordinates +#=====================================================================# + +def get_coordinate_system_type(basis_type, max_deg = 4, max_coords = 3): + cf = CoordinateFactory() + cs = CoordinateSystem(basis_type) + + ncoords = random.randint(1, max_coords) + print(f"[Setup] Using {ncoords} coordinates") + for cid in range(ncoords): + coord = random_coordinate(cf, cid, max_deg) + cs.addCoordinateAxis(coord) + + cs.initialize() + + return cs diff --git a/tests/test_vector_decomposition.py b/tests/test_vector_decomposition.py new file mode 100644 index 0000000..8189f40 --- /dev/null +++ b/tests/test_vector_decomposition.py @@ -0,0 +1,102 @@ +#=====================================================================# +# Randomized vector decomposition tests in randomly chosen +# N-dimensional coordinate system +#---------------------------------------------------------------------# +# Combinations: +#---------------------------------------------------------------------# +# - numerical vs symbolic +# - sparse vs full decomposition +#---------------------------------------------------------------------# +# Author : Komahan Boopathy (komahan@gatech.edu) +#=====================================================================# + +# python module imports +import pytest +import random + +# core module imports +from pspace.core import (CoordinateFactory, + CoordinateSystem, + BasisFunctionType, + PolyFunction) + +# local module imports +from .test_utils import (random_coordinate, + random_polynomial, + get_coordinate_system_type) + +#=====================================================================# +# Symbolic and numerical vector decomposition tests +#=====================================================================# + +@pytest.mark.parametrize("trial", range(5)) +def test_randomized_tensor_numerical_symbolic(trial): + random.seed(trial) + + print(f"\n=== Trial {trial} : Tensor Degree Basis (numerical vs symbolic) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TENSOR_DEGREE) + + polynomial_function = random_polynomial(cs) + + ok, diffs = cs.check_decomposition_numerical_symbolic(polynomial_function, + sparse = True, + tol=1e-6, + verbose=True) + assert ok + +@pytest.mark.parametrize("trial", range(5)) +def test_randomized_total_numerical_symbolic(trial): + random.seed(trial) + + print(f"\n=== Trial {trial} : Total Degree Basis (numerical vs symbolic) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TOTAL_DEGREE) + + polynomial_function = random_polynomial(cs) + + ok, diffs = cs.check_decomposition_numerical_symbolic(polynomial_function, + sparse = True, + tol=1e-6, + verbose=True) + assert ok + +#=====================================================================# +# Sparsity-aware and sparsity-unware vector decomposition tests +#=====================================================================# + +@pytest.mark.parametrize("trial", range(5)) +def test_randomized_tensor_basis_sparse_full(trial): + random.seed(trial) + + print(f"\n=== Trial {trial} : Tensor Degree Basis (sparse vs full assembly) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TENSOR_DEGREE, + max_deg = 3, max_coords = 3) + + polynomial_function = random_polynomial(cs) + + ok, diffs = cs.check_decomposition_numerical_sparse_full(polynomial_function, + tol=1e-6, + verbose=True) + assert ok + +#=====================================================================# +# Tests 2 B: Total Degree Basis (sparse vs full assembly) +#=====================================================================# + +@pytest.mark.parametrize("trial", range(5)) +def test_randomized_total_basis_sparse_full(trial): + random.seed(trial) + + print(f"\n=== Trial {trial} : Total Degree Basis (sparse vs full assembly) ===") + + cs = get_coordinate_system_type(BasisFunctionType.TOTAL_DEGREE, + max_deg = 3, max_coords = 3) + + polynomial_function = random_polynomial(cs) + + ok, diffs = cs.check_decomposition_numerical_sparse_full(polynomial_function, + tol=1e-6, + verbose=True) + assert ok