Наблюдение за сообщением Terminated предупредит систему о том, что удаленный субъект умер, но возникает вопрос, как именно реагировать на завершение работы удаленного субъекта. Предполагая, что актор получает IActorRef для удаленного актора через свой конструктор, как актор получит новый IActorRef для удаленного актора, когда он снова станет активным. Одним из способов может быть сбой актора и делегирование родительскому актору, который затем получит новый IActorRef для удаленного актора через выбор актера. Проблема с этим, однако, заключается в том, что первоначальный выбор субъекта для удаленного субъекта мог иметь место в коде, не являющемся субъектом, в корне композиции, где обычно происходит внедрение зависимостей. Я полагаю, вы могли бы обойти это, передав делегата фабрики выбора актера, который можно было бы использовать для восстановления удаленного IActorRef. В качестве альтернативы я придумал создать класс-оболочку, реализующий IActorRef, с именем FaultTolerantActorRef.
Этот класс принимает путь удаленного (или локального) актора в конструкторе и периодически выполняет выбор актера, чтобы получить обновление IActorRef для удаленного актера. Таким образом, если по какой-либо причине удаленный субъект умирает, вызовы FaultTolerantActorRef заканчиваются мертвыми буквами, а удаленный субъект мертв. Однако, когда удаленный субъект в конечном итоге снова подключается к сети, вызовы FaultTolerantActorRef в конечном итоге достигнут вновь возрожденного удаленного субъекта без необходимости предпринимать какие-либо явные действия со стороны вызывающего локального субъекта.
Существует метод Invalidate, который заставит FaultTolerantActorRef сделать новый выбор актера при следующем вызове. Предположительно, это может быть вызвано субъектом в ответ на сообщение Terminated от удаленного субъекта. Даже без вызова Invalidate произойдет новый выбор актера на основе интервала обновления, переданного конструктору.
using Akka.Actor;
using System;
using Akka.Util;
using System.Threading;
namespace JA.AkkaCore
{
public class FaultTolerantActorRef : IActorRef
{
public IActorRef ActorRef
{
get
{
if (!_valid || DateTime.Now.Ticks > Interlocked.Read(ref _nextRefreshTime))
RefreshActorRef();
return _actorRef;
}
}
public ActorPath Path
{
get
{
return ActorRef.Path;
}
}
object _lock = new object();
IActorRef _actorRef;
volatile bool _valid;
string _path;
IActorRefFactory _actorSystem;
private TimeSpan _requestTimeout;
private TimeSpan _refreshInterval;
//private DateTime _nextRefreshTime = DateTime.MinValue;
private long _nextRefreshTime = DateTime.MinValue.Ticks;
public FaultTolerantActorRef(IActorRefFactory actorSystem, IActorRef actorRef,
TimeSpan refreshInterval = default(TimeSpan), TimeSpan requestTimeout = default(TimeSpan))
: this(actorSystem, actorRef.Path.ToString(), refreshInterval, requestTimeout)
{
_actorRef = actorRef;
_valid = true;
}
public FaultTolerantActorRef(IActorRefFactory actorSystem, string actorPath,
TimeSpan refreshInterval = default(TimeSpan), TimeSpan requestTimeout = default(TimeSpan))
{
if (refreshInterval == default(TimeSpan))
_refreshInterval = TimeSpan.FromSeconds(60);
else
_refreshInterval = refreshInterval;
if (requestTimeout == default(TimeSpan))
_requestTimeout = TimeSpan.FromSeconds(60);
else
_requestTimeout = requestTimeout;
_actorSystem = actorSystem;
_valid = false;
_path = actorPath;
}
private void RefreshActorRef()
{
lock(_lock)
{
if (!_valid || DateTime.Now.Ticks > _nextRefreshTime)
{
_actorRef = _actorSystem.ActorSelectionOne(_path, _requestTimeout);
Interlocked.Exchange(ref _nextRefreshTime,DateTime.Now.Ticks + _refreshInterval.Ticks);
_valid = true;
}
}
}
public void Invalidate()
{
_valid = false;
}
public void Tell(object message, IActorRef sender)
{
ActorRef.Tell(message, sender);
}
public bool Equals(IActorRef other)
{
return ActorRef.Equals(other);
}
public int CompareTo(IActorRef other)
{
return ActorRef.CompareTo(other);
}
public ISurrogate ToSurrogate(ActorSystem system)
{
return ActorRef.ToSurrogate(system);
}
public int CompareTo(object obj)
{
return ActorRef.CompareTo(obj);
}
}
}
person
cfcal
schedule
29.12.2016