本文介绍了在Azure Service Fabric服务中设置SignalR和Service Bus的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要将现有的Cloud Service WorkerRole移植到Service Fabric作为无状态服务.原始的云服务使用SignalR和服务总线(作为SignalR背板)将通知发送到任何正在侦听的客户端.有一个Startup类可以进行一些设置:

I am porting an existing Cloud Service WorkerRole to Service Fabric as a stateless service. The original Cloud Service uses SignalR and Service Bus (as a SignalR backplane), to send notifications out to any client listening. There is a Startup class that does some of the setup:

class Startup
{
    public void Configuration(IAppBuilder app)
    {
        String connectionString = "Endpoint=sb://[name].servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=[key]";
        GlobalHost.DependencyResolver.UseServiceBus(connectionString, "InSys");
        app.MapSignalR();
        Notifications.Hub = GlobalHost.ConnectionManager.GetHubContext<MyHub>();
    }
}

在用于WorkerRole的OnStart()方法中,我使用以下命令启动OWIN:

In the OnStart() method in for the WorkerRole I kick-off OWIN with:

var endpoint = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["HttpEndpoint"];
var baseUri = $"{endpoint.Protocol}://{endpoint.IPEndpoint}";
var app = WebApp.Start<Startup>(new StartOptions(url: baseUri));

如何对Service Fabric中的无状态服务进行此操作(即,连接到SignalR Service Bus背板)?

How is this (i.e., connection the to SignalR Service Bus Backplane) done for a stateless service within Service Fabric?

推荐答案

https://github的帮助下. com/marcinbudny/SignalRSelfHostScaleOut (这是使用Redis进行横向扩展的示例),我想我对此很满意.

With the help of https://github.com/marcinbudny/SignalRSelfHostScaleOut (which is an example of scaleout using Redis) I think I have this licked.

在ServiceManifest.xml中,添加了以下端点:

In the ServiceManifest.xml I added the following EndPoint:

<Endpoint Protocol="http" Name="ServiceEndpoint" Type="Input" Port="8322" />

我还添加了一个Startup类:

I also added a Startup class:

public static class Startup
{
    public static void ConfigureApp(IAppBuilder app)
    {
        String connectionString = "Endpoint=sb://[name].servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=[value]";
        GlobalHost.DependencyResolver.UseServiceBus(connectionString, "InSys");
        app.MapSignalR();
        Notifications.Hub = GlobalHost.ConnectionManager.GetHubContext<InSysMainHub>();
    }
}

还添加了一个OwinCommunicationListener类:

An OwinCommunicationListener class was also added:

public class OwinCommunicationListener : ICommunicationListener
{
    private readonly ServiceEventSource eventSource;
    private readonly Action<IAppBuilder> startup;
    private readonly ServiceContext serviceContext;
    private readonly string endpointName;
    private readonly string appRoot;

    private IDisposable webApp;
    private string publishAddress;
    private string listeningAddress;

    public OwinCommunicationListener(Action<IAppBuilder> startup, ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName)
        : this(startup, serviceContext, eventSource, endpointName, null)
    {
    }

    public OwinCommunicationListener(Action<IAppBuilder> startup, ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName, string appRoot)
    {
        if (startup == null)
        {
            throw new ArgumentNullException(nameof(startup));
        }

        if (serviceContext == null)
        {
            throw new ArgumentNullException(nameof(serviceContext));
        }

        if (endpointName == null)
        {
            throw new ArgumentNullException(nameof(endpointName));
        }

        if (eventSource == null)
        {
            throw new ArgumentNullException(nameof(eventSource));
        }

        this.startup = startup;
        this.serviceContext = serviceContext;
        this.endpointName = endpointName;
        this.eventSource = eventSource;
        this.appRoot = appRoot;
    }


    public Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        var serviceEndpoint = this.serviceContext.CodePackageActivationContext.GetEndpoint(this.endpointName);
        var protocol = serviceEndpoint.Protocol;
        int port = serviceEndpoint.Port;

        if (this.serviceContext is StatefulServiceContext)
        {
            StatefulServiceContext statefulServiceContext = (StatefulServiceContext) serviceContext;

            listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "{0}://+:{1}/{2}{3}/{4}/{5}",
                protocol,
                port,
                string.IsNullOrWhiteSpace(appRoot)
                    ? string.Empty
                    : appRoot.TrimEnd('/') + '/',
                statefulServiceContext.PartitionId,
                statefulServiceContext.ReplicaId,
                Guid.NewGuid());
        }
        else if (serviceContext is StatelessServiceContext)
        {
            listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "{0}://+:{1}/{2}",
                protocol,
                port,
                string.IsNullOrWhiteSpace(appRoot)
                    ? string.Empty
                    : appRoot.TrimEnd('/') + '/');
        }
        else
        {
            throw new InvalidOperationException();
        }

        publishAddress = listeningAddress.Replace("+", FabricRuntime.GetNodeContext().IPAddressOrFQDN);

        try
        {
            eventSource.Message("Starting web server on " + listeningAddress);
            webApp = WebApp.Start(listeningAddress, appBuilder => startup.Invoke(appBuilder));
            eventSource.Message("Listening on " + this.publishAddress);
            return Task.FromResult(this.publishAddress);
        }
        catch (Exception ex)
        {
            eventSource.Message("Web server failed to open endpoint {0}. {1}", this.endpointName, ex.ToString());
            StopWebServer();
            throw;
        }
    }

    public Task CloseAsync(CancellationToken cancellationToken)
    {
        this.eventSource.Message("Closing web server on endpoint {0}", this.endpointName);

        this.StopWebServer();

        return Task.FromResult(true);
    }

    public void Abort()
    {
        this.eventSource.Message("Aborting web server on endpoint {0}", this.endpointName);

        this.StopWebServer();
    }

    private void StopWebServer()
    {
        if (this.webApp != null)
        {
            try
            {
                this.webApp.Dispose();
            }
            catch (ObjectDisposedException)
            {
                // no-op
            }
        }
    }
}

最后,我将无状态服务代码中的CreateServiceInstanceListeners方法更改为:

And then finally I changed the CreateServiceInstanceListeners method in my stateless service code to:

protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        return new[]
        {
            new ServiceInstanceListener(serviceContext => new OwinCommunicationListener(Startup.ConfigureApp, serviceContext, ServiceEventSource.Current, "ServiceEndpoint"))
        };
    }

这篇关于在Azure Service Fabric服务中设置SignalR和Service Bus的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 08:13