Лучший способ управлять несколькими очередями в порядке FIFO

У меня есть пользовательский интерфейс, который может ставить в очередь задания совершенно разных типов. В настоящее время он делает это, сохраняя задание в соответствующей таблице в базе данных (Table-per-JobType).

Затем появляется мой внутренний процесс, берет задания и, при условии, что есть доступный рабочий процесс (поток), использует его для выполнения соответствующего метода для задания.

в псевдокоде:

While(Runnning) {
    While(Queue1.HasJobs && Workers.IdleCount > 0) {
        FirstIdleWorker.Execute(Queue1Method(Type1Job));
    }

    ...

    While(QueueN.HasJobs && Workers.IdleCount > 0) {
        FirstIdleWorker.Execute(QueueNMethod(TypeNJob));
    }
    //Wait for a job to complete or a polling timeout if queues are empty
}

(На самом деле это не так уж и наивно, но это иллюстрирует порядок, в котором обрабатывается работа)

Как видите, это работает, но не учитывает, в каком порядке были добавлены задания. Это не является нарушением условий сделки, поскольку задания являются атомарными, но это раздражает с точки зрения пользовательского интерфейса — например, Пользователь 1 ставит в очередь 20 заданий из Тип 2, затем пользователь 2 Помещает в очередь 1000 заданий типа 1. Пользователь 1 теперь должен дождаться завершения всех заданий пользователя 2, прежде чем их (относительно) быстрые задания будут обработаны.

У заданий есть свойство CreatedOn, поэтому определить порядок несложно, но как лучше всего реализовать комбинированную очередь строго типизированным способом, который не является спагетти-кодом?

Я пытаюсь избежать объекта "GenericJob" с .CreatedOn, .Queue1Id, .Queue2Id, .Queue3Id, так как это кажется неаккуратным.

Хотя в целом FIFO — это то, что мне нужно, это не строгое требование — я просто не хочу, чтобы элементы постоянно менялись.

Есть ли шаблон для такого рода вещей? Если нет, может ли кто-нибудь указать мне хороший учебник, пожалуйста?

(Кстати, это потенциально длительные задания. На самом деле я использую TPL за кулисами для управления рабочими после того, как задания взяты из очереди, но мне все еще нужно самому управлять очередью, так как заданий гораздо больше, чем я может загрузиться в память за один раз)


person Basic    schedule 26.06.2012    source источник
comment
В чем причина серверных очередей? Кажется, вы все равно хотите объединить их в одну очередь.   -  person Oliver Hanappi    schedule 26.06.2012
comment
Это приложение для внутреннего использования, которое выполняет ряд совершенно разных сканирований (разные входные и выходные данные. Одни берут доменные имена, другие — хосты, а третьи — нечеткие условия поиска). Работа выполняется совершенно разными библиотеками, но все они работают на одном сервере, и вместо того, чтобы иметь N сервисов, обрабатывающих задания, я решил объединить их в один, тем более что это позволяет мне лучше контролировать использование объединенных ресурсов без IPC.   -  person Basic    schedule 26.06.2012
comment
Mark 1 Mod 0 на самом деле работал, имея задание с входом List<String> и JobType, которое работало, но приводило к путанице, если оно ожидало (скажем) доменные имена и получало имена хостов/IP-адреса. Он также выводит только в CSV с некоторыми метаданными и оставляет на усмотрение пользователя сопоставлять/организовывать результаты. На этот раз я хочу, чтобы выходные данные сохранялись структурированным образом в базе данных, чтобы я мог создавать отчеты, повторно использовать старые результаты и т. д.   -  person Basic    schedule 26.06.2012
comment
Общий элемент задания не должен быть таким небрежным, возможно, с использованием абстракции для определения структуры общих атрибутов задания в целях управления заданием. Обычная функция Process() или аналогичная может взять на себя конкретные требования к обработке каждого задания.   -  person invert    schedule 26.06.2012
comment
... в качестве альтернативы, использование нескольких рабочих с умом, чтобы начать обработку типов заданий, для которых еще нет рабочих, назначенных этим типам ... но мне это кажется ненадежным.   -  person invert    schedule 26.06.2012
comment
@wez Как упоминалось в вопросе, у заданий совершенно разные входы / выходы. Выходы не должны быть такой проблемой, но входы (нет очевидного способа абстрагироваться). Соглашусь насчет хлипкости варианта 2 (думал о чем-то подобном). Возможно, у каждого работника мог бы быть предпочтительный тип работы, который учитывался бы при назначении работы?   -  person Basic    schedule 26.06.2012
comment
@wex на самом деле, я беру свои слова назад, возможно, вы что-то поняли - может быть достаточно объекта с Enum для указания типа задания и полей Date и Id ...   -  person Basic    schedule 26.06.2012


Ответы (2)


Ткните меня, если я ошибаюсь, я надеюсь, что этот псевдокод хорошо объясняет абстракцию интерфейса.

Интерфейс может выглядеть так:

enum JobTypes
{
    JobType1 = 0x01,
    JobType2 = 0x02,
    JobType3 = 0x03
}
interface IJob
{
    int ID { get; set; }
    JobTypes JobType { get; set; }
    DateTime Date { get; set; }
    bool Complete { get; set; }
    void Process(List<object> parameters);
}

Каждый процессор типа задания реализует этот интерфейс, изменяйте и добавляйте свойства в соответствии с вашими потребностями:

class JobType1 : IJob
{
    public int ID { get; set; }
    public JobTypes JobType { get; set; }
    public DateTime Date { get; set; }
    public bool Complete { get; set; }
    public void Process(List<object> parameters)
    {
        throw new NotImplementedException();
    }
}

Затем вы можете смешать типы заданий в один список:

List<IJob> joblist = new List<IJob>();

и, возможно, отсортировать их по дате с помощью лямбда:

joblist.Sort((a, b) => DateTime.Compare(a.Date, b.Date));

получить список необработанных заданий (не проверяет те, которые заняты обработкой*)

List<IJob> undone = joblist.Where(job => job.Complete == false) as List<IJob>;
person invert    schedule 27.06.2012
comment
Привет, Вез, рад, что ты написал в качестве ответа, я как раз собирался тебя попросить. Это близко к тому, что я сделал, за исключением того, что я решил использовать класс JobQueue, который является не чем иным, как заполнителем в очереди, и сохранил CreatedOn, JobType (перечисление) и Id (идентификатор, специфичный для этого типа задания). Затем у меня есть фабрика, которая берет этот заполнитель и создает правильный класс для работы с ним. Я сделал это таким образом, поскольку это позволяет мне повторно запрашивать очередь без особых накладных расходов (очередь может быть изменена из другого места, поэтому мне нужно продолжать проверять БД). Спасибо - person Basic; 27.06.2012
comment
Отличное мышление с использованием фабрики. - person invert; 27.06.2012

Полный ответ, вероятно, будет чем-то вроде планирования рабочих мест, поскольку у вас есть задания, требующие разных ресурсов или уровней ресурсов, и вы хотите избежать голодания коротких заданий. Это довольно сложно и требует некоторого чтения, но в конечном итоге это меньше работы, чем попытка выполнить сложное планирование ad hoc из одной очереди.

Не похоже, что у вас есть зависимости между заданиями, поэтому менее затратным способом было бы создание очередей для каждого пользователя для каждого типа задания, а затем выполнение взвешенного циклического перебора очередей. (Т.е. взять 2 задания из Очереди X на каждое задание из Очереди Y.)

person Chris D    schedule 18.07.2012