Как отменить асинхронный сокет отправки в приложении С#

У меня есть приложение, которое просто извлекает элементы из очереди, а затем пытается отправить их асинхронно через сетевой сокет.

У меня возникают некоторые проблемы, когда что-то идет не так или клиент прерывает хост-сокет.

Вот часть моего кода: я думаю, это может объяснить лучше, чем мои слова:

Вот мой класс SocketState.cs, который содержит сокет и связанную с ним информацию:

public class SocketState
{
    public const int BufferSize = 256;
    public Socket WorkSocket { get; set; }
    public byte[] Buffer { get; set; }   

    /// <summary>
    /// Constructor receiving a socket
    /// </summary>
    /// <param name="socket"></param>
    public SocketState(Socket socket)
    {
        WorkSocket = socket;
        Buffer = new byte[BufferSize];
    }
}

Вот мой класс SocketHandler.cs, который управляет большинством операций с сокетами:

public class SocketHandler : IObserver
{
    # region Class Variables
    private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
    private SocketState _state;
    private OutgoingConnectionManager _parentConnectionManager;
    private int _recieverId;
    private readonly ManualResetEvent _sendDone = new ManualResetEvent(false);

    public volatile bool NameIsSet = false;
    private ManualResetEvent _receiveDone = new ManualResetEvent(false);
    public String Name;
    public readonly Guid HandlerId;
    # endregion Class Variables

    /// <summary>
    /// Constructor
    /// </summary>
    public SocketHandler(SocketState state)
    {
        HandlerId = Guid.NewGuid();
        _state = state;       
        _state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0, new AsyncCallback(ReceiveCallback), this._state);
    }

    /// <summary>
    /// Set the receiver id for this socket.
    /// </summary>
    public void SetReceiverId(int receiverId)
    {
        _recieverId = receiverId;
    }

    /// <summary>
    /// Stops / closes a connection
    /// </summary>
    public void Stop()
    {
        CloseConnection();
    }

    /// <summary>
    /// Used to set this connections parent connection handler
    /// </summary>
    /// <param name="conMan"></param>
    public void SetConnectionManager(OutgoingConnectionManager conMan)
    {
        _parentConnectionManager = conMan;
    }

    /// <summary>
    /// Returns if the socket is connected or not
    /// </summary>
    /// <returns></returns>
    public bool IsConnected()
    {
        return _state.WorkSocket.Connected;
    }

    /// <summary>
    /// Public Class that implements the observer interface , this function provides a portal to receive new messages which it must send
    /// </summary>
    /// <param name="e"> Event to execute</param>        
    public void OnMessageRecieveEvent(ObserverEvent e)
    {
        SendSignalAsync(e.Message.payload);           
    }
    # region main working space

    # region CallBack Functions
    /// <summary>
    /// CallBack Function that is called when a connection Receives Some Data
    /// </summary>
    /// <param name="ar"></param>
    private void ReceiveCallback(IAsyncResult ar)
    {
        try
        {
            String content = String.Empty;
            if (ar != null)
            {
                SocketState state = (SocketState)ar.AsyncState;
                if (state != null)
                {
                    Socket handler = state.WorkSocket;
                    if (handler != null)
                    {
                        int bytesRead = handler.EndReceive(ar);

                        if (bytesRead > 0)
                        {
                            StringBuilder Sb = new StringBuilder();
                            Sb.Append(Encoding.Default.GetString(state.Buffer, 0, bytesRead));

                            if (Sb.Length > 1)
                            {
                                content = Sb.ToString();

                                foreach (var s in content.Split('Ÿ'))
                                {
                                    if (string.Compare(s, 0, "ID", 0, 2) == 0)
                                    {
                                        Name = s.Substring(2);
                                        NameIsSet = true;
                                    }

                                    if (string.Compare(s, 0, "TG", 0, 2) == 0)
                                    {
                                        LinkReplyToTag(s.Substring(2), this.Name);
                                    }
                                }
                                _state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0,
                                    new AsyncCallback(ReceiveCallback), _state);
                            }
                        }
                    }
                }
            }
        }
        catch
        {
            CloseConnection();
        }
    }

    /// <summary>
    /// Call Back Function called when data is send though this connection
    /// </summary>
    /// <param name="asyncResult"></param>
    private void SendCallback(IAsyncResult asyncResult)
    {
        try
        {
            if (asyncResult != null)
            {
                Socket handler = (Socket)asyncResult.AsyncState;
                if (handler != null)
                {
                    int bytesSent = handler.EndSend(asyncResult);
                    // Signal that all bytes have been sent.
                    _sendDone.Set();
                    if (bytesSent > 0)
                    {
                        return;
                    }
                }
            }
        }
        catch (Exception e)
        {                
            Log.Error("Transmit Failed On Send CallBack");
        }
        //Close socket as something went wrong
        CloseConnection();
    }
    # endregion CallBack Functions

    /// <summary>
    /// Sends a signal out of the current connection
    /// </summary>
    /// <param name="signal"></param>
    private void SendSignalAsync(Byte[] signal)
    {
        try
        {
            if (_state != null)
            {
                if (_state.WorkSocket != null)
                {
                    if (_state.WorkSocket.Connected)
                    {
                        try
                        {
                            _sendDone.Reset();
                            _state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, new AsyncCallback(SendCallback),
                                _state.WorkSocket);
                            _sendDone.WaitOne(200);
                            return;
                        }
                        catch (Exception e)
                        {
                            Log.Error("Transmission Failier for IP: " + ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address, e);
                        }
                    }
                }
            }
            //Close Connection as something went wrong
            CloseConnection();
        }
        catch (Exception e)
        {
            Log.Error("An Exception has occurred in the SendSignalAsync function", e);
        }
    }     


    /// <summary>
    /// Call this to Close the connection
    /// </summary>
    private void CloseConnection()
    {
        try
        {
            var ip = "NA";
            try
            {
                if (_state != null)
                {
                    ip = ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address.ToString();
                }
            }
            catch
            {
                //Cannot get the ip address
            }
            OutgoingListeningServer.UpdateRecieversHistory(_recieverId, ip, "Disconnected");

            try
            {
                if (_state != null)
                {
                    if (_state.WorkSocket != null)
                    {
                        _state.WorkSocket.Shutdown(SocketShutdown.Both);
                        _state.WorkSocket.Close();
                        //_state.WorkSocket.Dispose();
                        _state.WorkSocket = null;
                        _state = null;
                    }
                }
            }
            catch (Exception e)
            {
                _state = null;
                Log.Error("Error while trying to close socket for IP: " + ip, e);
            }
            if (_parentConnectionManager != null)
            {
                // Remove this connection from the connection list
                _parentConnectionManager.ConnectionRemove(this);
            }
        }
        catch (Exception e)
        {
            Log.Error("A major error occurred in the close connection function, outer try catch was hit", e);
        }
    }  
    # endregion main working space
}

И вот мой поток, который затем вызовет функцию OnMessageRecieveEvent() в классе SocketHandler.cs.

  private void Main()
    {
        Log.Info("Receiver" + ReceiverDb.Id + " Thread Starting");
        // Exponential back off 
        var eb = new ExponentialBackoff();
        try
        {
            while (_run)
            {
                try
                {
                    if (ReceiverOutgoingConnectionManager.HasConnectedClient())
                    {
                        //Fetch Latest Item
                        ILogItem logItem;
                        if (_receiverQueue.TryDequeue(out logItem))
                        {
                            //Handle the logItem 
                           **calls the OnMessageRecieveEvent() for all my connections.                       
                                    ReceiverOutgoingConnectionManager.SendItemToAllConnections(logItem);  
                            //Reset the exponential back off counter
                            eb.reset();
                        }
                        else
                        {
                            //Exponential back off thread sleep
                            eb.sleep();
                        }
                    }
                    else
                    {
                        //Exponential back off thread sleep
                        eb.sleep();
                    }
                }
                catch (Exception e)
                {
                    Log.Error("Error occurred in " + ReceiverDb.Id + "receiver mains thread ", e);
                }
            }
        }
        catch (Exception e)
        {
            Log.Error(" ** An error has occurred in a receiver holder main thread ** =====================================================================>", e);
        }
        Log.Info("Receiver" + ReceiverDb.Id + " Thread Exiting ** ===============================================================================>");
    }

Прошу прощения за столько кода. Но я боюсь, что это может быть неочевидно, поэтому я разместил весь соответствующий код.

Теперь, чтобы объяснить мою проблему. Если на сокете произошла ошибка. Я получаю много Transmit Failed On Send CallBack. что для меня означает, что я неправильно закрываю сокет, и все еще выполняется невыполненный обратный вызов.

Нет ли способа отменить все ожидающие обратные вызовы, когда я закрываю сокет?

Я также уверен, что будет несколько предложений/проблем с моим кодом, который я разместил. Я был бы более чем признателен за обратную связь.


person Zapnologica    schedule 11.02.2015    source источник
comment
Человек, этот обратный вызов просто выдающийся! #just_had_to   -  person SimpleVar    schedule 11.02.2015
comment
Что будет означать отмена невыполненных действий? Вы пытаетесь уведомить вызывающих абонентов, что их действия были отменены из-за закрытия сокета? На мой взгляд, если у вас есть очередь действий, а затем вы перестаете проходить через эту очередь, действия в очереди не выполняются, игнорируются, игнорируются и отменяются.   -  person SimpleVar    schedule 11.02.2015


Ответы (1)


Я предполагаю, что это для учебных целей, поскольку написание собственного сетевого кода для других целей несколько... сложно.

Получение исключения в обратном вызове отправки нормально. Вот для чего нужны исключения. Однако вам нужно определить исключение, а не просто хвататься за одеяло Exception и практически игнорировать всю информацию внутри.

Обрабатывайте исключения, которые вы можете обрабатывать, в тех местах, где они должны обрабатываться.

Я не буду вдаваться в подробности, потому что с вашим кодом много проблем. Но одна из ключевых вещей — игнорирование правильной обработки сокета — когда вы получаете 0 байта, это означает, что вы должны закрыть сокет. Это сигнал с другой стороны, говорящий, что мы закончили. Игнорируя это, вы работаете с (возможно, частично) закрытым соединением.

Пожалуйста, используйте какую-нибудь сетевую библиотеку, которая может дать вам необходимые гарантии (и простоту). WCF, Линдгрен, что угодно. TCP не гарантирует, например, что вы получите сообщение одной частью, поэтому ваш код разбора сообщения ненадежен. Вам нужно использовать некоторые кадры сообщений, вам нужно добавить правильную обработку ошибок... Socket не является высокоуровневой конструкцией, она не работает "автоматически", вам нужно реализовать все это самостоятельно.

Даже игнорируя сам сетевой код, очевидно, что вы просто игнорируете большинство сложностей асинхронного кода. Что случилось с SendDone? Либо вы хотите использовать асинхронный код, а затем избавиться от синхронности, либо вы хотите синхронный код, и тогда зачем вы используете асинхронные сокеты?

person Luaan    schedule 11.02.2015
comment
Спасибо за ваши комментарии и мнения. Я думаю, что вы указали мне на несколько хороших вещей, как на то, чего я на самом деле хочу достичь. В конце концов, я хочу надежно. Что, вероятно, не то, что у меня есть здесь. Я следил за множеством онлайн-руководств по асинхронным сокетам. Просто отметим, что установка, для которой он используется, больше похожа на среду вещания. Мое клиентское приложение специально не отправляет обратно и не подтверждает. только tcp акк. - person Zapnologica; 11.02.2015
comment
@zapnologica Образцы C # в Интернете обычно представляют собой ужасный беспорядок. Есть несколько очень хороших примеров C++, но это, конечно, означает много проблем - и вам все равно придется отфильтровывать неработающие. Писать низкоуровневый сетевой код действительно сложно, вы действительно хотите, чтобы кто-то, кто делает это для жизни, справился с этим, и просто использовал их библиотеку :) Я начал делать несколько примеров и руководств по работе с сетью, но это не будет готов в ближайшее время, не говоря уже о том, что он нуждается в большом количестве экспертных оценок и тестировании в реальных условиях. Работа в сети сложна. В конце концов, это распределенная система. - person Luaan; 11.02.2015
comment
Послушайте, как бы мне ни хотелось погрузиться и изучить все тонкости работы в сети. Это в конечном счете для живого производственного сервера. И по твоему совету и немного логики с моей стороны, кричит Библиотека. Это потребует значительного изменения кода от моего имени, но оно того стоит. Есть ли какая-то конкретная библиотека, которую вы могли бы предложить? В этом случае у меня нет контроля над клиентами. Знаете ли вы об этой библиотеке: codeproject.com/Articles/563627/ - person Zapnologica; 11.02.2015
comment
@zapnologica Не бойтесь использовать хорошо проверенную технологию для того, что вы делаете. Например, похоже, что вы можете легко использовать HTTP для связи — HTTP очень легко использовать в .NET. И это фактически дает вам ответ - TCP Send не сообщит вам, что удаленный сервер получил ваши данные. - person Luaan; 11.02.2015
comment
@zapnologica У вас нет контроля над клиентом? Это сложно. Если в коммуникационном протоколе нет кадрирования сообщений, вы не можете просто добавить его на сервер. Что вы знаете об общении? Есть ли способы найти конец любого данного сообщения? В любом случае, похоже, вы не сможете использовать какую-либо готовую библиотеку для этого сценария. Ой. - person Luaan; 11.02.2015
comment
Да, это моя забота. Я знаю клиента. и если бы нужно было бы его можно было бы восстановить. Однако другой отдел контролирует клиента. Он просто использует простой текстовый протокол. Этот сервер в основном почтовый ящик. Клиент подключается к серверу и предоставляет свое имя. Сервер аутентифицирует его, а затем буквально просто передает сообщения клиенту. Есть надежда. Протокол, которым они пользуются, имеет начальный разделитель и конечный разделитель. - person Zapnologica; 11.02.2015
comment
Давайте продолжим это обсуждение в чате. - person Zapnologica; 11.02.2015