Шаблон параллелизма Java для внешнего общего ресурса (смарт-карт)

У меня есть служба веб-сервера, где клиенты запрашивают вычисление смарт-карты и получают результат. Доступный номер смарт-карты может уменьшаться или увеличиваться во время работы сервера, например, я могу физически добавить или удалить смарт-карту из устройства чтения (или многие другие события ... например, исключение и т. Д.).

введите описание изображения здесь

Вычисление смарт-карты может занять некоторое время, поэтому мне нужно оптимизировать эти задания, чтобы использовать все доступные смарт-карты, если есть одновременные запросы к веб-серверу.

Думал работать с пулом смарт-карт-потоков. Необычным, по крайней мере для меня, является то, что размер пула должен меняться не в зависимости от клиентских запросов, а только от доступности смарт-карты.

введите описание изображения здесь

Я изучил множество примеров:

  • BlockingQueue: неплохо сохранить запрос и остановить поток, ожидающий какого-либо действия.
  • FutureTask: я могу использовать этот класс, чтобы позволить клиенту ждать своего ответа, но какой тип программы должен выполнять эту задачу?
  • ThreadPoolExecutor: кажется, что мне нужно, но с этим я не могу изменить размер пула, более того, каждый поток должен быть связан с одним слотом смарт-карты. Это может быть решением, если я могу изменить размер пула (добавление потока при вставке смарт-карты и удаление потока при удалении смарт-карты) и если я могу назначить конкретную смарт-карту для каждого потока.

Это элемент управления смарт-картой, у меня есть один SmartcardWrapper на каждую смарт-карту, каждая смарт-карта имеет свой номер слота.

public class SmartcardWrapper{

    private int slot;

    public SmartcardWrapper(int slot) {
        this.slot=slot;
    }   

    public byte[] compute(byte[] input) {
        byte[] out=new byte[];
        SmartcardApi.computerInput(slot,input,out); //Native method
        return out;
    }
}

Я попытался создать пул потоков с одним потоком на смарт-карту:

private class SmartcardThread extends Thread{

    protected SmartcardWrapper sw;

    public SmartcardThread(SmartcardWrapper sw){
        this.sw=sw;
    }

    @Override
    public void run() {
        while(true){
            byte[] input=queue.take();
            byte output=sw.compute(input);
            // I have to return back the output to the client
        }           
    }
}

Все ждут чего-то в одной очереди ввода:

BlockingQueue<byte[]> queue=new BlockingQueue<byte[]>();

Но как вернуть вывод смарт-карты-потока клиенту-веб-серверу? Это позволило мне думать, что BlockingQueue - не мое решение.

Как подойти к этой проблеме? Какому шаблону параллелизма мне следует следовать? правильно ли назначать один поток для каждой смарт-карты или я могу просто использовать семафоры?


person Tobia    schedule 07.12.2015    source источник
comment
Смарт-карты находятся на сервере или на клиенте? Вычисления происходят на сервере или на клиенте? Если они находятся в разных местах, не могли бы вы уточнить, как информация передается между ними?   -  person Warren Dew    schedule 08.12.2015
comment
Почему бы не поставить смарт-карты (ресурсы) в очередь?   -  person Maarten Bodewes    schedule 08.12.2015
comment
@WarrenDew - это вычисление на стороне сервера.   -  person Tobia    schedule 08.12.2015
comment
@MaartenBodewes ты имеешь ввиду очередь потоков? И когда поток завершает свою работу, он снова ставится в очередь?   -  person Tobia    schedule 08.12.2015
comment
@Tobia есть ли у вас серверный API, который вызывает какой-либо метод увеличения / уменьшения количества доступных смарт-карт?   -  person Archer    schedule 15.12.2015
comment
Нет, я буду использовать задачу таймера, чтобы проверить наличие новой карты или удалить отсутствующую смарт-карту. (api смарт-карты имеет только bool-метод isCardIn (), поэтому у меня нет реального события добавления / удаления)   -  person Tobia    schedule 15.12.2015


Ответы (5)


Ваше предположение:

ThreadPoolExecutor: Кажется, что мне нужно, но с этим я не могу изменить размер пула, более того, каждый поток должен быть связан с одним слотом смарт-карты.

это не так.

You can set thread pool size dynamically.

Взгляните на следующие API-интерфейсы ThreadPoolExecutor

public void setMaximumPoolSize(int maximumPoolSize)

Устанавливает максимально допустимое количество потоков. Это отменяет любое значение, установленное в конструкторе. Если новое значение меньше текущего значения, избыточные существующие потоки будут завершены, когда они в следующий раз станут простаивающими.

public void setCorePoolSize(int corePoolSize)

Устанавливает основное количество потоков. Это отменяет любое значение, установленное в конструкторе. Если новое значение меньше текущего значения, избыточные существующие потоки будут завершены, когда они в следующий раз станут простаивающими. Если больше, то при необходимости будут запущены новые потоки для выполнения любых задач в очереди.

Core and maximum pool sizes:

ThreadPoolExecutor автоматически настроит размер пула в соответствии с границами, установленными corePoolSize и maximumPoolSize.

Когда новая задача отправляется в методе execute(java.lang.Runnable) и выполняется менее corePoolSize потоков, создается новый поток для обработки запроса, даже если другие рабочие потоки простаивают.

Если запущено больше corePoolSize, но меньше maximumPoolSize потоков, новый поток будет создан только в том случае, если очередь заполнена.

Устанавливая для maximumPoolSize практически неограниченное значение, такое как Integer.MAX_VALUE, вы позволяете пулу размещать произвольное количество одновременных задач. Но я бы не рекомендовал иметь такое большое количество потоков. Будьте осторожны при установке этого значения.

Чаще всего размеры ядра и максимального пула устанавливаются только при создании, но они также могут быть изменены динамически с помощью setCorePoolSize(int) и setMaximumPoolSize(int).

РЕДАКТИРОВАТЬ:

Для лучшего использования пула потоков, если вы знаете, что максимальное количество карт равно 6, вы можете использовать

 ExecutorService executor = Executors.newFixedThreadPool(6);

OR

person Ravindra babu    schedule 15.12.2015
comment
У меня была такая же идея! Я расширил ThreadFactory, чтобы создать обернутый поток со ссылкой на слот смарт-карты, а затем создал новый ThreadPoolExecutor с этим настраиваемым ThreadFactory. Эта фабрика поддерживает maxpool, corepool с количеством доступных смарт-карт. Я остановил эту разработку, потому что я не мог понять, как управлять остановкой потока, если я уменьшу номер пула из-за удаления смарт-карты, как я могу быть уверен, что правильный поток будет остановлен для удаленной смарт-карты, а не другой? ThreadPoolExecutor не имеет метода removeThread (Thread t). - person Tobia; 16.12.2015
comment
Я думаю, вам не стоит беспокоиться о точном потоке, если система поддерживает размер пула. Ваше требование: новая карточка должна иметь новую ветку, и это требование выполнено. Даже если уменьшение размера пула происходит не сразу, а в определенное время позже, состояние системы будет в порядке. - person Ravindra babu; 16.12.2015
comment
Если я связываю поток с одной смарт-картой, мне нужно удалить этот поток или пометить его как недоступный, если его смарт-карта удалена. Это требование (номер смарт-карты может уменьшаться или увеличиваться) - person Tobia; 16.12.2015
comment
Лично я не думаю, что новая ветка для новой карты - хорошая идея. У вас ограниченное количество ядер ЦП на вашем сервере, например 8,16,32 и т. Д. Какую выгоду вы получаете от создания 1000 потоков для 1000 новых карт, когда у вас 32 ядра ЦП? - person Ravindra babu; 16.12.2015
comment
Я думаю, что есть какой-то API для получения номера потока выполняемой задачи. Я выясню это когда-нибудь. - person Ravindra babu; 16.12.2015
comment
Но создание и удаление потоков - дело дорогостоящее. Вы теряете преимущество основной цели ThredPoolExecutor. - person Ravindra babu; 16.12.2015
comment
В моем случае смарт-карт может быть до 6, но я согласен с вами, это может быть пустой тратой потока. Как обойтись без одного потока на смарт-карту? С семафорами для проверки наличия смарт-карты? Мне нужно оптимизировать использование смарт-карты в случае одновременного выполнения большого количества запросов к веб-серверу. - person Tobia; 16.12.2015
comment
Давайте возьмем фиксированный пул потоков размером 6 и будем рассматривать добавление карты как задачу для исполнителя пула потоков. Решает ли это вашу проблему? - person Ravindra babu; 16.12.2015
comment
Но когда у меня есть реальная задача для смарт-карты, я отправляю ее ThreadPoolExecutor ... это выбирает первый доступный, но как я могу быть уверен, что смарт-карта вставлена ​​и готова? Вот почему я решил увеличивать потоки только тогда, когда доступна смарт-карта. - person Tobia; 16.12.2015
comment
Если я правильно понимаю (поправьте меня, если я ошибаюсь), у вас может быть два пула потоков - один раз до готовности карты и другой, когда карта будет готова. - person Ravindra babu; 17.12.2015
comment
Я этого не понимаю, почему два бассейна? - person Tobia; 17.12.2015
comment
Вы сказали, что отправите реальную задачу ThreadPoolExecutor. Но вы не уверены, когда смарт-карта вставлена ​​и готова. Итак, я подумал, что карта будет готова через некоторое время, а задание будет отправлено в пул позже. - person Ravindra babu; 17.12.2015
comment
Я говорил о реальной задаче, потому что вы предложили создать задачу для вставки смарт-карты. На самом деле я могу понять, что смарт-карта готова с запланированным опросом, который проверяет каждую карту. - person Tobia; 17.12.2015
comment
Если у вас есть агент опроса, достаточно одного пула потоков. Вы можете отправить реальную задачу в ThreadPoolExecutor, когда ваша смарт-карта будет готова. - person Ravindra babu; 17.12.2015
comment
Взгляните на scheduledexecutorservice: tutorials.jenkov.com/java-util-concurrent/ - person Ravindra babu; 17.12.2015

Рассматривали ли вы вообще использование Apache Commons Pool?

Вам необходимо поддерживать пул объектов SmartcardWrapper, где каждый SmartcardWrapper будет представлять физическую смарт-карту. Всякий раз, когда вам нужно выполнить новое вычисление, вы заимствуете объект из пула, выполняете расчет и возвращаете объект в пуле, чтобы его можно было повторно использовать в следующем потоке.

Сам пул является потокобезопасным и блокируется, когда нет доступных объектов. Все, что вам нужно сделать, это реализовать api для добавления / удаления объектов SmartcardWrapper в пул.

person Vladimir G.    schedule 18.12.2015

Я мог бы найти разумное простое решение, основанное на следующих предположениях:

  • отдельный процесс управляет уведомлениями (системными событиями) для смарт-карт, которые становятся доступными или удаляются.
  • клиенту все равно, какую смарт-карту он получит, если он может использовать одну без помех.

Эти два предположения на самом деле упрощают создание решения для объединения ресурсов (общих ресурсов), поскольку обычно сам пул отвечает за создание и удаление ресурсов, когда это необходимо. Без этой функциональности решение для пула становится проще. Я действительно предполагаю, что клиент, который получает смарт-карту из пула для использования, может выполнять необходимые функции смарт-карты в своем собственном потоке выполнения (аналогично тому, как соединение с базой данных используется из пула соединений с базой данных для запроса данных из базы данных).

Я провел лишь минимальное тестирование для двух классов, показанных ниже, и, боюсь, основная часть работы заключается в написании (модульных) тестов, которые доказывают, что пул работает правильно с одновременными клиентскими запросами в сочетании с добавлением и удалением ресурсов смарт-карты. Если вы не хотите этого делать, то, вероятно, лучшим решением будет ответ user769771. Но если вы это сделаете, попробуйте, посмотрите, подходит ли он. Идея состоит в том, что только один экземпляр пула ресурсов создается и используется всеми клиентами и обновляется отдельным процессом, который управляет доступностью смарт-карт.

import java.util.*;
import java.util.concurrent.*;

/**
 * A resource pool that expects shared resources 
 * to be added and removed from the pool by an external process
 * (i.e. not done by the pool itself, see {@link #add(Object)} and {@link #remove(Object)}.
 * <br>A {@link ResourcePoolValidator} can optionally be used. 
 * @param <T> resource type handed out by the pool.
 */
public class ResourcePool<T> {

    private final Set<T> registered = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>()); 
    /* Use a linked list as FIFO queue for resources to lease. */
    private final List<T> available = Collections.synchronizedList(new LinkedList<T>()); 
    private final Semaphore availableLock = new Semaphore(0, true); 

    private final ResourcePoolValidator<T> validator;

    public ResourcePool() {
        this(null);
    }

    public ResourcePool(ResourcePoolValidator<T> validator) {
        super();
        this.validator = validator;
    }

    /**
     * Add a resource to the pool.
     * @return true if resource is not already in the pool.
     */
    public synchronized boolean add(T resource) {

        boolean added = false;
        if (!registered.contains(resource)) {
            registered.add(resource);
            available.add(resource);
            availableLock.release();
            added = true;
        }
        return added;
    }

    /**
     * Removes a resource from the pool.
     * The resource might be in use (see {@link #isLeased(Object)})
     * in which case {@link ResourcePoolValidator#abandoned(Object)} will be called 
     * when the resource is no longer used (i.e. released). 
     * @return true if resource was part of the pool and removed from the pool.
     */
    public synchronized boolean remove(T resource) {

        // method is synchronized to prevent multiple threads calling add and remove at the same time 
        // which could in turn bring the pool in an invalid state.
        return registered.remove(resource);
    }

    /**
     * If the given resource is (or was, see also {@link #remove(Object)} part of the pool,
     * a returned value true indicates the resource is in use / checked out.
     * <br>This is a relative expensive method, do not call it frequently.
     */
    public boolean isLeased(T resource) {
        return !available.contains(resource);
    }

    /**
     * Try to get a shared resource for usage. 
     * If a resource is acquired, it must be {@link #release(Object)}d in a finally-block.
     * @return A resource that can be exclusively used by the caller.
     * @throws InterruptedException When acquiring a resource is interrupted.
     * @throws TimeoutException When a resource is not available within the given timeout period.
     */
    public T tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException, TimeoutException {

        T resource = null;
        long timeRemaining = tunit.toMillis(timeout);
        final long tend = System.currentTimeMillis() + timeRemaining;
        do {
            if (availableLock.tryAcquire(timeRemaining, TimeUnit.MILLISECONDS)) {
                resource = available.remove(0);
                if (registered.contains(resource)) {
                    boolean valid = false;
                    try {
                        valid = (validator == null ? true : validator.isValid(resource));
                    } catch (Exception e) {
                        // TODO: log exception
                        e.printStackTrace();
                    }
                    if (valid) {
                        break; // return the "checked out" resource
                    } else {
                        // remove invalid resource from pool
                        registered.remove(resource);
                        if (validator != null) {
                            validator.abandoned(resource);
                        }
                    }
                }
                // resource was removed from pool, try acquire again
                // note that this implicitly lowers the maximum available resources
                // (an acquired permit from availableLock goes unused).
                // TODO: retry puts us at the back of availableLock queue but should put us at the front of the queue
                resource = null;
            }
            timeRemaining = tend - System.currentTimeMillis();
        } while (timeRemaining > 0L);
        if (resource == null) {
            throw new TimeoutException("Unable to acquire a resource within " + tunit.toMillis(timeout) + " ms.");
        }
        return resource;
    }

    /**
     * This method must be called by the caller / client whenever {@link #tryAcquire(long, TimeUnit)}
     * has returned a resource. If the caller has determined the resource is no longer valid,
     * the caller should call {@link #remove(Object)} before calling this method.
     * @param resource no longer used.
     */
    public void release(T resource) {

        if (resource == null) {
            return;
        }
        if (registered.contains(resource)) {
            available.add(resource);
            availableLock.release();
        } else {
            if (validator != null) {
                validator.abandoned(resource);
            }
        }
    }

    /** An array (copy) of all resources registered in the pool. */
    @SuppressWarnings("unchecked")
    public T[] getRegisteredResources() {
        return (T[]) registered.toArray(new Object[registered.size()]);
    }

}

И отдельный класс с функциями, относящимися к отдельному процессу, который управляет доступностью смарт-карт.

import java.util.concurrent.TimeUnit;

/**
 * Used by a {@link ResourcePool} to validate a resource before handing it out for lease
 * (see {@link #isValid(Object)} and signal a resource is no longer used (see {@link #abandoned(Object)}). 
 */
public class ResourcePoolValidator<T> {

    /**
     * Overload this method (this method does nothing by default) 
     * to validate a resource before handing it out for lease.
     * If this method returns false or throws an exception (which it preferably should not do), 
     * the resource is removed from the pool.
     * @return true if the resource is valid for leasing
     */
    public boolean isValid(T resource) {
        return true;
    }

    /**
     * Called by the {@link ResourcePool#release(Object)} method when a resource is released by a caller 
     * but the resource was previously removed from the pool and in use.
     * <br>Called by {@link ResourcePool#tryAcquire(long, TimeUnit)} if a resource if not valid 
     * (see {@link #isValid(Object)}.
     * <br>Overload this method (this method does nothing by default) to create a notification of an unused resource,
     * do NOT do any long period of processing as this method is called from a caller (client) thread.
     */
    public void abandoned(T resource) {
        // NO-OP
    }

}
person vanOekel    schedule 20.12.2015

Если посмотреть на требования, то лучшей архитектурой будет отделение вычислений смарт-карты от ваших веб-сервисов.

Если полагаться на то, что веб-службы будут ожидать выполнения задач, интенсивно использующих процессор, это приведет к тайм-аутам.

Лучшее решение - это предварительное вычисление смарт-карты с использованием периодического задания и сохранение этих пар слотов и вычислений на кэш-сервере, таком как Redis.

введите описание изображения здесь

Задание синхронизатора смарт-карт - это отдельное автономное приложение J2SE, которое периодически проверяет, какая смарт-карта доступна и активна (без ошибок), и обновляет кэш Redis с слотом и вычислениями в виде пары ключ / значение. Если смарт-карта недоступна, она будет удалена из кеша.

Веб-служба просто проверит кеш Redis для определенного ключа слота и, если найдет значение, вернет его или вернет не найденное для этого слота (недоступно или ошибка)

Этот дизайн масштабируется как со стороны смарт-карты, так и со стороны клиентских запросов.

person shazin    schedule 15.12.2015
comment
Я не могу кэшировать задания смарт-карты, потому что это зависит от ввода клиентов. Мой вопрос заключается в том, как управлять множеством SmartcardWrapper (каждая смарт-карта имеет свою собственную оболочку, связанную с ее слотом) для обслуживания одновременных запросов от клиентов. - person Tobia; 15.12.2015

В ответ на ваш вопрос о том, как вернуть результат вызывающему абоненту:

Все ждут чего-то в одной очереди ввода:

BlockingQueue queue = новая BlockingQueue ();

Но как вернуть вывод смарт-карты-потока клиенту-веб-серверу? Это позволило мне думать, что BlockingQueue - не мое решение.

Ваша идея очереди отправки в основном хороша, но вам также нужна очередь для каждого потока, чтобы вернуть результат отправителю задания ...

Измените свою очередь отправки на:

BlockingQueue<JobSubmitRec> queue=new BlockingQueue<JobSubmitRec>();

и JobSubmitRec будет иметь byte [] и одноразовую очередь для возврата результата:

class JobSubmitRec
{
  byte[] data;
  BlockingQueue<JobSubmitResult> result=new LinkedBlockingQueue<JobSubmitResult>();
}

и ваш рабочий поток будет выглядеть примерно так:

public void run() {
 while(true){
  JobSubmitRec submitrec = queue.take();
  byte[] input = submitrec.data;
  byte output = sw.compute(input);
  submitrec.result.put( new JobSubmitResult(output) );
 }           
}

и клиент, отправивший задание, будет выглядеть так:

JobSubmitRec jsr = new JobSubmitRec( data );
queue.put( jsr );
JobSubmitResult result = jsr.result.take();
// use result here
person Trevor Harrison    schedule 18.12.2015