使用spark-notebook更新累积表。采用accumulo documentationaccumulo example code中指定的方法。以下是我逐字记录在笔记本中的内容以及答案:

val clientRqrdTble = new ClientOnRequiredTable
val bwConfig = new BatchWriterConfig
val batchWriter = connector.createBatchWriter("batchtestY", bwConfig);

clientRqrdTble:org.apache.accumulo.core.cli.ClientOnRequiredTable =
org.apache.accumulo.core.cli.ClientOnRequiredTable@6c6a18ed bwConfig:
org.apache.accumulo.core.client.BatchWriterConfig =
[maxMemory = 52428800,maxLatency = 120000,maxWriteThreads = 3,
timeout = 9223372036854775807] batchWriter:
org.apache.accumulo.core.client.BatchWriter =
org.apache.accumulo.core.client.impl.BatchWriterImpl@298aa736
val rowIdS = rddX2_first._1.split(" ")(0)

rowIdS:字符串= row_0736460000
val mutation = new Mutation(new Text(rowIdS))

突变:org.apache.accumulo.core.data.Mutation =
org.apache.accumulo.core.data.Mutation@0
mutation.put(
  new Text("foo"),
  new Text("1"),
  new ColumnVisibility("exampleVis"),
  new Value(new String("CHEWBACCA!").getBytes) )

java.lang.IllegalStateException:之后无法添加到变异
在序列化
org.apache.accumulo.core.data.Mutation.put(Mutation.java:168)位于
org.apache.accumulo.core.data.Mutation.put(Mutation.java:163)在
org.apache.accumulo.core.data.Mutation.put(Mutation.java:211)

我研究了the code,发现罪魁祸首是一个if-catch,它正在检查UnsynchronizedBuffer.Writer缓冲区是否为空。行号不会对齐,因为这是与1.6累积核心jar中的版本略有不同的版本-我已经看过两者,但在这种情况下区别不大。据我所知,对象是在执行该方法之前创建的,并且不会被转储。

因此我丢失了代码中的某些内容,或者其他内容出现了。你们中的任何人知道什么可能导致此行为吗?

更新一个

我已经使用scala控制台并通过直接的Java 1.8执行了以下代码。它在scala中失败,但在Java中未失败。我现在认为这是一个Accumulo问题。因此,我将打开一个故障单并更深入地探究源代码。如果我想出一个解决方案,我会在这里发布。

下面是Java形式的代码。这里有一些额外的东西,因为我想确保可以连接到使用accumulo batch writer示例创建的表:
import java.util.Map.Entry;

import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.client.*;
import org.apache.accumulo.core.client.mapred.*;
import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ClientOnRequiredTable.*;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configured.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;

public class App {

    public static void main( String[] args ) throws
                                            AccumuloException,
                                            AccumuloSecurityException,
                                            TableNotFoundException {
        // connect to accumulo using a scanner
        // print first ten rows of a given table
        String instanceNameS = "accumulo";
        String zooServersS = "localhost:2181";
        Instance instance = new ZooKeeperInstance(instanceNameS, zooServersS);
        Connector connector =
                instance.getConnector( "root", new PasswordToken("password"));

        Authorizations auths = new Authorizations("exampleVis");
        Scanner scanner = connector.createScanner("batchtestY", auths);

        scanner.setRange(new Range("row_0000000001", "row_0000000010"));

        for(Entry<Key, Value> entry : scanner) {
          System.out.println(entry.getKey() + " is " + entry.getValue());
        }


        // stage up connection info objects for serialization
        ClientOnRequiredTable clientRqrdTble = new ClientOnRequiredTable();
        BatchWriterConfig bwConfig = new BatchWriterConfig();
        BatchWriter batchWriter =
                connector.createBatchWriter("batchtestY", bwConfig);

        // create mutation object
        Mutation mutation = new Mutation(new Text("row_0000000001"));

        // populate mutation object
        // -->THIS IS WHAT'S FAILING IN SCALA<--
        mutation.put(
                  new Text("foo"),
                  new Text("1"),
                  new ColumnVisibility("exampleVis"),
                  new Value(new String("CHEWBACCA!").getBytes()) );
    }
}

更新两个

为此问题创建了一个Accumulo bug ticket。他们的目标是在v1.7.0中修复此问题。在此之前,我在下面提供的解决方案是一个功能性的解决方法。

最佳答案

看起来当执行新的Mutation单元序列化Mutation时,spark-notebook中发生的一切。序列化后,您不能调用放置在Mutation上的变量。我会尝试将mutation.put调用添加到与新的Mutation命令相同的笔记本单元格中。看起来clientRqrdTble / bwConfig / batchWriter命令位于单个多行单元中,因此希望这对于Mutation也将是可能的。

10-08 20:12