Отправка сообщений CQRS в Node.js

Я хочу выполнить CQRS для приложения Node.

Я не специалист по Node, я из .NET, у которого есть отличная библиотека под названием MediatR, которая отправляет команды / запросы посреднику, который можно использовать для разделения запросов и обработчиков. Таким образом, он позволяет очень простой и элегантный CQRS.

В мире Node я нашел много библиотек / блогов, но они всегда включают и Event Sourcing. Меня не интересует ES.

Я могу хорошо моделировать команды и запросы, но что тогда? Их нужно куда-то отправлять, чтобы избежать беспорядка.

Из того, что я знаю о платформе Node, возможное решение - использовать шаблон наблюдателя (через библиотеку RxJs), чтобы контроллер мог отправлять сообщения (то есть запросы CQRS) наблюдателю, который затем публикует соответствующие события для подписчиков ( т.е. обработчики запросов). Это разделяет контроллеры и службы в DDD-подобном дизайне. Хотя я не уверен, как передать результаты обратно в контроллер.

Это то, как это делают другие люди? Есть ли способ лучше в Node?


person lonix    schedule 17.12.2018    source источник
comment
Вы хотите сохранить общение внутри процесса или хотите иметь какие-то другие услуги?   -  person Pierre Criulanscy    schedule 17.12.2018
comment
@PierreCriulanscy Я предпочитаю, чтобы он был простым и имел только один процесс (например, узел) и избегал rabbitmq, redis и т. Д.   -  person lonix    schedule 17.12.2018
comment
Таким образом, вы можете использовать redux, очень легкий, вы можете отправлять команды и перехватывать их в redux промежуточном программном обеспечении. Не знаю, знакомы ли вы с redux или нет.   -  person Pierre Criulanscy    schedule 17.12.2018
comment
Вы также можете взглянуть на wolkenkit, но это тоже источник событий.   -  person Pierre Criulanscy    schedule 17.12.2018
comment
@PierreCriulanscy К сожалению, ничего подобного еще нет. Я исследую redux ... надеюсь, это достаточно хорошо. Спасибо.   -  person lonix    schedule 17.12.2018
comment
Для использования архитектуры CQRS не нужна сложная структура, redux очень хорошо подходит :)   -  person Pierre Criulanscy    schedule 18.12.2018


Ответы (1)


TL: DR: для применения архитектуры CQRS вам не нужна какая-то причудливая структура, особенно когда вы осуществляете только внутрипроцессное взаимодействие. Достаточно собственного EventEmitter из модуля events. Если вам нужна межпроцессная связь, servicebus отлично подойдет. Чтобы взглянуть на пример реализации (следующего ответа длинной версии), вы можете погрузиться в код этого репозитория: простые узлы cqrs

Давайте возьмем пример очень простого приложения для чата, в котором вы можете отправлять сообщения, если чат не закрыт, и лайкать / отличать сообщения.

Наш главный агрегат (или агрегированный корень, концептуально) - это Chat (writeModel/domain/chat.js):

const Chat = ({ id, isClosed } = {}) =>
  Object.freeze({
    id,
    isClosed,
  });

Затем у нас есть Message агрегат (writeModel/domain/message.js):

const Message = ({ id, chatId, userId, content, sentAt, messageLikes = [] } = {}) =>
  Object.freeze({
    id,
    chatId,
    userId,
    content,
    sentAt,
    messageLikes,
  });

Поведение для отправки сообщения может быть (writeModel/domain/chat.js):

const invariant = require('invariant');
const { Message } = require('./message');

const Chat = ({ id, isClosed } = {}) =>
  Object.freeze({
    id,
    isClosed,
  });

const sendMessage = ({ chatState, messageId, userId, content, sentAt }) => {
  invariant(!chatState.isClosed, "can't post in a closed chat");
  return Message({ id: messageId, chatId: chatState.id, userId, content, sentAt });
};

Теперь нам нужны команды (writeModel/domain/commands.js):

const commands = {
  types: {
    SEND_MESSAGE: '[chat] send a message',
  },
  sendMessage({ chatId, userId, content, sentAt }) {
    return Object.freeze({
      type: commands.types.SEND_MESSAGE,
      payload: {
        chatId,
        userId,
        content,
        sentAt,
      },
    });
  },
};

module.exports = {
  commands,
};

Поскольку мы используем javascript, у нас нет interface для предоставления абстракции, поэтому мы используем higher order functions (writeModel/domain/getChatOfId.js):

const { Chat } = require('./message');

const getChatOfId = (getChatOfId = async id => Chat({ id })) => async id => {
  try {
    const chatState = await getChatOfId(id);
    if (typeof chatState === 'undefined') {
      throw chatState;
    }
    return chatState;
  } catch (e) {
    throw new Error(`chat with id ${id} was not found`);
  }
};

module.exports = {
  getChatOfId,
};

(writeModel/domain/saveMessage.js) :

const { Message } = require('./message');

const saveMessage = (saveMessage = async (messageState = Message()) => {}) => saveMessage;

module.exports = {
  saveMessage,
};

Теперь нам нужно реализовать наш commandHandlers (уровень службы приложения):

(writeModel/commandHandlers/handleSendMessage.js)

const { sendMessage } = require('../domain/chat');

const handleSendMessage = ({
  getChatOfId,
  getNextMessageId,
  saveMessage,
}) => async sendMessageCommandPayload => {
  const { chatId, userId, content, sentAt } = sendMessageCommandPayload;
  const chat = await getChatOfId(chatId);
  return saveMessage(
    sendMessage({
      chatState: chat,
      messageId: getNextMessageId(),
      userId,
      content,
      sentAt,
    }),
  );
};

module.exports = {
  handleSendMessage,
};

Поскольку у нас нет interface в javascript, мы используем higher order functions для применения принципа инверсии зависимостей посредством внедрения зависимостей во время выполнения.

Затем мы можем реализовать корень композиции модели записи: (`writeModel / index.js):

const { handleSendMessage } = require('./commandHandlers/handleSendMessage');
const { commands } = require('./domain/commands');

const SimpleNodeCQRSwriteModel = ({
  dispatchCommand,
  handleCommand,
  getChatOfId,
  getNextMessageId,
  saveMessage,
}) => {
  handleCommand(
    commands.types.SEND_MESSAGE,
    handleSendMessage({ getChatOfId, getNextMessageId, saveMessage }),
  );
};

module.exports = {
  SimpleNodeCQRSwriteModel,
};

Ваши commands и command handler не связаны друг с другом, затем вы можете предоставить реализацию этих функций во время выполнения с помощью базы данных в памяти и узла EventEmitter, например (writeModel/infrastructure/inMemory/index.js):

const uuid = require('uuid/v1');
const { saveMessage } = require('../../domain/saveMessage');
const { getChatOfId } = require('../../domain/getChatOfId');
const { getNextMessageId } = require('../../domain/getNextMessageId');

const InMemoryRepository = (initialDbState = { chats: {}, messages: {}, users: {} }) => {
  const listeners = [];

  const db = {
    ...initialDbState,
  };

  const addOnDbUpdatedListener = onDbUpdated => listeners.push(onDbUpdated);

  const updateDb = updater => {
    updater();
    listeners.map(listener => listener(db));
  };

  const saveMessageInMemory = saveMessage(async messageState => {
    updateDb(() => (db.messages[messageState.id] = messageState));
  });

  const getChatOfIdFromMemory = getChatOfId(async id => db.chats[id]);

  const getNextMessageUuid = getNextMessageId(uuid);

  return {
    addOnDbUpdatedListener,
    saveMessage: saveMessageInMemory,
    getChatOfId: getChatOfIdFromMemory,
    getNextMessageId: getNextMessageUuid,
  };
};

module.exports = {
  InMemoryRepository,
};

И наш TestWriteModel, связывающий все это воедино:

const EventEmitter = require('events');
const { SimpleNodeCQRSwriteModel } = require('../writeModel');
const { InMemoryRepository } = require('../writeModel/infrastructure/inMemory');

const TestWriteModel = () => {
  const { saveMessage, getChatOfId, getNextMessageId } = InMemoryRepository();
  const commandEmitter = new EventEmitter();
  const dispatchCommand = command => commandEmitter.emit(command.type, command.payload);
  const handleCommand = (commandType, commandHandler) => {
    commandEmitter.on(commandType, commandHandler);
  };
  return SimpleNodeCQRSwriteModel({
    dispatchCommand,
    handleCommand,
    getChatOfId,
    getNextMessageId,
    saveMessage,
  });
};

Вы можете погрузиться в код (с очень простым read model) в этом репозитории: простой узел cqrs

person Pierre Criulanscy    schedule 18.12.2018