
我正在尝试编写方案的 Python 实现,用于数值求解随机微分方程。算法的伪代码位于 Wikipedia 链接中。


from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns


class SDE:
    An abstraction for a stochastic differential equation

    initial_condition: float
    drift: Callable[[float, float], float]
    vol: Callable[[float, float], float]
    dvol_dx: Optional[Callable[[float, float], float]]

class Solver(ABC):
    An abstract base class for all numerical schemes

    t_start: float = 0.0
    t_end: float = 1.0
    step_size: float = 0.001
    num_paths: int = 10

    def __post_init__(self):
        self.iter = 0
        self.x_curr = np.zeros((self.num_paths,))
        self.num_steps = int((self.t_end - self.t_start) / self.step_size)
        self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)
        self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
            size=(self.num_paths, self.num_steps)
        self.brownian = np.cumsum(self.brownian_increments, axis=1)
        self.brownian = np.concatenate(
            [np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1
        self.solution = []

    def iterate(self):
        Compute the next iterate X(n+1)

    def solve(self, sde: SDE):
        Solve the SDE
        self.x_curr = np.full(shape=(self.num_paths,), fill_value=sde.initial_condition)
        self.solution = [self.x_curr.copy()]

        while self.iter < self.num_steps:
            self.iter += 1

        return np.array(self.solution).transpose()

    def reset(self):

class Milstein(Solver):
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
    + 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))


    def iterate(self, sde: SDE):
        Generate the next iterate X(n+1)

        mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
        sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
        dvol_dx_n = np.array(
            [sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr]

        d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

        self.x_curr += (
            mu_n * self.step_size
            + sigma_n * d_brownian
            + 0.5 * sigma_n * dvol_dx_n * (d_brownian**2 - self.step_size)

        return self.x_curr.copy()

class EulerMaruyama(Solver):
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))


    def iterate(self, sde: SDE):
        Generate the next iterate X(n+1)

        mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
        sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])

        d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

        self.x_curr += mu_n * self.step_size + sigma_n * d_brownian

        return self.x_curr.copy()

if __name__ == "__main__":
    # Let's solve the following SDE

    # dS_t = S_t dB_t + (mu + 1/2) S_t dt

    # where mu = 0. This has the known solution S_t = exp(B_t - t/2)

    gbm_sde = SDE(
        drift = lambda t, s_t : 0.50 * s_t,
        vol = lambda t, s_t : s_t,
        dvol_dx = lambda t, s_t : 1

    t = np.linspace(start=0.0, stop=1.0, num=1001)

    euler = EulerMaruyama()
    solution = euler.solve(gbm_sde)

    plt.xlabel(r'Time $t$')
    plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Euler method')
    for i in range(10):
        plt.plot(t, solution[i])


    milstein = Milstein()
    solution = milstein.solve(gbm_sde)

    plt.xlabel(r'Time $t$')
    plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Milstein method')
    for i in range(10):
        plt.plot(t, solution[i])





  1. 尽可能使用 NumPy 矢量化运算代替列表推导。这可以显著提高性能,尤其是在处理大数据时。例如,在iterate的函数
    ,你可以用 NumPy 矢量化运算替换列表推导来计算

  2. 使用函数参数和返回类型的类型提示,以提高代码的可读性以及与静态类型语言或 IDE 的兼容性。例如,在您当前的实现中,、driftvol函数dvol_dx

  3. 使用日志库,而不是直接将调试信息打印到控制台。这将使管理和过滤日志消息变得更加容易,尤其是在处理大型数据集或复杂模拟时。例如,您可以使用 Python 的内置logging模块来配置不同的日志级别和输出格式。

  4. 对于可能引发异常的函数输入或模拟场景,请使用适当的错误处理机制。这将有助于防止您的代码在遇到无效输入值或其他错误时意外崩溃。例如,您可以使用 try-except 块来处理潜在错误并提供有意义的错误消息。

  5. 考虑实现其他方法或函数来分析模拟结果,例如计算统计数据(例如平均值、标准差)、可视化样本路径或绘制汇总图(例如直方图、密度图)。这有助于更深入地了解模拟的 SDE 及其在不同条件下的行为。

以下是如何修改EulerMaruyama类以使用 NumPy 矢量化操作的示例:

import numpy as np

class EulerMaruyama(Solver):
    def iterate(self, sde, state):
        mu_n = self.sde.drift(np.repeat(self.times[self.iter], len(state)), state)
        sigma_n = self.sde.vol(np.repeat(self.times[self.iter], len(state)), state)

        d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

        delta_t = np.ones((len(state), 1)) * self.step_size

        self.x_curr += np.hstack((mu_n, sigma_n * d_brownian + delta_t))

        return self.x_curr.copy()

在这个例子中,EulerMaruyama该类现在使用 NumPy 的矢量化运算(repeathstack)来并行计算mu_n所有sigma_n状态,而不是使用列表推导。





mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
dvol_dx_n = np.array([sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr])



def slow():
    a = np.ones(1000)
    f = lambda x: 0.5 * x
    b = np.array([f(x) for x in a])
    return b

%timeit slow()
# 185 μs ± 11.8 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

def fast():
    a = np.ones(1000)
    f = lambda x: 0.5 * x
    b = f(a)
    return b

%timeit fast()
# 3.82 μs ± 261 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)



mu_n = sde.drift(self.times[self.iter], self.x_curr)
sigma_n = sde.vol(self.times[self.iter], self.x_curr)
dvol_dx_n = sde.dvol_dx(self.times[self.iter], self.x_curr)


gbm_sde = SDE(
    drift = lambda t, s_t : 0.50 * s_t,
    vol = lambda t, s_t : s_t,
    dvol_dx = lambda t, s_t : np.ones(len(s_t)),

gbm_sde = SDE(
    drift = lambda t, s_t : np.array([0.50 * x for x in s_t]),
    vol = lambda t, s_t : np.array([x for x in s_t]),
    dvol_dx = lambda t, s_t : np.array([1.0 for x in s_t]),



self.num_steps = int((self.t_end - self.t_start) / self.step_size)
self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)


self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
    size=(self.num_paths, self.num_steps)
self.brownian = np.cumsum(self.brownian_increments, axis=1)
self.brownian = np.concatenate(
    [np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1


d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]


self.d_brownian = np.sqrt(self.step_size) * np.random.standard_normal(
        size=(self.num_paths, self.num_steps)


self.solution = np.zeros((self.num_paths, self.num_steps))


mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])


mu_n = sde.drift(self.times[self.iter], x)


for i in range(10):
    plt.plot(t, solution[i])


plt.plot(t, solution.T)


self.x_curr += ...


return self.x_curr.copy()


我喜欢这个SDE类。我要么initial_condition从中删除,要么也包括t_start, t_end值。所有这三个都是初始值问题的属性,而不是随机 diff.eq. 本身的属性。


from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns


# for more descriptive type hints
T = np.array
X = np.ndarray

class SIVP:
    An abstraction for a stochastic initial value problem

    t_start: float
    t_end: float
    initial_condition: float
    drift: Callable[[float, float], float]
    vol: Callable[[float, float], float]
    dvol_dx: Optional[Callable[[float, float], float]]

class Solver(ABC):
    An abstract base class for all numerical schemes

    num_steps: int = 1000
    num_paths: int = 10

    def time_step(dt: float, t: T, x: X, dw: X) -> X:
        Compute the next iterate X(n+1)

    def solve(self, sivp: SIVP) -> (T, X):
        Solve the SDE

        # stepsize
        dt = (sivp.t_end - sivp.t_start) / self.num_steps

        # Gaussian increments
        dws = np.random.normal(
            size=(self.num_steps, self.num_paths)
        # times
        ts = np.linspace(sivp.t_start, sivp.t_end, self.num_steps + 1)

        # trajectories
        xs = np.zeros((self.num_steps + 1, self.num_paths))
        xs[0] = sivp.initial_condition

        for step_ix in range(self.num_steps):
            xs[step_ix + 1] = self.time_step(sivp, dt, ts[step_ix], xs[step_ix], dws[step_ix])

        return ts, xs

class Milstein(Solver):
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
    + 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))


    def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
        Generate the next iterate X(n+1)

        mu_n = sivp.drift(t, x)
        sigma_n = sivp.vol(t, x)
        dvol_dx_n = sivp.dvol_dx(t, x)

        dx = (
            dt * mu_n
            + sigma_n * dw
            + 0.5 * sigma_n * dvol_dx_n * (dw**2 - dt)
        return x + dx

class EulerMaruyama(Solver):
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))

    def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
        Generate the next iterate X(n+1)

        mu_n = sivp.drift(t, x)
        sigma_n = sivp.vol(t, x)
        dvol_dx_n = sivp.dvol_dx(t, x)

        dx = dt * mu_n + sigma_n * dw
        return x + dx

# Let's solve the following SDE
# dS_t = S_t dB_t + (mu + 1/2) S_t dt
# where mu = 0. This has the known solution S_t = exp(B_t - t/2)

gbm_sde = SIVP(
    t_start = 0.0,
    t_end = 1.0,
    initial_condition = 1.0,
    drift = lambda t, s_t : 0.50 * s_t,
    vol = lambda t, s_t : s_t,
    dvol_dx = lambda t, s_t: 1 + 0*s_t

ts, xs = EulerMaruyama().solve(gbm_sde)

plt.xlabel(r'Time $t$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Euler method')

plt.plot(ts, xs)


ts, xs = Milstein().solve(gbm_sde)

plt.xlabel(r'Time $t$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Milstein method')

plt.plot(ts, xs)

