Примеры использования fipy.steppers.pidStepper

Я использую FiPy для моделирования умеренно большой системы связанных PDE (11 уравнений), которые вычисляют зависящую от времени эволюцию биохимической сети адвекции-диффузии-реакции. В настоящее время я использую фиксированный временной шаг, и он отлично работает. Однако природа модели такова, что на начальных временных шагах переменные сильно меняются, для этого требуется небольшое значение dt. Через некоторое время модель «выгорает», и изменения переменных со временем становятся более постепенными. Затем, чтобы сэкономить на вычислительных ресурсах, dt можно было бы увеличить. Судя по документации, fipy.steppers.pidStepper кажется идеально подходящим для этого. Однако документации по степперу очень мало, и я не смог найти никаких примеров кода, которые реализовали бы это (ни в самих примерах FiPy, ни в общем поиске в Интернете).

Каркас цикла оценки в настоящее время выглядит следующим образом (при необходимости я с радостью предоставлю более подробную информацию); v - это массив из 11 переменных ячеек, а p[j] - PDE для v[j]:

# steps, err and u defined elsewhere 
for step in range(steps):
    r = [1e3] * len(v)
    while max(r) > err:
        for j in range(len(v)):
            r[j] = min(r[j], p[j].sweep(v[j], dt=u))

    for j in range(len(v)):
        v[j].updateOld()

Приветствуются любые указатели на то, как адаптировать это для pidStepper. Одно из требований заключалось в том, что мне нужно было указать сохранение выходных данных модели в заданные моменты времени. Однако они намного дальше, чем используемое начальное значение dt. Но если размер шага является адаптивным, он может не «приземлиться» точно в один из этих моментов времени, так как это реализовать?


person jmrohwer    schedule 02.12.2020    source источник


Ответы (1)


PIDStepper - это довольно архаично (последнее существенное изменение касалось четырнадцать лет назад) и, как вы выяснили, не документированы и нигде публично не используется. Поскольку мы его тоже не тестируем, нам, вероятно, следует кукурузное поле.

Тем не менее, покопавшись на своем жестком диске, я нашел несколько случаев, когда я экспериментировал с этим, и, что удивительно, он все еще работает.

На примере тривиальной задачи диффузии:

import fipy as fp
from fipy.tools import numerix

nx = 50
dx = 1.
L = nx * dx
mesh = fp.Grid1D(nx=nx, dx=dx)
x = mesh.cellCenters[0]

phi = fp.CellVariable(mesh=mesh, name=r"$\phi$", hasOld=True)

phi.value = 0.
phi.setValue(1., where=(x > L/2. - L/10.) & (x < L/2. + L/10.))

viewer = fp.Viewer(vars=phi, datamin=-0.1, datamax=1.1)

D = 1.
eq = fp.TransientTerm() == fp.DiffusionTerm(D * (1 - phi))

total_elapsed = 0.

dt = 100. * dx**2 / (2 * D)
totaltime = 1000.
checkpoints = (fp.numerix.arange(int(totaltime / dt)) + 1) * dt

Чтобы решить эту проблему с помощью PIDStepper, вам нужно определить sweepFn(), чтобы фактически решить ваше уравнение (я). Аргумент vardata - это кортеж кортежей, каждый из которых содержит CellVariable для решения, применяемое уравнение и используемые граничные условия старого стиля (все это в корне предшествует связанным уравнениям или ограничениям). Аргумент dt - это адаптированный временной шаг для попытки. *args и **kwargs - любые дополнительные агрументы, которые вы хотите пройти на каждом этапе. Эта функция должна возвращать ошибку, нормированную к единице; здесь мы возвращаем норму L1 var - var.old, нормированную на норму L1 var.old.

def L1error(var):
    denom = numerix.L1norm(var.old)
    return numerix.L1norm(var - var.old) / (denom + (denom == 0))

def sweepFn(vardata, dt, *args, **kwargs):
    error = 0.
    for var, eqn, bcs in vardata:
        res = eqn.sweep(var=var, 
                        dt=dt,
                        boundaryConditions=bcs)
        error = max(error, L1error(var))

    return error

При желании вы можете определить successFn(), который будет выполняться после успешного шага адаптивного решения. Аргумент vardata такой же, как указано выше. Аргумент dt - это запрошенный временной шаг. Аргумент dtPrev - это фактически сделанный временной шаг. Аргумент elapsed указывает, сколько времени прошло для этого временного шага. *args и **kwargs - любые дополнительные условия, которые вы хотите передать на каждом этапе , и должны быть такими же, как для sweepFn(). Здесь нам нужен доступ к viewer, списку шагов по времени, которые были сделаны dts, списку времени, прошедшего для каждого в elapseds, и глобальным часам total_elapsed.

def successFn(vardata, dt, dtPrev, elapsed, viewer, dts, elapseds, total_elapsed, *args, **kwargs):
    dts.append(dtPrev)
    elapseds.append(total_elapsed + elapsed)
    viewer.plot()

Вы также можете при желании определить failFn() для выполнения всякий раз, когда sweepFn() возвращает ошибку больше единицы. Это потребует тех же аргументов, что и successFn().

Наконец, создайте экземпляр stepper с соответствующим vardata и вызовите step() для каждого желаемого временного шага. Передайте желаемый временной шаг dt, временной шаг для первой попытки dtTry, наименьший временной шаг, разрешенный dtMin, последний временной шаг, который предприняли dtPrev, sweepFn(), который мы только что определили, successFn(), который мы только что определили, и дополнительные аргументы, которые делает наш successFn() использование.

from fipy.steppers import PIDStepper
stepper = PIDStepper(vardata=((phi, eq, ()),))

pid_dts = []
pid_elapseds = []

dtMin = dt / 1000
dtTry = dtMin
dtPrev = dtTry
for until in checkpoints:
    dtPrev, dtTry = stepper.step(dt=dt, dtTry=dtTry, 
                                 dtMin=dtMin, dtPrev=dtPrev,
                                 sweepFn=sweepFn,
                                 successFn=successFn,
                                 viewer=viewer,
                                 dts=pid_dts,
                                 elapseds=pid_elapseds,
                                 total_elapsed=total_elapsed)
    total_elapsed += dt

Вы также можете сделать то же самое, определив _ 40_:

stepper = PseudoRKQSStepper(vardata=((phi, eq, ()),))

и используйте те же sweepFn() и successFn().

Моя текущая практика

Хотя это работает, я обнаружил, что весь процесс определения и передачи вспомогательных функций непрозрачен и вызывает больше проблем, чем того стоит. До сих пор я не беспокоился о том, чтобы задокументировать это или показать кому-нибудь еще, как я его использовал.

Сейчас я делаю что-то вроде этого:

explicit_dts = []
explicit_elapseds = []

dt = dt / 1000
for until in checkpoints:
    while total_elapsed < until:
        phi.updateOld()
        dt_until = (until - total_elapsed)
        dt_save = dt
        if dt_until < dt:
            dt = dt_until
        res = eq.sweep(var=phi, dt=dt)
        
        if L1error(phi) < 1.:
            total_elapsed += dt

            explicit_dts.append(dt)
            explicit_elapseds.append(total_elapsed)

            dt = dt_save
            
            dt *= 1.2
            
            viewer.plot()
        else:
            phi.value = phi.old.value
            dt /= 2.

Это включает в себя примерно такое же количество кода в вашем скрипте (на самом деле немного меньше) и ни одной из дополнительных 100+ строк кода в _ 45_ и _ 46_.

Ни один из них не был настроен, и эта проблема диффузии не является большой проверкой, поскольку она безусловно стабильна, но как только они начинают работать, все они показывают примерно одинаковое ускорение. введите описание изображения здесь

person jeguyer    schedule 02.12.2020
comment
Действительно, очень полезно, спасибо, я попробую что-нибудь в соответствии с вашей текущей практикой. Кажется более простым, но эффективным. - person jmrohwer; 03.12.2020