\begingroup

我正在尝试编写方案的 Python 实现,用于数值求解随机微分方程。算法的伪代码位于 Wikipedia 链接中。

你能检查一下我的代码,看看它是否可以改进,是否更接近生产质量吗?我尝试snake_style对所有变量名都使用约定俗成的名称。

from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style("whitegrid")

@dataclass
class SDE:
    """
    An abstraction for a stochastic differential equation
    """

    initial_condition: float
    drift: Callable[[float, float], float]
    vol: Callable[[float, float], float]
    dvol_dx: Optional[Callable[[float, float], float]]

@dataclass
class Solver(ABC):
    """
    An abstract base class for all numerical schemes
    """

    t_start: float = 0.0
    t_end: float = 1.0
    step_size: float = 0.001
    num_paths: int = 10

    def __post_init__(self):
        self.iter = 0
        self.x_curr = np.zeros((self.num_paths,))
        self.num_steps = int((self.t_end - self.t_start) / self.step_size)
        self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)
        self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
            size=(self.num_paths, self.num_steps)
        )
        self.brownian = np.cumsum(self.brownian_increments, axis=1)
        self.brownian = np.concatenate(
            [np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1
        )
        self.solution = []

    @abstractmethod
    def iterate(self):
        """
        Compute the next iterate X(n+1)
        """

    def solve(self, sde: SDE):
        """
        Solve the SDE
        """
        self.x_curr = np.full(shape=(self.num_paths,), fill_value=sde.initial_condition)
        self.solution = [self.x_curr.copy()]

        while self.iter < self.num_steps:
            self.solution.append(self.iterate(sde))
            self.iter += 1

        return np.array(self.solution).transpose()

    def reset(self):
        self.__post_init__()

@dataclass
class Milstein(Solver):
    """
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
    + 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))

    """

    def iterate(self, sde: SDE):
        """
        Generate the next iterate X(n+1)
        """

        mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
        sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
        dvol_dx_n = np.array(
            [sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr]
        )

        d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

        self.x_curr += (
            mu_n * self.step_size
            + sigma_n * d_brownian
            + 0.5 * sigma_n * dvol_dx_n * (d_brownian**2 - self.step_size)
        )

        return self.x_curr.copy()

@dataclass
class EulerMaruyama(Solver):
    """
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))

    """

    def iterate(self, sde: SDE):
        """
        Generate the next iterate X(n+1)
        """

        mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
        sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])

        d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

        self.x_curr += mu_n * self.step_size + sigma_n * d_brownian

        return self.x_curr.copy()

if __name__ == "__main__":
    # Let's solve the following SDE

    # dS_t = S_t dB_t + (mu + 1/2) S_t dt

    # where mu = 0. This has the known solution S_t = exp(B_t - t/2)

    gbm_sde = SDE(
        initial_condition=1.0,
        drift = lambda t, s_t : 0.50 * s_t,
        vol = lambda t, s_t : s_t,
        dvol_dx = lambda t, s_t : 1
    )

    t = np.linspace(start=0.0, stop=1.0, num=1001)

    euler = EulerMaruyama()
    solution = euler.solve(gbm_sde)

    plt.xlabel(r'Time $t$')
    plt.ylabel(r'$S(t)$')
    plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Euler method')
    plt.grid(True)
    for i in range(10):
        plt.plot(t, solution[i])

    plt.show()

    milstein = Milstein()
    solution = milstein.solve(gbm_sde)

    plt.xlabel(r'Time $t$')
    plt.ylabel(r'$S(t)$')
    plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Milstein method')
    plt.grid(True)
    for i in range(10):
        plt.plot(t, solution[i])

    plt.show()

\endgroup


最佳答案
3

\begingroup

  1. 尽可能使用 NumPy 矢量化运算代替列表推导。这可以显著提高性能,尤其是在处理大数据时。例如,在iterate的函数
    ,你可以用 NumPy 矢量化运算替换列表推导来计算
    EulerMaruyamaMilsteinmu_nsigma_n

  2. 使用函数参数和返回类型的类型提示,以提高代码的可读性以及与静态类型语言或 IDE 的兼容性。例如,在您当前的实现中,、driftvol函数dvol_dx
    应采用两个参数(时间和状态),但您尚未在类型提示中指定这一点。

  3. 使用日志库,而不是直接将调试信息打印到控制台。这将使管理和过滤日志消息变得更加容易,尤其是在处理大型数据集或复杂模拟时。例如,您可以使用 Python 的内置logging模块来配置不同的日志级别和输出格式。

  4. 对于可能引发异常的函数输入或模拟场景,请使用适当的错误处理机制。这将有助于防止您的代码在遇到无效输入值或其他错误时意外崩溃。例如,您可以使用 try-except 块来处理潜在错误并提供有意义的错误消息。

  5. 考虑实现其他方法或函数来分析模拟结果,例如计算统计数据(例如平均值、标准差)、可视化样本路径或绘制汇总图(例如直方图、密度图)。这有助于更深入地了解模拟的 SDE 及其在不同条件下的行为。

以下是如何修改EulerMaruyama类以使用 NumPy 矢量化操作的示例:

import numpy as np

class EulerMaruyama(Solver):
    def iterate(self, sde, state):
        mu_n = self.sde.drift(np.repeat(self.times[self.iter], len(state)), state)
        sigma_n = self.sde.vol(np.repeat(self.times[self.iter], len(state)), state)

        d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

        delta_t = np.ones((len(state), 1)) * self.step_size

        self.x_curr += np.hstack((mu_n, sigma_n * d_brownian + delta_t))

        return self.x_curr.copy()

在这个例子中,EulerMaruyama该类现在使用 NumPy 的矢量化运算(repeathstack)来并行计算mu_n所有sigma_n状态,而不是使用列表推导。

\endgroup

\begingroup

我觉得这看起来非常好。

我唯一想评论的是:

mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
dvol_dx_n = np.array([sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr])

没有矢量化,因此可能比它们应该的速度慢一点或慢很多

请考虑以下示例:

def slow():
    a = np.ones(1000)
    f = lambda x: 0.5 * x
    b = np.array([f(x) for x in a])
    return b

%timeit slow()
# 185 μs ± 11.8 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

def fast():
    a = np.ones(1000)
    f = lambda x: 0.5 * x
    b = f(a)
    return b

%timeit fast()
# 3.82 μs ± 261 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

您的类iterate方法强制底层函数不是矢量化的,因此会导致速度缓慢。

尝试期待一个矢量化解决方案并允许初始化程序处理无法矢量化的情况,因为这样您可以在无法矢量化的情况下获得灵活性。

mu_n = sde.drift(self.times[self.iter], self.x_curr)
sigma_n = sde.vol(self.times[self.iter], self.x_curr)
dvol_dx_n = sde.dvol_dx(self.times[self.iter], self.x_curr)

在哪里

# FAST
gbm_sde = SDE(
    initial_condition=1.0,
    drift = lambda t, s_t : 0.50 * s_t,
    vol = lambda t, s_t : s_t,
    dvol_dx = lambda t, s_t : np.ones(len(s_t)),
)

# SLOW
gbm_sde = SDE(
    initial_condition=1.0,
    drift = lambda t, s_t : np.array([0.50 * x for x in s_t]),
    vol = lambda t, s_t : np.array([x for x in s_t]),
    dvol_dx = lambda t, s_t : np.array([1.0 for x in s_t]),
)

\endgroup

\begingroup

self.num_steps = int((self.t_end - self.t_start) / self.step_size)
self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)

你可能达不到t_end。如果步长固定,为什么不用num_steps作参数?

self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
    size=(self.num_paths, self.num_steps)
)
self.brownian = np.cumsum(self.brownian_increments, axis=1)
self.brownian = np.concatenate(
    [np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1
)

你实际上不会使用其中任何一个,只是通过

d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

在每个时间步骤。我会在开始时计算一次,然后存储它。事实上,差异是你已经计算出的高斯:

self.d_brownian = np.sqrt(self.step_size) * np.random.standard_normal(
        size=(self.num_paths, self.num_steps)
)

如果步骤数是先验已知的,那么就没有理由self.solution将其作为一个列表,而应该作为一个numpy依次填充的数组。

self.solution = np.zeros((self.num_paths, self.num_steps))

sde.drift如果你的函数被假定为矢量化的,返回数组,那么速度会更快,numpy然后

mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])

会成为

mu_n = sde.drift(self.times[self.iter], x)

这是小事,但是

for i in range(10):
    plt.plot(t, solution[i])

可能

plt.plot(t, solution.T)

我也看到你这样做

self.x_curr += ...

就在

return self.x_curr.copy()

这让我建议您重组您的代码。

我喜欢这个SDE类。我要么initial_condition从中删除,要么也包括t_start, t_end值。所有这三个都是初始值问题的属性,而不是随机 diff.eq. 本身的属性。

我们可能处于主观上看起来更干净的领域,所以我只打算提供重写。

from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style("whitegrid")

# for more descriptive type hints
T = np.array
X = np.ndarray

@dataclass
class SIVP:
    """
    An abstraction for a stochastic initial value problem
    """

    t_start: float
    t_end: float
    initial_condition: float
    
    drift: Callable[[float, float], float]
    vol: Callable[[float, float], float]
    dvol_dx: Optional[Callable[[float, float], float]]

@dataclass
class Solver(ABC):
    """
    An abstract base class for all numerical schemes
    """

    num_steps: int = 1000
    num_paths: int = 10

    @staticmethod
    @abstractmethod
    def time_step(dt: float, t: T, x: X, dw: X) -> X:
        """
        Compute the next iterate X(n+1)
        """

    def solve(self, sivp: SIVP) -> (T, X):
        """
        Solve the SDE
        """

        # stepsize
        dt = (sivp.t_end - sivp.t_start) / self.num_steps

        # Gaussian increments
        dws = np.random.normal(
            scale=np.sqrt(dt),
            size=(self.num_steps, self.num_paths)
        )
        
        # times
        ts = np.linspace(sivp.t_start, sivp.t_end, self.num_steps + 1)

        # trajectories
        xs = np.zeros((self.num_steps + 1, self.num_paths))
        xs[0] = sivp.initial_condition

        #time-stepping
        for step_ix in range(self.num_steps):
            xs[step_ix + 1] = self.time_step(sivp, dt, ts[step_ix], xs[step_ix], dws[step_ix])

        return ts, xs


@dataclass
class Milstein(Solver):
    """
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
    + 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))

    """

    @staticmethod
    def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
        """
        Generate the next iterate X(n+1)
        """

        mu_n = sivp.drift(t, x)
        sigma_n = sivp.vol(t, x)
        dvol_dx_n = sivp.dvol_dx(t, x)

        dx = (
            dt * mu_n
            + sigma_n * dw
            + 0.5 * sigma_n * dvol_dx_n * (dw**2 - dt)
        )
        return x + dx

@dataclass
class EulerMaruyama(Solver):
    """
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))

    """
    
    @staticmethod
    def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
        """
        Generate the next iterate X(n+1)
        """

        mu_n = sivp.drift(t, x)
        sigma_n = sivp.vol(t, x)
        dvol_dx_n = sivp.dvol_dx(t, x)

        dx = dt * mu_n + sigma_n * dw
        return x + dx

# Let's solve the following SDE
# dS_t = S_t dB_t + (mu + 1/2) S_t dt
# where mu = 0. This has the known solution S_t = exp(B_t - t/2)

gbm_sde = SIVP(
    t_start = 0.0,
    t_end = 1.0,
    initial_condition = 1.0,
    drift = lambda t, s_t : 0.50 * s_t,
    vol = lambda t, s_t : s_t,
    dvol_dx = lambda t, s_t: 1 + 0*s_t
)

ts, xs = EulerMaruyama().solve(gbm_sde)

plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Euler method')
plt.grid(True)

plt.plot(ts, xs)

plt.show()

ts, xs = Milstein().solve(gbm_sde)

plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Milstein method')
plt.grid(True)

plt.plot(ts, xs)

plt.show()

\endgroup