Первоначально опубликовано на codewithhugo.com 21 июля 2018 г.

Добавьте Redux в свою логику очереди: экспресс-установка с ES6 и бычьей очередью

В жизни веб-приложения всегда наступает момент, когда операция лучше всего обслуживается в фоновом режиме, именно здесь возникают очереди.

В Node есть несколько решений для организации очередей. Ни один из них не является до смешного доминирующим, например. Kue, RSMQ, Bee Queue, bull.
Проблема с Kue, RSMQ и Bee Queue заключалась в использовании обратного вызова done в качестве рекомендуемого API.

Bull https://github.com/OptimalBits/bull - это пакет очереди премиум-класса для обработки заданий и сообщений в NodeJS. Он поддерживается Redis и довольно многофункциональный. Прежде всего, он использует API обработки на основе Promise, что означает async/await.

Мы рассмотрим приложение, которое отправляет веб-перехватчики с заданной полезной нагрузкой по набору URL-адресов.

Вы можете найти полное содержание кода на https://github.com/HugoDF/express-bull-es6.

Приложение Express с Redis и воркером 🏃‍♀️

Мы начнем с настройки Node / Redis / Express с использованием docker-compose (полное пошаговое руководство можно найти по адресу
https://codewithhugo.com/setting-up-express-and-redis-with-docker -compose / ),
приложение будет написано с использованием модулей ES (с использованием пакета esm ).

Для начала воспользуемся следующим docker-compose.yml:

version: '2'
services:
    app:
        build: .
        container_name: my-app
        environment:
            - NODE_ENV=development
            - PORT=3000
            - REDIS_URL=redis://my-cache
        command: "sh -c 'npm i && npm run dev'"
        volumes:
            - .:/var/www/app
        links:
            - redis
        ports:
            - "3000:3000"
    worker:
        build: .
        container_name: my-worker
        environment:
            - NODE_ENV=development
            - PORT=3000
            - REDIS_URL=redis://my-cache
        command: "sh -c 'npm i && npm run worker:dev'"
        volumes:
            - .:/var/www/app
        links:
            - redis
    redis:
        image: redis
        container_name: my-cache
        expose:
            - "6379"

Нам также понадобится package.json, как показано ниже:

{
  "name": "express-bull-es6",
  "version": "1.0.0",
  "description": "An Express setup with Redis, bull and ES6",
  "main": "server.js",
  "scripts": {
    "start": "node -r esm server.js",
    "dev": "nodemon -r esm server.js",
    "worker": "node -r esm worker.js",
    "worker:dev": "nodemon -r esm worker.js"
  },
  "author": "Hugo Di Francesco",
  "license": "MIT",
  "dependencies": {
    "esm": "^3.0.67",
    "express": "^4.16.3",
    "nodemon": "^1.18.1"
  }
}

A server.js:

import express from 'express';
const app = express();
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`)
});

И worker.js:

console.log('Worker doing nothing');

Выполнение следующего в командной строке должно дать нам некоторый вывод (через некоторое время, если зависимости необходимо установить):

$ docker-compose up

В итоге:

my-worker | [nodemon] 1.18.1
my-worker | [nodemon] to restart at any time, enter `rs`
my-worker | [nodemon] watching: *.*
my-worker | [nodemon] starting `node -r esm worker.js`
my-app    | [nodemon] 1.18.1
my-app    | [nodemon] to restart at any time, enter `rs`
my-app    | [nodemon] watching: *.*
my-app    | [nodemon] starting `node -r esm server.js`
my-worker | Worker doing nothing
my-app    | Server listening on port 3000

Настройка быка 🐮

Затем мы хотим добавить bull для настройки некоторых очередей.
Мы также настроим bull-arena в качестве веб-интерфейса для отслеживания этих очередей.

Сначала установите bull и bull-arena:

npm i --save bull bull-arena

Давайте создадим несколько очередей в queues.js файле:

import Queue from 'bull';
export const NOTIFY_URL = 'NOTIFY_URL';
export const queues = {
  [NOTIFY_URL]: new Queue(
    NOTIFY_URL,
    process.env.REDIS_URL
  )
};

И обновите server.js, чтобы включить bull-arena пользовательский интерфейс и import очередь NOTIFY_URL.

import url from 'url';
import express from 'express';
import Arena from 'bull-arena';
import { queues, NOTIFY_URL } from './queues';
const app = express();

function getRedisConfig(redisUrl) {
  const redisConfig = url.parse(redisUrl);
  return {
    host: redisConfig.hostname || 'localhost',
    port: Number(redisConfig.port || 6379),
    database: (redisConfig.pathname || '/0').substr(1) || '0',
    password: redisConfig.auth ? redisConfig.auth.split(':')[1] : undefined
  };
}
app.use('/', Arena(
  {
    queues: [
      {
        name: NOTIFY_URL,
        hostId: 'Worker',
        redis: getRedisConfig(process.env.REDIS_URL)
      }
    ]
  },
  {
    basePath: '/arena',
    disableListen: true
  }
));
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`)
});

При сохранении мы сможем открыть http: // localhost: 3000 / arena и увидеть следующее:

Сохранение данных веб-перехватчика с помощью Redis

Принятие полезных данных и их пересылка на

Форма нашего API будет следующей:
Конечная точка POST /webhooks, которая будет принимать тело JSON POST с массивами payload и urls, которое будет отвечать на следующий запрос:

curl -X POST \
  http://localhost:3000/webhooks \
  -H 'Content-Type: application/json' \
  -d '{
    "payload": {
        "hello": "world"
    },
    "urls": [
        "http://localhost:3000/example",
        "http://localhost:3000/example"
    ]
}'

Конечная точка POST /webhooks/notify, которая будет принимать тело сообщения JSON POST с полем id, которое будет отвечать на запрос, подобный следующему:

curl -X POST \
  http://localhost:3000/webhooks/notify \
  -H 'Content-Type: application/json' \
  -d '{
    "id": "e5d9f99f-9641-4c0a-b2ca-3b0036c2a9b3"
}'

У нас также будет POST /example конечная точка, чтобы проверять, действительно ли срабатывают наши веб-перехватчики.

Значит, нам понадобится body-parser:

npm install --save body-parser

server.js будет выглядеть следующим образом:

import url from 'url';
import express from 'express';
import bodyParser from 'body-parser';
import Arena from 'bull-arena';
import { queues, NOTIFY_URL } from './queues';
const app = express();
app.use(bodyParser.json());
app.post('/webhooks', (req, res, next) => {
  const { payload, urls } = req.body;
  res.json({
    payload,
    urls
  });
});
app.post('/webhooks/notify', async (req, res, next) => {
  const { id } = req.body;
  res.sendStatus(200);
});
app.post('/example', (req, res) => {
  console.log(`Hit example with ${JSON.stringify(req.body)}`);
  return res.sendStatus(200);
});
function getRedisConfig(redisUrl) {
  const redisConfig = url.parse(redisUrl);
  return {
    host: redisConfig.hostname || 'localhost',
    port: Number(redisConfig.port || 6379),
    database: (redisConfig.pathname || '/0').substr(1) || '0',
    password: redisConfig.auth ? redisConfig.auth.split(':')[1] : undefined
  };
}
app.use('/', Arena(
  {
    queues: [
      {
        name: NOTIFY_URL,
        hostId: 'Worker',
        redis: getRedisConfig(process.env.REDIS_URL)
      }
    ]
  },
  {
    basePath: '/arena',
    disableListen: true
  }
));
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`)
});

Сохранение данных веб-перехватчика в Redis 💾

ioredis (клиент Redis для Node) будет выбран, чтобы использовать тот факт, что bull использует ioredis под капотом:

npm install --save ioredis

Для создания уникальных идентификаторов мы также установим пакет uuid:

npm install --save uuid

Новый модуль db.js выглядит следующим образом:

import Redis from 'ioredis';
import { v4 as uuidV4 } from 'uuid';
const redis = new Redis(process.env.REDIS_URL);
const WEBHOOK_PREFIX = 'webhook:';
const PAYLOAD_PREFIX = `${WEBHOOK_PREFIX}payload:`;
const URLS_PREFIX = `${WEBHOOK_PREFIX}urls:`;
const makePayloadKey = id => `${PAYLOAD_PREFIX}${id}`;
const makeUrlsKey = id => `${URLS_PREFIX}${id}`;
async function setWebhook(payload, urls) {
  const id = uuidV4();
  const transaction = redis.multi()
    .hmset(makePayloadKey(id), payload)
    .lpush(makeUrlsKey(id), urls)
  await transaction.exec();
  return id;
}
async function getWebhook(id) {
  const transaction = redis.multi()
    .hgetall(makePayloadKey(id))
    .lrange(makeUrlsKey(id), 0, -1);
  const [[_, payload], [__, urls]] = await transaction.exec();
  return {
    payload,
    urls
  };
}
export const db = {
  setWebhook,
  getWebhook
};

Полезные данные и URL-адреса моделируются как webhook:payload:<some-uuid> и webhook:urls:<some-uuid> соответственно.

Полезные данные - это хэши Redis (поскольку полезные данные - это объект JSON), а URL-адреса - это списки Redis (поскольку мы имеем дело со списком строк).

Мы столкнулись с проблемой, при которой мы хотим убедиться, что мы устанавливаем / получаем payload и urls одновременно, отсюда и использование multi().

multi позволяет нам создавать транзакции (операции, которые должны выполняться атомарно).
В этом масштабе (без трафика 😄), учитывая, что мы только каждое добавление (никогда не обновляем) и что мы используем UUID, мы могли бы с таким же успехом иметь не использованные транзакции,
но мы будем хорошими инженерами и будем использовать их в любом случае.

Наиболее сложные линии:

const transaction = redis.multi()
  .hgetall(makePayloadKey(id))
  .lrange(makeUrlsKey(id), 0, -1);
const [[_, payload], [__, urls]] = await transaction.exec();

Гарантия объяснения:

  1. hgetall получает все пары ключ-значение в хеше,
  2. lrange получает значения списка, при использовании с 1 в качестве начала и -1 в качестве конца, он получает весь список
  3. const output = await multi().op1().op2().exec()
  • Устанавливает вывод в массив возвращаемых значений из op1, op2
  • Другими словами output = [ [ errorOp1, replyOp1 ], [ errorOp2, replyOp2 ] ]
  • Чтобы отразить это, мы игнорируем ошибки (что не является хорошей практикой) и получаем только ответы
  • Лучшим решением было бы сделать:
const [[errPayload, payload], [errUrls, urls]] = await transaction.exec();
    if (errPayload) {
      throw errPayload;
    }
    if (errUrls) {
      throw errUrls
    }
    ```
### Saving POST data using the new db module 
In `server.js` now looks like the following:

```js
import url from 'url';
import express from 'express';
import bodyParser from 'body-parser';
import Arena from 'bull-arena';
import { db } from './db';
import { queues, NOTIFY_URL } from './queues';
const app = express();
app.use(bodyParser.json());
app.post('/webhooks', async (req, res, next) => {
  const { payload, urls } = req.body;
  try {
    const id = await db.setWebhook(payload, urls);
    return res.json({
      id
    });
  } catch (error) {
    next(error);
  }
});
app.post('/webhooks/notify', async (req, res, next) => {
  const { id } = req.body;
  try {
    const { payload, urls } = await db.getWebhook(id);
    return res.sendStatus(200);
  } catch (error) {
    next(error);
  }
});
app.post('/example', (req, res) => {
  console.log(`Hit example with ${JSON.stringify(req.body)}`);
  return res.sendStatus(200);
});
function getRedisConfig(redisUrl) {
  const redisConfig = url.parse(redisUrl);
  return {
    host: redisConfig.hostname || 'localhost',
    port: Number(redisConfig.port || 6379),
    database: (redisConfig.pathname || '/0').substr(1) || '0',
    password: redisConfig.auth ? redisConfig.auth.split(':')[1] : undefined
  };
}
app.use('/', Arena(
  {
    queues: [
      {
        name: NOTIFY_URL,
        hostId: 'Worker',
        redis: getRedisConfig(process.env.REDIS_URL)
      }
    ]
  },
  {
    basePath: '/arena',
    disableListen: true
  }
));
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`)
});

Основные обновления:

app.post('/webhooks', async (req, res, next) => {
  const { payload, urls } = req.body;
  try {
    const id = await db.setWebhook(payload, urls);
    return res.json({
      id
    });
  } catch (error) {
    next(error);
  }
});

и:

app.post('/webhooks/notify', async (req, res, next) => {
  const { id } = req.body;
  try {
    const { payload, urls } = await db.getWebhook(id);
    return res.sendStatus(200);
  } catch (error) {
    next(error);
  }
});

Вы заметите, что обработчик POST /webhooks/notify по-прежнему ничего и никого не уведомляет 🙈.

Очередь вакансий 🏭

Чтобы поставить задания в очередь, мы используем метод queue.add и передаем ему то, что мы хотим отобразить в job.data:

queues[NOTIFY_URL].add({
  payload,
  url,
  id
});

Мы хотим отправить запрос на каждый URL независимо (это своего рода точка настройки всей очереди), что означает, что мы хотим:

app.post('/webhooks/notify', async (req, res, next) => {
  const { id } = req.body;
  try {
    const { payload, urls } = await db.getWebhook(id);
    urls.forEach(url => {
      queues[NOTIFY_URL].add({
        payload,
        url,
        id
      });
    });
    return res.sendStatus(200);
  } catch (error) {
    next(error);
  }
});

Заметное изменение:

urls.forEach(url => {
  queues[NOTIFY_URL].add({
    payload,
    url,
    id
  });
});

Теперь, когда мы это сделали, если мы создадим новый веб-перехватчик:

curl -X POST \
  http://localhost:3000/webhooks \
  -H 'Content-Type: application/json' \
  -d '{
        "payload": {
                "hello": "world"
        },
        "urls": [
                "http://localhost:3000/example",
                "http://localhost:3000/example"
        ]
}'
{"id":"5fc395bf-ca2f-4654-a7ac-52f6890d0deb"}

{"id":"5fc395bf-ca2f-4654-a7ac-52f6890d0deb"} не забудьте скопировать идентификатор для ввода в следующую команду:

curl -X POST \
  http://localhost:3000/webhooks/notify \
  -H 'Content-Type: application/json' \
  -d '{
    "id": "5fc395bf-ca2f-4654-a7ac-52f6890d0deb"
}'
OK

Задания были добавлены в очередь, что мы можем проверить, открыв bull-arena UI по адресу http: // localhost: 3000 / arena / Worker / NOTIFY_URL / wait:

Щелкнув одно из __default__ заданий, мы увидим, что полезная нагрузка, URL-адреса и идентификатор передаются правильно:

Обработка вакансий ⚙️

Теперь мы хотим фактически обработать задания в очереди, то есть пропинговать некоторые URL-адреса с некоторыми данными.

Для этого давайте добавим axios в качестве HTTP-клиента:

npm install --save axios

Создайте processors.js файл:

import { NOTIFY_URL } from './queues';
import axios from 'axios';
export const processorInitialisers = {
  [NOTIFY_URL]: db => job => {
    console.log(`Posting to ${job.data.url}`);
    return axios.post(job.data.url, job.data.payload);
  }
}

В некотором контексте причины, по которым мы использовали сигнатуру db => job => Promisetype, хотя в настоящее время нам не нужна БД, - это
, чтобы проиллюстрировать, как я передал бы базу данных или любые другие зависимости в processorInitialiser.

Другой инициализатор процессора может выглядеть следующим образом:

const myOtherProcessorInitialiser = db => async job => {
  const webhook = await db.getWebhook(job.data.id);
  return Promise.all(
    webhook.urls.map(
      url => axios.post(url, webhook.payload)
    )
  );
};

В завершение нам нужно подключить процессоры к очереди, это сделано с помощью queue.process, поэтому в worker.js у нас теперь будет:

import { queues } from './queues';
import { processorInitialisers } from './processors';
import { db } from './db';
Object.entries(queues).forEach(([queueName, queue]) => {
  console.log(`Worker listening to '${queueName}' queue`);
  queue.process(processorInitialisers[queueName](db));
});

Мы можем протестировать работу веб-перехватчиков, создав тот, который указывает на http://localhost:3000/example, запустив его с помощью /webhook/notify и проверив журналы, например:

my-worker | Posting to http://localhost:3000/example
my-app    | Hit example with {"hello":"world"}
my-worker | Posting to http://localhost:3000/example
my-app    | Hit example with {"hello":"world"}

Еще кое-что, что нужно сделать перед отправкой 🚢

Нам действительно не следует открывать bull-arena пользовательский интерфейс для публики, поэтому, если вы планируете использовать эту настройку в размещенной среде, выполните следующие действия:

if (process.env.NODE_ENV !== 'product') {
  // Bull arena logic
}

Или добавьте к нему базовую аутентификацию HTTP с помощью какого-либо промежуточного программного обеспечения.

Вы можете прочитать более подробную статью об использовании Docker Compose, Redis и Node / Express: https://codewithhugo.com/setting-up-express-and-redis-with-docker-compose/).

Для получения дополнительной информации об использовании esm см .: https://codewithhugo.com/es6-by-example-a-module/cli-to-wait-for-postgres-in-docker-compose/.

✉️ Подпишитесь на рассылку еженедельно Email Blast от CodeBurst 🐦 Подпишитесь на CodeBurst на Twitter , просмотрите 🗺️ Дорожная карта веб-разработчиков на 2018 год и 🕸️ Изучите веб-разработку с полным стеком .