我有一个应用程序,用户可以在其中投票。
我希望我的应用程序能够扩展,因此我决定使用Cloud Dataflow汇总存储在Firestore中的计数器。
我已经设置了流类型的Dataflow作业,因此无论用户何时投票,它都可以侦听pubsub主题。
有时我一天有成千上万的用户输入,有时我有几百...有什么解决方案可以在一段时间内没有收到pubsub消息时“暂停”该作业?
目前,我的数据流工作一直在运行,恐怕这会花我很多钱。
如果有人可以帮助我了解流媒体工作的计费方式,我们将不胜感激
这是我的Python管道:
def run(argv=None):
# Config
parser = argparse.ArgumentParser()
# Output PubSub Topic
parser.add_argument(
'--output_topic', required=True)
# Input PubSub Topic
parser.add_argument(
'--input_topic', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
# Pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
# Pipeline process
with beam.Pipeline(options=pipeline_options) as p:
# Counting votes
def count_votes(contestant_votes):
(contestant, votes) = contestant_votes
return (contestant, sum(votes))
# Format data to a fake object (used to be parsed by the CF)
def format_result(contestant_votes):
(contestant, votes) = contestant_votes
return '{ "contestant": %s, "votes": %d }' % (contestant, votes)
transformed = (p
| 'Receive PubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
.with_output_types(bytes)
| 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'Pair with one' >> beam.Map(lambda x: (x, 1))
| 'Apply window of time' >> beam.WindowInto(window.FixedWindows(30, 0))
| 'Group by contestant' >> beam.GroupByKey()
| 'Count votes' >> beam.Map(count_votes)
| 'Format to fake object string' >> beam.Map(format_result)
| 'Transform to PubSub base64 string' >> beam.Map(lambda x: x.encode('utf-8'))
.with_output_types(bytes))
# Trigger a the output PubSub topic with the message payload
transformed | beam.io.WriteToPubSub(known_args.output_topic)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
最佳答案
要回答您的成本问题:与您当前正在使用的工作人员一起,将花费您大约250美元(取决于您当月的PD使用情况)。
当前没有等待迫使数据流“空闲”或扩展为0个工作程序的等待。您可以拥有的最小值是1。
话虽这么说,您可以采取一些方法来尽量降低成本。
如果您的工作人员负担不大,并且您想要最简单的选择,则可以使用功能较弱的工作人员(n1-standard-1 [〜USD $ 77.06]或n1-standard-2 [〜USD $ 137.17])。 https://cloud.google.com/products/calculator/#id=3bbedf2f-8bfb-41db-9923-d3a5ef0c0250(如果您看到的是我使用照片中看到的430GB PD,则添加了所有3种版本)。
如果需要计算能力,可以切换到使用基于cron的数据流作业,如此处所述:https://cloud.google.com/blog/products/gcp/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions。这样,您可能应该阅读订阅而不是主题,以便您可以保留消息,直到开始工作为止。
关于python - 云数据流流传输,空闲时停止以省钱?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55667401/