#!/usr/bin/python3 # Robert Denkert, 575117 import time from matplotlib import pyplot as plt import numpy as np from numpy.linalg import inv from numpy.linalg import cholesky def safeCholesky(matrix): """ Calculate the Cholesky decomposition, but don't throw if there's a numerical problem. Instead, try again with some added noise. Parameters ---------- matrix : The matrix for which the Cholesky decomposition should be calculated. Returns ------- L : The cholesky decompostion of matrix """ # Use some noise for numerical reasons, but even then, this might still fail. # In this case, the script will be restarted for different random training values. epsilon = -10 while(1): try: L = cholesky(matrix + 10**epsilon * np.eye(len(matrix))) return L except np.linalg.linalg.LinAlgError: print("Simulation aus numerischen Gruenden fehlgeschlagen. Noise wird hinzugefĆ¼gt...") epsilon+=1 def simulateProcessByCovarianzMatrix(covariance, trainPoints, trainValues , evalPoints, numberOfDraws): """ Emulate the stochastic process given the covariance matrix. Parameters ---------- covariance : A function returning the covariance of two input vectors. trainPoints : Training data coordinates. An array of the same length as `f`. trainValues : Training data values. An array of the same length as `x`. evalPoints : Evaluation coordinates. numberOfDraws : Number of evaluations of the stochastic process. Returns ------- points : N-sized array of time coordinates. values : NxnumberOfDraws-sized array of space coordinates of the different evaluations. lowerConfidenceFunction : Function marking the lower bound of the confidence interval. upperConfidenceFunction : Function marking the upper bound of the confidence interval. """ # calculate all the matrices needed later covarianceEvalTrain = covariance(evalPoints, trainPoints) inverseCovarianceTrainTrain = inv(covariance(trainPoints, trainPoints)) covarianceEvalEval = covariance(evalPoints, evalPoints) # calculate mean, covariance matrix, and its square root m = covarianceEvalTrain.dot((inverseCovarianceTrainTrain).dot(trainValues)) K = covarianceEvalEval - covarianceEvalTrain.dot(inverseCovarianceTrainTrain).dot(covarianceEvalTrain.T) L = safeCholesky(K) # draw random functions U = np.random.normal(size=(len(evalPoints), numberOfDraws)) M = np.reshape(m,(len(m),1)).repeat(numberOfDraws, axis = 1) evalValues = M + L.dot(U) # returns functions to plot lowerConfidenceFunction = m - 2*np.sqrt(np.diag(K)) upperConfidenceFunction = m + 2*np.sqrt(np.diag(K)) return evalPoints, evalValues, lowerConfidenceFunction, upperConfidenceFunction def fractionalBrownianCovarianceWithHurstParam(h, aVec, bVec): """ Given two vectors, this function calculates the covariance matrix assuming them being part of a fractional brownian motion with Hurst parameter `h` using (1) [Dieker, Mandjes]. Parameters ---------- h : Hurst parameter. `h` should be in (0,1). aVec : First non negative input vector. bVec : Second non negative input vector. Returns ------- covariance : Covariance matrix of aVec and bVec assuming them being part of a fractional brownian motion. """ assert all(aVec >= 0) & all(bVec >= 0), "Alle verwendeten Zeitpunkte muessen nichtnegativ sein." covariance = np.fromfunction(lambda i,j: 0.5 * (np.power(bVec[j], 2*h) + np.power(aVec[i], 2*h) - np.power(np.abs(aVec[i] - bVec[j]), 2*h)), (len(aVec),len(bVec)), dtype = int) return covariance def fractionalBrownianCovarianceFunctionForHurstParam(h): """ Get the covariance function for a fractional brownian motion given the Hurst parameter. Parameters ---------- h : Hurst parameter. `h` should be in (0,1). Returns ------- func : Covariance function for a fractional brownian motion given the Hurst parameter """ func = lambda aVec, bVec : fractionalBrownianCovarianceWithHurstParam(h, aVec, bVec) return func def generateFractionalBrownianTrainingData(a, b, h, numberOfTrainValues): """ Get the covariance function for a fractional brownian motion given the Hurst parameter. Parameters ---------- a : Lower bound of the interval the taining points are drawn from. Has to be non negative. b : Upper bound of the interval the taining points are drawn from. Has to be non negative and greater than `a`. h : Hurst parameter of the fractional brownian motion. numberOfTrainValues : Number of training points / values. Returns ------- trainPoints : Time coordinates for the generated training points. trainValues : Space coordinates for the generated training points. """ trainPoints = np.sort(np.random.uniform(low=a, high=b, size=numberOfTrainValues)) trainValues = np.random.multivariate_normal(np.zeros(numberOfTrainValues), fractionalBrownianCovarianceWithHurstParam(h, trainPoints, trainPoints)) return trainPoints, trainValues def simulateFractionalBrownianMotionByFFT(h, N, b, numberOfDraws): """ This simulates `numberOfDraws` sample paths of the fractional brownian motion with the givenHurst Parameter. Parameters ---------- h : Hurst parameter of the fractional brownian motion. N : Number of evalution points. b : Upper bound of the time interval. The fractional brownian motion will be simulated with t in [0,b]. numberOfDraws : Number of sample paths to evaluate. Returns ------- points : N-sized array of time coordinates. values : NxnumberOfDraws-sized array of space coordinates of the different evaluations. """ points = np.arange(N, dtype=float) values = np.empty([N, numberOfDraws], dtype=float) for i in range(numberOfDraws): # Define gamma (but compatible with array operations) gamma = lambda ks : (np.abs(ks-np.ones(len(ks)))**(2*h) - 2*np.abs(ks)**(2*h) + np.abs(ks+np.ones(len(ks)))**(2*h)) # Set (a_k)_0..2N-1 a = np.empty(2*N) a[0:N] = gamma(np.arange(N)) a[N] = 0 a[N+1:2*N] = gamma(np.arange(N-1,0,-1)) # Compute the fourier transformation according to (4) lmbda = np.fft.fft(a) # Construct u^i_k as described in the paper u = np.random.normal(size=[2,2*N]) # This should not happen almost surely. But let's get an useful error message in this case. # The solution should be trying again and hoping for better random vectors though. assert np.linalg.matrix_rank(u) == 2, "The constructed uniform vectors are not independent." # Define omega as in (5) omega = np.empty(2*N, dtype=complex) omega[0] = np.sqrt(lmbda[0]/(2*N)) * u[0,0] omega[1:N] = np.sqrt(lmbda[1:N]/(4*N)) * (u[0,1:N] + 1j * u[1,1:N]) omega[N] = np.sqrt(lmbda[N]/(2*N)) * u[0,N] omega[N+1:2*N] = np.sqrt(lmbda[N:2*N-1]/(4*N)) * (u[0,N-1:0:-1] - 1j * u[1,N-1:0:-1]) # Get the fractional Gaussian noise sample by using the fourier transformation fGn = np.fft.fft(omega)[0:N] # The fractional brownian motion sample is obtained by computing the cumulative sums values[:,i] = np.real(np.cumsum(fGn)) # rescale the fBm scalingFactor = b*1.0/(N-1) points *= scalingFactor values *= scalingFactor**h return points, values if __name__ == "__main__": numberOfPathEvaluations = 4 trainingDataSize = 10 numberOfEvaluationPoints = 2000 upperTimeLimit = 4 totalTimeForCovariance = 0 totalTimeForFFT = 0 fig = plt.figure() # plots fractional brownian motions with different Hurst poarameters # plot 1 axes = fig.add_subplot(2,3,1) lastTimePoint = time.time() hurstParam = 0.2 evalPoints = np.linspace(0, upperTimeLimit, numberOfEvaluationPoints) trainPoints, trainValues = generateFractionalBrownianTrainingData(0, upperTimeLimit, hurstParam, trainingDataSize) pts, vls, lfunc, ufunc = simulateProcessByCovarianzMatrix(fractionalBrownianCovarianceFunctionForHurstParam(hurstParam), trainPoints, trainValues, evalPoints, numberOfPathEvaluations) totalTimeForCovariance += time.time() - lastTimePoint axes.fill_between(pts, lfunc, ufunc, color='#a0a0a0') axes.plot(pts, vls) axes.plot(trainPoints, trainValues, 'bs') axes.set_title("fBm with H = 0.2") # plot 2 axes = fig.add_subplot(2,3,2) lastTimePoint = time.time() hurstParam = 0.5 # this results in a standard brownian motion trainPoints, trainValues = generateFractionalBrownianTrainingData(0, upperTimeLimit, hurstParam, trainingDataSize) pts, vls, lfunc, ufunc = simulateProcessByCovarianzMatrix(fractionalBrownianCovarianceFunctionForHurstParam(hurstParam), trainPoints, trainValues, evalPoints, numberOfPathEvaluations) totalTimeForCovariance += time.time() - lastTimePoint axes.fill_between(pts, lfunc, ufunc, color='#a0a0a0') axes.plot(pts, vls) axes.plot(trainPoints, trainValues, 'bs') axes.set_title("fBm with H = 0.5") # plot 3 axes = fig.add_subplot(2,3,3) lastTimePoint = time.time() hurstParam = 0.8 trainPoints, trainValues = generateFractionalBrownianTrainingData(0, upperTimeLimit, hurstParam, trainingDataSize) pts, vls, lfunc, ufunc = simulateProcessByCovarianzMatrix(fractionalBrownianCovarianceFunctionForHurstParam(hurstParam), trainPoints, trainValues, evalPoints, numberOfPathEvaluations) totalTimeForCovariance += time.time() - lastTimePoint axes.fill_between(pts, lfunc, ufunc, color='#a0a0a0') axes.plot(pts, vls) axes.plot(trainPoints, trainValues, 'bs') axes.set_title("fBm with H = 0.8") # The following fractional brownian motions samples are computed in a different way using the fourier transformation # plot 4 axes = fig.add_subplot(2,3,4) lastTimePoint = time.time() hurstParam = 0.2 pts, vls = simulateFractionalBrownianMotionByFFT(hurstParam, numberOfEvaluationPoints, upperTimeLimit, numberOfPathEvaluations) totalTimeForFFT += time.time() - lastTimePoint axes.plot(pts, vls) axes.set_title("fBm by FFT with H = 0.2") # plot 5 axes = fig.add_subplot(2,3,5) lastTimePoint = time.time() hurstParam = 0.5 # this results in a standard brownian motion pts, vls = simulateFractionalBrownianMotionByFFT(hurstParam, numberOfEvaluationPoints, upperTimeLimit, numberOfPathEvaluations) totalTimeForFFT += time.time() - lastTimePoint axes.plot(pts, vls) axes.set_title("fBm by FFT with H = 0.5") # plot 6 axes = fig.add_subplot(2,3,6) lastTimePoint = time.time() hurstParam = 0.8 pts, vls = simulateFractionalBrownianMotionByFFT(hurstParam, numberOfEvaluationPoints, upperTimeLimit, numberOfPathEvaluations) totalTimeForFFT += time.time() - lastTimePoint axes.plot(pts, vls) axes.set_title("fBm by FFT with H = 0.8") print("Gesamtrechenzeit: ", totalTimeForCovariance + totalTimeForFFT) print("Rechenzeit fuer Teil 1: ", totalTimeForCovariance) print("Rechenzeit fuer Teil 2: ", totalTimeForFFT) plt.tight_layout() plt.show()