PIDStepper
- это довольно архаично (последнее существенное изменение касалось четырнадцать лет назад) и, как вы выяснили, не документированы и нигде публично не используется. Поскольку мы его тоже не тестируем, нам, вероятно, следует кукурузное поле.
Тем не менее, покопавшись на своем жестком диске, я нашел несколько случаев, когда я экспериментировал с этим, и, что удивительно, он все еще работает.
На примере тривиальной задачи диффузии:
import fipy as fp
from fipy.tools import numerix
nx = 50
dx = 1.
L = nx * dx
mesh = fp.Grid1D(nx=nx, dx=dx)
x = mesh.cellCenters[0]
phi = fp.CellVariable(mesh=mesh, name=r"$\phi$", hasOld=True)
phi.value = 0.
phi.setValue(1., where=(x > L/2. - L/10.) & (x < L/2. + L/10.))
viewer = fp.Viewer(vars=phi, datamin=-0.1, datamax=1.1)
D = 1.
eq = fp.TransientTerm() == fp.DiffusionTerm(D * (1 - phi))
total_elapsed = 0.
dt = 100. * dx**2 / (2 * D)
totaltime = 1000.
checkpoints = (fp.numerix.arange(int(totaltime / dt)) + 1) * dt
Чтобы решить эту проблему с помощью PIDStepper
, вам нужно определить sweepFn()
, чтобы фактически решить ваше уравнение (я). Аргумент vardata
- это кортеж кортежей, каждый из которых содержит CellVariable
для решения, применяемое уравнение и используемые граничные условия старого стиля (все это в корне предшествует связанным уравнениям или ограничениям). Аргумент dt
- это адаптированный временной шаг для попытки. *args
и **kwargs
- любые дополнительные агрументы, которые вы хотите пройти на каждом этапе. Эта функция должна возвращать ошибку, нормированную к единице; здесь мы возвращаем норму L1 var - var.old
, нормированную на норму L1 var.old
.
def L1error(var):
denom = numerix.L1norm(var.old)
return numerix.L1norm(var - var.old) / (denom + (denom == 0))
def sweepFn(vardata, dt, *args, **kwargs):
error = 0.
for var, eqn, bcs in vardata:
res = eqn.sweep(var=var,
dt=dt,
boundaryConditions=bcs)
error = max(error, L1error(var))
return error
При желании вы можете определить successFn()
, который будет выполняться после успешного шага адаптивного решения. Аргумент vardata
такой же, как указано выше. Аргумент dt
- это запрошенный временной шаг. Аргумент dtPrev
- это фактически сделанный временной шаг. Аргумент elapsed
указывает, сколько времени прошло для этого временного шага. *args
и **kwargs
- любые дополнительные условия, которые вы хотите передать на каждом этапе , и должны быть такими же, как для sweepFn()
. Здесь нам нужен доступ к viewer
, списку шагов по времени, которые были сделаны dts
, списку времени, прошедшего для каждого в elapseds
, и глобальным часам total_elapsed
.
def successFn(vardata, dt, dtPrev, elapsed, viewer, dts, elapseds, total_elapsed, *args, **kwargs):
dts.append(dtPrev)
elapseds.append(total_elapsed + elapsed)
viewer.plot()
Вы также можете при желании определить failFn()
для выполнения всякий раз, когда sweepFn()
возвращает ошибку больше единицы. Это потребует тех же аргументов, что и successFn()
.
Наконец, создайте экземпляр stepper
с соответствующим vardata
и вызовите step()
для каждого желаемого временного шага. Передайте желаемый временной шаг dt
, временной шаг для первой попытки dtTry
, наименьший временной шаг, разрешенный dtMin
, последний временной шаг, который предприняли dtPrev
, sweepFn()
, который мы только что определили, successFn()
, который мы только что определили, и дополнительные аргументы, которые делает наш successFn()
использование.
from fipy.steppers import PIDStepper
stepper = PIDStepper(vardata=((phi, eq, ()),))
pid_dts = []
pid_elapseds = []
dtMin = dt / 1000
dtTry = dtMin
dtPrev = dtTry
for until in checkpoints:
dtPrev, dtTry = stepper.step(dt=dt, dtTry=dtTry,
dtMin=dtMin, dtPrev=dtPrev,
sweepFn=sweepFn,
successFn=successFn,
viewer=viewer,
dts=pid_dts,
elapseds=pid_elapseds,
total_elapsed=total_elapsed)
total_elapsed += dt
Вы также можете сделать то же самое, определив _ 40_:
stepper = PseudoRKQSStepper(vardata=((phi, eq, ()),))
и используйте те же sweepFn()
и successFn()
.
Моя текущая практика
Хотя это работает, я обнаружил, что весь процесс определения и передачи вспомогательных функций непрозрачен и вызывает больше проблем, чем того стоит. До сих пор я не беспокоился о том, чтобы задокументировать это или показать кому-нибудь еще, как я его использовал.
Сейчас я делаю что-то вроде этого:
explicit_dts = []
explicit_elapseds = []
dt = dt / 1000
for until in checkpoints:
while total_elapsed < until:
phi.updateOld()
dt_until = (until - total_elapsed)
dt_save = dt
if dt_until < dt:
dt = dt_until
res = eq.sweep(var=phi, dt=dt)
if L1error(phi) < 1.:
total_elapsed += dt
explicit_dts.append(dt)
explicit_elapseds.append(total_elapsed)
dt = dt_save
dt *= 1.2
viewer.plot()
else:
phi.value = phi.old.value
dt /= 2.
Это включает в себя примерно такое же количество кода в вашем скрипте (на самом деле немного меньше) и ни одной из дополнительных 100+ строк кода в _ 45_ и _ 46_.
Ни один из них не был настроен, и эта проблема диффузии не является большой проверкой, поскольку она безусловно стабильна, но как только они начинают работать, все они показывают примерно одинаковое ускорение.
person
jeguyer
schedule
02.12.2020