WCF via UDP

Publicado por Fabio A. Falavinha
05/3/2012
Categoria:
Tags: , , , ,

Fala pessoALL!

A grande parte dos artigos sobre WCF foram baseados em TCP (Transmission Control Protocol) e HTTP. Qualquer interesse no protocolo UDP (User Datagram Protocol) é descrito através de sockets. Not so bad, but very poor!

Utilizando a plataforma WCF (Windows Communication Foundation) podemos desenvolver uma solução que utiliza a infraestrutura de serviços em cima de sockets.

Neste artigo será utilizado WCF 4.0 através de serviços self-hosted.

TCP vs UDP

O protocolo TCP é baseado em conexões, gerenciamento de sessão e garantia na entrega dos pacotes. Já o protocolo UDP não suporta nenhum dos recursos citados acima. Portanto, na camada de aplicação o desenvolvedor deverá implementar boa parte ou se não todos os recursos. Mas… por quê utilizar UDP ao invés de TCP? Aham! Simples, ao utilizar TCP temos no mínimo três pacotes a serem enviados ao servidor para iniciar a troca de dados. Com UDP apenas 1 pacote é necessário para realizar um request/response entre cliente e servidor. O overhead “causado” pelo protocolo TCP garante a entrega e o gerenciamento dos pacotes (ordem = reliability), porém torna-se mais lento em aplicaçãoes que não necessitam desse tipo de gerenciamento (veja a Figura 1).

TCP vs UDP

Outra funcionalidade do protocolo UDP é o envio de uma mensagem a múltiplos destinatários - Multicast e Broadcast.

  • Multicast: é um processo de subscription (publisher e subscriber) que através de um endereço IP Multicast máquinas registram-se em um grupo para receber dados enviados para este IP.
  • Broadcast: pacote distribuído em uma sub-net.

Criando a Solução

A Microsoft disponibiliza uma implementação padrão da integração entre WCF e UDP. Porém, esta implementação é genérica e requer atenção para não incluir elementos e configurações indesejadas à aplicação.

Eu vou disponibilizar aqui uma implementação mais simples e focada em: UDP ponto-a-ponto, Multicast e Reliability.

O primeiro passo é entender a infraestrutura da arquitetura WCF necessária para integrar o protocolo UDP através dos elementos: TransportBindingElement, UdpChannelFactory, UdpChannelListener, InputChannel e OutputChannel.

TransportBindingElement

Elemento principal para a criação do canal WCF através do protocolo.

    public class UdpTransportBindingElement : TransportBindingElement
    {
        public IPAddress LocalNetworkInterfaceAddress { get; set; }
        public bool Multicast { get; set; }

        public override string Scheme
        {
            get
            {
                return "soap.udp";
            }
        }

        public UdpTransportBindingElement()
        {
        }

        protected UdpTransportBindingElement(UdpTransportBindingElement other)
            : base(other)
        {
            Multicast = other.Multicast;
            LocalNetworkInterfaceAddress = other.LocalNetworkInterfaceAddress;
        }

        public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
        {
            if (context == null)
                throw new ArgumentNullException("context");
            return (IChannelFactory<TChannel>)(object)new UdpChannelFactory(this, context);
        }

        public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
        {
            if (context == null)
                throw new ArgumentNullException("context");
            if (!this.CanBuildChannelListener<TChannel>(context))
                throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, "Unsupported channel type: {0}.", typeof(TChannel).Name));
            return (IChannelListener<TChannel>)new UdpChannelListener(this, context);
        }

        public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
        {
            return typeof(TChannel) == typeof(IOutputChannel);
        }

        public override bool CanBuildChannelListener<TChannel>(BindingContext context)
        {
            return typeof(TChannel) == typeof(IInputChannel);
        }

        public override BindingElement Clone()
        {
            return new UdpTransportBindingElement(this);
        }

        public override T GetProperty<T>(BindingContext context)
        {
            if (context == null)
                throw new ArgumentNullException("context");
            return context.GetInnerProperty<T>();
        }
    }

UdpChannelFactory

Factory  para criação do canal de recebimento e envio dos pacotes através de protocolo.

    internal class UdpChannelFactory : ChannelFactoryBase<IOutputChannel>
    {
        public BufferManager BufferManager { get; private set; }
        public MessageEncoderFactory MessageEncoderFactory { get; private set; }
        public bool Multicast { get; private set; }
        public IPAddress LocalNetworkInterfaceAddress { get; private set; }

        public UdpChannelFactory(UdpTransportBindingElement bindingElement, BindingContext context)
            : base(context.Binding)
        {
            Multicast = bindingElement.Multicast;
            LocalNetworkInterfaceAddress = bindingElement.LocalNetworkInterfaceAddress;
            BufferManager = BufferManager.CreateBufferManager(bindingElement.MaxBufferPoolSize, int.MaxValue);
            var messageEncoderBindingElements = context.BindingParameters.FindAll<MessageEncodingBindingElement>();
            if (messageEncoderBindingElements.Count > 1)
                throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext");
            else if (messageEncoderBindingElements.Count == 1)
                MessageEncoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory();
            else
                MessageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory();
        }

        public override T GetProperty<T>()
        {
            T messageEncoderProperty = this.MessageEncoderFactory.Encoder.GetProperty<T>();
            if (messageEncoderProperty != null)
                return messageEncoderProperty;
            if (typeof(T) == typeof(MessageVersion))
                return (T)(object)this.MessageEncoderFactory.Encoder.MessageVersion;
            return base.GetProperty<T>();
        }

        protected override void OnOpen(TimeSpan timeout)
        {
        }

        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CompletedAsyncResult(callback, state);
        }

        protected override void OnEndOpen(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }

        protected override IOutputChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via)
        {
            return new UdpOutputChannel(this, remoteAddress, via, MessageEncoderFactory.Encoder, LocalNetworkInterfaceAddress);
        }

        protected override void OnClosed()
        {
            base.OnClosed();
            BufferManager.Clear();
        }
    }

UdpChannelListener

Recurso utilizado para interceptar o recebimento e envio de dados através do canal de comunicação (pode atuar de forma assíncrona ou síncrona).

    internal class UdpChannelListener : ChannelListenerBase<IInputChannel>
    {
        BufferManager bufferManager;

        //The UDP network sockets.
        List<Socket> listenSockets;

        int maxMessageSize;
        MessageEncoderFactory messageEncoderFactory;
        bool multicast;

        AsyncCallback onReceive;
        Uri uri;

        InputQueue<IInputChannel> channelQueue;

        //The channel associated with this listener.
        UdpInputChannel currentChannel;

        object currentChannelLock;

        public UdpChannelListener(UdpTransportBindingElement bindingElement, BindingContext context)
            : base(context.Binding)
        {
            this.maxMessageSize = (int)bindingElement.MaxReceivedMessageSize;
            this.multicast = bindingElement.Multicast;
            this.bufferManager = BufferManager.CreateBufferManager(bindingElement.MaxBufferPoolSize, this.maxMessageSize);
            MessageEncodingBindingElement messageEncoderBindingElement = context.BindingParameters.Remove<MessageEncodingBindingElement>();
            if (messageEncoderBindingElement != null)
            {
                this.messageEncoderFactory = messageEncoderBindingElement.CreateMessageEncoderFactory();
            }
            else
            {
                this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory();
            }
            this.channelQueue = new InputQueue<IInputChannel>();
            this.currentChannelLock = new object();
            this.listenSockets = new List<Socket>(2);

            Uri baseAddress = context.ListenUriBaseAddress;
            if (baseAddress == null)
            {
                if (context.ListenUriMode == ListenUriMode.Unique)
                {
                    UriBuilder uriBuilder = new UriBuilder(this.Scheme, Dns.GetHostEntry(String.Empty).HostName);
                    uriBuilder.Path = Guid.NewGuid().ToString();
                    baseAddress = uriBuilder.Uri;
                }
                else
                {
                    throw new InvalidOperationException("Null is only a supported value for ListenUriBaseAddress when using ListenUriMode.Unique.");
                }
            }

            this.InitializeUri(baseAddress, context.ListenUriRelativeAddress, context.ListenUriMode);
        }

        internal TimeSpan InternalReceiveTimeout
        {
            get { return this.DefaultReceiveTimeout; }
        }

        public MessageEncoderFactory MessageEncoderFactory
        {
            get
            {
                return messageEncoderFactory;
            }
        }

        string Scheme
        {
            get
            {
                return "soap.udp";
            }
        }

        public override Uri Uri
        {
            get
            {
                return this.uri;
            }
        }

        public override T GetProperty<T>()
        {
            T messageEncoderProperty = this.MessageEncoderFactory.Encoder.GetProperty<T>();
            if (messageEncoderProperty != null)
            {
                return messageEncoderProperty;
            }

            if (typeof(T) == typeof(MessageVersion))
            {
                return (T)(object)this.MessageEncoderFactory.Encoder.MessageVersion;
            }

            return base.GetProperty<T>();
        }

        #region Lifecycle State Machine
        /// <summary>
        /// Shutdown ungracefully
        /// </summary>
        protected override void OnAbort()
        {
            // Abort can be called at anytime, so we can't assume that
            // we've been Opened successfully (and thus may not have any listen sockets)
            lock (this.ThisLock)
            {
                CloseListenSockets(TimeSpan.Zero);
                this.channelQueue.Close();
            }
        }

        /// <summary>
        /// Shutdown gracefully
        /// </summary>
        protected override void OnClose(TimeSpan timeout)
        {
            lock (this.ThisLock)
            {
                CloseListenSockets(TimeSpan.Zero);
                this.channelQueue.Close();
            }
        }

        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            this.OnClose(timeout);
            return new CompletedAsyncResult(callback, state);
        }

        protected override void OnEndClose(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }

        void CloseListenSockets(TimeSpan timeout)
        {
            for (int i = 0; i < listenSockets.Count; i++)
            {
                this.listenSockets[i].Close((int)timeout.TotalMilliseconds);
            }
            listenSockets.Clear();
        }

        protected override void OnClosed()
        {
            if (this.bufferManager != null)
            {
                this.bufferManager.Clear();
            }

            base.OnClosed();
        }

        /// <summary>
        /// Initialize any objects we're going to need for the opened factory
        /// </summary>
        protected override void OnOpening()
        {
            base.OnOpening();
            this.onReceive = new AsyncCallback(this.OnReceive);
        }

        /// <summary>
        /// Open the listener factory for use. Ensures our UDP socket is bound
        /// </summary>
        protected override void OnOpen(TimeSpan timeout)
        {
            if (uri == null)
            {
                throw new InvalidOperationException("Uri must be set before ChannelListener is opened.");
            }

            if (this.listenSockets.Count == 0)
            {
                if (uri.HostNameType == UriHostNameType.IPv6 ||
                    uri.HostNameType == UriHostNameType.IPv4)
                {
                    listenSockets.Add(CreateListenSocket(IPAddress.Parse(uri.Host), uri.Port));
                }
                else
                {
                    listenSockets.Add(CreateListenSocket(IPAddress.Any, uri.Port));
                    if (Socket.OSSupportsIPv6)
                    {
                        listenSockets.Add(CreateListenSocket(IPAddress.IPv6Any, uri.Port));
                    }
                }
            }
        }

        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            this.OnOpen(timeout);
            return new CompletedAsyncResult(callback, state);
        }

        protected override void OnEndOpen(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }

        /// <summary>
        /// Open has completed, start an asynchronous receive on our socket.
        /// </summary>
        protected override void OnOpened()
        {
            base.OnOpened();
            Socket[] socketsSnapshot = listenSockets.ToArray();
            WaitCallback startReceivingCallback = new WaitCallback(StartReceiving);
            for (int i = 0; i < socketsSnapshot.Length; i++)
            {
                ThreadPool.QueueUserWorkItem(startReceivingCallback, socketsSnapshot[i]);
            }
        }
        #endregion

        Socket CreateListenSocket(IPAddress ipAddress, int port)
        {
            bool isIPv6 = (ipAddress.AddressFamily == AddressFamily.InterNetworkV6);
            Socket socket = null;

            if (multicast)
            {
                IPAddress anyIPAddr = IPAddress.Any;
                if (isIPv6)
                    anyIPAddr = IPAddress.IPv6Any;

                IPEndPoint endPoint = new IPEndPoint(anyIPAddr, port);
                socket = new Socket(endPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
                socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
                socket.Bind(endPoint);

                if (isIPv6)
                {
                    socket.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.AddMembership,
                        new IPv6MulticastOption(ipAddress));
                }
                else
                {
                    socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, new MulticastOption(ipAddress));
                }
            }
            else
            {
                IPEndPoint endPoint = new IPEndPoint(ipAddress, port);
                socket = new Socket(endPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
                socket.Bind(endPoint);
            }

            return socket;
        }

        EndPoint CreateDummyEndPoint(Socket socket)
        {
            if (socket.AddressFamily == AddressFamily.InterNetwork)
            {
                return new IPEndPoint(IPAddress.Any, 0);
            }
            else
            {
                return new IPEndPoint(IPAddress.IPv6Any, 0);
            }
        }

        void StartReceiving(object state)
        {
            Socket listenSocket = (Socket)state;
            IAsyncResult result = null;

            try
            {
                lock (ThisLock)
                {
                    if (base.State == CommunicationState.Opened)
                    {
                        EndPoint dummy = CreateDummyEndPoint(listenSocket);
                        byte[] buffer = this.bufferManager.TakeBuffer(maxMessageSize);
                        result = listenSocket.BeginReceiveFrom(buffer, 0, buffer.Length,
                            SocketFlags.None, ref dummy, this.onReceive, new SocketReceiveState(listenSocket, buffer));
                    }
                }

                if (result != null && result.CompletedSynchronously)
                {
                    ContinueReceiving(result, listenSocket);
                }
            }
            catch (Exception e)
            {
                Debug.WriteLine("Error in receiving from the socket.");
                Debug.WriteLine(e.ToString());
            }
        }

        void ContinueReceiving(IAsyncResult receiveResult, Socket listenSocket)
        {
            bool continueReceiving = true;

            while (continueReceiving)
            {
                Message receivedMessage = null;

                if (receiveResult != null)
                {
                    receivedMessage = EndReceive(listenSocket, receiveResult);
                    receiveResult = null;
                }

                lock (ThisLock)
                {
                    if (base.State == CommunicationState.Opened)
                    {
                        EndPoint dummy = CreateDummyEndPoint(listenSocket);
                        byte[] buffer = this.bufferManager.TakeBuffer(maxMessageSize);
                        receiveResult = listenSocket.BeginReceiveFrom(buffer, 0, buffer.Length,
                            SocketFlags.None, ref dummy, this.onReceive, new SocketReceiveState(listenSocket, buffer));
                    }
                }

                if (receiveResult == null || !receiveResult.CompletedSynchronously)
                {
                    continueReceiving = false;
                    Dispatch(receivedMessage);
                }
                else if (receivedMessage != null)
                {
                    ThreadPool.QueueUserWorkItem(new WaitCallback(DispatchCallback), receivedMessage);
                }
            }
        }

        Message EndReceive(Socket listenSocket, IAsyncResult result)
        {
            // if we've started the shutdown process, then we've disposed
            // the socket and calls to socket.EndReceive will throw
            if (base.State != CommunicationState.Opened)
                return null;

            byte[] buffer = ((SocketReceiveState)result.AsyncState).Buffer;
            Debug.Assert(buffer != null);
            Message message = null;

            try
            {
                int count = 0;

                lock (ThisLock)
                {
                    // if we've started the shutdown process, socket is disposed
                    // and calls to socket.EndReceive will throw
                    if (base.State == CommunicationState.Opened)
                    {
                        EndPoint dummy = CreateDummyEndPoint(listenSocket);
                        count = listenSocket.EndReceiveFrom(result, ref dummy);
                    }
                }

                if (count > 0)
                {
                    try
                    {
                        message = MessageEncoderFactory.Encoder.ReadMessage(new ArraySegment<byte>(buffer, 0, count), bufferManager);
                    }
                    catch (XmlException xmlException)
                    {
                        throw new ProtocolException(
                            "There is a problem with the XML that was received from the network. See inner exception for more details.",
                            xmlException);
                    }
                }
            }
            catch (Exception e)
            {
                Debug.WriteLine("Error in completing the async receive via EndReceiveFrom method.");
                Debug.WriteLine(e.ToString());
            }
            finally
            {
                if (message == null)
                {
                    this.bufferManager.ReturnBuffer(buffer);
                    buffer = null;
                }
            }

            return message;
        }

        //Called when an ansynchronous receieve operation completes
        //on the listening socket.
        void OnReceive(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
                return;

            ContinueReceiving(result, ((SocketReceiveState)result.AsyncState).Socket);
        }

        void DispatchCallback(object state)
        {
            Dispatch((Message)state);
        }

        /// <summary>
        /// Matches an incoming message to its waiting listener,
        /// using the FilterTable to dispatch the message to the correc
        /// listener. If no listener is waiting for the message, it is silently
        /// discarded.
        /// </summary>
        void Dispatch(Message message)
        {
            if (message == null)
                return;

            try
            {
                UdpInputChannel newChannel;
                bool channelCreated = CreateOrRetrieveChannel(out newChannel);

                newChannel.Dispatch(message);

                if (channelCreated)
                {
                    //Hand the channel off to whomever is waiting for AcceptChannel()
                    //to complete
                    this.channelQueue.EnqueueAndDispatch(newChannel);
                }
            }
            catch (Exception e)
            {
                Debug.WriteLine("Error dispatching Message.");
                Debug.WriteLine(e.ToString());
            }
        }

        /// <summary>
        /// Used to get a unique uri (by CompositeDuplexChannelFactory for example).
        /// We get a unique TCP port by binding to "port 0"
        /// </summary>
        public void InitializeUniqueUri(string host)
        {
            if (host == null)
            {
                throw new ArgumentNullException("host");
            }

            int port;

            lock (this.ThisLock)
            {
                CloseListenSockets(TimeSpan.Zero);
                IPAddress ipAddress = null;
                if (IPAddress.TryParse(host, out ipAddress))
                {
                    Socket socket = CreateListenSocket(ipAddress, 0);
                    port = ((IPEndPoint)socket.LocalEndPoint).Port;
                    listenSockets.Add(socket);
                }
                else
                {
                    Socket socket = CreateListenSocket(IPAddress.Any, 0);
                    port = ((IPEndPoint)socket.LocalEndPoint).Port;
                    listenSockets.Add(socket);
                    if (Socket.OSSupportsIPv6)
                    {
                        listenSockets.Add(CreateListenSocket(IPAddress.IPv6Any, port));
                    }
                }
            }

            UriBuilder uriBuilder = new UriBuilder(Scheme, host, port);
            InitializeUri(uriBuilder.Uri, String.Empty);
        }

        void InitializeUri(Uri baseAddress, string relativeAddress, ListenUriMode mode)
        {
            switch (mode)
            {
                case ListenUriMode.Explicit:
                    this.InitializeUri(baseAddress, relativeAddress);
                    break;
                case ListenUriMode.Unique:
                    {
                        //This listener sets unique uris using the host name only.
                        this.InitializeUniqueUri(baseAddress.Host);
                        break;
                    }
            }
        }

        public void InitializeUri(Uri baseAddress, string relativeAddress)
        {
            if (baseAddress == null)
                throw new ArgumentNullException("baseAddress");

            if (relativeAddress == null)
                throw new ArgumentNullException("relativeAddress");

            if (!baseAddress.IsAbsoluteUri)
                throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
                    "Base address must be an absolute URI."), "baseAddress");

            if (baseAddress.Scheme != this.Scheme)
            {
                // URI schemes are case-insensitive, so try a case insensitive compare now
                if (string.Compare(baseAddress.Scheme, this.Scheme, true, System.Globalization.CultureInfo.InvariantCulture) != 0)
                {
                    throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
                        "Invalid URI scheme: {0}.", baseAddress.Scheme), "baseAddress");
                }
            }

            Uri fullUri = baseAddress;

            // Ensure that baseAddress Path does end with a slash if we have a relative address
            if (relativeAddress != string.Empty)
            {
                if (!baseAddress.AbsolutePath.EndsWith("/"))
                {
                    UriBuilder uriBuilder = new UriBuilder(baseAddress);
                    uriBuilder.Path = uriBuilder.Path + "/";
                    baseAddress = uriBuilder.Uri;
                }

                fullUri = new Uri(baseAddress, relativeAddress);
            }

            lock (base.ThisLock)
            {
                ThrowIfDisposedOrImmutable();
                this.uri = fullUri;
                CloseListenSockets(TimeSpan.Zero);
            }
        }

        //Synchronously returns a channel that is attached to this listener.
        protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
        {
            if (timeout < TimeSpan.Zero)
            {
                throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue.");
            }
            if (!this.IsDisposed)
            {
                this.EnsureChannelAvailable();
            }

            IInputChannel channel;
            if (this.channelQueue.Dequeue(timeout, out channel))
            {
                return channel;
            }
            else
            {
                throw CreateAcceptTimeoutException(timeout);
            }
        }

        TimeoutException CreateAcceptTimeoutException(TimeSpan timeout)
        {
            return new TimeoutException(
                string.Format("Accept on listener at address {0} timed out after {1}.",
                this.Uri.AbsoluteUri, timeout));
        }

        protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (timeout < TimeSpan.Zero)
            {
                throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue.");
            }
            if (!this.IsDisposed)
            {
                this.EnsureChannelAvailable();
            }

            return this.channelQueue.BeginDequeue(timeout, callback, state);
        }

        protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
        {
            IInputChannel channel;
            if (this.channelQueue.EndDequeue(result, out channel))
            {
                return channel;
            }
            else
            {
                throw new TimeoutException();
            }
        }

        protected override bool OnWaitForChannel(TimeSpan timeout)
        {
            if (timeout < TimeSpan.Zero)
            {
                throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue.");
            }
            return this.channelQueue.WaitForItem(timeout);
        }

        protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (timeout < TimeSpan.Zero)
            {
                throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue.");
            }
            return this.channelQueue.BeginWaitForItem(timeout, callback, state);
        }

        protected override bool OnEndWaitForChannel(IAsyncResult result)
        {
            return this.channelQueue.EndWaitForItem(result);
        }

        //Guarantees that channel is attached to this listener.
        void EnsureChannelAvailable()
        {
            UdpInputChannel newChannel;
            if (CreateOrRetrieveChannel(out newChannel))
            {
                this.channelQueue.EnqueueAndDispatch(newChannel);
            }
        }

        bool CreateOrRetrieveChannel(out UdpInputChannel newChannel)
        {
            bool channelCreated = false;

            if ((newChannel = currentChannel) == null)
            {
                lock (currentChannelLock)
                {
                    if ((newChannel = currentChannel) == null)
                    {
                        newChannel = new UdpInputChannel(this);
                        newChannel.Closed += new EventHandler(this.OnChannelClosed);
                        currentChannel = newChannel;
                        channelCreated = true;
                    }
                }
            }

            return channelCreated;
        }

        void OnChannelClosed(object sender, EventArgs args)
        {
            UdpInputChannel channel = (UdpInputChannel)sender;

            lock (this.currentChannelLock)
            {
                if (channel == this.currentChannel)
                {
                    this.currentChannel = null;
                }
            }
        }

        class SocketReceiveState
        {
            Socket socket;
            byte[] buffer;
            public SocketReceiveState(Socket socket, byte[] buffer)
            {
                this.socket = socket;
                this.buffer = buffer;
            }

            public Socket Socket
            {
                get { return this.socket; }
            }

            public byte[] Buffer
            {
                get { return this.buffer; }
            }
        }
    }

A assinatura utilizada (scheme) na solução será soap.udp. Pode-se trocar caso deseje outra assinatura para o envio de dados via UDP, utilizando esta solução.

Não há como fugir da arquitura de sockets, pois são a base da infraestrutura de rede para as aplicaçãoes enviarem dados de um lado para o outro.
Notem, as classes UdpInputChannel e UdpOutputChannel utilizam sockets para receber e enviar dados. Como citato, no tópico anterior, é necessário implementar três recursos vistos no protocolo TCP para garantir reliability:

  • Non-duplication: cada pacote será recebido no máximo uma vez
  • Transmission: cada pacote será recebido pelo menos uma vez. Se o pacote não for recebido um mecanismo de erro deve reportar ao sender que o pacote não foi recebido.
  • Order: garantia de ordem na leitura dos pacotes

A solução implementada acima não suporta nenhum dos três recursos. Para garantir reliability na solução WCF é necessário habilitar o uso do protocolo WS-ReliableMessaging. Este protocolo é suportado por qualquer WCF channel implementado. Ou seja, na configuração do serviço WCF adicionaremos o uso do recurso: ReliableSessionBindingElement.

Testando a Solução

Para testarmos a solução de UDP vou criar uma aplicação que habilitará um self-host WCF chamado PingService.

    class Program
    {
        static void Main(string[] args)
        {
            //var uri = new Uri("soap.udp://localhost:4444/PingService");
            var uri = new Uri("soap.udp://239.255.255.20:4444/PingService");
            using (var serviceHost = new ServiceHost(typeof(PingService), uri))
            {
                BindingElementCollection bindingElements = new BindingElementCollection();
                ReliableSessionBindingElement session = new ReliableSessionBindingElement();
                session.Ordered = true;
                bindingElements.Add(new BinaryMessageEncodingBindingElement());
                bindingElements.Add(session);
                bindingElements.Add(new CompositeDuplexBindingElement());
                bindingElements.Add(new UdpTransportBindingElement());
                Binding customBinding = new CustomBinding(bindingElements);

                //Binding customBinding = new CustomBinding(new BinaryMessageEncodingBindingElement(), new UdpTransportBindingElement() { Multicast = true, LocalNetworkInterfaceAddress = IPAddress.Parse("192.168.10.146") });

                serviceHost.AddServiceEndpoint(typeof(IPingService), customBinding, uri);
                serviceHost.Open();
                Console.WriteLine("Type <ENTER> to exit application");
                Console.Read();
                serviceHost.Close();
            }
        }
    }

    [ServiceContract]
    public interface IPingService
    {
        [OperationContract(IsOneWay = true)]
        void Ping(string text);
    }

    public class PingService : IPingService
    {
        public void Ping(string text)
        {
            Console.WriteLine("Ping received => {0}", text);
        }
    }

Observem que após a criação do serviço WCF a adição dos endpoints é constituída por um CustomBinding com dois elementos:

  • UdpTransportBindingElement: classe criada para habilitar o uso do protocolo UDP como um canal WCF
  • BinaryMessageEncodingBindingElement: classe que mantém as configurações das mensagens XML a serem trocadas pelo canal WCF

Lembre-se que WCF é uma arquitetura que permite a troca de mensagens XML via SOAP em cima de protocolo TCP, HTTP, e agora, UDP. Portanto, a configuração das mensagens deve ser feita utilizando a classe BinaryMessageEncondingBindingElement juntamente com a propriedade ReaderQuotas, no qual armazena a configuração das mensagens XML.

Ao criar um serviço WCF todas as operações devem ser marcadas como IsOneWay. Esta configuração garante que as chamadas a esta operação serão processadas de forma assíncrona, e por se tratarem do protocolo UDP, os serviços não realizam handshake.

Como dito antes, a solução UDP não suporte reliability. Para tanto, basta trocar o CustomBinding por:

BindingElementCollection bindingElements = new BindingElementCollection();
ReliableSessionBindingElement session = new ReliableSessionBindingElement();
session.Ordered = true; // support packet order
bindingElements.Add(new BinaryMessageEncodingBindingElement());
bindingElements.Add(session);
bindingElements.Add(new CompositeDuplexBindingElement());
bindingElements.Add(new UdpTransportBindingElement()); // must be last item in the collection
Binding customBinding = new CustomBinding(bindingElements); //new CustomBinding(new BinaryMessageEncodingBindingElement(), new UdpTransportBindingElement())

Para realizar a chamadas ao serviço criei uma aplicação cliente que acessa de forma UDP-ponto-a-ponto:

    class Program
    {
        static void Main(string[] args)
        {
            using (var channelFactory = new ChannelFactory<IPingService>(new CustomBinding(new BinaryMessageEncodingBindingElement(), new UdpTransportBindingElement()), new EndpointAddress("soap.udp://239.255.255.20:4444/PingService")))
            {
                var client = channelFactory.CreateChannel();
                client.Ping("ACK!");
            }
        }
    }

Outra forma de disponibilizar um serviço é via Multicast. Sendo assim, basta habilitar a configuração na classe UdpTransportBindingElement:

  • Multicast: informar a propriedade como True. Tanto na aplicação cliente, como no servidor, a propriedade Multicast deve ser informada.
  • LocalNetworkInterfaceAddress: informar o endereço local da placa de rede. Note, que uma máquina pode ter mais de um endereço de rede dependo da configuração. Portanto é necessário informar o IP que será utilizado pelo socket para enviar o pacote a rede multicast UDP. Caso tenha apenas um endereço IP esta propriedade não precisa ser configurada.

O endereço de publicação do serviço WCF deve ser alterado para um endereço multicast válido de 224.0.0.0 a 239.255.255.255.


    class Program
    {
        static void Main(string[] args)
        {
            //var uri = new Uri("soap.udp://localhost:4444/PingService");
            var uri = new Uri("soap.udp://239.255.255.20:4444/PingService");
            using (var serviceHost = new ServiceHost(typeof(PingService), uri))
            {
                BindingElementCollection bindingElements = new BindingElementCollection();
                ReliableSessionBindingElement session = new ReliableSessionBindingElement();
                session.Ordered = true;
                bindingElements.Add(new BinaryMessageEncodingBindingElement());
                bindingElements.Add(session);
                bindingElements.Add(new CompositeDuplexBindingElement());
                bindingElements.Add(new UdpTransportBindingElement());
                //Binding customBinding = new CustomBinding(bindingElements);

                Binding customBinding = new CustomBinding(new BinaryMessageEncodingBindingElement(), new UdpTransportBindingElement() { Multicast = true, LocalNetworkInterfaceAddress = IPAddress.Parse("192.168.10.146") });

                serviceHost.AddServiceEndpoint(typeof(IPingService), customBinding, uri);
                serviceHost.Open();
                Console.WriteLine("Type <ENTER> to exit application");
                Console.Read();
                serviceHost.Close();
            }
        }
    }

    [ServiceContract]
    public interface IPingService
    {
        [OperationContract(IsOneWay = true)]
        void Ping(string text);
    }

    public class PingService : IPingService
    {
        public void Ping(string text)
        {
            Console.WriteLine("Ping received => {0}", text);
        }
    }
    class Program
    {
        static void Main(string[] args)
        {
            using (var channelFactory = new ChannelFactory<IPingService>(new CustomBinding(new BinaryMessageEncodingBindingElement(), new UdpTransportBindingElement() { Multicast = true }), new EndpointAddress("soap.udp://239.255.255.20:4444/PingService")))
            {
                var client = channelFactory.CreateChannel();
                client.Ping("ACK!");
            }
        }
    }

Conclusão

Neste artigo foi discutido os prós e contras entres os protocolos TCP e UDP. A implementação do recurso reliability na camada de aplicação é tarefa árdua para o desenvolver, porém ao utilizar a arquitura WCF a especificação do que deve ser implementado dimiuni e facilita o trabalho de expor os sockets de forma organizada.
Através do protocolo WS-ReliableMessaging a arquitetura WCF permite de forma genérica o uso de reliability ao utilizar UDP como  protocolo de comunicação dos serviços WCF.

Referências





Desenvolvido por hacklab/ com WordPress