Merge branch 'feature/new_sampleable_distributions' into 'feature/MonteCarlo'

Adding new sampleable distributions: lognormal and custom

See merge request cara/cara!192
This commit is contained in:
Nicolas Mounet 2021-06-03 08:36:21 +00:00
commit 14d712d12f
3 changed files with 205 additions and 0 deletions

View file

@ -1,6 +1,7 @@
import typing
import numpy as np
from sklearn.neighbors import KernelDensity # type: ignore
import cara.models
@ -16,6 +17,9 @@ class SampleableDistribution:
class Normal(SampleableDistribution):
"""
Defines a normal (i.e. Gaussian) distribution
"""
def __init__(self, mean: float, standard_deviation: float):
self.mean = mean
self.standard_deviation = standard_deviation
@ -24,6 +28,96 @@ class Normal(SampleableDistribution):
return np.random.normal(self.mean, self.standard_deviation, size=size)
class LogNormal(SampleableDistribution):
"""
Defines a lognormal distribution (i.e. Gaussian distribution vs. the
natural logarithm of the random variable)
"""
def __init__(self, mean_gaussian: float, standard_deviation_gaussian: float):
# these are resp. the mean and std. deviation of the underlying
# Gaussian distribution
self.mean_gaussian = mean_gaussian
self.standard_deviation_gaussian = standard_deviation_gaussian
def generate_samples(self, size: int) -> float_array_size_n:
return np.random.lognormal(self.mean_gaussian,
self.standard_deviation_gaussian,
size=size)
class Custom(SampleableDistribution):
"""
Defines a distribution which follows a custom curve vs. the random
variable. Uses a simple algorithm. This is appropriate for a smooth
distribution function (one should know its maximum).
"""
def __init__(self, bounds: typing.Tuple[float, float],
function: typing.Callable, max_function: float):
self.bounds = bounds
self.function = function
self.max_function = max_function
def generate_samples(self, size: int) -> float_array_size_n:
fvalue = np.random.uniform(0,self.max_function,size)
x = np.random.uniform(*self.bounds,size)
invalid = np.where(fvalue>self.function(x))[0]
while len(invalid)>0:
fvalue[invalid] = np.random.uniform(0,self.max_function,len(invalid))
x[invalid] = np.random.uniform(*self.bounds,len(invalid))
invalid = np.where(fvalue>self.function(x))[0]
return x
class CustomKernel(SampleableDistribution):
"""
Defines a distribution which follows a custom curve vs. the
random variable. Uses a Gaussian kernel density fit. This is more
appropriate for a noisy distribution function.
"""
def __init__(self, variable: float_array_size_n,
frequencies: float_array_size_n,
kernel_bandwidth: float):
# these are resp. the random variable, the distribution
# frequencies at these values, and the bandwidth of the Gaussian
# kernel
self.variable = variable
self.frequencies = frequencies
self.kernel_bandwidth = kernel_bandwidth
def generate_samples(self, size: int) -> float_array_size_n:
kde_model = KernelDensity(kernel='gaussian',
bandwidth=self.kernel_bandwidth)
kde_model.fit(self.variable.reshape(-1, 1),
sample_weight=self.frequencies)
return kde_model.sample(n_samples=size)[:, 0]
class LogCustomKernel(SampleableDistribution):
"""
Defines a distribution which follows a custom curve vs. the log
(in base 10) of the random variable. Uses a Gaussian kernel density
fit. This is more appropriate for a noisy distribution function.
"""
def __init__(self, log_variable: float_array_size_n,
frequencies: float_array_size_n,
kernel_bandwidth: float):
# these are resp. the log of the random variable, the distribution
# frequencies at these values, and the bandwidth of the Gaussian
# kernel
self.log_variable = log_variable
self.frequencies = frequencies
self.kernel_bandwidth = kernel_bandwidth
def generate_samples(self, size: int) -> float_array_size_n:
kde_model = KernelDensity(kernel='gaussian',
bandwidth=self.kernel_bandwidth)
kde_model.fit(self.log_variable.reshape(-1, 1),
sample_weight=self.frequencies)
return 10 ** kde_model.sample(n_samples=size)[:, 0]
_VectorisedFloatOrSampleable = typing.Union[
SampleableDistribution, cara.models._VectorisedFloat,
]

View file

@ -0,0 +1,110 @@
import numpy as np
import numpy.testing as npt
import pytest
from cara.monte_carlo import sampleable
# TODO: seed better the random number generators
np.random.seed(2000)
@pytest.mark.parametrize(
"mean, std",[
[1., 0.5],
]
)
def test_normal(mean, std):
# test that the sample has approximately the right mean,
# std deviation and distribution function.
sample_size = 2000000
samples = sampleable.Normal(mean, std).generate_samples(sample_size)
histogram, bins = np.histogram(samples,bins=100, density=True)
selected_bins,selected_histogram = zip(*[(b,h) for b,h in zip(
(bins[1:]+bins[:-1])/2,histogram) if b>=0.25 and b<=1.75])
exact_dist = 1/(np.sqrt(2*np.pi)*std) * np.exp(
-((np.array(selected_bins)-mean)/std)**2/2)
assert len(samples) == sample_size
npt.assert_allclose([samples.mean(), samples.std()], [mean, std], rtol=0.01)
npt.assert_allclose(selected_histogram, exact_dist, rtol=0.02)
@pytest.mark.parametrize(
"mean_gaussian, std_gaussian",[
[-0.6872121723362303, 0.10498338229297108],
]
)
def test_lognormal(mean_gaussian, std_gaussian):
# test that the sample has approximately the right mean,
# std deviation and distribution function.
sample_size = 2000000
samples = sampleable.LogNormal(mean_gaussian, std_gaussian
).generate_samples(sample_size)
histogram, bins = np.histogram(samples,bins=50, density=True)
selected_bins,selected_histogram = zip(*[(b,h) for b,h in zip(
(bins[1:]+bins[:-1])/2,histogram) if b>=0.4 and b<=0.6])
selected_bins = np.array(selected_bins)
exact_dist = ( 1/(selected_bins*np.sqrt(2*np.pi)*std_gaussian) *
np.exp(-((np.log(selected_bins)-mean_gaussian)/std_gaussian)**2/2) )
exact_mean = np.exp(mean_gaussian + std_gaussian**2/2)
exact_std = np.sqrt( (np.exp(std_gaussian**2)-1) *
np.exp(2*mean_gaussian + std_gaussian**2) )
assert len(samples) == sample_size
npt.assert_allclose([samples.mean(), samples.std()],
[exact_mean, exact_std], rtol=0.01)
npt.assert_allclose(selected_histogram, exact_dist, rtol=0.02)
@pytest.mark.parametrize(
"use_kernel",
[False, True],
)
def test_custom(use_kernel):
# test that the sample has approximately the right distribution
# function, with both Custom and CustomKernel method. The latter
# is less accurate for smooth functions.
# the distribution function is an inverted parabola, with maximum 0.15,
# which is 0 at the bounds (0,10) (normalized)
norm = 500/3.
function = lambda x: (-(5 - x)**2 + 25)/norm
max_function = 0.15
sample_size = 2000000
if use_kernel:
variable = np.linspace(0.1,9.9,100)
frequencies = function(variable)
samples = sampleable.CustomKernel(variable, frequencies,
kernel_bandwidth=0.1
).generate_samples(sample_size)
else:
samples = sampleable.Custom((0, 10), function, max_function
).generate_samples(sample_size)
histogram, bins = np.histogram(samples, bins=100, density=True)
selected_bins,selected_histogram = zip(*[(b,h) for b,h in zip(
(bins[1:]+bins[:-1])/2,histogram) if b>=1 and b<=9])
correct_dist = function(np.array(selected_bins))
assert len(samples) == sample_size
npt.assert_allclose(selected_histogram, correct_dist, rtol=0.05)
def test_logcustomkernel():
# test that the sample has approximately the right distribution
# function, for the LogCustomKernel.
# the distribution function is an inverted parabola vs. the log of
# the variable (normalized)
norm = 500/3.
function = lambda x: (-(5 - x)**2 + 25)/norm
sample_size = 2000000
log_variable = np.linspace(0.1,9.9,100)
frequencies = function(log_variable)
samples = sampleable.LogCustomKernel(log_variable, frequencies,
kernel_bandwidth=0.1
).generate_samples(sample_size)
histogram, bins = np.histogram(np.log10(samples), bins=100, density=True)
selected_bins,selected_histogram = zip(*[(b,h) for b,h in zip(
(bins[1:]+bins[:-1])/2,histogram) if b>=1 and b<=9])
correct_dist = function(np.array(selected_bins))
assert len(samples) == sample_size
npt.assert_allclose(selected_histogram, correct_dist, rtol=0.05)

View file

@ -29,6 +29,7 @@ REQUIREMENTS: dict = {
'numpy',
'qrcode[pil]',
'scipy',
'sklearn',
'tornado',
'voila >=0.2.4',
],