如何增加Cassandra的数据流读取并行性

如何增加Cassandra的数据流读取并行性

本文介绍了如何增加Cassandra的数据流读取并行性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 我试图从Cassandra向BigQuery输出大量数据(2 TB,30kkk行)。我所有的基础设施都在GCP上。我的Cassandra群集有4个节点(每个4个vCPU,26 GB内存,2000 GB PD(HDD))。群集中有一个种子节点。在写入BQ之前,我需要转换数据,因此我正在使用Dataflow。工人类型是 n1-highmem-2 。工人和Cassandra实例位于同一个区域 europe-west1-c 。我对卡桑德拉的限制: 部分管道负责阅读转换的代码位于 负载均衡 当我设置 - numWorkers = 15 时,读取速率不会增加2名工作人员与Cassandra进行通信(我可以从 iftop >中告诉它,只有这些工人的CPU负载约为60%)。 p> 与此同时,Cassandra节点没有很多负载(CPU使用率为20-30%)。种子节点的网络和磁盘使用率比别人高2倍左右,但不是太高,我认为: 种子节点: 管道启动警告 我有一些警告当管道启动时: 警告:源代码的大小估计失败: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63 com.datastax.driver.core.exceptions.NoHostAvailableException:所有主机( s)尝试查询失败(尝试:/10.132.9.101:9042(com.datastax.driver.core.exceptions.TransportException:[/10.132.9.101:9042]无法连接),/10.132.9.102:9042(com.datastax .driver.core.exceptions.TransportException:[/10.132.9.102:9042]无法连接),/10.132.9.103:9042(com.datastax.driver.core.exceptions.TransportException:[/10.132.9.103:9042]无法连接),/10.132.9.104:9042 [只显示前3个主机的错误,更多细节请使用getErrors()]) 我的Cassandra集群位于GCE本地网络中,它接近某些查询是从本地计算机创建的,并且无法访问集群(我正在使用Dataflow Eclipse插件启动管道,如这里)。这些查询是关于表的大小估计。我可以手动指定尺寸估算还是从GCE实例启动管线?或者我可以忽略这些警告?它对读取速度有影响吗? 我尝试从GCE虚拟机启动管道。没有更多的连接问题。我的表中没有varchar列,但我得到了这样的警告(datastax驱动程序[varchar< - > java.lang.Long]中没有编解码器)。 : 警告:无法估计大小 com.datastax.driver.core.exceptions.CodecNotFoundException:编解码器找不到请求的操作:[varchar< - > java.lang.Long] at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:741) at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java: 588) at com.datastax.driver.core.CodecRegistry.access $ 500(CodecRegistry.java:137) at com.datastax.driver.core.CodecRegistry $ TypeCodecCacheLoader.load(CodecRegistry.java:246 ) at com.datastax.driver.core.CodecRegistry $ TypeCodecCacheLoader.load(CodecRegistry.java:232) at com.google.common.cache.LocalCache $ LoadingValueReference.loadFuture(LocalCache.java:3628 )在com.google.common.cache.LocalCache $ Segment.loadSync(LocalCache.java:2336)在com.google.common.cache.LocalCache $ Segment.lockedGetOrLoad(LocalCache.java:2295 ) at com.google.common.cache.LocalCache $ Segment.get(LocalCache.java:2208) at com.google.common.cache.LocalCache.get(LocalCache.java:4053),位于com.google.common.cache.LocalCache .getOrLoad(LocalCache.java:4057) at com.google.common.cache.LocalCache $ LocalLoadingCache.get(LocalCache.java:4986) at com.datastax.driver.core.CodecRegistry.lookupCodec (CodecRegistry.java:522) at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:485) at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java :467)在com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:69 )在com.datastax.driver.core.AbstractGettableByIndexData.getLong(AbstractGettableByIndexData.java:152) at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:26) at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:95) at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getTokenRanges(CassandraServiceImpl.java:279) at org.apache.beam.sdk.io.cassandra.CassandraS erviceImpl.getEstimatedSizeBytes(CassandraServiceImpl.java:135)维持在org.apache.beam org.apache.beam.sdk.io.cassandra.CassandraIO $ CassandraSource.getEstimatedSizeBytes(CassandraIO.java:308) 。 runners.direct.BoundedReadEvaluatorFactory $ BoundedReadEvaluator.startDynamicSplitThread(BoundedReadEvaluatorFactory.java:166) at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory $ BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:142) at org。 apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146) at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110) at java。 util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)$ b $ java.util.concurrent.FutureTask.run(FutureTask.java:266)在java.util.concurrent.ThreadPoolExecutor。 runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPo olExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 流水线读取代码 //从Cassandra表读取数据 PCollection<模型> pcollection = p.apply(CassandraIO。< Model> read() .withHosts(Arrays.asList(10.10.10.101,10.10.10.102,10.10.10.103,10.10.10.104) ).withPort(9042) .withKeyspace(keyspaceName).withTable(tableName) .withEntity(Model.class).withCoder(SerializableCoder.of(Model.class)) .withConsistencyLevel CASSA_CONSISTENCY_LEVEL)); //将pcollection转换为KV PCollection by rowName PCollection< KV< Long,Model>> pcollection_by_rowName = pcollection .apply(ParDo.of(new DoFn< Model,KV< Long,Model>>(){ @ProcessElement public void processElement(ProcessContext c){ c.output(KV.of(c.element()。rowName,c.element())); } })); 拆分次数(Stackdriver log) I分割数为1 W分割数小于0(0) ,回退到1 I分离次数为1 W分离次数小于0(0),回退到1 I分离次数为1 我试过的东西 无影响: 将读取一致性级别设置为1 nodetool setstreamthroughput 1000 , nodetool setinterdcstreamthroughput 1000 增加Cassandra读取并发性(在 cassandra.yaml ): concurrent_reads:32 设置不同数量的工作人员1-40。 一些影响: 1.我建议将numSplits = 10设为@jkff。现在我可以在日志中看到: 我Murmur3Partitioner检测到,拆分 W无法估计大小 W无法估计大小 W分割次数小于0(0),回退至10 I分割次数为10 W分割次数小于0( 0),回退到10 I分割的数量是10 I分割源org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@6d83ee93产生了10个包,总序列化响应大小为20799 I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@25d02f5c生成了10个包,其中包含总序列化响应大小19359 我分裂源[0,1)生成了1个包含序列化总数的包响应大小1091 我Murmur3Partitioner检测到,拆分 W无法估计大小我拆分源代码[0,0)产生了0个包含序列化响应总大小的包76 W数的分裂小于0(0),回落到10 I拆分数量为10 我拆分源代码org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@2661dcf3生成了10个包,其序列化响应总量为18527 但是我又遇到了另一个异常: java .io.IOException:无法从源代码读取:org.apache.beam.sdk.io.cassandra.Cassandra ... (5d6339652002918d):java.io.IOException:无法从源代码开始读取:org .apache.beam.sdk.io.cassandra.CassandraIO $ CassandraSource @ 5f18c296 at com.google.cloud.dataflow.worker.WorkerCustomSources $ BoundedReaderIterator.start(WorkerCustomSources.java:582) at com。 google.cloud.dataflow.worker.util.common.worker.ReadOperation $ SynchronizedReaderIterator.start(ReadOperation.java:347),位于com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:183) com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperat ion.java:148),位于com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68) com.google.cloud.dataflow。 worker.DataflowWorker.executeWork(DataflowWorker.java:336),位于com.google.cloud.dataflow处,位于com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294)处。 worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness $ WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) com.google.cloud。 dataflow.worker.DataflowBatchWorkerHarness $ WorkerThread.call(DataflowBatchWorkerHarness.java:115)在com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness $ WorkerThread.call(DataflowBatchWorkerHarness.java:102)在java。 util.concurrent.FutureTask.run(FutureTask.java:266)位于java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)位于java.util.concurrent.Thre adPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)$ b $在java.lang.Thread.run(Thread.java:745)引起:com.datastax.driver.core.exceptions.SyntaxError:行1:53不匹配的字符'p'在com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)处获得'$' $ b $ at com.datastax.driver.core .exceptions.SyntaxError.copy(SyntaxError.java:24) at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) at com.datastax.driver.core.DefaultResultSetFuture .getUninterruptibly(DefaultResultSetFuture.java:245) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68) at com.datastax.driver.core.AbstractSession.execute(AbstractSession .java:43) at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl $ CassandraReaderImpl.start(CassandraServiceImpl.java:80) at com.google.cloud.dataflow.worker.WorkerCustomSources $ BoundedReaderIterator.start(WorkerC ustomSources.java:579) ... 14 more 由com.datastax.driver.core.exceptions.SyntaxError引起:行1:53不匹配字符'p'期待'$' at com.datastax.driver.core.Responses $ Error.asException(Responses.java:144) at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179) at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186) at com.datastax.driver.core.RequestHandler.access $ 2500(RequestHandler.java:50) at com.datastax .driver.core.RequestHandler $ SpeculativeExecution.setFinalResult(RequestHandler.java:817)在com.datastax.driver.core.RequestHandler $ SpeculativeExecution.onSet(RequestHandler.java:651)在com.datastax .driver.core.Connection $ Dispatcher.channelRead0(Connection.java:1077) at com.datastax.driver.core.Connection $ Dispatcher.channelRead0(Connection.java:1000) at io.netty .channel.SimpleChannelInboundHandler.channelR EAD(SimpleChannelInboundHandler.java:105)在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)在io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341) 在io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)在IO .netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead (AbstractChannelHandlerContext.java:341)在io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)在io.netty.channel.AbstractChannelHandlerContext.inv okeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)在io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)在io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) 在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext .fireChannelRead(AbstractChannelHandlerContext.java:341)在io.netty.channel.DefaultChannelPipeline $ HeadContext.channelRead(DefaultChannelPipeline.java:1334)在io.netty.channel.AbstractChannelHandlerC ontext.invokeChannelRead(AbstractChannelHandlerContext.java:363)在io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)在io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:在io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:129)在io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642 926))在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)在io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479) 在io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)$ b $处IO io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:858) b。 netty.util.concurrent.DefaultThreadFactory $ DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ... 1 more 也许有一个错误: CassandraServiceImpl.java#L220 CassandraServiceImpl.java#L207 v完成CassandraIO代码 正如@jkff所提出的,我已经以我需要的方式更改了CassandraIO: @VisibleForTesting protected List< BoundedSource< T>> split(CassandraIO.Read< T> spec, long desiredBundleSizeBytes, long estimatedSizeBytes){ long numSplits = 1; 列出< BoundedSource< T>> sourceList = new ArrayList<>(); if(desiredBundleSizeBytes> 0){ numSplits = estimatedSizeBytes / desiredBundleSizeBytes; if(numSplits< = 0){ LOG.warn(分割数小于0({}),回退到10,numSplits); numSplits = 10; } LOG.info(分割数是{},numSplits); Long startRange = MIN_TOKEN; Long endRange = MAX_TOKEN; 长startToken,endToken; String pk =$ pk; switch(spec.table()){ casetable1: pk =table1_pk; 休息; casetable2: casetable3: pk =table23_pk; 休息; } endToken = startRange; Long incrementValue = endRange / numSplits - startRange / numSplits; String splitQuery; if(numSplits == 1){ //我们在(spec.keyspace(),spec.table())中有一个唯一的分割 splitQuery = QueryBuilder.select()。的toString(); sourceList.add(new CassandraIO.CassandraSource< T>(spec,splitQuery)); } else { //我们有多个split b $ b for(int i = 0; i< numSplits; i ++){ startToken = endToken; endToken = startToken + incrementValue; Select(Select)Where builder = QueryBuilder.select()。from(spec.keyspace(),spec.table())。 if(i> 0){ builder = builder.and(QueryBuilder.gte(token(+ pk +),startToken)); $($ {} if(i<(numSplits - 1)){ builder = builder.and(QueryBuilder.lt(token(+ pk +),endToken)); } sourceList.add(new CassandraIO.CassandraSource(spec,builder.toString())); } } return sourceList; } 解决方案我认为这应该分类作为CassandraIO中的一个错误。我提交了 BEAM-3424 。您可以尝试构建自己的Beam版本,将默认值1更改为100或类似的内容,而此问题正在解决。 我还提交了 BEAM-3425 的大小估算错误。 I am trying to export a lot of data (2 TB, 30kkk rows) from Cassandra to BigQuery. All my infrastructure is on GCP. My Cassandra cluster have 4 nodes (4 vCPUs, 26 GB memory, 2000 GB PD (HDD) each). There is one seed node in the cluster. I need to transform my data before writing to BQ, so I am using Dataflow. Worker type is n1-highmem-2. Workers and Cassandra instances are at the same zone europe-west1-c. My limits for Cassandra:Part of my pipeline code responsible for reading transform is located here.AutoscalingThe problem is that when I don't set --numWorkers, the autoscaling set number of workers in such manner (2 workers average):Load balancingWhen I set --numWorkers=15 the rate of reading doesn't increase and only 2 workers communicate with Cassandra (I can tell it from iftop and only these workers have CPU load ~60%).At the same time Cassandra nodes don't have a lot of load (CPU usage 20-30%). Network and disk usage of the seed node is about 2 times higher than others, but not too high, I think:And for the not seed node here:Pipeline launch warningsI have some warnings when pipeline is launching:WARNING: Size estimation of the source failed: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.132.9.101:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.101:9042] Cannot connect), /10.132.9.102:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.102:9042] Cannot connect), /10.132.9.103:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.103:9042] Cannot connect), /10.132.9.104:9042 [only showing errors of first 3 hosts, use getErrors() for more details])My Cassandra cluster is in GCE local network and it seams that some queries are made from my local machine and cannot reach the cluster (I am launching pipeline with Dataflow Eclipse plugin as described here). These queries are about size estimation of tables. Can I specify size estimation by hand or launch pipline from GCE instance? Or can I ignore these warnings? Does it have effect on rate of read?I'v tried to launch pipeline from GCE VM. There is no more problem with connectivity. I don't have varchar columns in my tables but I get such warnings (no codec in datastax driver [varchar <-> java.lang.Long]). :WARNING: Can't estimate the sizecom.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> java.lang.Long] at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:741) at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:588) at com.datastax.driver.core.CodecRegistry.access$500(CodecRegistry.java:137) at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:246) at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:232) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208) at com.google.common.cache.LocalCache.get(LocalCache.java:4053) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986) at com.datastax.driver.core.CodecRegistry.lookupCodec(CodecRegistry.java:522) at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:485) at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:467) at com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:69) at com.datastax.driver.core.AbstractGettableByIndexData.getLong(AbstractGettableByIndexData.java:152) at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:26) at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:95) at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getTokenRanges(CassandraServiceImpl.java:279) at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getEstimatedSizeBytes(CassandraServiceImpl.java:135) at org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource.getEstimatedSizeBytes(CassandraIO.java:308) at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.startDynamicSplitThread(BoundedReadEvaluatorFactory.java:166) at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:142) at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146) at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Pipeline read code// Read data from Cassandra tablePCollection<Model> pcollection = p.apply(CassandraIO.<Model>read() .withHosts(Arrays.asList("10.10.10.101", "10.10.10.102", "10.10.10.103", "10.10.10.104")).withPort(9042) .withKeyspace(keyspaceName).withTable(tableName) .withEntity(Model.class).withCoder(SerializableCoder.of(Model.class)) .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));// Transform pcollection to KV PCollection by rowNamePCollection<KV<Long, Model>> pcollection_by_rowName = pcollection .apply(ParDo.of(new DoFn<Model, KV<Long, Model>>() { @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element().rowName, c.element())); } }));Number of splits (Stackdriver log)W Number of splits is less than 0 (0), fallback to 1 I Number of splits is 1 W Number of splits is less than 0 (0), fallback to 1 I Number of splits is 1 W Number of splits is less than 0 (0), fallback to 1 I Number of splits is 1 What I'v triedNo effect:set read consistency level to ONEnodetool setstreamthroughput 1000, nodetool setinterdcstreamthroughput 1000increase Cassandra read concurrency (in cassandra.yaml): concurrent_reads: 32setting different number of workers 1-40.Some effect: 1. I'v set numSplits = 10 as @jkff proposed. Now I can see in logs:I Murmur3Partitioner detected, splitting W Can't estimate the size W Can't estimate the size W Number of splits is less than 0 (0), fallback to 10 I Number of splits is 10 W Number of splits is less than 0 (0), fallback to 10 I Number of splits is 10 I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@6d83ee93 produced 10 bundles with total serialized response size 20799 I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@25d02f5c produced 10 bundles with total serialized response size 19359 I Splitting source [0, 1) produced 1 bundles with total serialized response size 1091 I Murmur3Partitioner detected, splitting W Can't estimate the size I Splitting source [0, 0) produced 0 bundles with total serialized response size 76 W Number of splits is less than 0 (0), fallback to 10 I Number of splits is 10 I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@2661dcf3 produced 10 bundles with total serialized response size 18527 But I'v got another exception:java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.Cassandra...(5d6339652002918d): java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@5f18c296 at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:582) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:347) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:183) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148) at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68) at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336) at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294) at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58) at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24) at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:43) at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl$CassandraReaderImpl.start(CassandraServiceImpl.java:80) at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:579) ... 14 moreCaused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$' at com.datastax.driver.core.Responses$Error.asException(Responses.java:144) at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179) at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186) at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:50) at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:817) at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:651) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1077) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1000) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ... 1 moreMaybe there is a mistake: CassandraServiceImpl.java#L220And this statement looks like mistype: CassandraServiceImpl.java#L207Changes I'v done to CassandraIO codeAs @jkff proposed, I've change CassandraIO in the way I needed:@VisibleForTestingprotected List<BoundedSource<T>> split(CassandraIO.Read<T> spec, long desiredBundleSizeBytes, long estimatedSizeBytes) { long numSplits = 1; List<BoundedSource<T>> sourceList = new ArrayList<>(); if (desiredBundleSizeBytes > 0) { numSplits = estimatedSizeBytes / desiredBundleSizeBytes; } if (numSplits <= 0) { LOG.warn("Number of splits is less than 0 ({}), fallback to 10", numSplits); numSplits = 10; } LOG.info("Number of splits is {}", numSplits); Long startRange = MIN_TOKEN; Long endRange = MAX_TOKEN; Long startToken, endToken; String pk = "$pk"; switch (spec.table()) { case "table1": pk = "table1_pk"; break; case "table2": case "table3": pk = "table23_pk"; break; } endToken = startRange; Long incrementValue = endRange / numSplits - startRange / numSplits; String splitQuery; if (numSplits == 1) { // we have an unique split splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString(); sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery)); } else { // we have more than one split for (int i = 0; i < numSplits; i++) { startToken = endToken; endToken = startToken + incrementValue; Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where(); if (i > 0) { builder = builder.and(QueryBuilder.gte("token(" + pk + ")", startToken)); } if (i < (numSplits - 1)) { builder = builder.and(QueryBuilder.lt("token(" + pk + ")", endToken)); } sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString())); } } return sourceList;} 解决方案 I think this should be classified as a bug in CassandraIO. I filed BEAM-3424. You can try building your own version of Beam with that default of 1 changed to 100 or something like that, while this issue is being fixed.I also filed BEAM-3425 for the bug during size estimation. 这篇关于如何增加Cassandra的数据流读取并行性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!
09-21 02:42