Как разделить ввод сценария оболочки по времени, а не по размеру?

В сценарии bash я использую шаблон «много производителей — один потребитель». Производители — это фоновые процессы, записывающие строки в fifo (через GNU Parallel). Потребитель читает все строки из fifo, затем сортирует, фильтрует и выводит отформатированный результат на стандартный вывод.

Однако до получения полного результата может пройти много времени. Производители обычно быстры на первых нескольких результатах, но затем замедляются. Здесь мне больше интересно видеть куски данных каждые несколько секунд, каждый из которых отсортирован и отфильтрован по отдельности.

mkfifo fifo
parallel ... >"$fifo" &
while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
  process "$chunk"
done

Цикл будет выполняться до тех пор, пока не будут выполнены все производители и не будет прочитан весь ввод. Каждый фрагмент считывается до тех пор, пока не будет новых данных в течение 5 с или пока не пройдет 10 с с момента запуска фрагмента. Чанк также может быть пустым, если в течение 10 секунд не было новых данных.

Я пытался заставить его работать так:

output=$(mktemp)
while true; do
  wasTimeout=0 interruptAt=$(( $(date '+%s') + 10 ))
  while true; do
    IFS= read -r -t5 <>"${fifo}"
    rc="$?"
    if [[ "${rc}" -gt 0 ]]; then
      [[ "${rc}" -gt 128 ]] && wasTimeout=1
      break
    fi
    echo "$REPLY" >>"${output}"
    if [[ $(date '+%s') -ge "${interruptAt}" ]]; then
      wasTimeout=1
      break
    fi
  done
  echo '---' >>"${output}"
  [[ "${wasTimeout}" -eq 0 ]] && break
done

Пробовал некоторые варианты этого. В приведенной выше форме он читает первый фрагмент, но затем зацикливается навсегда. Если я использую <"${fifo}" (без чтения/записи, как указано выше), он блокируется после первого фрагмента. Может быть, все это можно упростить с помощью buffer и/или stdbuf? Но оба они определяют блоки по размеру, а не по времени.


person Werner Lehmann    schedule 26.05.2019    source источник
comment
Наступает момент, когда легче писать код на C, чем в оболочке. Моя интуиция подсказывает, что эта функция фрагментации достигла этой точки. Есть одна проблема, с которой вам придется иметь дело — вам нужен процесс, который удерживает FIFO открытым для чтения (без фактического чтения каких-либо данных), чтобы данные не терялись, когда программа C открывает FIFO, читает фрагмент, закрывает FIFO (возможно, неявно, просто выходя) и возвращается. Другие могут просить отличаться.   -  person Jonathan Leffler    schedule 26.05.2019
comment
@JonathanLeffler: до сих пор Bash мог справиться со всем, что я ему бросил, и эта проблема выглядела безобидной, пока я не попытался ее решить. Интересно, будет ли работать подоболочка только с read -t3, чтение из одного fifo, буферизация чтения, а затем запись в другой. Как это отличить: а) в fifo больше нет писателей, больше нет данных для поступления б) fifo пуст, но позже могут поступить дополнительные данные... Я не хочу запускать еще один read, если для fifo больше нет писателей .   -  person Werner Lehmann    schedule 02.06.2019
comment
Да, это каверзная задача, и ее не легко запрограммировать в сценарии оболочки. Я использую C автоматически; эквивалентные программы наверняка могут быть написаны с использованием C++ и Perl, возможно, также Python и Java (и, вполне вероятно, других языков).   -  person Jonathan Leffler    schedule 04.06.2019


Ответы (4)


Это не тривиальная проблема для решения. Как я уже говорил, программа на C (или программа на каком-либо языке программирования, отличном от оболочки), вероятно, является лучшим решением. Некоторые осложняющие факторы:

  • Чтение с таймаутами.
  • Если данные поступают достаточно быстро, время ожидания изменяется.
  • Different systems have different sets of interval timing functions:
    • alarm() is likely available everywhere, but has only 1-second resolution which is liable to accumulated rounding errors. (Compile this version with make UFLAGS=-DUSE_ALARM; on macOS, use make UFLAGS=-DUSE_ALARM LDLIB2=.)
    • setitimer() использует время в микросекундах и тип struct timeval. (Скомпилируйте эту версию с make UFLAGS=-DUSE_SETITIMER; в macOS скомпилируйте с make UFLAGS=-DUSE_SETITIMER LDLIB2=.)
    • timer_create() и timer_settime() и т. д. используют современный тип наносекунд struct timespec. Это доступно в Linux; он недоступен в macOS 10.14.5 Mojave или более ранних версиях. (Скомпилируйте эту версию с make; она не будет работать в macOS.)

Сообщение об использовании программы:

$ chunker79 -h
Usage: chunker79 [-hvV][-c chunk][-d delay][-f file]
  -c chunk  Maximum time to wait for data in a chunk (default 10)
  -d delay  Maximum delay after line read (default: 5)
  -f file   Read from file instead of standard input
  -h        Print this help message and exit
  -v        Verbose mode: print timing information to stderr
  -V        Print version information and exit

$

Этот код доступен в моем репозитории SOQ (вопросы о переполнении стека) на GitHub в виде файла chunker79.c в папке src/so-5631-4784. Вам также понадобится код поддержки из каталога src/libsoq.

/*
@(#)File:           chunker79.c
@(#)Purpose:        Chunk Reader for SO 5631-4784
@(#)Author:         J Leffler
@(#)Copyright:      (C) JLSS 2019
*/

/*TABSTOP=4*/

/*
** Problem specification from the Stack Overflow question
**
** In a bash script I am using a many-producer single-consumer pattern.
** Producers are background processes writing lines into a fifo (via GNU
** Parallel).  The consumer reads all lines from the fifo, then sorts,
** filters, and prints the formatted result to stdout.
**
** However, it could take a long time until the full result is
** available.  Producers are usually fast on the first few results but
** then would slow down.  Here I am more interested to see chunks of
** data every few seconds, each sorted and filtered individually.
**
**    mkfifo fifo
**    parallel ... >"$fifo" &
**    while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
**      process "$chunk"
**    done
**
** The loop would run until all producers are done and all input is
** read.  Each chunk is read until there has been no new data for 5s, or
** until 10s have passed since the chunk was started.  A chunk may also
** be empty if there was no new data for 10s.
*/

/*
** Analysis
**
** 1.  If no data arrives at all for 10 seconds, then the program should
**     terminate producing no output.  This timeout is controlled by the
**     value of time_chunk in the code.
** 2.  If data arrives more or less consistently, then the collection
**     should continue for 10s and then finish.  This timeout is also
**     controlled by the value of time_chunk in the code.
** 3.  If a line of data arrives before 5 seconds have elapsed, and no
**     more arrives for 5 seconds, then the collection should finish.
**     (If the first line arrives after 5 seconds and no more arrives
**     for more than 5 seconds, then the 10 second timeout cuts in.)
**     This timeout is controlled by the value of time_delay in the code.
** 4.  This means that we want two separate timers at work:
**     - Chunk timer (started when the program starts).
**     - Delay timer (started each time a line is read).
**
** It doesn't matter which timer goes off, but further timer signals
** should be ignored.  External signals will confuse things; tough!
**
** -- Using alarm(2) is tricky because it provides only one time, not two.
** -- Using getitimer(2), setitimer(2) uses obsolescent POSIX functions,
**    but these are available on macOS.
** -- Using timer_create(2), timer_destroy(2), timer_settime(2),
**    timer_gettime(2) uses current POSIX function but is not available
**    on macOS.
*/

#include "posixver.h"

#include "stderr.h"
#include "timespec_io.h"
#include <assert.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/uio.h>
#include <time.h>
#include <unistd.h>

#ifdef USE_SETITIMER
#include "timeval_math.h"
#include "timeval_io.h"
#include <sys/time.h>
#endif /* USE_SETITIMER */

static const char optstr[] = "hvVc:d:f:";
static const char usestr[] = "[-hvV][-c chunk][-d delay][-f file]";
static const char hlpstr[] =
    "  -c chunk  Maximum time to wait for data in a chunk (default 10)\n"
    "  -d delay  Maximum delay after line read (default: 5)\n"
    "  -f file   Read from file instead of standard input\n"
    "  -h        Print this help message and exit\n"
    "  -v        Verbose mode: print timing information to stderr\n"
    "  -V        Print version information and exit\n"
    ;

static struct timespec time_delay = { .tv_sec =  5, .tv_nsec = 0 };
static struct timespec time_chunk = { .tv_sec = 10, .tv_nsec = 0 };
static struct timespec time_start;

static bool verbose = false;

static void set_chunk_timeout(void);
static void set_delay_timeout(void);
static void cancel_timeout(void);
static void alarm_handler(int signum);

// Using signal() manages to set SA_RESTART on a Mac.
// This is allowed by standard C and POSIX, sadly.
// signal(SIGALRM, alarm_handler);

#if defined(USE_ALARM)

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    alarm(time_chunk.tv_sec);
    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    unsigned time_left = alarm(0);
    if (time_left > time_delay.tv_sec)
        alarm(time_delay.tv_sec);
    else
        alarm(time_left);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    alarm(0);
    signal(SIGALRM, SIG_IGN);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

#elif defined(USE_SETITIMER)

static inline struct timeval cvt_timespec_to_timeval(struct timespec ts)
{
    return (struct timeval){ .tv_sec = ts.tv_sec, .tv_usec = ts.tv_nsec / 1000 };
}

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
    tv_new.it_value = cvt_timespec_to_timeval(time_chunk);
    struct itimerval tv_old;
    if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
        err_syserr("failed to set interval timer: ");
    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_until;
    if (getitimer(ITIMER_REAL, &tv_until) != 0)
        err_syserr("failed to set interval timer: ");
    struct timeval tv_delay = cvt_timespec_to_timeval(time_delay);

    if (verbose)
    {
        char buff1[32];
        fmt_timeval(&tv_delay, 6, buff1, sizeof(buff1));
        char buff2[32];
        fmt_timeval(&tv_until.it_value, 6, buff2, sizeof(buff2));
        err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
    }

    if (cmp_timeval(tv_until.it_value, tv_delay) <= 0)
    {
        if (verbose)
            err_remark("---- %s(): no need for delay timer\n", __func__);
    }
    else
    {
        struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
        tv_new.it_value = cvt_timespec_to_timeval(time_delay);
        struct itimerval tv_old;
        if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
            err_syserr("failed to set interval timer: ");
        if (verbose)
            err_remark("---- %s(): set delay timer\n", __func__);
    }
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_new =
    {
        .it_value    = { .tv_sec = 0, .tv_usec = 0 },
        .it_interval = { .tv_sec = 0, .tv_usec = 0 },
    };
    struct itimerval tv_old;
    if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
        err_syserr("failed to set interval timer: ");
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

#else /* USE_TIMER_GETTIME */

#include "timespec_math.h"

static timer_t t0 = { 0 };

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);

    struct sigevent ev =
    {
        .sigev_notify = SIGEV_SIGNAL,
        .sigev_signo = SIGALRM,
        .sigev_value.sival_int = 0,
        .sigev_notify_function = 0,
        .sigev_notify_attributes = 0,
    };
    if (timer_create(CLOCK_REALTIME, &ev, &t0) < 0)
        err_syserr("failed to create a timer: ");

    struct itimerspec it =
    {
        .it_interval = { .tv_sec = 0, .tv_nsec = 0 },
        .it_value = time_chunk,
    };
    struct itimerspec ot;
    if (timer_settime(t0, 0, &it, &ot) != 0)
        err_syserr("failed to activate timer: ");

    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerspec time_until;
    if (timer_gettime(t0, &time_until) != 0)
        err_syserr("failed to set per-process timer: ");

    char buff1[32];
    fmt_timespec(&time_delay, 6, buff1, sizeof(buff1));
    char buff2[32];
    fmt_timespec(&time_until.it_value, 6, buff2, sizeof(buff2));
    err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);

    if (cmp_timespec(time_until.it_value, time_delay) <= 0)
    {
        if (verbose)
            err_remark("---- %s(): no need for delay timer\n", __func__);
    }
    else
    {
        struct itimerspec time_new =
        {
            .it_interval = { .tv_sec = 0, .tv_nsec = 0 },
            .it_value = time_delay,
        };
        struct itimerspec time_old;
        if (timer_settime(t0, 0, &time_new, &time_old) != 0)
            err_syserr("failed to set per-process timer: ");
        if (verbose)
            err_remark("---- %s(): set delay timer\n", __func__);
    }
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (timer_delete(t0) != 0)
        err_syserr("failed to delete timer: ");
}

#endif /* Timing mode */

/* Writing to stderr via err_remark() is not officially supported */
static void alarm_handler(int signum)
{
    assert(signum == SIGALRM);
    if (verbose)
        err_remark("---- %s(): signal %d\n", __func__, signum);
}

static void read_chunks(FILE *fp)
{
    size_t num_data = 0;
    size_t max_data = 0;
    struct iovec *data = 0;
    size_t buflen = 0;
    char *buffer = 0;
    ssize_t length;
    size_t chunk_len = 0;

    clock_gettime(CLOCK_REALTIME, &time_start);

    set_chunk_timeout();
    while ((length = getline(&buffer, &buflen, fp)) != -1)
    {
        if (num_data >= max_data)
        {
            size_t new_size = (num_data * 2) + 2;
            void *newspace = realloc(data, new_size * sizeof(data[0]));
            if (newspace == 0)
                err_syserr("failed to allocate %zu bytes data: ", new_size * sizeof(data[0]));
            data = newspace;
            max_data = new_size;
        }
        data[num_data].iov_base = buffer;
        data[num_data].iov_len = length;
        num_data++;
        if (verbose)
            err_remark("Received line %zu\n", num_data);
        chunk_len += length;
        buffer = 0;
        buflen = 0;
        set_delay_timeout();
    }
    cancel_timeout();

    if (chunk_len > 0)
    {
        if ((length = writev(STDOUT_FILENO, data, num_data)) < 0)
            err_syserr("failed to write %zu bytes to standard output: ", chunk_len);
        else if ((size_t)length != chunk_len)
            err_error("failed to write %zu bytes to standard output "
                      "(short write of %zu bytes)\n", chunk_len, (size_t)length);
    }

    if (verbose)
        err_remark("---- %s(): data written (%zu bytes)\n", __func__, length);

    for (size_t i = 0; i < num_data; i++)
        free(data[i].iov_base);
    free(data);
    free(buffer);
}

int main(int argc, char **argv)
{
    const char *name = "(standard input)";
    FILE *fp = stdin;
    err_setarg0(argv[0]);
    err_setlogopts(ERR_MICRO);

    int opt;
    while ((opt = getopt(argc, argv, optstr)) != -1)
    {
        switch (opt)
        {
        case 'c':
            if (scn_timespec(optarg, &time_chunk) != 0)
                err_error("Failed to convert '%s' into a time value\n", optarg);
            break;
        case 'd':
            if (scn_timespec(optarg, &time_delay) != 0)
                err_error("Failed to convert '%s' into a time value\n", optarg);
            break;
        case 'f':
            if ((fp = fopen(optarg, "r")) == 0)
                err_syserr("Failed to open file '%s' for reading: ", optarg);
            name = optarg;
            break;
        case 'h':
            err_help(usestr, hlpstr);
            /*NOTREACHED*/
        case 'v':
            verbose = true;
            break;
        case 'V':
            err_version("CHUNKER79", &"@(#)$Revision$ ($Date$)"[4]);
            /*NOTREACHED*/
        default:
            err_usage(usestr);
            /*NOTREACHED*/
        }
    }

    if (optind != argc)
        err_usage(usestr);

    if (verbose)
    {
        err_remark("chunk: %3lld.%09ld\n", (long long)time_chunk.tv_sec, time_chunk.tv_nsec);
        err_remark("delay: %3lld.%09ld\n", (long long)time_delay.tv_sec, time_delay.tv_nsec);
        err_remark("file:  %s\n", name);
    }

    read_chunks(fp);

    return 0;
}

В моем репозитории SOQ также есть скрипт gen-data.sh, который использует некоторые пользовательские программы для создания потока данных, такого как этот (начальное значение записывается в стандартную ошибку, а не в стандартный вывод):

$ gen-data.sh
# Seed: 1313715286
2019-06-03 23:04:16.653: Zunmieoprri Rdviqymcho 5878 2017-03-29 03:59:15 Udransnadioiaeamprirteo
2019-06-03 23:04:18.525: Rndflseoevhgs Etlaevieripeoetrnwkn 9500 2015-12-18 10:49:15 Ebyrcoebeezatiagpleieoefyc
2019-06-03 23:04:20.526: Nrzsuiakrooab Nbvliinfqidbujoops 1974 2020-05-13 08:05:14 Lgithearril
2019-06-03 23:04:21.777: Eeagop Aieneose 6533 2016-11-06 22:51:58 Aoejlwebbssroncmeovtuuueigraa
2019-06-03 23:04:23.876: Izirdoeektau Atesltiybysaclee 4557 2020-09-13 02:24:46 Igrooiaauiwtna
2019-06-03 23:04:26.145: Yhioit Eamrexuabagsaraiw 9703 2014-09-13 07:44:12 Dyiiienglolqopnrbneerltnmsdn
^C
$

При вводе в chunker79 с параметрами по умолчанию я получаю вывод, например:

$ gen-data.sh | chunker79
# Seed: 722907235
2019-06-03 23:06:20.570: Aluaezkgiebeewal Oyvahee 1022 2015-08-12 07:45:54 Weuababeeduklleym
2019-06-03 23:06:24.100: Gmujvoyevihvoilc Negeiiuvleem 8196 2015-08-29 21:15:15 Nztkrvsadeoeagjgoyotvertavedi
$

Если вы проанализируете временные интервалы (посмотрите на первые два поля в строках вывода), этот вывод соответствует спецификации. Еще более подробный анализ показывает:

$ timecmd -mr -- gen-data.sh | timecmd -mr -- chunker79
2019-06-03 23:09:14.246 [PID 57159] gen-data.sh
2019-06-03 23:09:14.246 [PID 57160] chunker79
# Seed: -1077610201
2019-06-03 23:09:14.269: Woreio Rdtpimvoscttbyhxim 7893 2017-03-12 12:46:57 Uywaietirkekes
2019-06-03 23:09:16.939: Uigaba Nzoxdeuisofai 3630 2017-11-16 09:28:59 Jnsncgoesycsevdscugoathusaoq
2019-06-03 23:09:17.845: Sscreua Aloaoonnsuur 5163 2016-08-13 19:47:15 Injhsiifqovbnyeooiimitaaoir
2019-06-03 23:09:19.272 [PID 57160; status 0x0000]  -  5.026s  -  chunker79
2019-06-03 23:09:22.084 [PID 57159; status 0x8D00]  -  7.838s  -  gen-data.sh
$

В этой настройке есть заметная пауза между появлением вывода из chunker79 и завершением gen-data.sh. Это связано с тем, что Bash ожидает завершения всех процессов в конвейере, а gen-data.sh не завершается до тех пор, пока в следующий раз он не запишет в конвейер сообщение, заканчивающееся chunker79. Это артефакт этой тестовой установки; это не будет фактором в сценарии оболочки, изложенном в вопросе.

person Jonathan Leffler    schedule 04.06.2019
comment
Ух ты. Это намного больше, чем я мог даже надеяться. Я проверю это сегодня вечером. - person Werner Lehmann; 06.06.2019
comment
После того, как я обманул себя несколько раз, я смог подтвердить, что это работает так, как рекламируется, так что еще раз спасибо. Я не мог использовать gendata.sh (дрибблер?), но для этого хватило чего-то простого: while sleep 1s; do echo $(( c++ )); done | while chunk=$(./chunker79); do echo -e "${chunk}\n---"; done (и его вариации). Все еще нужно решить, вызывать ли его снова после отсутствия данных (есть ли еще процесс записи для fifo, если последний чанк был пуст?). Но я просто добавлю маркер EOF и проверю это. - person Werner Lehmann; 07.06.2019
comment
Все три программы, используемые gen-data.sh (random, dribbler, tstamp), написаны мной на C (хотя я не удивлюсь, если найду других людей с другими программами под названием random). Код доступен по запросу — см. мой профиль. Программа random имеет довольно хорошее ядро, но еще не имеет хорошего интерфейса. - person Jonathan Leffler; 07.06.2019
comment
Вы можете создать простую программу (конечно, на C, возможно, даже в оболочке — GNU timeout может помочь), которая пытается открыть FIFO, но терпит неудачу, если для успеха требуется более N секунд (где N может быть 1 или, возможно, 5). . Это означает, что нет процесса с открытым для записи FIFO. Вам, вероятно, нужен процесс, скрывающийся в фоновом режиме с открытым для чтения FIFO — вы убьете этого скрытня, когда не обнаружите писателей. И, может быть, вы удалите FIFO. - person Jonathan Leffler; 07.06.2019

Я бы подумал о написании безопасной многопоточной программы с очередями.

Я лучше знаю Java, но могут быть более современные подходящие языки, такие как Go и Kotlin.

person Dudi Boy    schedule 26.05.2019
comment
На самом деле я сначала рассмотрел Java. Здесь я взаимодействую с библиотекой C, и мне нужно было бы перенести ее на Java или начать работу с JNI. Оба варианта показались не слишком приятными. Может быть, когда-нибудь попробую Rust. - person Werner Lehmann; 02.06.2019

Что-то вроде этого:

#!/usr/bin/perl

$timeout = 3;
while(<STDIN>) {
    # Make sure there is some input                                                      
    push @out,$_;
    eval {
        local $SIG{ALRM} = sub { die };
        alarm $timeout;
        while(<STDIN>) {
            alarm $timeout;
            push @out,$_;
        }
        alarm 0;
    };
    system "echo","process",@out;
}
person Ole Tange    schedule 27.05.2019
comment
спасибо за предложение (и за проведение параллелей :). Я не возражаю против использования Perl, пока мне не нужно понимать все это... Я попробовал вышеизложенное, и у него есть некоторые проблемы. Рассмотрим это для теста: c=0; while sleep 1s; do echo $(( c++ )); done | ./x.pl. Он должен печатать все, что он получил, максимум через 3 с (или 10 с, если ввод не получен), но ничего не выводит. Измените сон на 4 с, и теперь тайм-аут работает, но каждый вывод повторяет предыдущий вывод плюс новый ввод из стандартного ввода. - person Werner Lehmann; 02.06.2019

GNU Parallel 20200122 представил --blocktimeout (--bt):

find ~ | parallel -j3 --bt 2s --pipe wc

Это работает как обычный GNU Parallel, за исключением случаев, когда заполнение блока занимает> 2 секунд. В этом случае прочитанный блок просто передается в wc (если он не пустой).

У него немного странное поведение при запуске: вам нужно подождать 3 * 2 секунды (рабочие слоты * тайм-аут), прежде чем вывод стабилизируется, и вы получаете вывод как минимум каждые 2 секунды.

person Ole Tange    schedule 26.05.2020