Проблемы дуплекса WCF с несколькими клиентами в разных потоках

Я установил дуплексную службу WCF (односторонние сообщения) и несколько клиентов в одном процессе (для тестирования), используя CustomBinding по TCP.

Все работает нормально, пока перезвонят только одному клиенту. Однако это не подходит для нескольких клиентов. В последнем случае один клиент работает, другие могут отправлять свой запрос, но не получают ответа. Сервер может без проблем отправлять все ответы. Трассировка WCF показывает исключение EndpointNotFoundException на стороне клиента:

There was no channel that could accept the message with action
'http://tempuri.org/IMyService/Response'.
at System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(Exception e, Message message)
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.ProcessItem(TInnerItem item)
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.HandleReceiveResult(IAsyncResult result)
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.OnReceiveCompleteStatic(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception)
at System.ServiceModel.Channels.InputChannel.HelpReceiveAsyncResult.OnReceive(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.InputQueue`1.AsyncQueueReader.Set(Item item)
at System.Runtime.InputQueue`1.Dispatch()
at System.ServiceModel.Channels.SingletonChannelAcceptor`3.DispatchItems()
at System.ServiceModel.Channels.DuplexSessionOneWayChannelListener.ChannelReceiver.OnReceive(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception)
at System.ServiceModel.Channels.FramingDuplexSessionChannel.TryReceiveAsyncResult.OnReceive(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception)
at System.ServiceModel.Channels.SynchronizedMessageSource.ReceiveAsyncResult.OnReceiveComplete(Object state)
at System.ServiceModel.Channels.SessionConnectionReader.OnAsyncReadComplete(Object state)
at System.ServiceModel.Channels.TracingConnection.TracingConnectionState.ExecuteCallback()
at System.ServiceModel.Channels.TracingConnection.WaitCallback(Object state)
at System.ServiceModel.Channels.SocketConnection.FinishRead()
at System.ServiceModel.Channels.SocketConnection.AsyncReadCallback(Boolean haveResult, Int32 error, Int32 bytesRead)
at System.ServiceModel.Channels.OverlappedContext.CompleteCallback(UInt32 error, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
at System.Runtime.Fx.IOCompletionThunk.UnhandledExceptionFrame(UInt32 error, UInt32 bytesRead, NativeOverlapped* nativeOverlapped)
at System.Threading._IOCompletionCallback.PerformIOCompletionCallback(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* pOVERLAP)

На момент исключения я уверен, что все клиентские каналы по-прежнему открыты, так как они закрываются только после получения ответа. Похоже, что клиент получает сообщение, но не может отправить его экземпляру клиента.

Вот мой полный пример (конфигурация WCF по коду):

using System;
using System.Text;
using System.Collections.Generic;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Threading;
using System.Threading.Tasks;

namespace WcfDuplex
{
  [TestClass]
  public class WcfDuplexTest
  {
    [TestMethod]
    public void WcfDuplexTest1()
    {
      const int NumParallelRequests = 2;
      const int NumMessagesPerThread = 1;
      using (var host = MyServer.CreateServer(TestContext))
      {
        Action clientAction = () =>
          {
            using (var client = MyClient.CreateProxy(TestContext))
            {
              using (var scope = new OperationContextScope(client))
              {
                var callback = MyClient.GetCallbackHandler(client);
                OperationContext.Current.OutgoingMessageHeaders.ReplyTo = client.LocalAddress;
                for (int i = 1; i <= NumMessagesPerThread; i++)
                {
                  string message = String.Format("Message {0} from tread {1}", i, Thread.CurrentThread.ManagedThreadId);
                  client.Request(message);
                  bool success = callback.MessageArrived.WaitOne(5000);
                  Assert.IsTrue(success, "Timeout while waiting for: " + message);
                  Assert.IsTrue(callback.Message.EndsWith(message));
                }
              }
              client.Close();
            }
          };
        var tasks = new List<Task>();
        for (int i = 0; i < NumParallelRequests; i++)
          tasks.Add(Task.Factory.StartNew(clientAction));
        foreach (var task in tasks)
          task.Wait(10000);
      }
    }

    public TestContext TestContext
    {
      get;
      set;
    }
  }

  [ServiceContract(CallbackContract = typeof(IMyCallback))]
  interface IMyService
  {
    [OperationContract(IsOneWay = true)]
    void Request(string message);
  }

  [ServiceContract()]
  interface IMyCallback
  {
    [OperationContract(IsOneWay = true)]
    void Response(string message);
  }

  interface IMyServiceChannel : IMyService, IClientChannel
  { }

  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
  class MyService : IMyService
  {
    public void Request(string message)
    {
      var context = OperationContext.Current.Host.Extensions.Find<TestContextExtension>().TestContext;
      var callback = OperationContext.Current.GetCallbackChannel<IMyCallback>();
      OperationContext.Current.OutgoingMessageHeaders.To = OperationContext.Current.IncomingMessageHeaders.ReplyTo.Uri;
      context.WriteLine("Server received message: {0}. Reply to {1}", message, OperationContext.Current.OutgoingMessageHeaders.To);
      string responseMessage = "From server thread " + Thread.CurrentThread.ManagedThreadId + ": " + message;
      callback.Response(responseMessage);
      context.WriteLine("Server sent response: " + responseMessage);
    }
  }

  class MyCallbackHandler : IMyCallback, IExtension<IContextChannel>
  {

    private readonly TestContext Context;
    public string Message { get; set; }
    public AutoResetEvent MessageArrived { get; private set; }

    public MyCallbackHandler(TestContext context)
    {
      Context = context;
      MessageArrived = new AutoResetEvent(false);
    }

    public void Response(string message)
    {
      Message = message;
      Context.WriteLine("Client received message: " + message + " on " + OperationContext.Current.Channel.LocalAddress +
        " in thread " + Thread.CurrentThread.ManagedThreadId);
      MessageArrived.Set();
    }

    public void Attach(IContextChannel owner) { }
    public void Detach(IContextChannel owner) { }
  }

  class MyServer
  {
    public const string Url = "net.tcp://localhost:8731/MyService/";

    public static ServiceHost CreateServer(TestContext context)
    {
      var host = new ServiceHost(typeof(MyService));
      host.Extensions.Add(new TestContextExtension { TestContext = context });
      host.AddServiceEndpoint(typeof(IMyService), MyClient.GetBinding(), new Uri(MyServer.Url));
      host.Open();
      return host;
    }
  }

  class TestContextExtension : IExtension<ServiceHostBase>
  {
    public TestContext TestContext { get; set; }
    public void Attach(ServiceHostBase owner) { }
    public void Detach(ServiceHostBase owner) { }
  }

  class MyClient
  {
    public static MyCallbackHandler GetCallbackHandler(IMyServiceChannel channel)
    {
      var callback = channel.Extensions.Find<MyCallbackHandler>();
      return callback;
    }

    public static IMyServiceChannel CreateProxy(TestContext testContext)
    {
      var callback = new MyCallbackHandler(testContext);
      var instanceContext = new InstanceContext(callback);
      var binding = GetBinding();
      int port = 8732;// +Thread.CurrentThread.ManagedThreadId;
      binding.Elements.Find<CompositeDuplexBindingElement>().ClientBaseAddress = new Uri(String.Format("net.tcp://localhost:{0}/Client/", port));
      binding.OpenTimeout = TimeSpan.FromMinutes(5);
      var clientFactory = new DuplexChannelFactory<IMyServiceChannel>(instanceContext, binding);
      var client = clientFactory.CreateChannel(new EndpointAddress(MyServer.Url));
      client.Extensions.Add(callback);
      return client;
    }

    public static CustomBinding GetBinding()
    {
      var binding = new CustomBinding(
        new CompositeDuplexBindingElement(),
        new OneWayBindingElement(),
        new BinaryMessageEncodingBindingElement(),
        //new ReliableSessionBindingElement(),
        new TcpTransportBindingElement());
      return binding;
    }
  }
}

В качестве справки я использовал следующие статьи: здесь, здесь, здесь


person Piper    schedule 30.05.2013    source источник


Ответы (1)


Проблема заключалась в привязке. Что работает:

var binding = new CustomBinding(
  new BinaryMessageEncodingBindingElement(),
  new ReliableSessionBindingElement(),
  new TcpTransportBindingElement());

CompositeDuplexBindingElement и OneWayBindingElement вызывают проблемы. Они не нужны для каналов типа TCP.

Также работают:

var binding = new NetTcpBinding(SecurityMode.None);

or

var binding = new WSDualHttpBinding(WSDualHttpSecurityMode.None);
binding.ClientBaseAddress = new Uri("http://localhost:8732/Client/");
person Piper    schedule 29.10.2013