问题描述
我正在尝试实现类似于此 python 示例但在 .net 中的应用同步订阅 https://aws.amazon.com/blogs/mobile/appsync-websockets-python/
I am trying to implement an app sync subscription similar to this python example but in .net https://aws.amazon.com/blogs/mobile/appsync-websockets-python/
我开始使用 nuget 包 GraphQL.Client https://www.nuget.org/packages/GraphQL.ClientQuery/Mutation 的执行工作正常,就像 https://github.com 的自述文件中给出的那样/graphql-dotnet/graphql-client但是订阅不起作用.
I started this using the nuget package GraphQL.Client https://www.nuget.org/packages/GraphQL.ClientThe execution of Query/Mutation is working fine like given in the readme of https://github.com/graphql-dotnet/graphql-clientBut subscription is not working.
我使用 GraphQL.Client 的代码:
using var graphQLClient = new GraphQLHttpClient("https://<MY-API-PATH>.appsync-realtime-api.<AWS-region>.amazonaws.com/graphql", new NewtonsoftJsonSerializer());
graphQLClient.HttpClient.DefaultRequestHeaders.Add("host", "<API HOST without https or absolute path and 'realtime-' text in the api address>"); //As given in the python example
graphQLClient.HttpClient.DefaultRequestHeaders.Add("x-api-key", "<API KEY>");
var req= new GraphQLRequest
{
Query = @"subscription SubscribeToEventComments{ subscribeToEventComments(eventId: 'test'){ content }}",
Variables = new{}
};
IObservable<GraphQLResponse<Response>> subscriptionStream = graphQLClient.CreateSubscriptionStream<Response>(req, (Exception ex) =>
{
Console.WriteLine("Error: {0}", ex.ToString());
});
var subscription = subscriptionStream.Subscribe(response =>
{
Console.WriteLine($"Response'{Newtonsoft.Json.JsonConvert.SerializeObject(response)}' ");
},
ex =>
{
Console.WriteLine("Error{0}", ex.ToString());
});
它给出了异常远程方在没有完成关闭握手的情况下关闭了 WebSocket 连接."
Its giving the exception "The remote party closed the WebSocket connection without completing the close handshake."
堆栈跟踪:
在 System.Net.WebSockets.ManagedWebSocket.d__662.MoveNext()在 System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)在 System.Runtime.CompilerServices.TaskAwaiter
1.GetResult()在 GraphQL.Client.Http.Websocket.GraphQLHttpWebSocket.d__40.MoveNext() 在 C:UsersUserNameSourceeposgraphql-clientsrcGraphQL.ClientWebsocketGraphQLHttpWebSocket.cs:line 546
at System.Net.WebSockets.ManagedWebSocket.d__662.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter
1.GetResult() at GraphQL.Client.Http.Websocket.GraphQLHttpWebSocket.d__40.MoveNext() in C:UsersUserNameSourceeposgraphql-clientsrcGraphQL.ClientWebsocketGraphQLHttpWebSocket.cs:line 546
然后我尝试不使用这个 nuget 并使用标准的 websocket
没有nuget的代码:
static public async Task CallWebsocket()
{
try
{
_client = new ClientWebSocket();
_client.Options.AddSubProtocol("graphql-ws");
_client.Options.SetRequestHeader("host", "<HOST URL without wss but now with 'realtime' text in api url because otherwise we are getting SSL error>");
_client.Options.SetRequestHeader("x-api-key", "<API KEY>");
await _client.ConnectAsync(new Uri("https://<MY-APPSYNC_API_PATH>.appsync-realtime-api.<AWS-region>.amazonaws.com/graphql"), CancellationToken.None);
await SendCommand();
var docList = await Receive();
}
catch(Exception ex)
{
}
}
static private async Task SendCommand()
{
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes("'query' : 'subscription SubscribeToEventComments{ subscribeToEventComments(eventId: 'test'){ content }}'"));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task<string> Receive()
{
var receiveBufferSize = 1536;
byte[] buffer = new byte[receiveBufferSize];
var result = await _client.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
var resultJson = (new UTF8Encoding()).GetString(buffer);
return resultJson;
}
我遇到了以下异常:
内部异常:已建立的连接被主机中的软件中止."
Inner exception: "An established connection was aborted by the software in your host machine."
内部异常消息:无法从传输连接读取数据:已建立的连接被主机中的软件中止.."
Inner exception message: "Unable to read data from the transport connection: An established connection was aborted by the software in your host machine.."
消息:远程方在没有完成关闭握手的情况下关闭了 WebSocket 连接."
Message: "The remote party closed the WebSocket connection without completing the close handshake."
任何人都可以帮助正确实施.
Could anyone please help with the correct implementation.
推荐答案
Nuget 无法在 AppSync 订阅中开箱即用,因此您需要为此编写自己的客户端代码,就像您在第二个 (非nuget)示例.
Nuget won't work out of the box with AppSync subscriptions, so you will need to write your own client code for that, like you attempted in the second (non-nuget) example.
现在,对于第二个示例,再看一下 python您的问题中引用的示例.有几个步骤未包含在您的代码中.我将列举所需的步骤并尝试将它们从 python 代码移植到 C#(请注意,我手头没有 C# 环境,因此可能存在语法错误,但此代码应该非常接近您的需要)
Now, for the second example, take a second look at the python example referenced in your question. There are several steps that are not included in your code. I will enumerate the required steps and try to port them to C# from the python code (note that I don't have a C# environment at hand so there might be syntax errors, but this code should be pretty close to what you need)
假设为您的 API 调用 aws appsync get-graphql-api --api-id example123456
的结果是:
Assume the result of invoking aws appsync get-graphql-api --api-id example123456
for your API is:
{
"graphqlApi": {
"name": "myNewRealTimeGraphQL-API",
"authenticationType": "<API_KEY>",
"tags": {},
"apiId": "example123456",
"uris": {
"GRAPHQL": "https://abc.appsync-api.us-west-2.amazonaws.com/graphql",
"REALTIME": "wss://abc.appsync-realtime-api.us-west-2.amazonaws.com/graphql"
},
"arn": "arn:aws:appsync:us-west-2: xxxxxxxxxxxx:apis/xxxxxxxxxxxx"
}
}
步骤 1 - 构建连接 URL
第 2 步 - 连接到 WebSocket 端点
这包括按照 python 文章中提到的协议发送 connection_init 消息
Step 1 - Build the connection URL
Step 2 - Connect to WebSocket Endpoint
This includes sending a connection_init message as per the protocol mentioned in the python article
同样,这是按照协议
此步骤不在此响应中,但可以通过 AWS 控制台完成
This step is not in this response, but can be done through the AWS console
这些是 AppSync 发送的实时事件
These are the real-time events sent by AppSync
// These are declared at the same level as your _client
// This comes from the graphqlApi.uris.GRAPHQL in step 0, set as a var here for clarity
_gqlHost = "abc.appsync-api.us-west-2.amazonaws.com";
// This comes from the graphqlApi.uris.REALTIME in step 0, set as a var here for clarity
_realtimeUri = "wss://abc.appsync-realtime-api.us-west-2.amazonaws.com/graphql";
_apiKey = "<API KEY>";
static public async Task CallWebsocket()
{
// Step 1
// This is JSON needed by the server, it will be converted to base64
// (note: might be better to use something like Json.NET for this task)
var header = var test = $@"{{
""host"":""{_gqlHost}"",
""x-api-key"": ""{_apiKey}""
}}";
// Now we need to encode the previous JSON to base64
var headerB64 = System.Convert.ToBase64String(
System.Text.Encoding.UTF8.GetBytes(header));
UriBuilder connectionUriBuilder = new UriBuilder(_realtimeUri);
connectionUriBuilder.Query = $"header={headerB64}&payload=e30=";
try
{
_client = new ClientWebSocket();
_client.Options.AddSubProtocol("graphql-ws");
// Step 2
await _client.ConnectAsync(connectionUriBuilder.Uri), CancellationToken.None);
// Step 3
await SendConnectionInit();
await Receive();
}
catch(Exception ex)
{
}
}
static private async Task SendConnectionInit()
{
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(@"{""type"": ""connection_init""}"));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task SendSubscription()
{
// This detail is important, note that the subscription is a stringified JSON that will be embeded in the "data" field below
var subscription = $@"{{""query"": ""subscription SubscribeToEventComments{{ subscribeToEventComments{{ content }} }}"", ""variables"": {{}} }}";
var register = $@"{{
""id"": ""<SUB_ID>"",
""payload"": {{
""data"": ""{subscription}"",
""extensions"": {{
""authorization"": {{
""host"": ""{_gqlHost}"",
""x-api-key"":""{_apiKey}""
}}
}}
}},
""type"": ""start""
}}";
// The output should look like below, note again the "data" field contains a stringified JSON that represents the subscription
/*
{
"id": "<SUB_ID>",
"payload": {
"data": "{"query": "subscription SubscribeToEventComments{ subscribeToEventComments{ content}}", "variables": {} }",
"extensions": {
"authorization": {
"host": "abc.appsync-api.us-west-2.amazonaws.com",
"x-api-key":"<API KEY>"
}
}
},
"type": "start"
}
*/
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(register));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task Deregister()
{
var deregister = $@"{{
""type"": ""stop"",
""id"": ""<SUB_ID>""
}}"
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(deregister));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task Receive()
{
while (_socket.State == WebSocketState.Open)
{
ArraySegment<Byte> buffer = new ArraySegment<byte>(new Byte[8192]);
WebSocketReceiveResult result= null;
using (var ms = new MemoryStream())
{
// This loop is needed because the server might send chunks of data that need to be assembled by the client
// see: https://stackoverflow.com/questions/23773407/a-websockets-receiveasync-method-does-not-await-the-entire-message
do
{
result = await socket.ReceiveAsync(buffer, CancellationToken.None);
ms.Write(buffer.Array, buffer.Offset, result.Count);
}
while (!result.EndOfMessage);
ms.Seek(0, SeekOrigin.Begin);
using (var reader = new StreamReader(ms, Encoding.UTF8))
{
// convert stream to string
var message = reader.ReadToEnd();
Console.WriteLine(message)
// quick and dirty way to check response
if (message.Contains("connection_ack"))
{
// Step 4
await SendSubscription();
} else if (message.Contains("data")) // Step 6
{
// Step 7
await Deregister();
// Step 8
await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}
}
}
}
}
这篇关于在 .Net 中使用 GraphQL 客户端库的 AWS Appsync 实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!