TL: DR: для применения архитектуры CQRS вам не нужна какая-то причудливая структура, особенно когда вы осуществляете только внутрипроцессное взаимодействие. Достаточно собственного EventEmitter
из модуля events
. Если вам нужна межпроцессная связь, servicebus
отлично подойдет. Чтобы взглянуть на пример реализации (следующего ответа длинной версии), вы можете погрузиться в код этого репозитория: простые узлы cqrs
Давайте возьмем пример очень простого приложения для чата, в котором вы можете отправлять сообщения, если чат не закрыт, и лайкать / отличать сообщения.
Наш главный агрегат (или агрегированный корень, концептуально) - это Chat
(writeModel/domain/chat.js
):
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
Затем у нас есть Message
агрегат (writeModel/domain/message.js
):
const Message = ({ id, chatId, userId, content, sentAt, messageLikes = [] } = {}) =>
Object.freeze({
id,
chatId,
userId,
content,
sentAt,
messageLikes,
});
Поведение для отправки сообщения может быть (writeModel/domain/chat.js
):
const invariant = require('invariant');
const { Message } = require('./message');
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
const sendMessage = ({ chatState, messageId, userId, content, sentAt }) => {
invariant(!chatState.isClosed, "can't post in a closed chat");
return Message({ id: messageId, chatId: chatState.id, userId, content, sentAt });
};
Теперь нам нужны команды (writeModel/domain/commands.js
):
const commands = {
types: {
SEND_MESSAGE: '[chat] send a message',
},
sendMessage({ chatId, userId, content, sentAt }) {
return Object.freeze({
type: commands.types.SEND_MESSAGE,
payload: {
chatId,
userId,
content,
sentAt,
},
});
},
};
module.exports = {
commands,
};
Поскольку мы используем javascript, у нас нет interface
для предоставления абстракции, поэтому мы используем higher order functions
(writeModel/domain/getChatOfId.js
):
const { Chat } = require('./message');
const getChatOfId = (getChatOfId = async id => Chat({ id })) => async id => {
try {
const chatState = await getChatOfId(id);
if (typeof chatState === 'undefined') {
throw chatState;
}
return chatState;
} catch (e) {
throw new Error(`chat with id ${id} was not found`);
}
};
module.exports = {
getChatOfId,
};
(writeModel/domain/saveMessage.js
) :
const { Message } = require('./message');
const saveMessage = (saveMessage = async (messageState = Message()) => {}) => saveMessage;
module.exports = {
saveMessage,
};
Теперь нам нужно реализовать наш commandHandlers
(уровень службы приложения):
(writeModel/commandHandlers/handleSendMessage.js
)
const { sendMessage } = require('../domain/chat');
const handleSendMessage = ({
getChatOfId,
getNextMessageId,
saveMessage,
}) => async sendMessageCommandPayload => {
const { chatId, userId, content, sentAt } = sendMessageCommandPayload;
const chat = await getChatOfId(chatId);
return saveMessage(
sendMessage({
chatState: chat,
messageId: getNextMessageId(),
userId,
content,
sentAt,
}),
);
};
module.exports = {
handleSendMessage,
};
Поскольку у нас нет interface
в javascript, мы используем higher order functions
для применения принципа инверсии зависимостей посредством внедрения зависимостей во время выполнения.
Затем мы можем реализовать корень композиции модели записи: (`writeModel / index.js):
const { handleSendMessage } = require('./commandHandlers/handleSendMessage');
const { commands } = require('./domain/commands');
const SimpleNodeCQRSwriteModel = ({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
}) => {
handleCommand(
commands.types.SEND_MESSAGE,
handleSendMessage({ getChatOfId, getNextMessageId, saveMessage }),
);
};
module.exports = {
SimpleNodeCQRSwriteModel,
};
Ваши commands
и command handler
не связаны друг с другом, затем вы можете предоставить реализацию этих функций во время выполнения с помощью базы данных в памяти и узла EventEmitter
, например (writeModel/infrastructure/inMemory/index.js
):
const uuid = require('uuid/v1');
const { saveMessage } = require('../../domain/saveMessage');
const { getChatOfId } = require('../../domain/getChatOfId');
const { getNextMessageId } = require('../../domain/getNextMessageId');
const InMemoryRepository = (initialDbState = { chats: {}, messages: {}, users: {} }) => {
const listeners = [];
const db = {
...initialDbState,
};
const addOnDbUpdatedListener = onDbUpdated => listeners.push(onDbUpdated);
const updateDb = updater => {
updater();
listeners.map(listener => listener(db));
};
const saveMessageInMemory = saveMessage(async messageState => {
updateDb(() => (db.messages[messageState.id] = messageState));
});
const getChatOfIdFromMemory = getChatOfId(async id => db.chats[id]);
const getNextMessageUuid = getNextMessageId(uuid);
return {
addOnDbUpdatedListener,
saveMessage: saveMessageInMemory,
getChatOfId: getChatOfIdFromMemory,
getNextMessageId: getNextMessageUuid,
};
};
module.exports = {
InMemoryRepository,
};
И наш TestWriteModel
, связывающий все это воедино:
const EventEmitter = require('events');
const { SimpleNodeCQRSwriteModel } = require('../writeModel');
const { InMemoryRepository } = require('../writeModel/infrastructure/inMemory');
const TestWriteModel = () => {
const { saveMessage, getChatOfId, getNextMessageId } = InMemoryRepository();
const commandEmitter = new EventEmitter();
const dispatchCommand = command => commandEmitter.emit(command.type, command.payload);
const handleCommand = (commandType, commandHandler) => {
commandEmitter.on(commandType, commandHandler);
};
return SimpleNodeCQRSwriteModel({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
});
};
Вы можете погрузиться в код (с очень простым read model
) в этом репозитории: простой узел cqrs а>
person
Pierre Criulanscy
schedule
18.12.2018
redux
, очень легкий, вы можете отправлять команды и перехватывать их вredux
промежуточном программном обеспечении. Не знаю, знакомы ли вы сredux
или нет. - person Pierre Criulanscy   schedule 17.12.2018redux
очень хорошо подходит :) - person Pierre Criulanscy   schedule 18.12.2018