diff --git a/cara/monte_carlo/sampleable.py b/cara/monte_carlo/sampleable.py index 4ed49d82..4333c93f 100644 --- a/cara/monte_carlo/sampleable.py +++ b/cara/monte_carlo/sampleable.py @@ -1,6 +1,7 @@ import typing import numpy as np +from sklearn.neighbors import KernelDensity # type: ignore import cara.models @@ -16,6 +17,9 @@ class SampleableDistribution: class Normal(SampleableDistribution): + """ + Defines a normal (i.e. Gaussian) distribution + """ def __init__(self, mean: float, standard_deviation: float): self.mean = mean self.standard_deviation = standard_deviation @@ -24,6 +28,96 @@ class Normal(SampleableDistribution): return np.random.normal(self.mean, self.standard_deviation, size=size) +class LogNormal(SampleableDistribution): + """ + Defines a lognormal distribution (i.e. Gaussian distribution vs. the + natural logarithm of the random variable) + """ + + def __init__(self, mean_gaussian: float, standard_deviation_gaussian: float): + # these are resp. the mean and std. deviation of the underlying + # Gaussian distribution + self.mean_gaussian = mean_gaussian + self.standard_deviation_gaussian = standard_deviation_gaussian + + def generate_samples(self, size: int) -> float_array_size_n: + return np.random.lognormal(self.mean_gaussian, + self.standard_deviation_gaussian, + size=size) + + +class Custom(SampleableDistribution): + """ + Defines a distribution which follows a custom curve vs. the random + variable. Uses a simple algorithm. This is appropriate for a smooth + distribution function (one should know its maximum). + """ + def __init__(self, bounds: typing.Tuple[float, float], + function: typing.Callable, max_function: float): + self.bounds = bounds + self.function = function + self.max_function = max_function + + def generate_samples(self, size: int) -> float_array_size_n: + fvalue = np.random.uniform(0,self.max_function,size) + x = np.random.uniform(*self.bounds,size) + invalid = np.where(fvalue>self.function(x))[0] + while len(invalid)>0: + fvalue[invalid] = np.random.uniform(0,self.max_function,len(invalid)) + x[invalid] = np.random.uniform(*self.bounds,len(invalid)) + invalid = np.where(fvalue>self.function(x))[0] + + return x + + +class CustomKernel(SampleableDistribution): + """ + Defines a distribution which follows a custom curve vs. the + random variable. Uses a Gaussian kernel density fit. This is more + appropriate for a noisy distribution function. + """ + def __init__(self, variable: float_array_size_n, + frequencies: float_array_size_n, + kernel_bandwidth: float): + # these are resp. the random variable, the distribution + # frequencies at these values, and the bandwidth of the Gaussian + # kernel + self.variable = variable + self.frequencies = frequencies + self.kernel_bandwidth = kernel_bandwidth + + def generate_samples(self, size: int) -> float_array_size_n: + kde_model = KernelDensity(kernel='gaussian', + bandwidth=self.kernel_bandwidth) + kde_model.fit(self.variable.reshape(-1, 1), + sample_weight=self.frequencies) + return kde_model.sample(n_samples=size)[:, 0] + + +class LogCustomKernel(SampleableDistribution): + """ + Defines a distribution which follows a custom curve vs. the log + (in base 10) of the random variable. Uses a Gaussian kernel density + fit. This is more appropriate for a noisy distribution function. + """ + def __init__(self, log_variable: float_array_size_n, + frequencies: float_array_size_n, + kernel_bandwidth: float): + # these are resp. the log of the random variable, the distribution + # frequencies at these values, and the bandwidth of the Gaussian + # kernel + self.log_variable = log_variable + self.frequencies = frequencies + self.kernel_bandwidth = kernel_bandwidth + + def generate_samples(self, size: int) -> float_array_size_n: + kde_model = KernelDensity(kernel='gaussian', + bandwidth=self.kernel_bandwidth) + kde_model.fit(self.log_variable.reshape(-1, 1), + sample_weight=self.frequencies) + return 10 ** kde_model.sample(n_samples=size)[:, 0] + + _VectorisedFloatOrSampleable = typing.Union[ SampleableDistribution, cara.models._VectorisedFloat, ] diff --git a/cara/tests/test_sampleable_distribution.py b/cara/tests/test_sampleable_distribution.py new file mode 100644 index 00000000..e3801c7e --- /dev/null +++ b/cara/tests/test_sampleable_distribution.py @@ -0,0 +1,110 @@ +import numpy as np +import numpy.testing as npt +import pytest + +from cara.monte_carlo import sampleable +# TODO: seed better the random number generators +np.random.seed(2000) + +@pytest.mark.parametrize( + "mean, std",[ + [1., 0.5], + ] +) +def test_normal(mean, std): + # test that the sample has approximately the right mean, + # std deviation and distribution function. + sample_size = 2000000 + samples = sampleable.Normal(mean, std).generate_samples(sample_size) + histogram, bins = np.histogram(samples,bins=100, density=True) + selected_bins,selected_histogram = zip(*[(b,h) for b,h in zip( + (bins[1:]+bins[:-1])/2,histogram) if b>=0.25 and b<=1.75]) + exact_dist = 1/(np.sqrt(2*np.pi)*std) * np.exp( + -((np.array(selected_bins)-mean)/std)**2/2) + + assert len(samples) == sample_size + npt.assert_allclose([samples.mean(), samples.std()], [mean, std], rtol=0.01) + npt.assert_allclose(selected_histogram, exact_dist, rtol=0.02) + + +@pytest.mark.parametrize( + "mean_gaussian, std_gaussian",[ + [-0.6872121723362303, 0.10498338229297108], + ] +) +def test_lognormal(mean_gaussian, std_gaussian): + # test that the sample has approximately the right mean, + # std deviation and distribution function. + sample_size = 2000000 + samples = sampleable.LogNormal(mean_gaussian, std_gaussian + ).generate_samples(sample_size) + histogram, bins = np.histogram(samples,bins=50, density=True) + selected_bins,selected_histogram = zip(*[(b,h) for b,h in zip( + (bins[1:]+bins[:-1])/2,histogram) if b>=0.4 and b<=0.6]) + selected_bins = np.array(selected_bins) + exact_dist = ( 1/(selected_bins*np.sqrt(2*np.pi)*std_gaussian) * + np.exp(-((np.log(selected_bins)-mean_gaussian)/std_gaussian)**2/2) ) + exact_mean = np.exp(mean_gaussian + std_gaussian**2/2) + exact_std = np.sqrt( (np.exp(std_gaussian**2)-1) * + np.exp(2*mean_gaussian + std_gaussian**2) ) + + assert len(samples) == sample_size + npt.assert_allclose([samples.mean(), samples.std()], + [exact_mean, exact_std], rtol=0.01) + npt.assert_allclose(selected_histogram, exact_dist, rtol=0.02) + + +@pytest.mark.parametrize( + "use_kernel", + [False, True], +) +def test_custom(use_kernel): + # test that the sample has approximately the right distribution + # function, with both Custom and CustomKernel method. The latter + # is less accurate for smooth functions. + # the distribution function is an inverted parabola, with maximum 0.15, + # which is 0 at the bounds (0,10) (normalized) + norm = 500/3. + function = lambda x: (-(5 - x)**2 + 25)/norm + max_function = 0.15 + sample_size = 2000000 + + if use_kernel: + variable = np.linspace(0.1,9.9,100) + frequencies = function(variable) + samples = sampleable.CustomKernel(variable, frequencies, + kernel_bandwidth=0.1 + ).generate_samples(sample_size) + else: + samples = sampleable.Custom((0, 10), function, max_function + ).generate_samples(sample_size) + + histogram, bins = np.histogram(samples, bins=100, density=True) + selected_bins,selected_histogram = zip(*[(b,h) for b,h in zip( + (bins[1:]+bins[:-1])/2,histogram) if b>=1 and b<=9]) + correct_dist = function(np.array(selected_bins)) + assert len(samples) == sample_size + npt.assert_allclose(selected_histogram, correct_dist, rtol=0.05) + + +def test_logcustomkernel(): + # test that the sample has approximately the right distribution + # function, for the LogCustomKernel. + # the distribution function is an inverted parabola vs. the log of + # the variable (normalized) + norm = 500/3. + function = lambda x: (-(5 - x)**2 + 25)/norm + sample_size = 2000000 + + log_variable = np.linspace(0.1,9.9,100) + frequencies = function(log_variable) + samples = sampleable.LogCustomKernel(log_variable, frequencies, + kernel_bandwidth=0.1 + ).generate_samples(sample_size) + + histogram, bins = np.histogram(np.log10(samples), bins=100, density=True) + selected_bins,selected_histogram = zip(*[(b,h) for b,h in zip( + (bins[1:]+bins[:-1])/2,histogram) if b>=1 and b<=9]) + correct_dist = function(np.array(selected_bins)) + assert len(samples) == sample_size + npt.assert_allclose(selected_histogram, correct_dist, rtol=0.05) diff --git a/setup.py b/setup.py index e99d3185..3d5fc17b 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ REQUIREMENTS: dict = { 'numpy', 'qrcode[pil]', 'scipy', + 'sklearn', 'tornado', 'voila >=0.2.4', ],