我正在将Google数据流Java 1.9迁移到Beam 2.0,并且试图使用BigtableIO.Write

    ....
.apply("", BigtableIO.write()
                .withBigtableOptions(bigtableOptions)
                .withTableId("twoSecondVitals"));

在BigtableIO之前的ParDo中,我正在努力尝试使Iterable成为可能。
          try{
        Mutation mutation = Mutation.parseFrom(new ObjectMapper().writeValueAsBytes(v));
        Mutation mu[] = {mutation};
        Iterable<Mutation> imu = Arrays.asList(mu);
        log.severe("imu");
        c.output(KV.of(ByteString.copyFromUtf8(rowKey+"_"+v.getEpoch()), imu));
      }catch (Exception e){
        log.severe(rowKey+"_"+v.getEpoch()+" error:"+e.getMessage());
      }

上面的代码抛出以下异常
InvalidProtocolBufferException:协议消息端组标记与预期的标记不匹配

v是对象列表(Vitals.class)。 hbase api使用Put方法创建突变。如何创建可与BigtableIO接收器配合使用的BigTable突变?

最佳答案

通过查看sdk的测试,我找到了答案。

            Iterable<Mutation> mutations =
                ImmutableList.of(Mutation.newBuilder()
                .setSetCell(
                        Mutation.SetCell.newBuilder()
                        .setValue(ByteString.copyFrom(new ObjectMapper().writeValueAsBytes(v)))
                        .setFamilyName("vitals")
                ).build());

10-02 07:06