Фильтр Калмана с различными временными шагами

У меня есть некоторые данные, которые представляют местоположение объекта, измеренное двумя разными датчиками. Итак, мне нужно сделать слияние сенсоров. Более сложная проблема заключается в том, что данные от каждого датчика поступают практически в случайное время. Я хотел бы использовать pykalman, чтобы объединить и сгладить данные. Как pykalman может обрабатывать переменные данные временной метки?

Упрощенная выборка данных будет выглядеть так:

import pandas as pd
data={'time':\
['10:00:00.0','10:00:01.0','10:00:05.2','10:00:07.5','10:00:07.5','10:00:12.0','10:00:12.5']\
,'X':[10,10.1,20.2,25.0,25.1,35.1,35.0],'Y':[20,20.2,41,45,47,75.0,77.2],\
'Sensor':[1,2,1,1,2,1,2]}

df=pd.DataFrame(data,columns=['time','X','Y','Sensor'])
df.time=pd.to_datetime(df.time)
df=df.set_index('time')

И это:

df
Out[130]: 
                            X     Y  Sensor
time                                       
2017-12-01 10:00:00.000  10.0  20.0       1
2017-12-01 10:00:01.000  10.1  20.2       2
2017-12-01 10:00:05.200  20.2  41.0       1
2017-12-01 10:00:07.500  25.0  45.0       1
2017-12-01 10:00:07.500  25.1  47.0       2
2017-12-01 10:00:12.000  35.1  75.0       1
2017-12-01 10:00:12.500  35.0  77.2       2

Что касается проблемы слияния датчиков, я думаю, что могу просто изменить данные так, чтобы у меня были позиции X1, Y1, X2, Y2 с кучей пропущенных значений, а не только X, Y. (Это было связано: http://stackoverflow.com.mevn.net/questions/47386426/2-sensor-readings-fusion-yaw-pitch )

Итак, мои данные могут выглядеть так:

df['X1']=df.X[df.Sensor==1]
df['Y1']=df.Y[df.Sensor==1]
df['X2']=df.X[df.Sensor==2]
df['Y2']=df.Y[df.Sensor==2]
df
Out[132]: 
                            X     Y  Sensor    X1    Y1    X2    Y2
time                                                               
2017-12-01 10:00:00.000  10.0  20.0       1  10.0  20.0   NaN   NaN
2017-12-01 10:00:01.000  10.1  20.2       2   NaN   NaN  10.1  20.2
2017-12-01 10:00:05.200  20.2  41.0       1  20.2  41.0   NaN   NaN
2017-12-01 10:00:07.500  25.0  45.0       1  25.0  45.0  25.1  47.0
2017-12-01 10:00:07.500  25.1  47.0       2  25.0  45.0  25.1  47.0
2017-12-01 10:00:12.000  35.1  75.0       1  35.1  75.0   NaN   NaN
2017-12-01 10:00:12.500  35.0  77.2       2   NaN   NaN  35.0  77.2

Документы для pykalman указывают, что он может обрабатывать отсутствующие данные, но правильно ли это?

Но документы для pykalman совсем не ясны в отношении проблемы с переменным временем. Док просто говорит:

«И фильтр Калмана, и сглаживатель Калмана могут использовать параметры, которые меняются со временем. Чтобы использовать это, нужно всего лишь передать массив длиной n_timesteps вдоль его первой оси:»

>>> transition_offsets = [[-1], [0], [1], [2]]
>>> kf = KalmanFilter(transition_offsets=transition_offsets, n_dim_obs=1)

Мне не удалось найти примеров использования pykalman Smoother с переменными временными шагами. Таким образом, любые рекомендации, примеры или даже пример с использованием приведенных выше данных будут очень полезны. Мне не обязательно использовать pykalman, но он кажется полезным инструментом для сглаживания этих данных.

***** Дополнительный код добавлен ниже @Anton Я сделал версию вашего полезного кода, которая использует функцию сглаживания. Странно то, что он, кажется, рассматривает каждое наблюдение с одинаковым весом и траектория проходит через каждое из них. Даже если у меня большая разница между значениями дисперсии датчика. Я ожидаю, что около точки 5.4, 5.0 отфильтрованная траектория должна приблизиться к точке Датчика 1, поскольку она имеет более низкую дисперсию. Вместо этого траектория идет точно к каждой точке и делает большой поворот, чтобы добраться туда.

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt

# reading data (quick and dirty)
Time=[]
RefX=[]
RefY=[]
Sensor=[]
X=[]
Y=[]

for line in open('data/dataset_01.csv'):
    f1, f2, f3, f4, f5, f6 = line.split(';')
    Time.append(float(f1))
    RefX.append(float(f2))
    RefY.append(float(f3))
    Sensor.append(float(f4))
    X.append(float(f5))
    Y.append(float(f6))

# Sensor 1 has a higher precision (max error = 0.1 m)
# Sensor 2 has a lower precision (max error = 0.3 m)

# Variance definition through 3-Sigma rule
Sensor_1_Variance = (0.1/3)**2;
Sensor_2_Variance = (0.3/3)**2;

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  0,  dt,   0], 
     [0,  1,   0,  dt],
     [0,  0,   1,   0],
     [0,  0,   0,   1]]   

# observation_matrix   
H = [[1, 0, 0, 0],
     [0, 1, 0, 0]]

# transition_covariance 
Q = [[1e-4,     0,     0,     0], 
     [   0,  1e-4,     0,     0],
     [   0,     0,  1e-4,     0],
     [   0,     0,     0,  1e-4]] 

# observation_covariance 
R_1 = [[Sensor_1_Variance, 0],
       [0, Sensor_1_Variance]]

R_2 = [[Sensor_2_Variance, 0],
       [0, Sensor_2_Variance]]

# initial_state_mean
X0 = [0,
      0,
      0,
      0]

# initial_state_covariance - assumed a bigger uncertainty in initial velocity
P0 = [[  0,    0,   0,   0], 
      [  0,    0,   0,   0],
      [  0,    0,   1,   0],
      [  0,    0,   0,   1]]

n_timesteps = len(Time)
n_dim_state = 4
filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

import numpy.ma as ma

obs_cov=np.zeros([n_timesteps,2,2])
obs=np.zeros([n_timesteps,2])

for t in range(n_timesteps):
    if Sensor[t] == 0:
        obs[t]=None
    else:
        obs[t] = [X[t], Y[t]]
        if Sensor[t] == 1:
            obs_cov[t] = np.asarray(R_1)
        else:
            obs_cov[t] = np.asarray(R_2)

ma_obs=ma.masked_invalid(obs)

ma_obs_cov=ma.masked_invalid(obs_cov)

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = ma_obs_cov, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)

filtered_state_means, filtered_state_covariances=kf.smooth(ma_obs)


# extracting the Sensor update points for the plot        
Sensor_1_update_index = [i for i, x in enumerate(Sensor) if x == 1]    
Sensor_2_update_index = [i for i, x in enumerate(Sensor) if x == 2]     

Sensor_1_update_X = [ X[i] for i in Sensor_1_update_index ]        
Sensor_1_update_Y = [ Y[i] for i in Sensor_1_update_index ]   

Sensor_2_update_X = [ X[i] for i in Sensor_2_update_index ]        
Sensor_2_update_Y = [ Y[i] for i in Sensor_2_update_index ] 

# plot of the resulted trajectory
plt.plot(RefX, RefY, "k-", label="Real Trajectory")
plt.plot(Sensor_1_update_X, Sensor_1_update_Y, "ro", label="Sensor 1")
plt.plot(Sensor_2_update_X, Sensor_2_update_Y, "bo", label="Sensor 2")
plt.plot(filtered_state_means[:, 0], filtered_state_means[:, 1], "g.", label="Filtered Trajectory", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.show()    

person Adam    schedule 01.12.2017    source источник
comment
Я не вижу никакой проблемы в проблеме случайного времени. У вас есть модель для прогнозирования состояния и измерения для корректировки прогнозируемого состояния. Если измерения иногда пропускаются, вы все равно можете предсказать. Не могли бы вы предоставить больше данных о своей модели, чтобы я мог попытаться решить проблему?   -  person Anton    schedule 27.12.2017
comment
С двумя датчиками, которые не синхронизированы, в большинстве наблюдений будут отсутствовать те или иные измерения. Кроме того, кажется, что pykalman отбрасывает все наблюдения, когда в каком-либо отдельном столбце отсутствует значение. Итак, в моем примере выше это ведет себя так, как будто есть только одно преувеличение. Я могу добавить некоторые детали, если это может помочь.   -  person Adam    schedule 27.12.2017
comment
У вас есть информация о точности датчика? Вам нужно определить дисперсию, не так ли? И обязательно ли использовать pykalman? Создать собственный фильтр на питоне несложно. Так что, если вы предоставите достаточно информации, я могу это сделать. Мне нравится эта тема.   -  person Anton    schedule 27.12.2017
comment
Последний фрагмент кода выше, с transition_offsets, взят из руководства по pykalman, а не из моего кода. Пример вручную ясно дает понять, что он имеет значения по умолчанию.   -  person Adam    schedule 28.12.2017


Ответы (1)


Для фильтра Калмана полезно представлять входные данные с постоянным шагом по времени. Ваши датчики отправляют данные случайным образом, поэтому вы можете определить наименьший значимый временной шаг для вашей системы и с помощью этого шага дискретизировать ось времени.

Например, один из ваших датчиков отправляет данные примерно каждые 0,2 секунды, а второй — каждые 0,5 секунды. Таким образом, наименьший временной шаг может составлять 0,01 секунды (здесь вам нужно найти компромисс между временем вычислений и желаемой точностью).

Ваши данные будут выглядеть так:

Time    Sensor  X       Y
0,52        0   0       0
0,53        1   0,3417  0,2988
0,54        0   0       0
0,56        0   0       0
0,57        0   0       0
0,55        0   0       0
0,58        0   0       0
0,59        2   0,4247  0,3779
0,60        0   0       0
0,61        0   0       0
0,62        0   0       0

Теперь вам нужно вызвать функцию Pykalman filter_update в зависимости от ваших наблюдений. Если наблюдения нет, фильтр предсказывает следующее состояние на основе предыдущего. Если есть наблюдение, оно корректирует состояние системы.

Вероятно, ваши датчики имеют разную точность. Таким образом, вы можете указать ковариацию наблюдения в зависимости от дисперсии датчика.

Для демонстрации идеи я сгенерировал 2D-траекторию и случайным образом поставил измерения 2-х датчиков с разной точностью.

Sensor1: mean update time = 1.0s; max error = 0.1m;
Sensor2: mean update time = 0.7s; max error = 0.3m;

Вот результат:

Фильтр Пикальмана Калмана с отсутствующими наблюдениями

Я намеренно выбрал очень плохие параметры, чтобы можно было увидеть этапы прогнозирования и коррекции. Между обновлениями датчика фильтр предсказывает траекторию на основе постоянной скорости из предыдущего шага. Как только приходит обновление, фильтр корректирует позицию в соответствии с дисперсией датчика. Точность второго датчика очень плохая, поэтому он влияет на систему с меньшим весом.

Вот мой код на питоне:

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt

# reading data (quick and dirty)
Time=[]
RefX=[]
RefY=[]
Sensor=[]
X=[]
Y=[]

for line in open('data/dataset_01.csv'):
    f1, f2, f3, f4, f5, f6 = line.split(';')
    Time.append(float(f1))
    RefX.append(float(f2))
    RefY.append(float(f3))
    Sensor.append(float(f4))
    X.append(float(f5))
    Y.append(float(f6))

# Sensor 1 has a higher precision (max error = 0.1 m)
# Sensor 2 has a lower precision (max error = 0.3 m)

# Variance definition through 3-Sigma rule
Sensor_1_Variance = (0.1/3)**2;
Sensor_2_Variance = (0.3/3)**2;

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  0,  dt,   0], 
     [0,  1,   0,  dt],
     [0,  0,   1,   0],
     [0,  0,   0,   1]]   

# observation_matrix   
H = [[1, 0, 0, 0],
     [0, 1, 0, 0]]

# transition_covariance 
Q = [[1e-4,     0,     0,     0], 
     [   0,  1e-4,     0,     0],
     [   0,     0,  1e-4,     0],
     [   0,     0,     0,  1e-4]] 

# observation_covariance 
R_1 = [[Sensor_1_Variance, 0],
       [0, Sensor_1_Variance]]

R_2 = [[Sensor_2_Variance, 0],
       [0, Sensor_2_Variance]]

# initial_state_mean
X0 = [0,
      0,
      0,
      0]

# initial_state_covariance - assumed a bigger uncertainty in initial velocity
P0 = [[  0,    0,   0,   0], 
      [  0,    0,   0,   0],
      [  0,    0,   1,   0],
      [  0,    0,   0,   1]]

n_timesteps = len(Time)
n_dim_state = 4
filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R_1, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)


# iterative estimation for each new measurement
for t in range(n_timesteps):
    if t == 0:
        filtered_state_means[t] = X0
        filtered_state_covariances[t] = P0
    else:

        # the observation and its covariance have to be switched depending on Sensor_Id 
        #     Sensor_ID == 0: no observation
        #     Sensor_ID == 1: Sensor 1
        #     Sensor_ID == 2: Sensor 2

        if Sensor[t] == 0:
            obs = None
            obs_cov = None
        else:
            obs = [X[t], Y[t]]

            if Sensor[t] == 1:
                obs_cov = np.asarray(R_1)
            else:
                obs_cov = np.asarray(R_2)

        filtered_state_means[t], filtered_state_covariances[t] = (
        kf.filter_update(
            filtered_state_means[t-1],
            filtered_state_covariances[t-1],
            observation = obs,
            observation_covariance = obs_cov)
        )

# extracting the Sensor update points for the plot        
Sensor_1_update_index = [i for i, x in enumerate(Sensor) if x == 1]    
Sensor_2_update_index = [i for i, x in enumerate(Sensor) if x == 2]     

Sensor_1_update_X = [ X[i] for i in Sensor_1_update_index ]        
Sensor_1_update_Y = [ Y[i] for i in Sensor_1_update_index ]   

Sensor_2_update_X = [ X[i] for i in Sensor_2_update_index ]        
Sensor_2_update_Y = [ Y[i] for i in Sensor_2_update_index ] 

# plot of the resulted trajectory
plt.plot(RefX, RefY, "k-", label="Real Trajectory")
plt.plot(Sensor_1_update_X, Sensor_1_update_Y, "ro", label="Sensor 1")
plt.plot(Sensor_2_update_X, Sensor_2_update_Y, "bo", label="Sensor 2")
plt.plot(filtered_state_means[:, 0], filtered_state_means[:, 1], "g.", label="Filtered Trajectory", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.show()    

Я помещаю CSV-файл здесь, чтобы вы могли выполнить код.

Я надеюсь, что смог помочь вам.

ОБНОВЛЕНИЕ

Некоторая информация к вашему предложению о переменной матрице перехода. Я бы сказал, что это зависит от наличия ваших датчиков и требований к результату оценки.

Здесь я построил одну и ту же оценку как с постоянной, так и с переменной матрицей перехода (я изменил ковариационную матрицу перехода, иначе оценка была бы слишком плохой из-за высокой «жесткости» фильтра):

«Фильтр

Как видите, расчетное положение желтых маркеров довольно хорошее. НО! у вас нет информации между показаниями датчика. Используя переменную матрицу перехода, вы избегаете шага прогнозирования между показаниями и понятия не имеете, что происходит с системой. Это может быть достаточно хорошо, если ваши показания идут с высокой скоростью, но в противном случае это может быть недостатком.

Вот обновленный код:

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt

# reading data (quick and dirty)
Time=[]
RefX=[]
RefY=[]
Sensor=[]
X=[]
Y=[]

for line in open('data/dataset_01.csv'):
    f1, f2, f3, f4, f5, f6 = line.split(';')
    Time.append(float(f1))
    RefX.append(float(f2))
    RefY.append(float(f3))
    Sensor.append(float(f4))
    X.append(float(f5))
    Y.append(float(f6))

# Sensor 1 has a higher precision (max error = 0.1 m)
# Sensor 2 has a lower precision (max error = 0.3 m)

# Variance definition through 3-Sigma rule
Sensor_1_Variance = (0.1/3)**2;
Sensor_2_Variance = (0.3/3)**2;

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  0,  dt,   0], 
     [0,  1,   0,  dt],
     [0,  0,   1,   0],
     [0,  0,   0,   1]]   

# observation_matrix   
H = [[1, 0, 0, 0],
     [0, 1, 0, 0]]

# transition_covariance 
Q = [[1e-2,     0,     0,     0], 
     [   0,  1e-2,     0,     0],
     [   0,     0,  1e-2,     0],
     [   0,     0,     0,  1e-2]] 

# observation_covariance 
R_1 = [[Sensor_1_Variance, 0],
       [0, Sensor_1_Variance]]

R_2 = [[Sensor_2_Variance, 0],
       [0, Sensor_2_Variance]]

# initial_state_mean
X0 = [0,
      0,
      0,
      0]

# initial_state_covariance - assumed a bigger uncertainty in initial velocity
P0 = [[  0,    0,   0,   0], 
      [  0,    0,   0,   0],
      [  0,    0,   1,   0],
      [  0,    0,   0,   1]]

n_timesteps = len(Time)
n_dim_state = 4

filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

filtered_state_means2 = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances2 = np.zeros((n_timesteps, n_dim_state, n_dim_state))

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R_1, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)

# Kalman-Filter initialization (Different F Matrices depending on DT)
kf2 = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R_1, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)


# iterative estimation for each new measurement
for t in range(n_timesteps):
    if t == 0:
        filtered_state_means[t] = X0
        filtered_state_covariances[t] = P0

        # For second filter
        filtered_state_means2[t] = X0
        filtered_state_covariances2[t] = P0

        timestamp = Time[t]
        old_t = t
    else:

        # the observation and its covariance have to be switched depending on Sensor_Id 
        #     Sensor_ID == 0: no observation
        #     Sensor_ID == 1: Sensor 1
        #     Sensor_ID == 2: Sensor 2

        if Sensor[t] == 0:
            obs = None
            obs_cov = None
        else:
            obs = [X[t], Y[t]]

            if Sensor[t] == 1:
                obs_cov = np.asarray(R_1)
            else:
                obs_cov = np.asarray(R_2)

        filtered_state_means[t], filtered_state_covariances[t] = (
        kf.filter_update(
            filtered_state_means[t-1],
            filtered_state_covariances[t-1],
            observation = obs,
            observation_covariance = obs_cov)
        )

        #For the second filter
        if Sensor[t] != 0:

            obs2 = [X[t], Y[t]]

            if Sensor[t] == 1:
                obs_cov2 = np.asarray(R_1)
            else:
                obs_cov2 = np.asarray(R_2)  

            dt2 = Time[t] - timestamp

            timestamp = Time[t]        

            # transition_matrix  
            F2 = [[1,  0,  dt2,    0], 
                  [0,  1,    0,  dt2],
                  [0,  0,    1,    0],
                  [0,  0,    0,    1]] 

            filtered_state_means2[t], filtered_state_covariances2[t] = (
            kf2.filter_update(
                filtered_state_means2[old_t],
                filtered_state_covariances2[old_t],
                observation = obs2,
                observation_covariance = obs_cov2,
                transition_matrix = np.asarray(F2))
            )      

            old_t = t

# extracting the Sensor update points for the plot        
Sensor_1_update_index = [i for i, x in enumerate(Sensor) if x == 1]    
Sensor_2_update_index = [i for i, x in enumerate(Sensor) if x == 2]     

Sensor_1_update_X = [ X[i] for i in Sensor_1_update_index ]        
Sensor_1_update_Y = [ Y[i] for i in Sensor_1_update_index ]   

Sensor_2_update_X = [ X[i] for i in Sensor_2_update_index ]        
Sensor_2_update_Y = [ Y[i] for i in Sensor_2_update_index ] 

# plot of the resulted trajectory
plt.plot(RefX, RefY, "k-", label="Real Trajectory")
plt.plot(Sensor_1_update_X, Sensor_1_update_Y, "ro", label="Sensor 1", markersize=9)
plt.plot(Sensor_2_update_X, Sensor_2_update_Y, "bo", label="Sensor 2", markersize=9)
plt.plot(filtered_state_means[:, 0], filtered_state_means[:, 1], "g.", label="Filtered Trajectory", markersize=1)
plt.plot(filtered_state_means2[:, 0], filtered_state_means2[:, 1], "yo", label="Filtered Trajectory 2", markersize=6)
plt.grid()
plt.legend(loc="upper left")
plt.show()    

Еще один важный момент, который я не реализовал в этом коде: при использовании переменной матрицы перехода нужно варьировать и ковариационную матрицу перехода (в зависимости от текущего dt).

Это интересная тема. Дайте мне знать, какая оценка лучше всего подходит для вашей проблемы.

person Anton    schedule 05.01.2018
comment
Это очень полезно, чтобы помочь протестировать некоторые вещи, но все же не позволяет изменять шаг по времени, за исключением перехода к гораздо более высокому разрешению. Если бы время имело некоторую изменчивость, могу ли я просто изменять dt в F для каждого шага вместо того, чтобы оставлять его равным 0,01? - person Adam; 10.01.2018
comment
Например, данные радара могут иметь очень точные метки времени, но время между каждой точкой данных будет разным, потому что транспортное средство находится в другом месте относительно вращающейся антенны при каждом вращении. - person Adam; 10.01.2018
comment
Я добавил новый график для случая переменной матрицы перехода. У него есть свои плюсы и минусы. Что вы думаете? - person Anton; 12.01.2018
comment
Да, это как раз то, что я пытался сделать. Я не понимаю, почему мой более поздний пример в вопросе не дает таких же результатов. Разве сглаживающая функция не должна эффективно делать то же самое, что и ваш второй цикл фильтра? - person Adam; 15.01.2018