我已经阅读了有关Kinesis Streaming(实时数据处理)的Amazon Web Services(AWS)的文档。但是我对如何实现Kinesis Streaming感到困惑。

  • 在项目中为iOS安装了SDK。
  • 具有区域和池ID的Cognito凭证的书面代码。
  • 使用AWSServiceConfiguration成功完成。
  • 在viewcontroller中,我编写了代码以将数据保存到
    AWSKinesisRecorder流。

  • 但是我如何知道数据已成功保存到Stream中?如何放置打印日志?

    我想在控制台中显示“成功保存数据”。
    AppDelegate Code:
    
    
     func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplicationLaunchOptionsKey: Any]?) -> Bool {
            AWSLogger.default().logLevel = .verbose
    
            let CognitoPoolID = "ap-northeast-1:430840dc-4df5-469f"
            let Region = AWSRegionType.APNortheast1
            let credentialsProvider = AWSCognitoCredentialsProvider(regionType:Region,identityPoolId:CognitoPoolID)
    
            let configuration = AWSServiceConfiguration(region:Region, credentialsProvider:credentialsProvider)
            AWSServiceManager.default().defaultServiceConfiguration = configuration
    
            return true
        }
    

    ViewController代码:导入UIKit导入AWSKinesis

    类ViewController:UIViewController {
    var kinesisRecorder : AWSKinesisRecorder! = nil
    
    override func viewDidLoad() {
        super.viewDidLoad()
    
        //use AWSKinesisRecorder with Amazon Kinesis. The following snippet returns a shared instance of the Amazon Kinesis service client
         kinesisRecorder = AWSKinesisRecorder.default()
    
    
        configureAWSKinesisRecorder()
        saveDataInStream()
    }
    
    override func didReceiveMemoryWarning() {
        super.didReceiveMemoryWarning()
    }
    
    /*
     * Method to configure perties of AWSKinesisRecorder
     */
    func configureAWSKinesisRecorder()  {
        //The diskAgeLimit property sets the expiration for cached requests. When a request exceeds the limit, it's discarded. The default is no age limit.
    
        kinesisRecorder.diskAgeLimit = TimeInterval(30 * 24 * 60 * 60); // 30 days
    
    
        //The diskByteLimit property holds the limit of the disk cache size in bytes. If the storage limit is exceeded, older requests are discarded. The default value is 5 MB. Setting the value to 0 means that there's no practical limit.
    
        kinesisRecorder.diskByteLimit = UInt(10 * 1024 * 1024); // 10MB
    
    
        //The notficationByteThreshold property sets the point beyond which Kinesis issues a notification that the byte threshold has been reached. The default value is 0, meaning that by default Amazon Kinesis doesn't post the notification.
    
        kinesisRecorder.notificationByteThreshold = UInt(5 * 1024 * 1024); // 5MB
    
    }
    
    /*
     * Method to save real time data in AWS stream
     */
    func saveDataInStream()  {
    
    
        //Both saveRecord and submitAllRecords are asynchronous operations, so you should ensure that saveRecord is complete before you invoke submitAllRecords.
    
        // Create an array to store a batch of objects.
        var tasks = Array<AWSTask<AnyObject>>()
        for i in 0...5 {
            //create an NSData object .
            //StreamName should be a string corresponding to the name of your Kinesis stream.
            //save it locally in kinesisRecorder instances
    
            tasks.append(kinesisRecorder!.saveRecord(String(format: "TestString-%02d", i).data(using: .utf8), streamName: "my_stream")!)
        }
    
        ////submitAllRecords sends all locally saved requests to the Amazon Kinesis service.
        AWSTask(forCompletionOfAllTasks: tasks).continueOnSuccessWith(block: { (task:AWSTask<AnyObject>) -> AWSTask<AnyObject>? in
            return self.kinesisRecorder?.submitAllRecords()
        }).continueWith(block: { (task:AWSTask<AnyObject>) -> Any? in
            if let error = task.error as? NSError {
                print("Error: \(error)")
            }
            return nil
        }
    
        )
    
    
    
    } }
    

    最佳答案

    您可以直接使用AWSKinesis而不是AWSKinesisRecorder
    putRecords方法(http://docs.aws.amazon.com/AWSiOSSDK/latest/Classes/AWSKinesis.html#//api/name/putRecords:completionHandler :)将为您显式地将记录写入流中,而不是抽象记录器中的记录保存。

    该方法为您提供了一个completionHandler块,您可以使用它来准确查看Kinesis响应的内容,无论是成功的记录ID还是错误代码。从那里,您可以登录到控制台或对响应执行其他操作。需要注意的是,您需要手动批处理记录才能调用putRecords。您可以使用用户默认值或领域在本地存储事件,然后按计划将它们刷新到Kinesis。这是录音机免费为您提供的。

    或者,如果您只想查看事件正在成功通过Kinesis流,则可以将Kinesis Firehose流连接到Kinesis流,可以将其配置为将事件数据传递到S3,Amazon ElasticSearch或Amazon Redshift: https://aws.amazon.com/kinesis/data-firehose/

    关于ios - 如何在iOS App中实现Amazon Kinesis Streaming以实时处理数据,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42973495/

    10-11 11:03