Многопоточная транзакция прерывается с помощью Entity Framework

У меня проблема с многопоточной транзакцией и структурой сущностей. У меня есть поток, который работает в транзакции, и я хотел бы, чтобы еще несколько рабочих потоков работали в одной транзакции. Следующий код иллюстрирует ситуацию (в контексте EF есть одна фиктивная сущность, код в основном порождает 5 потоков, я хотел бы вставить некоторые сущности в каждый поток и в конце в основной поток, я хотел бы продолжить работу с БД, но чтобы весь процесс был изолирован в ОДНОЙ транзакции):

using(var scope = new TransactionScope())
{
    int cnt = 5;
    ManualResetEvent[] evt = new ManualResetEvent[cnt];

    for(int i = 0; i < cnt; i++)
    {
        var sink = new ManualResetEvent(false);
        evt[i] = sink;

        var tr = Transaction.Current.DependentClone(
            DependentCloneOption.BlockCommitUntilComplete);

        Action run = () =>
        {
            using (var scope2 = new TransactionScope(tr))
            {
                using (var mc = new ModelContainer())
                {
                    mc.EntitySet.Add(new Entity()
                    {
                        MyProp = "test"
                    });
                    mc.SaveChanges();
                }
            }

            sink.Set();
        };

        ThreadPool.QueueUserWorkItem(r => run());
    }

    ManualResetEvent.WaitAll(evt);

    using (var mc = new ModelContainer())
    {
        Console.WriteLine(mc.EntitySet.Count());
    }
    Console.ReadKey();
}

Проблема в том, что это исключение вызывается для mc.SaveChanges();. Внутренним исключением является TransactionException: «Операция недействительна для состояния транзакции». Кажется, что в какой-то момент транзакция прерывается. Я думаю, что это после того, как первый поток вызывает SaveChanges(), но я не уверен. Любая идея, почему транзакция прерывается?


person Jan Novák    schedule 16.10.2012    source источник


Ответы (2)


Я обнаружил, в чем здесь проблема. На основе этой статьи я нашел что невозможно работать с двумя соединениями MSSQL сервера в рамках одной транзакции одновременно. Я также обнаружил, что неправильно обрабатывал зависимые транзакции в предыдущем коде. Мой рабочий код иллюстрации выглядит следующим образом:

    class Context
    {
        public ManualResetEvent sink;
        public DependentTransaction transaction;
    }

    static object syncRoot = new object();

    static void Main(string[] args)
    {
        using (var scope = new TransactionScope())
        {
            int cnt = 5;
            ManualResetEvent[] evt = new ManualResetEvent[cnt];

            for (int i = 0; i < cnt; i++)
            {
                var sink = new ManualResetEvent(false);
                evt[i] = sink;

                var context = new Context()
                {
                    // clone transaction
                    transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    sink = sink
                };

                ThreadPool.QueueUserWorkItem(new WaitCallback(Run), context);
            }

            // wait for all threads to finish
            ManualResetEvent.WaitAll(evt);

            using (var mc = new ModelContainer())
            {
                // check database content
                Console.WriteLine(mc.EntitySet.Count());
            }

            // after test is done, the transaction is rolled back and the database state is untouched
            Console.ReadKey();
        }
    }

    static void Run(object state)
    {
        var context = state as Context;

        // set ambient transaction
        Transaction oldTran = Transaction.Current;
        Transaction.Current = context.transaction;

        using (var mc = new ModelContainer())
        {
            mc.EntitySet.Add(new Entity()
            {
                MyProp = "test"
            });

            // synchronize database access
            lock (syncRoot)
            {
                mc.SaveChanges();
            }
        }

        // release dependent transaction
        context.transaction.Complete();            
        context.transaction.Dispose();

        Transaction.Current = oldTran;

        context.sink.Set();            
    }
}

Вероятно, это не очень хороший способ написания многопоточного бизнес-слоя, но такой подход к разделяемым транзакциям ОЧЕНЬ полезен для тестирования в моем случае. Единственная модификация, необходимая для того, чтобы это работало, — это переопределить контекст базы данных и синхронизировать метод сохранения в тестовых прогонах.

person Jan Novák    schedule 21.10.2012

SqlConnection не является потокобезопасным (и EF ObjectContext/DbContect также не является потокобезопасным), поэтому это будет работать только при синхронизации доступа к контексту и соединению. Вы предлагаете модель, в которой вы параллельно обрабатываете ресурсы, интенсивно использующие ЦП, и записываете все изменения в один поток после завершения всех потоков.

person Steven    schedule 16.10.2012
comment
Но я не понимаю, что если я выполняю этот код без транзакции, он работает. Я создаю новый контекст БД для каждого потока, и DependentTransaction, как я полагаю, должен использоваться для совместного использования транзакций между потоками — ссылка - person Jan Novák; 16.10.2012
comment
Изучив ваш код еще раз более внимательно, я вижу, что вы делаете. Это может действительно сработать, но я все еще нахожу это пугающим способом разработки вашей системы. Какую проблему вы пытаетесь решить? - person Steven; 16.10.2012
comment
Это не то, как работает мой производственный код. Моя цель - сделать какой-то модульный интеграционный тест. Я тестирую уровень бизнес-логики на сервере MSSQL. В методе тестовой инициализации я запускаю транзакцию, затем запускаю проверенный код, а затем проверяю содержимое базы данных. После завершения теста я откатываю транзакцию, чтобы сохранить базу данных в исходном состоянии. Этот метод отлично работает для однопоточных операций. Но когда в операции используется более одного потока, мне нужно разделить транзакцию в этих потоках. Лучше всего без необходимости менять производственный код. - person Jan Novák; 16.10.2012
comment
Я до сих пор не понимаю, зачем вам нужно несколько транзакций. Одна бизнес-операция обычно должна выполняться в одном потоке, или, когда задействовано несколько потоков, они будут объединены, чтобы один поток взаимодействовал с этой базой данных. Эта бизнес-операция является единицей работы, которая должна быть транзакционной. Каждый модульный тест должен проверять одну бизнес-операцию, и поэтому этот тест будет границей транзакции. - person Steven; 17.10.2012
comment
Хорошо, может быть, это плохой дизайн теста. Я хотел протестировать всю систему вместе (более мелкие части уже протестированы). Мое приложение обрабатывает большой объем данных, поэтому я разделил обработку на несколько потоков, я не хочу объединять взаимодействие с базой данных в один поток по соображениям производительности. Вы правы в том, что я иду против принципа модульного тестирования тестирования небольших фрагментов кода. Моя цель больше интеграционный тест. Кроме того, у меня есть транзакция ТОЛЬКО для сохранения состояния базы данных нетронутым после теста. Это более технический вопрос, чем вопрос архитектуры, мне все еще интересно, почему транзакция прерывается. - person Jan Novák; 17.10.2012