Как сделать апсерт с SqlAlchemy?

У меня есть запись, которую я хочу сохранить в базе данных, если ее нет, и если она уже есть (первичный ключ существует), я хочу, чтобы поля были обновлены до текущего состояния. Это часто называют upsert.

Следующий неполный фрагмент кода демонстрирует, что будет работать, но он кажется чрезмерно неуклюжим (особенно, если столбцов было намного больше). Какой способ лучше / лучше?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

Есть ли лучший или менее подробный способ сделать это? Что-то вроде этого было бы здорово:

sess.upsert_this(desired_default, unique_key = "name")

хотя unique_key kwarg явно не нужен (ORM должен легко это понять), я добавил его только потому, что SQLAlchemy имеет тенденцию работать только с первичным ключом. например: я искал, может ли Session.merge будет применимо, но это работает только с первичным ключом, который в данном случае является автоматически увеличивающимся идентификатором, который не очень полезен для этой цели.

Пример использования для этого - просто запуск серверного приложения, которое могло обновить свои ожидаемые данные по умолчанию. то есть: никаких проблем с параллелизмом для этого апсерта.


person Russ    schedule 23.08.2011    source источник
comment
Почему вы не можете сделать поле name первичным ключом, если оно уникально (и слияние в этом случае будет работать). Зачем нужен отдельный первичный ключ?   -  person abbot    schedule 23.08.2011
comment
@abbot: Я не хочу вдаваться в дебаты о поле id, но ... краткий ответ - внешние ключи. Более того, хотя имя действительно является единственным обязательным уникальным ключом, возникают две проблемы. 1) когда на запись шаблона ссылаются 50 миллионов записей в другой таблице, имеющей этот FK в качестве строкового поля, - это безумие. Индексированное целое число лучше, отсюда, казалось бы, бессмысленный столбец id. и 2) в дополнение к этому, если строка была использовалась как FK, теперь есть два места для обновления имени, если / когда оно изменяется, что раздражает и изобилует проблемами мертвых отношений. Идентификатор никогда не меняется.   -  person Russ    schedule 24.08.2011
comment
вы можете попробовать новую (бета) библиотеку upsert для python ... она совместима с psycopg2, sqlite3, MySQLdb   -  person Seamus Abshere    schedule 27.09.2012
comment
см. также эту ветку: Есть ли у SQLAlchemy эквивалент Django получить или создать?   -  person driftcatcher    schedule 16.07.2014


Ответы (8)


SQLAlchemy действительно имеет поведение «сохранить или обновить», которое в последних версиях было встроено в session.add, но раньше было отдельным вызовом session.saveorupdate. Это не «апсерт», но может быть достаточно для ваших нужд.

Хорошо, что вы спрашиваете о классе с несколькими уникальными ключами; Я считаю, что именно по этой причине не существует единственно правильного способа сделать это. Первичный ключ также является уникальным ключом. Если бы не было уникальных ограничений, только первичный ключ, это была бы достаточно простая проблема: если ничего с данным идентификатором не существует или если идентификатор равен None, создать новую запись; иначе обновите все остальные поля в существующей записи с этим первичным ключом.

Однако при наличии дополнительных уникальных ограничений такой простой подход порождает логические проблемы. Если вы хотите «добавить» объект и первичный ключ вашего объекта соответствует существующей записи, но другой уникальный столбец соответствует другой записи, что вы будете делать? Точно так же, если первичный ключ не соответствует ни одной существующей записи, но другой уникальный столбец соответствует существующей записи, что тогда? Для вашей конкретной ситуации может быть правильный ответ, но в целом я бы сказал, что нет единого правильного ответа.

Это может быть причиной отсутствия встроенной операции «upsert». Приложение должно определять, что это означает в каждом конкретном случае.

person wberry    schedule 23.08.2011

SQLAlchemy поддерживает ON CONFLICT двумя методами on_conflict_do_update() и on_conflict_do_nothing().

Копирование из документации:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
person P.R.    schedule 06.06.2017
comment
MySQL также поддерживается с помощью on_duplicate_key_update - person Michael Berdyshev; 10.04.2019
comment
просто execute не могу получить возвращаемый идентификатор - person jiamo; 05.09.2020
comment
Этот код да, я думаю (ответ 3+ лет), но, возможно, комментарий Майклса работает для MySQL. Вообще говоря, мой (этот) ответ - это своего рода прыжок к выводу, что postgres используется в качестве базы данных. Это не очень хорошо, потому что на самом деле он не отвечает на общий вопрос, который был задан. Но, основываясь на полученных мной положительных голосах, я решил, что это было полезно для некоторых людей, поэтому оставил это. - person P.R.; 14.10.2020

В настоящее время SQLAlchemy предоставляет две полезные функции: on_conflict_do_nothing и on_conflict_do_update . Эти функции полезны, но требуют переключения с интерфейса ORM на интерфейс более низкого уровня - Ядро SQLAlchemy.

Хотя эти две функции делают апсертизацию с использованием синтаксиса SQLAlchemy несложной, эти функции далеки от того, чтобы предоставить полное готовое решение для апсертинга.

Я обычно использую для вставки большого фрагмента строк за один SQL-запрос / выполнение сеанса. Обычно я сталкиваюсь с двумя проблемами при обновлении:

Например, отсутствуют привычные нам функции ORM более высокого уровня. Вы не можете использовать объекты ORM, но вместо этого должны предоставить ForeignKey во время вставки.

Я использую эту следующую функцию, которую я написал для решения обеих этих проблем:

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)
person NirIzr    schedule 28.07.2018
comment
on_conflict доступен только для бэкендов, которые поддерживают собственные предложения ON CONFLICT. Следовательно, только postgresql - person cowbert; 28.08.2018
comment
@cowbert Теперь SQLAlchemy также поддерживает ПРИ ДВОЙНОМ ОБНОВЛЕНИИ КЛЮЧА для MySQL. - person Michael Berdyshev; 10.04.2019

Я использую подход «посмотрите, прежде чем прыгать»:

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

Преимущество в том, что он нейтрален по базе данных, и я думаю, что это понятно для чтения. Недостатком является то, что в следующем сценарии существует потенциальное состояние гонки:

  • мы запрашиваем в БД switch_command и не находим его
  • мы создаем switch_command
  • другой процесс или поток создает switch_command с тем же первичным ключом, что и наш
  • мы пытаемся зафиксировать наш switch_command
person Ben    schedule 19.10.2017
comment
Этот вопрос обрабатывает состояние гонки с помощью try / catch - person Ben; 19.10.2017
comment
Вся цель upsert - избежать состояния гонки, описанного здесь. - person sampierson; 05.12.2018
comment
@sampierson Я знаю - вот почему очень грустно, что SQLALchemy затрудняет работу с чистотой и переносимостью ... Я выделил состояние гонки в своем ответе - person Ben; 05.12.2018

Приведенное ниже отлично работает для меня с базой данных красного смещения, а также будет работать для комбинированного ограничения первичного ключа.

ИСТОЧНИК: это

Всего несколько модификаций, необходимых для создания движка SQLAlchemy в функции def start_engine ()

from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql

Base = declarative_base()

def start_engine():
    engine = create_engine(os.getenv('SQLALCHEMY_URI', 
    'postgresql://localhost:5432/upsert'))
     connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    return engine


class DigitalSpend(Base):
    __tablename__ = 'digital_spend'
    report_date = Column(Date, nullable=False)
    day = Column(Date, nullable=False, primary_key=True)
    impressions = Column(Integer)
    conversions = Column(Integer)

    def __repr__(self):
        return str([getattr(self, c.name, None) for c in self.__table__.c])


def compile_query(query):
    compiler = query.compile if not hasattr(query, 'statement') else 
  query.statement.compile
    return compiler(dialect=postgresql.dialect())


def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
    table = model.__table__

    stmt = insert(table).values(rows)

    update_cols = [c.name for c in table.c
                   if c not in list(table.primary_key.columns)
                   and c.name not in no_update_cols]

    on_conflict_stmt = stmt.on_conflict_do_update(
        index_elements=table.primary_key.columns,
        set_={k: getattr(stmt.excluded, k) for k in update_cols},
        index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
        )

    print(compile_query(on_conflict_stmt))
    session.execute(on_conflict_stmt)


session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])
person Aditi Srivastava    schedule 26.03.2019

Это позволяет получить доступ к базовым моделям на основе строковых имен.

def get_class_by_tablename(tablename):
  """Return class reference mapped to table.
  https://stackoverflow.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
  :param tablename: String with name of table.
  :return: Class reference or None.
  """
  for c in Base._decl_class_registry.values():
    if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
      return c


sqla_tbl = get_class_by_tablename(table_name)

def handle_upsert(record_dict, table):
    """
    handles updates when there are primary key conflicts

    """
    try:
        self.active_session().add(table(**record_dict))
    except:
        # Here we'll assume the error is caused by an integrity error
        # We do this because the error classes are passed from the
        # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
        # them with it's own code - this should be updated to have
        # explicit error handling for each new db engine

        # <update>add explicit error handling for each db engine</update> 
        active_session.rollback()
        # Query for conflic class, use update method to change values based on dict
        c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
        c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk

        c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
        c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols

        c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()

        # apply new data values to the existing record
        for k, v in record_dict.items()
            setattr(c_target_record, k, v)
person Schalton    schedule 05.04.2019

У меня это работает с sqlite3 и postgres. Хотя это может привести к сбою при комбинированных ограничениях первичного ключа и, скорее всего, не удастся с дополнительными уникальными ограничениями.

    try:
        t = self._meta.tables[data['table']]
    except KeyError:
        self._log.error('table "%s" unknown', data['table'])
        return

    try:
        q = insert(t, values=data['values'])
        self._log.debug(q)
        self._db.execute(q)
    except IntegrityError:
        self._log.warning('integrity error')
        where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
        update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
        q = update(t, values=update_dict).where(*where_clause)
        self._log.debug(q)
        self._db.execute(q)
    except Exception as e:
        self._log.error('%s: %s', t.name, e)
person ThePsyjo    schedule 08.11.2018

Есть несколько ответов, и вот еще один ответ (YAA). Другие ответы не так удобочитаемы из-за задействованного метапрограммирования. Вот пример того, что

  • Использует SQLAlchemy ORM

  • Показывает, как создать строку, если есть ноль строк, используя on_conflict_do_nothing

  • Показывает, как обновить существующую строку (если есть) без создания новой строки с помощью on_conflict_do_update

  • Использует первичный ключ таблицы как constraint

Более длинный пример в исходном вопросе, с чем связан этот код.


import sqlalchemy as sa
import sqlalchemy.orm as orm
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

class PairState(Base):

    __tablename__ = "pair_state"

    # This table has 1-to-1 relationship with Pair
    pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
    pair = orm.relationship(Pair,
                        backref=orm.backref("pair_state",
                                        lazy="dynamic",
                                        cascade="all, delete-orphan",
                                        single_parent=True, ), )


    # First raw event in data stream
    first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # Last raw event in data stream
    last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # The last hypertable entry added
    last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    @staticmethod
    def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Sets the first event value if not exist yet."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, first_event_at=ts).
            on_conflict_do_nothing()
        )

    @staticmethod
    def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_event_at for a named pair."""
        # Based on the original example of https://stackoverflow.com/a/49917004/315168
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_event_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
        )

    @staticmethod
    def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_interval_at for a named pair."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_interval_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
        )
person Mikko Ohtamaa    schedule 14.06.2021