Первоначально опубликовано на codewithhugo.com 21 июля 2018 г.
Добавьте Redux в свою логику очереди: экспресс-установка с ES6 и бычьей очередью
В жизни веб-приложения всегда наступает момент, когда операция лучше всего обслуживается в фоновом режиме, именно здесь возникают очереди.
В Node есть несколько решений для организации очередей. Ни один из них не является до смешного доминирующим, например. Kue, RSMQ, Bee Queue, bull.
Проблема с Kue, RSMQ и Bee Queue заключалась в использовании обратного вызова done
в качестве рекомендуемого API.
Bull https://github.com/OptimalBits/bull - это пакет очереди премиум-класса для обработки заданий и сообщений в NodeJS. Он поддерживается Redis и довольно многофункциональный. Прежде всего, он использует API обработки на основе Promise, что означает async/await
.
Мы рассмотрим приложение, которое отправляет веб-перехватчики с заданной полезной нагрузкой по набору URL-адресов.
Вы можете найти полное содержание кода на https://github.com/HugoDF/express-bull-es6.
- Экспресс-приложение с Redis и воркером 🏃♀️
- Настройка быка 🐮
- Сохранение данных веб-перехватчика с помощью Redis
- Прием полезных нагрузок и их отправка на
- Сохранение данных веб-перехватчика в Redis 💾
- Сохранение данных POST с использованием нового модуля db
- Очередь вакансий 🏭
- Обработка вакансий ⚙️
- Еще кое-что нужно сделать перед отправкой 🚢
Приложение Express с Redis и воркером 🏃♀️
Мы начнем с настройки Node / Redis / Express с использованием docker-compose (полное пошаговое руководство можно найти по адресу
https://codewithhugo.com/setting-up-express-and-redis-with-docker -compose / ),
приложение будет написано с использованием модулей ES (с использованием пакета esm ).
Для начала воспользуемся следующим docker-compose.yml
:
version: '2' services: app: build: . container_name: my-app environment: - NODE_ENV=development - PORT=3000 - REDIS_URL=redis://my-cache command: "sh -c 'npm i && npm run dev'" volumes: - .:/var/www/app links: - redis ports: - "3000:3000"
worker: build: . container_name: my-worker environment: - NODE_ENV=development - PORT=3000 - REDIS_URL=redis://my-cache command: "sh -c 'npm i && npm run worker:dev'" volumes: - .:/var/www/app links: - redis
redis: image: redis container_name: my-cache expose: - "6379"
Нам также понадобится package.json
, как показано ниже:
{
"name": "express-bull-es6",
"version": "1.0.0",
"description": "An Express setup with Redis, bull and ES6",
"main": "server.js",
"scripts": {
"start": "node -r esm server.js",
"dev": "nodemon -r esm server.js",
"worker": "node -r esm worker.js",
"worker:dev": "nodemon -r esm worker.js"
},
"author": "Hugo Di Francesco",
"license": "MIT",
"dependencies": {
"esm": "^3.0.67",
"express": "^4.16.3",
"nodemon": "^1.18.1"
}
}
A server.js
:
import express from 'express';
const app = express();
const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Server listening on port ${PORT}`) });
И worker.js
:
console.log('Worker doing nothing');
Выполнение следующего в командной строке должно дать нам некоторый вывод (через некоторое время, если зависимости необходимо установить):
$ docker-compose up
В итоге:
my-worker | [nodemon] 1.18.1
my-worker | [nodemon] to restart at any time, enter `rs`
my-worker | [nodemon] watching: *.*
my-worker | [nodemon] starting `node -r esm worker.js`
my-app | [nodemon] 1.18.1
my-app | [nodemon] to restart at any time, enter `rs`
my-app | [nodemon] watching: *.*
my-app | [nodemon] starting `node -r esm server.js`
my-worker | Worker doing nothing
my-app | Server listening on port 3000
Настройка быка 🐮
Затем мы хотим добавить bull
для настройки некоторых очередей.
Мы также настроим bull-arena
в качестве веб-интерфейса для отслеживания этих очередей.
Сначала установите bull
и bull-arena
:
npm i --save bull bull-arena
Давайте создадим несколько очередей в queues.js
файле:
import Queue from 'bull';
export const NOTIFY_URL = 'NOTIFY_URL';
export const queues = { [NOTIFY_URL]: new Queue( NOTIFY_URL, process.env.REDIS_URL ) };
И обновите server.js
, чтобы включить bull-arena
пользовательский интерфейс и import
очередь NOTIFY_URL
.
import url from 'url';
import express from 'express'; import Arena from 'bull-arena';
import { queues, NOTIFY_URL } from './queues';
const app = express();
function getRedisConfig(redisUrl) { const redisConfig = url.parse(redisUrl); return { host: redisConfig.hostname || 'localhost', port: Number(redisConfig.port || 6379), database: (redisConfig.pathname || '/0').substr(1) || '0', password: redisConfig.auth ? redisConfig.auth.split(':')[1] : undefined }; }
app.use('/', Arena( { queues: [ { name: NOTIFY_URL, hostId: 'Worker', redis: getRedisConfig(process.env.REDIS_URL) } ] }, { basePath: '/arena', disableListen: true } ));
const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Server listening on port ${PORT}`) });
При сохранении мы сможем открыть http: // localhost: 3000 / arena и увидеть следующее:
Сохранение данных веб-перехватчика с помощью Redis
Принятие полезных данных и их пересылка на
Форма нашего API будет следующей:
Конечная точка POST /webhooks
, которая будет принимать тело JSON POST с массивами payload
и urls
, которое будет отвечать на следующий запрос:
curl -X POST \
http://localhost:3000/webhooks \
-H 'Content-Type: application/json' \
-d '{
"payload": {
"hello": "world"
},
"urls": [
"http://localhost:3000/example",
"http://localhost:3000/example"
]
}'
Конечная точка POST /webhooks/notify
, которая будет принимать тело сообщения JSON POST с полем id
, которое будет отвечать на запрос, подобный следующему:
curl -X POST \
http://localhost:3000/webhooks/notify \
-H 'Content-Type: application/json' \
-d '{
"id": "e5d9f99f-9641-4c0a-b2ca-3b0036c2a9b3"
}'
У нас также будет POST /example
конечная точка, чтобы проверять, действительно ли срабатывают наши веб-перехватчики.
Значит, нам понадобится body-parser
:
npm install --save body-parser
server.js
будет выглядеть следующим образом:
import url from 'url'; import express from 'express'; import bodyParser from 'body-parser'; import Arena from 'bull-arena';
import { queues, NOTIFY_URL } from './queues';
const app = express();
app.use(bodyParser.json());
app.post('/webhooks', (req, res, next) => { const { payload, urls } = req.body; res.json({ payload, urls }); });
app.post('/webhooks/notify', async (req, res, next) => { const { id } = req.body; res.sendStatus(200); });
app.post('/example', (req, res) => { console.log(`Hit example with ${JSON.stringify(req.body)}`); return res.sendStatus(200); });
function getRedisConfig(redisUrl) { const redisConfig = url.parse(redisUrl); return { host: redisConfig.hostname || 'localhost', port: Number(redisConfig.port || 6379), database: (redisConfig.pathname || '/0').substr(1) || '0', password: redisConfig.auth ? redisConfig.auth.split(':')[1] : undefined }; }
app.use('/', Arena( { queues: [ { name: NOTIFY_URL, hostId: 'Worker', redis: getRedisConfig(process.env.REDIS_URL) } ] }, { basePath: '/arena', disableListen: true } ));
const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Server listening on port ${PORT}`) });
Сохранение данных веб-перехватчика в Redis 💾
ioredis
(клиент Redis для Node) будет выбран, чтобы использовать тот факт, что bull
использует ioredis
под капотом:
npm install --save ioredis
Для создания уникальных идентификаторов мы также установим пакет uuid
:
npm install --save uuid
Новый модуль db.js
выглядит следующим образом:
import Redis from 'ioredis'; import { v4 as uuidV4 } from 'uuid';
const redis = new Redis(process.env.REDIS_URL);
const WEBHOOK_PREFIX = 'webhook:'; const PAYLOAD_PREFIX = `${WEBHOOK_PREFIX}payload:`; const URLS_PREFIX = `${WEBHOOK_PREFIX}urls:`;
const makePayloadKey = id => `${PAYLOAD_PREFIX}${id}`; const makeUrlsKey = id => `${URLS_PREFIX}${id}`;
async function setWebhook(payload, urls) { const id = uuidV4(); const transaction = redis.multi() .hmset(makePayloadKey(id), payload) .lpush(makeUrlsKey(id), urls) await transaction.exec(); return id; }
async function getWebhook(id) { const transaction = redis.multi() .hgetall(makePayloadKey(id)) .lrange(makeUrlsKey(id), 0, -1); const [[_, payload], [__, urls]] = await transaction.exec(); return { payload, urls }; }
export const db = { setWebhook, getWebhook };
Полезные данные и URL-адреса моделируются как webhook:payload:<some-uuid>
и webhook:urls:<some-uuid>
соответственно.
Полезные данные - это хэши Redis (поскольку полезные данные - это объект JSON), а URL-адреса - это списки Redis (поскольку мы имеем дело со списком строк).
Мы столкнулись с проблемой, при которой мы хотим убедиться, что мы устанавливаем / получаем payload
и urls
одновременно, отсюда и использование multi()
.
multi
позволяет нам создавать транзакции (операции, которые должны выполняться атомарно).
В этом масштабе (без трафика 😄), учитывая, что мы только каждое добавление (никогда не обновляем) и что мы используем UUID, мы могли бы с таким же успехом иметь не использованные транзакции,
но мы будем хорошими инженерами и будем использовать их в любом случае.
Наиболее сложные линии:
const transaction = redis.multi()
.hgetall(makePayloadKey(id))
.lrange(makeUrlsKey(id), 0, -1);
const [[_, payload], [__, urls]] = await transaction.exec();
Гарантия объяснения:
hgetall
получает все пары ключ-значение в хеше,lrange
получает значения списка, при использовании с1
в качестве начала и-1
в качестве конца, он получает весь списокconst output = await multi().op1().op2().exec()
- Устанавливает вывод в массив возвращаемых значений из
op1
,op2
- Другими словами
output = [ [ errorOp1, replyOp1 ], [ errorOp2, replyOp2 ] ]
- Чтобы отразить это, мы игнорируем ошибки (что не является хорошей практикой) и получаем только ответы
- Лучшим решением было бы сделать:
const [[errPayload, payload], [errUrls, urls]] = await transaction.exec(); if (errPayload) { throw errPayload; } if (errUrls) { throw errUrls } ```
### Saving POST data using the new db module
In `server.js` now looks like the following:
```js import url from 'url'; import express from 'express'; import bodyParser from 'body-parser'; import Arena from 'bull-arena';
import { db } from './db'; import { queues, NOTIFY_URL } from './queues';
const app = express();
app.use(bodyParser.json());
app.post('/webhooks', async (req, res, next) => { const { payload, urls } = req.body; try { const id = await db.setWebhook(payload, urls); return res.json({ id }); } catch (error) { next(error); } });
app.post('/webhooks/notify', async (req, res, next) => { const { id } = req.body; try { const { payload, urls } = await db.getWebhook(id); return res.sendStatus(200); } catch (error) { next(error); } });
app.post('/example', (req, res) => { console.log(`Hit example with ${JSON.stringify(req.body)}`); return res.sendStatus(200); });
function getRedisConfig(redisUrl) { const redisConfig = url.parse(redisUrl); return { host: redisConfig.hostname || 'localhost', port: Number(redisConfig.port || 6379), database: (redisConfig.pathname || '/0').substr(1) || '0', password: redisConfig.auth ? redisConfig.auth.split(':')[1] : undefined }; }
app.use('/', Arena( { queues: [ { name: NOTIFY_URL, hostId: 'Worker', redis: getRedisConfig(process.env.REDIS_URL) } ] }, { basePath: '/arena', disableListen: true } ));
const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Server listening on port ${PORT}`) });
Основные обновления:
app.post('/webhooks', async (req, res, next) => {
const { payload, urls } = req.body;
try {
const id = await db.setWebhook(payload, urls);
return res.json({
id
});
} catch (error) {
next(error);
}
});
и:
app.post('/webhooks/notify', async (req, res, next) => {
const { id } = req.body;
try {
const { payload, urls } = await db.getWebhook(id);
return res.sendStatus(200);
} catch (error) {
next(error);
}
});
Вы заметите, что обработчик POST /webhooks/notify
по-прежнему ничего и никого не уведомляет 🙈.
Очередь вакансий 🏭
Чтобы поставить задания в очередь, мы используем метод queue.add
и передаем ему то, что мы хотим отобразить в job.data
:
queues[NOTIFY_URL].add({
payload,
url,
id
});
Мы хотим отправить запрос на каждый URL независимо (это своего рода точка настройки всей очереди), что означает, что мы хотим:
app.post('/webhooks/notify', async (req, res, next) => {
const { id } = req.body;
try {
const { payload, urls } = await db.getWebhook(id);
urls.forEach(url => {
queues[NOTIFY_URL].add({
payload,
url,
id
});
});
return res.sendStatus(200);
} catch (error) {
next(error);
}
});
Заметное изменение:
urls.forEach(url => {
queues[NOTIFY_URL].add({
payload,
url,
id
});
});
Теперь, когда мы это сделали, если мы создадим новый веб-перехватчик:
curl -X POST \
http://localhost:3000/webhooks \
-H 'Content-Type: application/json' \
-d '{
"payload": {
"hello": "world"
},
"urls": [
"http://localhost:3000/example",
"http://localhost:3000/example"
]
}'
{"id":"5fc395bf-ca2f-4654-a7ac-52f6890d0deb"}
{"id":"5fc395bf-ca2f-4654-a7ac-52f6890d0deb"}
не забудьте скопировать идентификатор для ввода в следующую команду:
curl -X POST \
http://localhost:3000/webhooks/notify \
-H 'Content-Type: application/json' \
-d '{
"id": "5fc395bf-ca2f-4654-a7ac-52f6890d0deb"
}'
OK
Задания были добавлены в очередь, что мы можем проверить, открыв bull-arena
UI по адресу http: // localhost: 3000 / arena / Worker / NOTIFY_URL / wait:
Щелкнув одно из __default__
заданий, мы увидим, что полезная нагрузка, URL-адреса и идентификатор передаются правильно:
Обработка вакансий ⚙️
Теперь мы хотим фактически обработать задания в очереди, то есть пропинговать некоторые URL-адреса с некоторыми данными.
Для этого давайте добавим axios
в качестве HTTP-клиента:
npm install --save axios
Создайте processors.js
файл:
import { NOTIFY_URL } from './queues'; import axios from 'axios';
export const processorInitialisers = { [NOTIFY_URL]: db => job => { console.log(`Posting to ${job.data.url}`); return axios.post(job.data.url, job.data.payload); } }
В некотором контексте причины, по которым мы использовали сигнатуру db => job => Promise
type, хотя в настоящее время нам не нужна БД, - это
, чтобы проиллюстрировать, как я передал бы базу данных или любые другие зависимости в processorInitialiser.
Другой инициализатор процессора может выглядеть следующим образом:
const myOtherProcessorInitialiser = db => async job => {
const webhook = await db.getWebhook(job.data.id);
return Promise.all(
webhook.urls.map(
url => axios.post(url, webhook.payload)
)
);
};
В завершение нам нужно подключить процессоры к очереди, это сделано с помощью queue.process
, поэтому в worker.js
у нас теперь будет:
import { queues } from './queues'; import { processorInitialisers } from './processors'; import { db } from './db';
Object.entries(queues).forEach(([queueName, queue]) => { console.log(`Worker listening to '${queueName}' queue`); queue.process(processorInitialisers[queueName](db)); });
Мы можем протестировать работу веб-перехватчиков, создав тот, который указывает на http://localhost:3000/example
, запустив его с помощью /webhook/notify
и проверив журналы, например:
my-worker | Posting to http://localhost:3000/example
my-app | Hit example with {"hello":"world"}
my-worker | Posting to http://localhost:3000/example
my-app | Hit example with {"hello":"world"}
Еще кое-что, что нужно сделать перед отправкой 🚢
Нам действительно не следует открывать bull-arena
пользовательский интерфейс для публики, поэтому, если вы планируете использовать эту настройку в размещенной среде, выполните следующие действия:
if (process.env.NODE_ENV !== 'product') {
// Bull arena logic
}
Или добавьте к нему базовую аутентификацию HTTP с помощью какого-либо промежуточного программного обеспечения.
Вы можете прочитать более подробную статью об использовании Docker Compose, Redis и Node / Express: https://codewithhugo.com/setting-up-express-and-redis-with-docker-compose/).
Для получения дополнительной информации об использовании esm см .: https://codewithhugo.com/es6-by-example-a-module/cli-to-wait-for-postgres-in-docker-compose/.
✉️ Подпишитесь на рассылку еженедельно Email Blast от CodeBurst 🐦 Подпишитесь на CodeBurst на Twitter , просмотрите 🗺️ Дорожная карта веб-разработчиков на 2018 год и 🕸️ Изучите веб-разработку с полным стеком .