From e378dc15b52985724b6ae4782c4ef0afc3393ca9 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Sat, 1 Jun 2024 22:07:46 -0500 Subject: [PATCH] Refactor (mostly rearrange) the statistics module (gh-119930) --- Lib/statistics.py | 1563 ++++++++++++++++++----------------- Lib/test/test_statistics.py | 5 +- 2 files changed, 808 insertions(+), 760 deletions(-) diff --git a/Lib/statistics.py b/Lib/statistics.py index c36145fe7f2..c64c6fae4ab 100644 --- a/Lib/statistics.py +++ b/Lib/statistics.py @@ -147,327 +147,13 @@ from collections import Counter, namedtuple, defaultdict _SQRT2 = sqrt(2.0) _random = random -# === Exceptions === +## Exceptions ############################################################## class StatisticsError(ValueError): pass -# === Private utilities === - -def _sum(data): - """_sum(data) -> (type, sum, count) - - Return a high-precision sum of the given numeric data as a fraction, - together with the type to be converted to and the count of items. - - Examples - -------- - - >>> _sum([3, 2.25, 4.5, -0.5, 0.25]) - (, Fraction(19, 2), 5) - - Some sources of round-off error will be avoided: - - # Built-in sum returns zero. - >>> _sum([1e50, 1, -1e50] * 1000) - (, Fraction(1000, 1), 3000) - - Fractions and Decimals are also supported: - - >>> from fractions import Fraction as F - >>> _sum([F(2, 3), F(7, 5), F(1, 4), F(5, 6)]) - (, Fraction(63, 20), 4) - - >>> from decimal import Decimal as D - >>> data = [D("0.1375"), D("0.2108"), D("0.3061"), D("0.0419")] - >>> _sum(data) - (, Fraction(6963, 10000), 4) - - Mixed types are currently treated as an error, except that int is - allowed. - """ - count = 0 - types = set() - types_add = types.add - partials = {} - partials_get = partials.get - for typ, values in groupby(data, type): - types_add(typ) - for n, d in map(_exact_ratio, values): - count += 1 - partials[d] = partials_get(d, 0) + n - if None in partials: - # The sum will be a NAN or INF. We can ignore all the finite - # partials, and just look at this special one. - total = partials[None] - assert not _isfinite(total) - else: - # Sum all the partial sums using builtin sum. - total = sum(Fraction(n, d) for d, n in partials.items()) - T = reduce(_coerce, types, int) # or raise TypeError - return (T, total, count) - - -def _ss(data, c=None): - """Return the exact mean and sum of square deviations of sequence data. - - Calculations are done in a single pass, allowing the input to be an iterator. - - If given *c* is used the mean; otherwise, it is calculated from the data. - Use the *c* argument with care, as it can lead to garbage results. - - """ - if c is not None: - T, ssd, count = _sum((d := x - c) * d for x in data) - return (T, ssd, c, count) - count = 0 - types = set() - types_add = types.add - sx_partials = defaultdict(int) - sxx_partials = defaultdict(int) - for typ, values in groupby(data, type): - types_add(typ) - for n, d in map(_exact_ratio, values): - count += 1 - sx_partials[d] += n - sxx_partials[d] += n * n - if not count: - ssd = c = Fraction(0) - elif None in sx_partials: - # The sum will be a NAN or INF. We can ignore all the finite - # partials, and just look at this special one. - ssd = c = sx_partials[None] - assert not _isfinite(ssd) - else: - sx = sum(Fraction(n, d) for d, n in sx_partials.items()) - sxx = sum(Fraction(n, d*d) for d, n in sxx_partials.items()) - # This formula has poor numeric properties for floats, - # but with fractions it is exact. - ssd = (count * sxx - sx * sx) / count - c = sx / count - T = reduce(_coerce, types, int) # or raise TypeError - return (T, ssd, c, count) - - -def _isfinite(x): - try: - return x.is_finite() # Likely a Decimal. - except AttributeError: - return math.isfinite(x) # Coerces to float first. - - -def _coerce(T, S): - """Coerce types T and S to a common type, or raise TypeError. - - Coercion rules are currently an implementation detail. See the CoerceTest - test class in test_statistics for details. - """ - # See http://bugs.python.org/issue24068. - assert T is not bool, "initial type T is bool" - # If the types are the same, no need to coerce anything. Put this - # first, so that the usual case (no coercion needed) happens as soon - # as possible. - if T is S: return T - # Mixed int & other coerce to the other type. - if S is int or S is bool: return T - if T is int: return S - # If one is a (strict) subclass of the other, coerce to the subclass. - if issubclass(S, T): return S - if issubclass(T, S): return T - # Ints coerce to the other type. - if issubclass(T, int): return S - if issubclass(S, int): return T - # Mixed fraction & float coerces to float (or float subclass). - if issubclass(T, Fraction) and issubclass(S, float): - return S - if issubclass(T, float) and issubclass(S, Fraction): - return T - # Any other combination is disallowed. - msg = "don't know how to coerce %s and %s" - raise TypeError(msg % (T.__name__, S.__name__)) - - -def _exact_ratio(x): - """Return Real number x to exact (numerator, denominator) pair. - - >>> _exact_ratio(0.25) - (1, 4) - - x is expected to be an int, Fraction, Decimal or float. - """ - - # XXX We should revisit whether using fractions to accumulate exact - # ratios is the right way to go. - - # The integer ratios for binary floats can have numerators or - # denominators with over 300 decimal digits. The problem is more - # acute with decimal floats where the default decimal context - # supports a huge range of exponents from Emin=-999999 to - # Emax=999999. When expanded with as_integer_ratio(), numbers like - # Decimal('3.14E+5000') and Decimal('3.14E-5000') have large - # numerators or denominators that will slow computation. - - # When the integer ratios are accumulated as fractions, the size - # grows to cover the full range from the smallest magnitude to the - # largest. For example, Fraction(3.14E+300) + Fraction(3.14E-300), - # has a 616 digit numerator. Likewise, - # Fraction(Decimal('3.14E+5000')) + Fraction(Decimal('3.14E-5000')) - # has 10,003 digit numerator. - - # This doesn't seem to have been problem in practice, but it is a - # potential pitfall. - - try: - return x.as_integer_ratio() - except AttributeError: - pass - except (OverflowError, ValueError): - # float NAN or INF. - assert not _isfinite(x) - return (x, None) - try: - # x may be an Integral ABC. - return (x.numerator, x.denominator) - except AttributeError: - msg = f"can't convert type '{type(x).__name__}' to numerator/denominator" - raise TypeError(msg) - - -def _convert(value, T): - """Convert value to given numeric type T.""" - if type(value) is T: - # This covers the cases where T is Fraction, or where value is - # a NAN or INF (Decimal or float). - return value - if issubclass(T, int) and value.denominator != 1: - T = float - try: - # FIXME: what do we do if this overflows? - return T(value) - except TypeError: - if issubclass(T, Decimal): - return T(value.numerator) / T(value.denominator) - else: - raise - - -def _fail_neg(values, errmsg='negative value'): - """Iterate over values, failing if any are less than zero.""" - for x in values: - if x < 0: - raise StatisticsError(errmsg) - yield x - - -def _rank(data, /, *, key=None, reverse=False, ties='average', start=1) -> list[float]: - """Rank order a dataset. The lowest value has rank 1. - - Ties are averaged so that equal values receive the same rank: - - >>> data = [31, 56, 31, 25, 75, 18] - >>> _rank(data) - [3.5, 5.0, 3.5, 2.0, 6.0, 1.0] - - The operation is idempotent: - - >>> _rank([3.5, 5.0, 3.5, 2.0, 6.0, 1.0]) - [3.5, 5.0, 3.5, 2.0, 6.0, 1.0] - - It is possible to rank the data in reverse order so that the - highest value has rank 1. Also, a key-function can extract - the field to be ranked: - - >>> goals = [('eagles', 45), ('bears', 48), ('lions', 44)] - >>> _rank(goals, key=itemgetter(1), reverse=True) - [2.0, 1.0, 3.0] - - Ranks are conventionally numbered starting from one; however, - setting *start* to zero allows the ranks to be used as array indices: - - >>> prize = ['Gold', 'Silver', 'Bronze', 'Certificate'] - >>> scores = [8.1, 7.3, 9.4, 8.3] - >>> [prize[int(i)] for i in _rank(scores, start=0, reverse=True)] - ['Bronze', 'Certificate', 'Gold', 'Silver'] - - """ - # If this function becomes public at some point, more thought - # needs to be given to the signature. A list of ints is - # plausible when ties is "min" or "max". When ties is "average", - # either list[float] or list[Fraction] is plausible. - - # Default handling of ties matches scipy.stats.mstats.spearmanr. - if ties != 'average': - raise ValueError(f'Unknown tie resolution method: {ties!r}') - if key is not None: - data = map(key, data) - val_pos = sorted(zip(data, count()), reverse=reverse) - i = start - 1 - result = [0] * len(val_pos) - for _, g in groupby(val_pos, key=itemgetter(0)): - group = list(g) - size = len(group) - rank = i + (size + 1) / 2 - for value, orig_pos in group: - result[orig_pos] = rank - i += size - return result - - -def _integer_sqrt_of_frac_rto(n: int, m: int) -> int: - """Square root of n/m, rounded to the nearest integer using round-to-odd.""" - # Reference: https://www.lri.fr/~melquion/doc/05-imacs17_1-expose.pdf - a = math.isqrt(n // m) - return a | (a*a*m != n) - - -# For 53 bit precision floats, the bit width used in -# _float_sqrt_of_frac() is 109. -_sqrt_bit_width: int = 2 * sys.float_info.mant_dig + 3 - - -def _float_sqrt_of_frac(n: int, m: int) -> float: - """Square root of n/m as a float, correctly rounded.""" - # See principle and proof sketch at: https://bugs.python.org/msg407078 - q = (n.bit_length() - m.bit_length() - _sqrt_bit_width) // 2 - if q >= 0: - numerator = _integer_sqrt_of_frac_rto(n, m << 2 * q) << q - denominator = 1 - else: - numerator = _integer_sqrt_of_frac_rto(n << -2 * q, m) - denominator = 1 << -q - return numerator / denominator # Convert to float - - -def _decimal_sqrt_of_frac(n: int, m: int) -> Decimal: - """Square root of n/m as a Decimal, correctly rounded.""" - # Premise: For decimal, computing (n/m).sqrt() can be off - # by 1 ulp from the correctly rounded result. - # Method: Check the result, moving up or down a step if needed. - if n <= 0: - if not n: - return Decimal('0.0') - n, m = -n, -m - - root = (Decimal(n) / Decimal(m)).sqrt() - nr, dr = root.as_integer_ratio() - - plus = root.next_plus() - np, dp = plus.as_integer_ratio() - # test: n / m > ((root + plus) / 2) ** 2 - if 4 * n * (dr*dp)**2 > m * (dr*np + dp*nr)**2: - return plus - - minus = root.next_minus() - nm, dm = minus.as_integer_ratio() - # test: n / m < ((root + minus) / 2) ** 2 - if 4 * n * (dr*dm)**2 < m * (dr*nm + dm*nr)**2: - return minus - - return root - - -# === Measures of central tendency (averages) === +## Measures of central tendency (averages) ################################# def mean(data): """Return the sample arithmetic mean of data. @@ -484,6 +170,7 @@ def mean(data): Decimal('0.5625') If ``data`` is empty, StatisticsError will be raised. + """ T, total, n = _sum(data) if n < 1: @@ -499,8 +186,10 @@ def fmean(data, weights=None): >>> fmean([3.5, 4.0, 5.25]) 4.25 + """ if weights is None: + try: n = len(data) except TypeError: @@ -510,18 +199,25 @@ def fmean(data, weights=None): n = next(counter) else: total = fsum(data) + if not n: raise StatisticsError('fmean requires at least one data point') + return total / n + if not isinstance(weights, (list, tuple)): weights = list(weights) + try: num = sumprod(data, weights) except ValueError: raise StatisticsError('data and weights must be the same length') + den = fsum(weights) + if not den: raise StatisticsError('sum of weights must be non-zero') + return num / den @@ -538,9 +234,11 @@ def geometric_mean(data): >>> round(geometric_mean([54, 24, 36]), 9) 36.0 + """ n = 0 found_zero = False + def count_positive(iterable): nonlocal n, found_zero for n, x in enumerate(iterable, start=1): @@ -551,12 +249,14 @@ def geometric_mean(data): else: raise StatisticsError('No negative inputs allowed', x) total = fsum(map(log, count_positive(data))) + if not n: raise StatisticsError('Must have a non-empty dataset') if math.isnan(total): return math.nan if found_zero: return math.nan if total == math.inf else 0.0 + return exp(total / n) @@ -582,10 +282,13 @@ def harmonic_mean(data, weights=None): If ``data`` is empty, or any element is less than zero, ``harmonic_mean`` will raise ``StatisticsError``. + """ if iter(data) is data: data = list(data) + errmsg = 'harmonic mean does not support negative values' + n = len(data) if n < 1: raise StatisticsError('harmonic_mean requires at least one data point') @@ -597,6 +300,7 @@ def harmonic_mean(data, weights=None): return x else: raise TypeError('unsupported type') + if weights is None: weights = repeat(1, n) sum_weights = n @@ -606,16 +310,19 @@ def harmonic_mean(data, weights=None): if len(weights) != n: raise StatisticsError('Number of weights does not match data size') _, sum_weights, _ = _sum(w for w in _fail_neg(weights, errmsg)) + try: data = _fail_neg(data, errmsg) T, total, count = _sum(w / x if w else 0 for w, x in zip(weights, data)) except ZeroDivisionError: return 0 + if total <= 0: raise StatisticsError('Weighted sum must be positive') + return _convert(sum_weights / total, T) -# FIXME: investigate ways to calculate medians without sorting? Quickselect? + def median(data): """Return the median (middle value) of numeric data. @@ -652,6 +359,9 @@ def median_low(data): 3 """ + # Potentially the sorting step could be replaced with a quickselect. + # However, it would require an excellent implementation to beat our + # highly optimized builtin sort. data = sorted(data) n = len(data) if n == 0: @@ -795,6 +505,7 @@ def multimode(data): ['b', 'd', 'f'] >>> multimode('') [] + """ counts = Counter(iter(data)) if not counts: @@ -803,6 +514,406 @@ def multimode(data): return [value for value, count in counts.items() if count == maxcount] +## Measures of spread ###################################################### + +def variance(data, xbar=None): + """Return the sample variance of data. + + data should be an iterable of Real-valued numbers, with at least two + values. The optional argument xbar, if given, should be the mean of + the data. If it is missing or None, the mean is automatically calculated. + + Use this function when your data is a sample from a population. To + calculate the variance from the entire population, see ``pvariance``. + + Examples: + + >>> data = [2.75, 1.75, 1.25, 0.25, 0.5, 1.25, 3.5] + >>> variance(data) + 1.3720238095238095 + + If you have already calculated the mean of your data, you can pass it as + the optional second argument ``xbar`` to avoid recalculating it: + + >>> m = mean(data) + >>> variance(data, m) + 1.3720238095238095 + + This function does not check that ``xbar`` is actually the mean of + ``data``. Giving arbitrary values for ``xbar`` may lead to invalid or + impossible results. + + Decimals and Fractions are supported: + + >>> from decimal import Decimal as D + >>> variance([D("27.5"), D("30.25"), D("30.25"), D("34.5"), D("41.75")]) + Decimal('31.01875') + + >>> from fractions import Fraction as F + >>> variance([F(1, 6), F(1, 2), F(5, 3)]) + Fraction(67, 108) + + """ + # http://mathworld.wolfram.com/SampleVariance.html + + T, ss, c, n = _ss(data, xbar) + if n < 2: + raise StatisticsError('variance requires at least two data points') + return _convert(ss / (n - 1), T) + + +def pvariance(data, mu=None): + """Return the population variance of ``data``. + + data should be a sequence or iterable of Real-valued numbers, with at least one + value. The optional argument mu, if given, should be the mean of + the data. If it is missing or None, the mean is automatically calculated. + + Use this function to calculate the variance from the entire population. + To estimate the variance from a sample, the ``variance`` function is + usually a better choice. + + Examples: + + >>> data = [0.0, 0.25, 0.25, 1.25, 1.5, 1.75, 2.75, 3.25] + >>> pvariance(data) + 1.25 + + If you have already calculated the mean of the data, you can pass it as + the optional second argument to avoid recalculating it: + + >>> mu = mean(data) + >>> pvariance(data, mu) + 1.25 + + Decimals and Fractions are supported: + + >>> from decimal import Decimal as D + >>> pvariance([D("27.5"), D("30.25"), D("30.25"), D("34.5"), D("41.75")]) + Decimal('24.815') + + >>> from fractions import Fraction as F + >>> pvariance([F(1, 4), F(5, 4), F(1, 2)]) + Fraction(13, 72) + + """ + # http://mathworld.wolfram.com/Variance.html + + T, ss, c, n = _ss(data, mu) + if n < 1: + raise StatisticsError('pvariance requires at least one data point') + return _convert(ss / n, T) + + +def stdev(data, xbar=None): + """Return the square root of the sample variance. + + See ``variance`` for arguments and other details. + + >>> stdev([1.5, 2.5, 2.5, 2.75, 3.25, 4.75]) + 1.0810874155219827 + + """ + T, ss, c, n = _ss(data, xbar) + if n < 2: + raise StatisticsError('stdev requires at least two data points') + mss = ss / (n - 1) + if issubclass(T, Decimal): + return _decimal_sqrt_of_frac(mss.numerator, mss.denominator) + return _float_sqrt_of_frac(mss.numerator, mss.denominator) + + +def pstdev(data, mu=None): + """Return the square root of the population variance. + + See ``pvariance`` for arguments and other details. + + >>> pstdev([1.5, 2.5, 2.5, 2.75, 3.25, 4.75]) + 0.986893273527251 + + """ + T, ss, c, n = _ss(data, mu) + if n < 1: + raise StatisticsError('pstdev requires at least one data point') + mss = ss / n + if issubclass(T, Decimal): + return _decimal_sqrt_of_frac(mss.numerator, mss.denominator) + return _float_sqrt_of_frac(mss.numerator, mss.denominator) + + +## Statistics for relations between two inputs ############################# + +def covariance(x, y, /): + """Covariance + + Return the sample covariance of two inputs *x* and *y*. Covariance + is a measure of the joint variability of two inputs. + + >>> x = [1, 2, 3, 4, 5, 6, 7, 8, 9] + >>> y = [1, 2, 3, 1, 2, 3, 1, 2, 3] + >>> covariance(x, y) + 0.75 + >>> z = [9, 8, 7, 6, 5, 4, 3, 2, 1] + >>> covariance(x, z) + -7.5 + >>> covariance(z, x) + -7.5 + + """ + # https://en.wikipedia.org/wiki/Covariance + n = len(x) + if len(y) != n: + raise StatisticsError('covariance requires that both inputs have same number of data points') + if n < 2: + raise StatisticsError('covariance requires at least two data points') + xbar = fsum(x) / n + ybar = fsum(y) / n + sxy = sumprod((xi - xbar for xi in x), (yi - ybar for yi in y)) + return sxy / (n - 1) + + +def correlation(x, y, /, *, method='linear'): + """Pearson's correlation coefficient + + Return the Pearson's correlation coefficient for two inputs. Pearson's + correlation coefficient *r* takes values between -1 and +1. It measures + the strength and direction of a linear relationship. + + >>> x = [1, 2, 3, 4, 5, 6, 7, 8, 9] + >>> y = [9, 8, 7, 6, 5, 4, 3, 2, 1] + >>> correlation(x, x) + 1.0 + >>> correlation(x, y) + -1.0 + + If *method* is "ranked", computes Spearman's rank correlation coefficient + for two inputs. The data is replaced by ranks. Ties are averaged + so that equal values receive the same rank. The resulting coefficient + measures the strength of a monotonic relationship. + + Spearman's rank correlation coefficient is appropriate for ordinal + data or for continuous data that doesn't meet the linear proportion + requirement for Pearson's correlation coefficient. + + """ + # https://en.wikipedia.org/wiki/Pearson_correlation_coefficient + # https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient + n = len(x) + if len(y) != n: + raise StatisticsError('correlation requires that both inputs have same number of data points') + if n < 2: + raise StatisticsError('correlation requires at least two data points') + if method not in {'linear', 'ranked'}: + raise ValueError(f'Unknown method: {method!r}') + + if method == 'ranked': + start = (n - 1) / -2 # Center rankings around zero + x = _rank(x, start=start) + y = _rank(y, start=start) + else: + xbar = fsum(x) / n + ybar = fsum(y) / n + x = [xi - xbar for xi in x] + y = [yi - ybar for yi in y] + + sxy = sumprod(x, y) + sxx = sumprod(x, x) + syy = sumprod(y, y) + + try: + return sxy / _sqrtprod(sxx, syy) + except ZeroDivisionError: + raise StatisticsError('at least one of the inputs is constant') + + +LinearRegression = namedtuple('LinearRegression', ('slope', 'intercept')) + + +def linear_regression(x, y, /, *, proportional=False): + """Slope and intercept for simple linear regression. + + Return the slope and intercept of simple linear regression + parameters estimated using ordinary least squares. Simple linear + regression describes relationship between an independent variable + *x* and a dependent variable *y* in terms of a linear function: + + y = slope * x + intercept + noise + + where *slope* and *intercept* are the regression parameters that are + estimated, and noise represents the variability of the data that was + not explained by the linear regression (it is equal to the + difference between predicted and actual values of the dependent + variable). + + The parameters are returned as a named tuple. + + >>> x = [1, 2, 3, 4, 5] + >>> noise = NormalDist().samples(5, seed=42) + >>> y = [3 * x[i] + 2 + noise[i] for i in range(5)] + >>> linear_regression(x, y) #doctest: +ELLIPSIS + LinearRegression(slope=3.17495..., intercept=1.00925...) + + If *proportional* is true, the independent variable *x* and the + dependent variable *y* are assumed to be directly proportional. + The data is fit to a line passing through the origin. + + Since the *intercept* will always be 0.0, the underlying linear + function simplifies to: + + y = slope * x + noise + + >>> y = [3 * x[i] + noise[i] for i in range(5)] + >>> linear_regression(x, y, proportional=True) #doctest: +ELLIPSIS + LinearRegression(slope=2.90475..., intercept=0.0) + + """ + # https://en.wikipedia.org/wiki/Simple_linear_regression + n = len(x) + if len(y) != n: + raise StatisticsError('linear regression requires that both inputs have same number of data points') + if n < 2: + raise StatisticsError('linear regression requires at least two data points') + + if not proportional: + xbar = fsum(x) / n + ybar = fsum(y) / n + x = [xi - xbar for xi in x] # List because used three times below + y = (yi - ybar for yi in y) # Generator because only used once below + + sxy = sumprod(x, y) + 0.0 # Add zero to coerce result to a float + sxx = sumprod(x, x) + + try: + slope = sxy / sxx # equivalent to: covariance(x, y) / variance(x) + except ZeroDivisionError: + raise StatisticsError('x is constant') + + intercept = 0.0 if proportional else ybar - slope * xbar + return LinearRegression(slope=slope, intercept=intercept) + + +## Kernel Density Estimation ############################################### + +_kernel_specs = {} + +def register(*kernels): + "Load the kernel's pdf, cdf, invcdf, and support into _kernel_specs." + def deco(builder): + spec = dict(zip(('pdf', 'cdf', 'invcdf', 'support'), builder())) + for kernel in kernels: + _kernel_specs[kernel] = spec + return builder + return deco + +@register('normal', 'gauss') +def normal_kernel(): + sqrt2pi = sqrt(2 * pi) + sqrt2 = sqrt(2) + pdf = lambda t: exp(-1/2 * t * t) / sqrt2pi + cdf = lambda t: 1/2 * (1.0 + erf(t / sqrt2)) + invcdf = lambda t: _normal_dist_inv_cdf(t, 0.0, 1.0) + support = None + return pdf, cdf, invcdf, support + +@register('logistic') +def logistic_kernel(): + # 1.0 / (exp(t) + 2.0 + exp(-t)) + pdf = lambda t: 1/2 / (1.0 + cosh(t)) + cdf = lambda t: 1.0 - 1.0 / (exp(t) + 1.0) + invcdf = lambda p: log(p / (1.0 - p)) + support = None + return pdf, cdf, invcdf, support + +@register('sigmoid') +def sigmoid_kernel(): + # (2/pi) / (exp(t) + exp(-t)) + c1 = 1 / pi + c2 = 2 / pi + c3 = pi / 2 + pdf = lambda t: c1 / cosh(t) + cdf = lambda t: c2 * atan(exp(t)) + invcdf = lambda p: log(tan(p * c3)) + support = None + return pdf, cdf, invcdf, support + +@register('rectangular', 'uniform') +def rectangular_kernel(): + pdf = lambda t: 1/2 + cdf = lambda t: 1/2 * t + 1/2 + invcdf = lambda p: 2.0 * p - 1.0 + support = 1.0 + return pdf, cdf, invcdf, support + +@register('triangular') +def triangular_kernel(): + pdf = lambda t: 1.0 - abs(t) + cdf = lambda t: t*t * (1/2 if t < 0.0 else -1/2) + t + 1/2 + invcdf = lambda p: sqrt(2.0*p) - 1.0 if p < 1/2 else 1.0 - sqrt(2.0 - 2.0*p) + support = 1.0 + return pdf, cdf, invcdf, support + +@register('parabolic', 'epanechnikov') +def parabolic_kernel(): + pdf = lambda t: 3/4 * (1.0 - t * t) + cdf = lambda t: sumprod((-1/4, 3/4, 1/2), (t**3, t, 1.0)) + invcdf = lambda p: 2.0 * cos((acos(2.0*p - 1.0) + pi) / 3.0) + support = 1.0 + return pdf, cdf, invcdf, support + +def _newton_raphson(f_inv_estimate, f, f_prime, tolerance=1e-12): + def f_inv(y): + "Return x such that f(x) ≈ y within the specified tolerance." + x = f_inv_estimate(y) + while abs(diff := f(x) - y) > tolerance: + x -= diff / f_prime(x) + return x + return f_inv + +def _quartic_invcdf_estimate(p): + sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p) + x = (2.0 * p) ** 0.4258865685331 - 1.0 + if p >= 0.004 < 0.499: + x += 0.026818732 * sin(7.101753784 * p + 2.73230839482953) + return x * sign + +@register('quartic', 'biweight') +def quartic_kernel(): + pdf = lambda t: 15/16 * (1.0 - t * t) ** 2 + cdf = lambda t: sumprod((3/16, -5/8, 15/16, 1/2), + (t**5, t**3, t, 1.0)) + invcdf = _newton_raphson(_quartic_invcdf_estimate, f=cdf, f_prime=pdf) + support = 1.0 + return pdf, cdf, invcdf, support + +def _triweight_invcdf_estimate(p): + sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p) + x = (2.0 * p) ** 0.3400218741872791 - 1.0 + return x * sign + +@register('triweight') +def triweight_kernel(): + pdf = lambda t: 35/32 * (1.0 - t * t) ** 3 + cdf = lambda t: sumprod((-5/32, 21/32, -35/32, 35/32, 1/2), + (t**7, t**5, t**3, t, 1.0)) + invcdf = _newton_raphson(_triweight_invcdf_estimate, f=cdf, f_prime=pdf) + support = 1.0 + return pdf, cdf, invcdf, support + +@register('cosine') +def cosine_kernel(): + c1 = pi / 4 + c2 = pi / 2 + pdf = lambda t: c1 * cos(c2 * t) + cdf = lambda t: 1/2 * sin(c2 * t) + 1/2 + invcdf = lambda p: 2.0 * asin(2.0 * p - 1.0) / pi + support = 1.0 + return pdf, cdf, invcdf, support + +del register, normal_kernel, logistic_kernel, sigmoid_kernel +del rectangular_kernel, triangular_kernel, parabolic_kernel +del quartic_kernel, triweight_kernel, cosine_kernel + + def kde(data, h, kernel='normal', *, cumulative=False): """Kernel Density Estimation: Create a continuous probability density function or cumulative distribution function from discrete samples. @@ -913,65 +1024,12 @@ def kde(data, h, kernel='normal', *, cumulative=False): if h <= 0.0: raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}') - match kernel: - - case 'normal' | 'gauss': - sqrt2pi = sqrt(2 * pi) - sqrt2 = sqrt(2) - K = lambda t: exp(-1/2 * t * t) / sqrt2pi - W = lambda t: 1/2 * (1.0 + erf(t / sqrt2)) - support = None - - case 'logistic': - # 1.0 / (exp(t) + 2.0 + exp(-t)) - K = lambda t: 1/2 / (1.0 + cosh(t)) - W = lambda t: 1.0 - 1.0 / (exp(t) + 1.0) - support = None - - case 'sigmoid': - # (2/pi) / (exp(t) + exp(-t)) - c1 = 1 / pi - c2 = 2 / pi - K = lambda t: c1 / cosh(t) - W = lambda t: c2 * atan(exp(t)) - support = None - - case 'rectangular' | 'uniform': - K = lambda t: 1/2 - W = lambda t: 1/2 * t + 1/2 - support = 1.0 - - case 'triangular': - K = lambda t: 1.0 - abs(t) - W = lambda t: t*t * (1/2 if t < 0.0 else -1/2) + t + 1/2 - support = 1.0 - - case 'parabolic' | 'epanechnikov': - K = lambda t: 3/4 * (1.0 - t * t) - W = lambda t: -1/4 * t**3 + 3/4 * t + 1/2 - support = 1.0 - - case 'quartic' | 'biweight': - K = lambda t: 15/16 * (1.0 - t * t) ** 2 - W = lambda t: sumprod((3/16, -5/8, 15/16, 1/2), - (t**5, t**3, t, 1.0)) - support = 1.0 - - case 'triweight': - K = lambda t: 35/32 * (1.0 - t * t) ** 3 - W = lambda t: sumprod((-5/32, 21/32, -35/32, 35/32, 1/2), - (t**7, t**5, t**3, t, 1.0)) - support = 1.0 - - case 'cosine': - c1 = pi / 4 - c2 = pi / 2 - K = lambda t: c1 * cos(c2 * t) - W = lambda t: 1/2 * sin(c2 * t) + 1/2 - support = 1.0 - - case _: - raise StatisticsError(f'Unknown kernel name: {kernel!r}') + kernel_spec = _kernel_specs.get(kernel) + if kernel_spec is None: + raise StatisticsError(f'Unknown kernel name: {kernel!r}') + K = kernel_spec['pdf'] + W = kernel_spec['cdf'] + support = kernel_spec['support'] if support is None: @@ -1015,9 +1073,53 @@ def kde(data, h, kernel='normal', *, cumulative=False): return pdf -# Notes on methods for computing quantiles -# ---------------------------------------- -# +def kde_random(data, h, kernel='normal', *, seed=None): + """Return a function that makes a random selection from the estimated + probability density function created by kde(data, h, kernel). + + Providing a *seed* allows reproducible selections within a single + thread. The seed may be an integer, float, str, or bytes. + + A StatisticsError will be raised if the *data* sequence is empty. + + Example: + + >>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2] + >>> rand = kde_random(data, h=1.5, seed=8675309) + >>> new_selections = [rand() for i in range(10)] + >>> [round(x, 1) for x in new_selections] + [0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6] + + """ + n = len(data) + if not n: + raise StatisticsError('Empty data sequence') + + if not isinstance(data[0], (int, float)): + raise TypeError('Data sequence must contain ints or floats') + + if h <= 0.0: + raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}') + + kernel_spec = _kernel_specs.get(kernel) + if kernel_spec is None: + raise StatisticsError(f'Unknown kernel name: {kernel!r}') + invcdf = kernel_spec['invcdf'] + + prng = _random.Random(seed) + random = prng.random + choice = prng.choice + + def rand(): + return choice(data) + h * invcdf(random()) + + rand.__doc__ = f'Random KDE selection with {h=!r} and {kernel=!r}' + + return rand + + +## Quantiles ############################################################### + # There is no one perfect way to compute quantiles. Here we offer # two methods that serve common needs. Most other packages # surveyed offered at least one or both of these two, making them @@ -1067,10 +1169,13 @@ def quantiles(data, *, n=4, method='exclusive'): If *method* is set to *inclusive*, *data* is treated as population data. The minimum value is treated as the 0th percentile and the maximum value is treated as the 100th percentile. + """ if n < 1: raise StatisticsError('n must be at least 1') + data = sorted(data) + ld = len(data) if ld < 2: if ld == 1: @@ -1100,314 +1205,8 @@ def quantiles(data, *, n=4, method='exclusive'): raise ValueError(f'Unknown method: {method!r}') -# === Measures of spread === - -# See http://mathworld.wolfram.com/Variance.html -# http://mathworld.wolfram.com/SampleVariance.html - - -def variance(data, xbar=None): - """Return the sample variance of data. - - data should be an iterable of Real-valued numbers, with at least two - values. The optional argument xbar, if given, should be the mean of - the data. If it is missing or None, the mean is automatically calculated. - - Use this function when your data is a sample from a population. To - calculate the variance from the entire population, see ``pvariance``. - - Examples: - - >>> data = [2.75, 1.75, 1.25, 0.25, 0.5, 1.25, 3.5] - >>> variance(data) - 1.3720238095238095 - - If you have already calculated the mean of your data, you can pass it as - the optional second argument ``xbar`` to avoid recalculating it: - - >>> m = mean(data) - >>> variance(data, m) - 1.3720238095238095 - - This function does not check that ``xbar`` is actually the mean of - ``data``. Giving arbitrary values for ``xbar`` may lead to invalid or - impossible results. - - Decimals and Fractions are supported: - - >>> from decimal import Decimal as D - >>> variance([D("27.5"), D("30.25"), D("30.25"), D("34.5"), D("41.75")]) - Decimal('31.01875') - - >>> from fractions import Fraction as F - >>> variance([F(1, 6), F(1, 2), F(5, 3)]) - Fraction(67, 108) - - """ - T, ss, c, n = _ss(data, xbar) - if n < 2: - raise StatisticsError('variance requires at least two data points') - return _convert(ss / (n - 1), T) - - -def pvariance(data, mu=None): - """Return the population variance of ``data``. - - data should be a sequence or iterable of Real-valued numbers, with at least one - value. The optional argument mu, if given, should be the mean of - the data. If it is missing or None, the mean is automatically calculated. - - Use this function to calculate the variance from the entire population. - To estimate the variance from a sample, the ``variance`` function is - usually a better choice. - - Examples: - - >>> data = [0.0, 0.25, 0.25, 1.25, 1.5, 1.75, 2.75, 3.25] - >>> pvariance(data) - 1.25 - - If you have already calculated the mean of the data, you can pass it as - the optional second argument to avoid recalculating it: - - >>> mu = mean(data) - >>> pvariance(data, mu) - 1.25 - - Decimals and Fractions are supported: - - >>> from decimal import Decimal as D - >>> pvariance([D("27.5"), D("30.25"), D("30.25"), D("34.5"), D("41.75")]) - Decimal('24.815') - - >>> from fractions import Fraction as F - >>> pvariance([F(1, 4), F(5, 4), F(1, 2)]) - Fraction(13, 72) - - """ - T, ss, c, n = _ss(data, mu) - if n < 1: - raise StatisticsError('pvariance requires at least one data point') - return _convert(ss / n, T) - - -def stdev(data, xbar=None): - """Return the square root of the sample variance. - - See ``variance`` for arguments and other details. - - >>> stdev([1.5, 2.5, 2.5, 2.75, 3.25, 4.75]) - 1.0810874155219827 - - """ - T, ss, c, n = _ss(data, xbar) - if n < 2: - raise StatisticsError('stdev requires at least two data points') - mss = ss / (n - 1) - if issubclass(T, Decimal): - return _decimal_sqrt_of_frac(mss.numerator, mss.denominator) - return _float_sqrt_of_frac(mss.numerator, mss.denominator) - - -def pstdev(data, mu=None): - """Return the square root of the population variance. - - See ``pvariance`` for arguments and other details. - - >>> pstdev([1.5, 2.5, 2.5, 2.75, 3.25, 4.75]) - 0.986893273527251 - - """ - T, ss, c, n = _ss(data, mu) - if n < 1: - raise StatisticsError('pstdev requires at least one data point') - mss = ss / n - if issubclass(T, Decimal): - return _decimal_sqrt_of_frac(mss.numerator, mss.denominator) - return _float_sqrt_of_frac(mss.numerator, mss.denominator) - - -def _mean_stdev(data): - """In one pass, compute the mean and sample standard deviation as floats.""" - T, ss, xbar, n = _ss(data) - if n < 2: - raise StatisticsError('stdev requires at least two data points') - mss = ss / (n - 1) - try: - return float(xbar), _float_sqrt_of_frac(mss.numerator, mss.denominator) - except AttributeError: - # Handle Nans and Infs gracefully - return float(xbar), float(xbar) / float(ss) - -def _sqrtprod(x: float, y: float) -> float: - "Return sqrt(x * y) computed with improved accuracy and without overflow/underflow." - h = sqrt(x * y) - if not isfinite(h): - if isinf(h) and not isinf(x) and not isinf(y): - # Finite inputs overflowed, so scale down, and recompute. - scale = 2.0 ** -512 # sqrt(1 / sys.float_info.max) - return _sqrtprod(scale * x, scale * y) / scale - return h - if not h: - if x and y: - # Non-zero inputs underflowed, so scale up, and recompute. - # Scale: 1 / sqrt(sys.float_info.min * sys.float_info.epsilon) - scale = 2.0 ** 537 - return _sqrtprod(scale * x, scale * y) / scale - return h - # Improve accuracy with a differential correction. - # https://www.wolframalpha.com/input/?i=Maclaurin+series+sqrt%28h**2+%2B+x%29+at+x%3D0 - d = sumprod((x, h), (y, -h)) - return h + d / (2.0 * h) - - -# === Statistics for relations between two inputs === - -# See https://en.wikipedia.org/wiki/Covariance -# https://en.wikipedia.org/wiki/Pearson_correlation_coefficient -# https://en.wikipedia.org/wiki/Simple_linear_regression - - -def covariance(x, y, /): - """Covariance - - Return the sample covariance of two inputs *x* and *y*. Covariance - is a measure of the joint variability of two inputs. - - >>> x = [1, 2, 3, 4, 5, 6, 7, 8, 9] - >>> y = [1, 2, 3, 1, 2, 3, 1, 2, 3] - >>> covariance(x, y) - 0.75 - >>> z = [9, 8, 7, 6, 5, 4, 3, 2, 1] - >>> covariance(x, z) - -7.5 - >>> covariance(z, x) - -7.5 - - """ - n = len(x) - if len(y) != n: - raise StatisticsError('covariance requires that both inputs have same number of data points') - if n < 2: - raise StatisticsError('covariance requires at least two data points') - xbar = fsum(x) / n - ybar = fsum(y) / n - sxy = sumprod((xi - xbar for xi in x), (yi - ybar for yi in y)) - return sxy / (n - 1) - - -def correlation(x, y, /, *, method='linear'): - """Pearson's correlation coefficient - - Return the Pearson's correlation coefficient for two inputs. Pearson's - correlation coefficient *r* takes values between -1 and +1. It measures - the strength and direction of a linear relationship. - - >>> x = [1, 2, 3, 4, 5, 6, 7, 8, 9] - >>> y = [9, 8, 7, 6, 5, 4, 3, 2, 1] - >>> correlation(x, x) - 1.0 - >>> correlation(x, y) - -1.0 - - If *method* is "ranked", computes Spearman's rank correlation coefficient - for two inputs. The data is replaced by ranks. Ties are averaged - so that equal values receive the same rank. The resulting coefficient - measures the strength of a monotonic relationship. - - Spearman's rank correlation coefficient is appropriate for ordinal - data or for continuous data that doesn't meet the linear proportion - requirement for Pearson's correlation coefficient. - """ - n = len(x) - if len(y) != n: - raise StatisticsError('correlation requires that both inputs have same number of data points') - if n < 2: - raise StatisticsError('correlation requires at least two data points') - if method not in {'linear', 'ranked'}: - raise ValueError(f'Unknown method: {method!r}') - if method == 'ranked': - start = (n - 1) / -2 # Center rankings around zero - x = _rank(x, start=start) - y = _rank(y, start=start) - else: - xbar = fsum(x) / n - ybar = fsum(y) / n - x = [xi - xbar for xi in x] - y = [yi - ybar for yi in y] - sxy = sumprod(x, y) - sxx = sumprod(x, x) - syy = sumprod(y, y) - try: - return sxy / _sqrtprod(sxx, syy) - except ZeroDivisionError: - raise StatisticsError('at least one of the inputs is constant') - - -LinearRegression = namedtuple('LinearRegression', ('slope', 'intercept')) - - -def linear_regression(x, y, /, *, proportional=False): - """Slope and intercept for simple linear regression. - - Return the slope and intercept of simple linear regression - parameters estimated using ordinary least squares. Simple linear - regression describes relationship between an independent variable - *x* and a dependent variable *y* in terms of a linear function: - - y = slope * x + intercept + noise - - where *slope* and *intercept* are the regression parameters that are - estimated, and noise represents the variability of the data that was - not explained by the linear regression (it is equal to the - difference between predicted and actual values of the dependent - variable). - - The parameters are returned as a named tuple. - - >>> x = [1, 2, 3, 4, 5] - >>> noise = NormalDist().samples(5, seed=42) - >>> y = [3 * x[i] + 2 + noise[i] for i in range(5)] - >>> linear_regression(x, y) #doctest: +ELLIPSIS - LinearRegression(slope=3.17495..., intercept=1.00925...) - - If *proportional* is true, the independent variable *x* and the - dependent variable *y* are assumed to be directly proportional. - The data is fit to a line passing through the origin. - - Since the *intercept* will always be 0.0, the underlying linear - function simplifies to: - - y = slope * x + noise - - >>> y = [3 * x[i] + noise[i] for i in range(5)] - >>> linear_regression(x, y, proportional=True) #doctest: +ELLIPSIS - LinearRegression(slope=2.90475..., intercept=0.0) - - """ - n = len(x) - if len(y) != n: - raise StatisticsError('linear regression requires that both inputs have same number of data points') - if n < 2: - raise StatisticsError('linear regression requires at least two data points') - if not proportional: - xbar = fsum(x) / n - ybar = fsum(y) / n - x = [xi - xbar for xi in x] # List because used three times below - y = (yi - ybar for yi in y) # Generator because only used once below - sxy = sumprod(x, y) + 0.0 # Add zero to coerce result to a float - sxx = sumprod(x, x) - try: - slope = sxy / sxx # equivalent to: covariance(x, y) / variance(x) - except ZeroDivisionError: - raise StatisticsError('x is constant') - intercept = 0.0 if proportional else ybar - slope * xbar - return LinearRegression(slope=slope, intercept=intercept) - - ## Normal Distribution ##################################################### - def _normal_dist_inv_cdf(p, mu, sigma): # There is no closed-form solution to the inverse CDF for the normal # distribution, so we use a rational approximation instead: @@ -1415,6 +1214,7 @@ def _normal_dist_inv_cdf(p, mu, sigma): # Normal Distribution". Applied Statistics. Blackwell Publishing. 37 # (3): 477–484. doi:10.2307/2347330. JSTOR 2347330. q = p - 0.5 + if fabs(q) <= 0.425: r = 0.180625 - q * q # Hash sum: 55.88319_28806_14901_4439 @@ -1436,6 +1236,7 @@ def _normal_dist_inv_cdf(p, mu, sigma): 1.0) x = num / den return mu + (x * sigma) + r = p if q <= 0.0 else 1.0 - p r = sqrt(-log(r)) if r <= 5.0: @@ -1476,9 +1277,11 @@ def _normal_dist_inv_cdf(p, mu, sigma): 1.36929_88092_27358_05310e-1) * r + 5.99832_20655_58879_37690e-1) * r + 1.0) + x = num / den if q < 0.0: x = -x + return mu + (x * sigma) @@ -1712,95 +1515,339 @@ class NormalDist: self._mu, self._sigma = state -## kde_random() ############################################################## +## Private utilities ####################################################### -def _newton_raphson(f_inv_estimate, f, f_prime, tolerance=1e-12): - def f_inv(y): - "Return x such that f(x) ≈ y within the specified tolerance." - x = f_inv_estimate(y) - while abs(diff := f(x) - y) > tolerance: - x -= diff / f_prime(x) - return x - return f_inv +def _sum(data): + """_sum(data) -> (type, sum, count) -def _quartic_invcdf_estimate(p): - sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p) - x = (2.0 * p) ** 0.4258865685331 - 1.0 - if p >= 0.004 < 0.499: - x += 0.026818732 * sin(7.101753784 * p + 2.73230839482953) - return x * sign + Return a high-precision sum of the given numeric data as a fraction, + together with the type to be converted to and the count of items. -_quartic_invcdf = _newton_raphson( - f_inv_estimate = _quartic_invcdf_estimate, - f = lambda t: sumprod((3/16, -5/8, 15/16, 1/2), (t**5, t**3, t, 1.0)), - f_prime = lambda t: 15/16 * (1.0 - t * t) ** 2) + Examples + -------- -def _triweight_invcdf_estimate(p): - sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p) - x = (2.0 * p) ** 0.3400218741872791 - 1.0 - return x * sign + >>> _sum([3, 2.25, 4.5, -0.5, 0.25]) + (, Fraction(19, 2), 5) -_triweight_invcdf = _newton_raphson( - f_inv_estimate = _triweight_invcdf_estimate, - f = lambda t: sumprod((-5/32, 21/32, -35/32, 35/32, 1/2), - (t**7, t**5, t**3, t, 1.0)), - f_prime = lambda t: 35/32 * (1.0 - t * t) ** 3) + Some sources of round-off error will be avoided: -_kernel_invcdfs = { - 'normal': NormalDist().inv_cdf, - 'logistic': lambda p: log(p / (1 - p)), - 'sigmoid': lambda p: log(tan(p * pi/2)), - 'rectangular': lambda p: 2*p - 1, - 'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3), - 'quartic': _quartic_invcdf, - 'triweight': _triweight_invcdf, - 'triangular': lambda p: sqrt(2*p) - 1 if p < 1/2 else 1 - sqrt(2 - 2*p), - 'cosine': lambda p: 2 * asin(2*p - 1) / pi, -} -_kernel_invcdfs['gauss'] = _kernel_invcdfs['normal'] -_kernel_invcdfs['uniform'] = _kernel_invcdfs['rectangular'] -_kernel_invcdfs['epanechnikov'] = _kernel_invcdfs['parabolic'] -_kernel_invcdfs['biweight'] = _kernel_invcdfs['quartic'] + # Built-in sum returns zero. + >>> _sum([1e50, 1, -1e50] * 1000) + (, Fraction(1000, 1), 3000) -def kde_random(data, h, kernel='normal', *, seed=None): - """Return a function that makes a random selection from the estimated - probability density function created by kde(data, h, kernel). + Fractions and Decimals are also supported: - Providing a *seed* allows reproducible selections within a single - thread. The seed may be an integer, float, str, or bytes. + >>> from fractions import Fraction as F + >>> _sum([F(2, 3), F(7, 5), F(1, 4), F(5, 6)]) + (, Fraction(63, 20), 4) - A StatisticsError will be raised if the *data* sequence is empty. + >>> from decimal import Decimal as D + >>> data = [D("0.1375"), D("0.2108"), D("0.3061"), D("0.0419")] + >>> _sum(data) + (, Fraction(6963, 10000), 4) - Example: - - >>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2] - >>> rand = kde_random(data, h=1.5, seed=8675309) - >>> new_selections = [rand() for i in range(10)] - >>> [round(x, 1) for x in new_selections] - [0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6] + Mixed types are currently treated as an error, except that int is + allowed. """ - n = len(data) - if not n: - raise StatisticsError('Empty data sequence') + count = 0 + types = set() + types_add = types.add + partials = {} + partials_get = partials.get + for typ, values in groupby(data, type): + types_add(typ) + for n, d in map(_exact_ratio, values): + count += 1 + partials[d] = partials_get(d, 0) + n + if None in partials: + # The sum will be a NAN or INF. We can ignore all the finite + # partials, and just look at this special one. + total = partials[None] + assert not _isfinite(total) + else: + # Sum all the partial sums using builtin sum. + total = sum(Fraction(n, d) for d, n in partials.items()) + T = reduce(_coerce, types, int) # or raise TypeError + return (T, total, count) - if not isinstance(data[0], (int, float)): - raise TypeError('Data sequence must contain ints or floats') - if h <= 0.0: - raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}') +def _ss(data, c=None): + """Return the exact mean and sum of square deviations of sequence data. - kernel_invcdf = _kernel_invcdfs.get(kernel) - if kernel_invcdf is None: - raise StatisticsError(f'Unknown kernel name: {kernel!r}') + Calculations are done in a single pass, allowing the input to be an iterator. - prng = _random.Random(seed) - random = prng.random - choice = prng.choice + If given *c* is used the mean; otherwise, it is calculated from the data. + Use the *c* argument with care, as it can lead to garbage results. - def rand(): - return choice(data) + h * kernel_invcdf(random()) + """ + if c is not None: + T, ssd, count = _sum((d := x - c) * d for x in data) + return (T, ssd, c, count) - rand.__doc__ = f'Random KDE selection with {h=!r} and {kernel=!r}' + count = 0 + types = set() + types_add = types.add + sx_partials = defaultdict(int) + sxx_partials = defaultdict(int) + for typ, values in groupby(data, type): + types_add(typ) + for n, d in map(_exact_ratio, values): + count += 1 + sx_partials[d] += n + sxx_partials[d] += n * n - return rand + if not count: + ssd = c = Fraction(0) + elif None in sx_partials: + # The sum will be a NAN or INF. We can ignore all the finite + # partials, and just look at this special one. + ssd = c = sx_partials[None] + assert not _isfinite(ssd) + else: + sx = sum(Fraction(n, d) for d, n in sx_partials.items()) + sxx = sum(Fraction(n, d*d) for d, n in sxx_partials.items()) + # This formula has poor numeric properties for floats, + # but with fractions it is exact. + ssd = (count * sxx - sx * sx) / count + c = sx / count + + T = reduce(_coerce, types, int) # or raise TypeError + return (T, ssd, c, count) + + +def _isfinite(x): + try: + return x.is_finite() # Likely a Decimal. + except AttributeError: + return math.isfinite(x) # Coerces to float first. + + +def _coerce(T, S): + """Coerce types T and S to a common type, or raise TypeError. + + Coercion rules are currently an implementation detail. See the CoerceTest + test class in test_statistics for details. + + """ + # See http://bugs.python.org/issue24068. + assert T is not bool, "initial type T is bool" + # If the types are the same, no need to coerce anything. Put this + # first, so that the usual case (no coercion needed) happens as soon + # as possible. + if T is S: return T + # Mixed int & other coerce to the other type. + if S is int or S is bool: return T + if T is int: return S + # If one is a (strict) subclass of the other, coerce to the subclass. + if issubclass(S, T): return S + if issubclass(T, S): return T + # Ints coerce to the other type. + if issubclass(T, int): return S + if issubclass(S, int): return T + # Mixed fraction & float coerces to float (or float subclass). + if issubclass(T, Fraction) and issubclass(S, float): + return S + if issubclass(T, float) and issubclass(S, Fraction): + return T + # Any other combination is disallowed. + msg = "don't know how to coerce %s and %s" + raise TypeError(msg % (T.__name__, S.__name__)) + + +def _exact_ratio(x): + """Return Real number x to exact (numerator, denominator) pair. + + >>> _exact_ratio(0.25) + (1, 4) + + x is expected to be an int, Fraction, Decimal or float. + + """ + try: + return x.as_integer_ratio() + except AttributeError: + pass + except (OverflowError, ValueError): + # float NAN or INF. + assert not _isfinite(x) + return (x, None) + + try: + # x may be an Integral ABC. + return (x.numerator, x.denominator) + except AttributeError: + msg = f"can't convert type '{type(x).__name__}' to numerator/denominator" + raise TypeError(msg) + + +def _convert(value, T): + """Convert value to given numeric type T.""" + if type(value) is T: + # This covers the cases where T is Fraction, or where value is + # a NAN or INF (Decimal or float). + return value + if issubclass(T, int) and value.denominator != 1: + T = float + try: + # FIXME: what do we do if this overflows? + return T(value) + except TypeError: + if issubclass(T, Decimal): + return T(value.numerator) / T(value.denominator) + else: + raise + + +def _fail_neg(values, errmsg='negative value'): + """Iterate over values, failing if any are less than zero.""" + for x in values: + if x < 0: + raise StatisticsError(errmsg) + yield x + + +def _rank(data, /, *, key=None, reverse=False, ties='average', start=1) -> list[float]: + """Rank order a dataset. The lowest value has rank 1. + + Ties are averaged so that equal values receive the same rank: + + >>> data = [31, 56, 31, 25, 75, 18] + >>> _rank(data) + [3.5, 5.0, 3.5, 2.0, 6.0, 1.0] + + The operation is idempotent: + + >>> _rank([3.5, 5.0, 3.5, 2.0, 6.0, 1.0]) + [3.5, 5.0, 3.5, 2.0, 6.0, 1.0] + + It is possible to rank the data in reverse order so that the + highest value has rank 1. Also, a key-function can extract + the field to be ranked: + + >>> goals = [('eagles', 45), ('bears', 48), ('lions', 44)] + >>> _rank(goals, key=itemgetter(1), reverse=True) + [2.0, 1.0, 3.0] + + Ranks are conventionally numbered starting from one; however, + setting *start* to zero allows the ranks to be used as array indices: + + >>> prize = ['Gold', 'Silver', 'Bronze', 'Certificate'] + >>> scores = [8.1, 7.3, 9.4, 8.3] + >>> [prize[int(i)] for i in _rank(scores, start=0, reverse=True)] + ['Bronze', 'Certificate', 'Gold', 'Silver'] + + """ + # If this function becomes public at some point, more thought + # needs to be given to the signature. A list of ints is + # plausible when ties is "min" or "max". When ties is "average", + # either list[float] or list[Fraction] is plausible. + + # Default handling of ties matches scipy.stats.mstats.spearmanr. + if ties != 'average': + raise ValueError(f'Unknown tie resolution method: {ties!r}') + if key is not None: + data = map(key, data) + val_pos = sorted(zip(data, count()), reverse=reverse) + i = start - 1 + result = [0] * len(val_pos) + for _, g in groupby(val_pos, key=itemgetter(0)): + group = list(g) + size = len(group) + rank = i + (size + 1) / 2 + for value, orig_pos in group: + result[orig_pos] = rank + i += size + return result + + +def _integer_sqrt_of_frac_rto(n: int, m: int) -> int: + """Square root of n/m, rounded to the nearest integer using round-to-odd.""" + # Reference: https://www.lri.fr/~melquion/doc/05-imacs17_1-expose.pdf + a = math.isqrt(n // m) + return a | (a*a*m != n) + + +# For 53 bit precision floats, the bit width used in +# _float_sqrt_of_frac() is 109. +_sqrt_bit_width: int = 2 * sys.float_info.mant_dig + 3 + + +def _float_sqrt_of_frac(n: int, m: int) -> float: + """Square root of n/m as a float, correctly rounded.""" + # See principle and proof sketch at: https://bugs.python.org/msg407078 + q = (n.bit_length() - m.bit_length() - _sqrt_bit_width) // 2 + if q >= 0: + numerator = _integer_sqrt_of_frac_rto(n, m << 2 * q) << q + denominator = 1 + else: + numerator = _integer_sqrt_of_frac_rto(n << -2 * q, m) + denominator = 1 << -q + return numerator / denominator # Convert to float + + +def _decimal_sqrt_of_frac(n: int, m: int) -> Decimal: + """Square root of n/m as a Decimal, correctly rounded.""" + # Premise: For decimal, computing (n/m).sqrt() can be off + # by 1 ulp from the correctly rounded result. + # Method: Check the result, moving up or down a step if needed. + if n <= 0: + if not n: + return Decimal('0.0') + n, m = -n, -m + + root = (Decimal(n) / Decimal(m)).sqrt() + nr, dr = root.as_integer_ratio() + + plus = root.next_plus() + np, dp = plus.as_integer_ratio() + # test: n / m > ((root + plus) / 2) ** 2 + if 4 * n * (dr*dp)**2 > m * (dr*np + dp*nr)**2: + return plus + + minus = root.next_minus() + nm, dm = minus.as_integer_ratio() + # test: n / m < ((root + minus) / 2) ** 2 + if 4 * n * (dr*dm)**2 < m * (dr*nm + dm*nr)**2: + return minus + + return root + + +def _mean_stdev(data): + """In one pass, compute the mean and sample standard deviation as floats.""" + T, ss, xbar, n = _ss(data) + if n < 2: + raise StatisticsError('stdev requires at least two data points') + mss = ss / (n - 1) + try: + return float(xbar), _float_sqrt_of_frac(mss.numerator, mss.denominator) + except AttributeError: + # Handle Nans and Infs gracefully + return float(xbar), float(xbar) / float(ss) + + +def _sqrtprod(x: float, y: float) -> float: + "Return sqrt(x * y) computed with improved accuracy and without overflow/underflow." + + h = sqrt(x * y) + + if not isfinite(h): + if isinf(h) and not isinf(x) and not isinf(y): + # Finite inputs overflowed, so scale down, and recompute. + scale = 2.0 ** -512 # sqrt(1 / sys.float_info.max) + return _sqrtprod(scale * x, scale * y) / scale + return h + + if not h: + if x and y: + # Non-zero inputs underflowed, so scale up, and recompute. + # Scale: 1 / sqrt(sys.float_info.min * sys.float_info.epsilon) + scale = 2.0 ** 537 + return _sqrtprod(scale * x, scale * y) / scale + return h + + # Improve accuracy with a differential correction. + # https://www.wolframalpha.com/input/?i=Maclaurin+series+sqrt%28h**2+%2B+x%29+at+x%3D0 + d = sumprod((x, h), (y, -h)) + return h + d / (2.0 * h) diff --git a/Lib/test/test_statistics.py b/Lib/test/test_statistics.py index cded8aba6e8..0b28459f03d 100644 --- a/Lib/test/test_statistics.py +++ b/Lib/test/test_statistics.py @@ -2435,12 +2435,13 @@ class TestKDE(unittest.TestCase): self.assertGreater(f_hat(100), 0.0) def test_kde_kernel_invcdfs(self): - kernel_invcdfs = statistics._kernel_invcdfs + kernel_specs = statistics._kernel_specs kde = statistics.kde # Verify that cdf / invcdf will round trip xarr = [i/100 for i in range(-100, 101)] - for kernel, invcdf in kernel_invcdfs.items(): + for kernel, spec in kernel_specs.items(): + invcdf = spec['invcdf'] with self.subTest(kernel=kernel): cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True) for x in xarr: