我有一个使用org.apache.parquet.hadoop.ParquetWriter将CSV数据文件转换为 Parquet 数据文件的工具。

当前,它仅处理int32doublestring
我需要支持 Parquet timestamp逻辑类型(标注为int96),但由于无法在网上找到精确的规范,因此我不知道该怎么做。

似乎这种时间戳记编码(int96)很少,不能很好地支持。我在网上发现很少的规范详细信息。 This github README指出:



具体来说:

  • 我为Type模式中的列使用哪个拼花MessageType?我假设我应该使用原始类型PrimitiveTypeName.INT96,但是我不确定是否可以指定逻辑类型?
  • 如何写入数据?也就是说,我要以哪种格式将时间戳记写入小组?对于INT96时间戳,我假设我必须写一些二进制类型?

  • 这是我的代码的简化版本,演示了我正在尝试做的事情。具体来说,请看一下“TODO”注释,这是代码中与上述问题相关的两点。
    List<Type> fields = new ArrayList<>();
    fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int32_col", null));
    fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "double_col", null));
    fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.STRING, "string_col", null));
    
    // TODO:
    //   Specify the TIMESTAMP type.
    //   How? INT96 primitive type? Is there a logical timestamp type I can use w/ MessageType schema?
    fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT96, "timestamp_col", null));
    
    MessageType schema = new MessageType("input", fields);
    
    // initialize writer
    Configuration configuration = new Configuration();
    configuration.setQuietMode(true);
    GroupWriteSupport.setSchema(schema, configuration);
    ParquetWriter<Group> writer = new ParquetWriter<Group>(
      new Path("output.parquet"),
      new GroupWriteSupport(),
      CompressionCodecName.SNAPPY,
      ParquetWriter.DEFAULT_BLOCK_SIZE,
      ParquetWriter.DEFAULT_PAGE_SIZE,
      1048576,
      true,
      false,
      ParquetProperties.WriterVersion.PARQUET_1_0,
      configuration
    );
    
    // write CSV data
    CSVParser parser = CSVParser.parse(new File(csv), StandardCharsets.UTF_8, CSVFormat.TDF.withQuote(null));
    ArrayList<String> columns = new ArrayList<>(schemaMap.keySet());
    int colIndex;
    int rowNum = 0;
    for (CSVRecord csvRecord : parser) {
      rowNum ++;
      Group group = f.newGroup();
      colIndex = 0;
      for (String record : csvRecord) {
        if (record == null || record.isEmpty() || record.equals( "NULL")) {
          colIndex++;
          continue;
        }
    
    
        record = record.trim();
        String type = schemaMap.get(columns.get(colIndex)).get("type").toString();
        MessageTypeConverter.addTypeValueToGroup(type, record, group, colIndex++);
    
        switch (colIndex) {
          case 0: // int32
            group.add(colIndex, Integer.parseInt(record));
            break;
          case 1: // double
            group.add(colIndex, Double.parseDouble(record));
            break;
          case 2: // string
            group.add(colIndex, record);
            break;
          case 3:
            // TODO: convert CSV string value to TIMESTAMP type (how?)
            throw new NotImplementedException();
        }
      }
      writer.write(group);
    }
    writer.close();
    

    最佳答案

  • INT96时间戳使用的INT96物理类型没有任何逻辑类型,因此请不要使用任何注释。
  • 如果您对INT96时间戳的结构感兴趣,请查看here。如果您想查看示例代码与该格式的相互转换,请查看this file from Hive
  • 10-01 15:08