我在SocketAsyncEventArgs buffer is full of zeroes中描述了类似的问题,其中我对SocketAsyncEventArgs UDP服务器的实现收到具有以下内容的数据包:

  • SocketAsyncEventArgs.BytesTransferred > 0
  • SocketAsyncEventArgs.Buffer中没有数据

  • 这种情况仅偶尔在负载下发生,并且可以在3台单独的机器上复制。我猜想这也是导致我另一个 Unresolved SO问题(FatalExecutionEngineError Detected when unhooking event)出现问题的原因。

    到目前为止,这是UDP服务器的实现:
    /// <summary>
    /// Provides a basic implementation of a UDPSocket based on ISocket
    /// </summary>
    public class UDPSocket : ISocket
    {
        #region "Private Variables"
        private Socket _socket;
        private IBufferManagerProvider _bufferManager;
        #endregion
    
        #region "Public Properties"
        public Int32 Port { get; private set; }
        public Int32 MessagePrefixLength { get; set; }
        public IPAddress ListeningAddress { get; private set; }
        public ILogProvider LogProvider { get; private set; }
        public bool AllowAddressReuse { get; private set; }
        #endregion
    
        #region "Constructors"
        private UDPSocket() { }
        public UDPSocket(String listeningAddress) : this(listeningAddress, 4444, null, null) { }
        public UDPSocket(String listeningAddress, Int32 port) : this(listeningAddress, port, null, null) { }
        public UDPSocket(Int32 port) : this("0.0.0.0", port, null, null) { }
    
        public UDPSocket(String listeningAddress, Int32 port, IBufferManagerProvider manager, ILogProvider logger)
        {
            // Setup the port
            if (port <= 0)
            {
                throw new ArgumentException("Port number cannot be less than 0.");
            }
            else
            {
                this.Port = port;
            }
    
            // check the ip address
            if (String.IsNullOrEmpty(listeningAddress))
            {
                throw new Exception("The listening address supplied is not valid.");
            }
            this.ListeningAddress = IPAddress.Parse(listeningAddress);
    
            // check the interfaces
            this.LogProvider = (logger == null) ? new DefaultLogProvider(LogLevel.None) : logger;
            _bufferManager = (manager == null) ? new DefaultBufferManager(100, 2048, null, null) : manager;
    
            // use a default message prefix
            this.MessagePrefixLength = 4;
        }
        #endregion
    
        #region "Event Handlers"
        #endregion
    
        #region "Internal handler methods"
        private void Receive()
        {
            SocketAsyncEventArgs args = _bufferManager.TakeNextSocketAsyncEventArgs();
            byte[] buff = _bufferManager.TakeNextBuffer();
            args.SetBuffer(buff, 0, buff.Length);
            args.Completed += PacketReceived;
            args.RemoteEndPoint = new IPEndPoint(IPAddress.Any, this.Port);
    
            try
            {
                if (!_socket.ReceiveMessageFromAsync(args))
                {
                    OnPacketReceived(args);
                }
            }
            catch
            {
                // we should only jump here when we disconnect all the clients.
            }
        }
    
        private void PacketReceived(object sender, SocketAsyncEventArgs e)
        {
            OnPacketReceived(e);
        }
    
        private void OnPacketReceived(SocketAsyncEventArgs e)
        {
            // Start a new Receive operation straight away
            Receive();
    
            // Now process the packet that we have already
            if (e.BytesTransferred <= MessagePrefixLength)
            {
                // Error condition, empty packet
                this.LogProvider.Log(String.Format("Empty packet received from {0}. Discarding packet.", e.ReceiveMessageFromPacketInfo.Address.ToString()), "UDPSocket.OnPacketReceived", LogLevel.Minimal);
                ReleaseArgs(e);
                return;
            }
    
            //
            //  The buffer can occassionally be zero'd at this point in time
            //
    
            // Get the message length from the beginning of the packet.
            byte[] arrPrefix = new byte[MessagePrefixLength];
            Buffer.BlockCopy(e.Buffer, 0, arrPrefix, 0, MessagePrefixLength);
            Int32 messageLength = BitConverter.ToInt32(arrPrefix, 0);
    
            // the number of bytes remaining to store
            Int32 bytesToProcess = e.BytesTransferred - MessagePrefixLength;
    
            if (bytesToProcess < messageLength)
            {
                this.LogProvider.Log(String.Format("Missing data from {0}. Discarding packet.", e.ReceiveMessageFromPacketInfo.Address.ToString()), "UDPSocket.OnPacketReceived", LogLevel.Minimal);
                ReleaseArgs(e);
                return;
            }
    
            // Create a data buffer
            byte[] data = new byte[messageLength];
    
            // Copy the remaining data to the data buffer on the user token
            Buffer.BlockCopy(e.Buffer, MessagePrefixLength, data, 0, messageLength);
    
            // Data is safely stored, so unhook the event and return the SocketAsyncEventArgs back to the pool
            ReleaseArgs(e);
    
            // Thread safe way of triggering the event
            var evnt = OnDataReceived;
    
            if (evnt != null)
            {
                evnt(e, new SocketDataEventArgs(data));
            }
        }
    
        private void ReleaseArgs(SocketAsyncEventArgs e)
        {
            e.Completed -= PacketReceived;
            _bufferManager.InsertSocketAsyncEventArgs(e);
            _bufferManager.InsertBuffer(e.Buffer);
        }
        #endregion
    
        #region "ISocket implicit implementation"
        public void Start()
        {
            this.LogProvider.Log("Starting. Creating socket", "UDPSocket.Start", LogLevel.Verbose);
            _socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
            _socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.PacketInformation, true);
            _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, this.AllowAddressReuse);
            _socket.Bind(new IPEndPoint(this.ListeningAddress, this.Port));
    
            // use a default message prefix
            this.MessagePrefixLength = 4;
    
            // begin receiving packets
            Receive();
    
            this.LogProvider.Log("Socket created. Listening for packets", "UDPSocket.Start", LogLevel.Verbose);
        }
    
        public void Stop()
        {
            // do a shutdown before you close the socket
            try
            {
                _socket.Shutdown(SocketShutdown.Both);
                this.LogProvider.Log("Clean socket shutdown", "TCPSocket.CloseSocket", LogLevel.Verbose);
            }
            // throws if socket was already closed
            catch (Exception ex)
            {
                this.LogProvider.Log("Error closing socket - " + ex.Message, "TCPSocket.CloseSocket", LogLevel.Minimal);
            }
    
            // Close the socket, which calls Dispose internally
            _socket.Close();
            this.LogProvider.Log("Socket closed", "TCPSocket.CloseSocket", LogLevel.Verbose);
        }
    
        public event EventHandler<SocketDataEventArgs> OnDataReceived;
        #endregion
    }
    

    如果需要,我可以提供完整的服务器/客户端演示,也可以发布其他类(例如缓冲区管理器)。这是我第一次使用SocketAsyncEventArgs,因此我的实现可能不是100%正确

    最佳答案

    我认为我找到了问题。我没有使用Stack<T>来管理缓冲区,而是使用Queue<T>来极大地减少了问题。对于那些感兴趣的人,这是我对缓冲区队列的实现:

    /// <summary>
    /// Creates a managed Queue that makes a program wait when the resources are depleated
    /// </summary>
    /// <typeparam name="T">The type of Queue to create</typeparam>
    public sealed class ManagedQueue<T>
    {
        #region "Constructors"
        public ManagedQueue(Int32 capacity = 400, bool fillQueue = false, Queue<T> queue = null)
        {
            Capacity = capacity;
            _queue = queue ?? new Queue<T>(capacity);
            _restrictor = new SemaphoreSlim((fillQueue) ? Capacity : 0, Capacity);
    
            if (fillQueue)
            {
                // Setup the queue with default values
                for (Int32 i = 0; i < Capacity; i++)
                {
                    Insert(default(T));
                }
            }
        }
        #endregion
    
        /// <summary>
        /// Gets the defined over all queue Capacity
        /// </summary>
        public Int32 Capacity { get; private set; }
    
        // The queue to hold the items
        private readonly Queue<T> _queue;
    
        // The SemaphoreSlim to restrict access to the queue
        private readonly SemaphoreSlim _restrictor;
    
        /// <summary>
        /// Take the next resource available from the queue. This is a blocking operation if capacity is reached.
        /// </summary>
        /// <returns>The next resource available</returns>
        public T TakeNext()
        {
            // Sanity Check
            if (_queue == null)
            {
                throw new InvalidOperationException("The queue cannot be null");
            }
    
            // make us wait if necessary
            _restrictor.Wait();
    
            lock (_queue)
            {
                if (_queue.Count > 0)
                {
                    return _queue.Dequeue();
                }
                throw new Exception("There has been a Semaphore/queue offset");
            }
        }
    
        /// <summary>
        /// Adds an item to the queue. This will release other threads if they are blocked
        /// </summary>
        /// <param name="item"></param>
        public void Insert(T item)
        {
            // Sanity Check
            if (_queue == null)
            {
                throw new InvalidOperationException("The queue cannot be null");
            }
    
            // Sanity Check
            if (item == null)
            {
                throw new ArgumentException("The item cannot be null");
            }
    
            lock (_queue)
            {
                _queue.Enqueue(item);
                _restrictor.Release();
            }
        }
    }
    

    关于c# - SocketAsyncEventArgs UDP服务器缓冲区有时充满零,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/14858952/

    10-11 21:25