我是python和aws的新手。我不太知道如何在stackoverflow中提问。

请不要阻止我。

我正在尝试发送 HTTP Post 请求以将记录放入 Amazon Kinesis Stream。

我在 kinesis 中创建了一个流 mystream。我使用方法帖子。

我尝试了以下链接来设置网关 api,它运行良好。

https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html

我正在尝试使用请求使用 python 代码来完成它。

但我收到以下提到的错误:

以下是我的代码:

import sys, os, base64, datetime, hashlib, hmac
import requests # pip install requests

# ************* REQUEST VALUES *************
method = 'POST'
service = 'kinesis'
host = 'kinesis.eu-west-1.amazonaws.com'
region = 'eu-west-1'
endpoint = 'https://kinesis.eu-west-1.amazonaws.com'

content_type = 'application/x-amz-json-1.1'
amz_target = 'Kinesis_20181114.PutRecord'
request_parameters = '{'
request_parameters += '"StreamName": mystream,'
request_parameters += '"Data":  + base64.b64encode(test) + ,'
request_parameters += '"PartitionKey": 1234 '
request_parameters += '}'


# Key derivation functions. See:
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-
examples.html#signature-v4-examples-python
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def getSignatureKey(key,datestamp,region,service):
kDate = sign(('AWS4' +key ).encode('utf-8'), datestamp)
kRegion = sign(kDate,region)
kService = sign(kRegion,service)
kSigning = sign(kService, 'aws4_request')
return kSigning

# Read AWS access key from env. variables or configuration file. Best
practice is NOT
# to embed credentials in code.
with open ('C:\\Users\\Connectm\\Desktop\\acesskeyid.txt') as f:
       contents = f.read().split('\n')
       access_key = contents[0]
       secret_key = contents[1]


 if access_key is None or secret_key is None:
 print('No access key is available.')
 sys.exit()

# Create a date for headers and the credential string
t = datetime.datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d') # Date w/o time, used in credential scope

canonical_uri = '/'

canonical_querystring = ''

canonical_headers = 'content-type:' + content_type + '\n' + 'host:' + host +
'\n' + 'x-amz-date:' + amzdate + '\n' + 'x-amz-target:' + amz_target + '\n'

signed_headers = 'content-type;host;x-amz-date;x-amz-target'

payload_hash = hashlib.sha256(request_parameters).hexdigest()

canonical_request = method + '\n' + canonical_uri + '\n' +
canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers +
'\n' + payload_hash

algorithm = 'AWS4-HMAC-SHA256'
credential_scope = datestamp + '/' + region + '/' + service + '/' +
'aws4_request'
 string_to_sign = algorithm + '\n' +  amzdate + '\n' +  credential_scope +
'\n' +  hashlib.sha256(canonical_request).hexdigest()

signing_key = getSignatureKey(secret_key, datestamp, region, service)

 signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'),
hashlib.sha256).hexdigest()

 authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' +
credential_scope + ', ' +  'SignedHeaders=' + signed_headers + ', ' +
'Signature=' + signature
print authorization_header;


headers = {'Content-Type':content_type,
       'X-Amz-Date':amzdate,
       'X-Amz-Target':amz_target,
       'Authorization':authorization_header}
 # ************* SEND THE REQUEST *************
print '\nBEGIN REQUEST++++++++++++++++++++++++++++++++++++'
print 'Request URL = ' + endpoint
r = requests.post(endpoint, data=request_parameters, headers=headers)
print '\nRESPONSE++++++++++++++++++++++++++++++++++++'
print 'Response code: %d\n' % r.status_code
print r.text

我收到以下错误

AWS4-HMAC-SHA256 凭证=AKIAI5C357A6YSKQFXEA/20181114/eu-west-
1/kinesis/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-
目标,
签名=1d7d463e77beaf86930806812188180db9cc7cff082663ad547f647a9c6d545a

开始请求++++++++++++++++++++++++++++++++++++
请求 URL = https://kinesis.eu-west-1.amazonaws.com

响应++++++++++++++++++++++++++++++++++++
响应代码:400

{"__type":"SerializationException"}

请有人解释我如何纠正上述错误?

代码是否连接到流? 是否有问题
数据序列化?

最佳答案

您获得 SerializationException 的事实意味着您的代码正在与 kinesis 对话,但您在 test 中提供的数据可能不是有效的 JSON。

那说:

我强烈建议不要自己做请求逻辑的事情,而是使用 AWS 的软件开发工具包 (SDK),称为 boto3。

import json

import boto3

kinesis = boto3.client("kinesis")

response = kinesis.put_record(
    StreamName="my-fancy-kinesis-stream",
    Data=json.dumps({
        'example': 'payload',
        'yay': 'data',
        'hello': 'world'
    }),
    PartitionKey="AdjustAsNeeded"
)
print response

这将使用您机器上的凭据(通过实例元数据或 ~/.aws/config)或环境变量实例化 kinesis 客户端。

然后它需要一个简单的字典并将其转储到数据的 JSON 字符串中。

关于分区键有很多话要说,你可以找到 here

另外,请查看 boto3 !

关于python - 使用python调用rest api将数据推送到kinesis,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53297024/

10-11 08:56