Я пытаюсь создать систему актеров, которая может масштабироваться вверх/вниз в зависимости от рабочей нагрузки, используя следующую конфигурацию.
akka {
actor {
serializers {
wire = "Akka.Serialization.WireSerializer, Akka.Serialization.Wire"
}
serialization-bindings {
"System.Object" = wire
}
deployment {
/analysis {
router = round-robin-pool
routees.paths = ["/user/analysis"]
resizer {
enabled = on
lower-bound = 1
upper-bound = 20
}
}
}
}
}
в моем основном цикле я создаю 3000 сообщений и нажимаю на актеров для обработки
var runAnalysisProps = Props.Create<RunAnalysisActor>().WithRouter(FromConfig.Instance);
//var runAnalysisProps = Props.Create<RunAnalysisActor>().WithRouter(new RoundRobinPool(0, new DefaultResizer(lower:1, upper: 10 ))); **This do not help either **
var analysisRef = RunAnalysisActorSystem.ActorOf(runAnalysisProps, "analysis");
Logger.LogMessage("Started Posting all messages", ConsoleColor.Blue);
for (int i = 0; i < 3000; i++)
{
analysisRef.Tell(new AnalysisMessage(100 + i, $"FIC{100+ i}", $"c:\\temp\\fic{100 + i}.dat"));
}
Logger.LogMessage("Completed posting all messages", ConsoleColor.Blue);
Видели ли мы что-то неправильное в цикле for выше? мне кажется, что он отправляет все сообщения на одну ссылку актера, а не на маршрутизатор.
мой ReceiveActor
очень простое приложение для ведения журнала
public RunAnalysisActor()
{
Receive<AnalysisMessage>((message)=> {
Logger.LogMessage($"Analysis Started => Id: {message.AnalysisId}, Loop: {message.LoopName}, File:{message.FilePath}");
var waitTime = rnd.Next(5) * 100;
//Logger.LogMessage($"Waiting => {waitTime}");
Thread.Sleep(10);
Logger.LogMessage($"Analysis Completed=> AId: {message.AnalysisId}, Loop: {message.LoopName}, File:{message.FilePath}");
});
}
Я также переопределил AroundPreStart
и PostStop
, чтобы увидеть, сколько актеров запущено и остановлено. но я вижу, что когда приложение запускается, оно создает только lower-bound
или минимум 2 актеров, я ожидал, что это создаст больше актеров на основе pressure-threshold
по умолчанию 1, но я вижу, что этого не происходит, и они не уменьшаются, когда нет никаких сообщений для обработки.
Может кто-нибудь, пожалуйста, помогите мне понять, что я делаю неправильно здесь. - Спасибо
Обновление 1:
Кажется, я понял суть проблемы. При публикации сообщения мне нужно изменить свою логику, чтобы добавить путь к моей конфигурации развертывания routees.paths = ["/user/analysis"]
, а также использовать функцию и Ask
, чтобы получить правильную ссылку, используя analysisRef.Ask<Routees>
что-то вроде этого.
RunAnalysisActorSystem.Scheduler.Advanced.ScheduleRepeatedly(
TimeSpan.FromMilliseconds(500),
TimeSpan.FromMilliseconds(500),
() => {
if (analysisRef.Ask<Routees>(new GetRoutees()).Result.Members.Any())
{
var i = rand.Next();
analysisRef.Tell(new AnalysisMessage(100 + i, $"FIC{100 + i}", $"c:\\temp\\fic{100 + i}.dat"));
}
}
);