在使用parquet-hadoop.jar包解析parquet文件时,遇到decimal类型的数据为乱码,具体解决方法如下:

使用parquet-Hadoop.jar解析httpfs服务提供的parquet文件,代码如下:

@Test
    public void httpfsReadHiveParquetFile() throws Exception {        Path path = new Path("webhdfs://s128:14000/wbd_test/parq1.0.parq");
        Configuration conf = new Configuration();
        conf.set("fs.webhdfs.impl", WebHdfsFileSystem.class.getName());
        Map<String, String> urlParams = new HashMap<>();
        urlParams.put("user.token", "7hmsNJIget0eGO5maKQ=sfds");
        conf.set(WebHdfsFileSystem.HTTPFS_URL_PARAM, JSON.toJSONString(urlParams));
        FileSystem fs = path.getFileSystem(conf);

        FileStatus fileStatus = fs.getFileStatus(path);

        InputFile inputFile = HadoopInputFile.fromStatus(fileStatus, conf);
        GroupReadSupport readSupport = new GroupReadSupport();
        ParquetReader.Builder<Group> reader= ParquetReader.read(inputFile);
        reader.withConf(conf);
        ParquetReader<Group> build=reader.build();

        Group line=null;
        line=build.read();

        Map<String,String> fieldTypeMap = new HashMap<String, String>();

        if (line != null){
            List<Type> typeList = line.getType().getFields();
            ParquetInputFormat inputFormat = new ParquetInputFormat();
            for(Type type : typeList){
                System.out.print(type.getName()+"("+type.asPrimitiveType().getPrimitiveTypeName().name()+")\t\t");
            }
            System.out.println();
            System.out.println("-----------------------------------------------------------------------------------------------------------");
            do{
                for (Type type : typeList){
                    System.out.print(converterType2Java(line, type)+"\t\t");
                }
                System.out.println();
            }while ((line=build.read())!=null);
        }

        System.out.println("It is over !");

    }

public static String converterType2Java(Group line, Type type) {
        String value = null;
        String fieldType = type.asPrimitiveType().getPrimitiveTypeName().name();
        String fieldName = type.getName();
        int repetition = line.getFieldRepetitionCount(type.getName());
        if (repetition == 0){
            return value;
        }

        switch (fieldType){
            case "BOOLEAN":
                value = String.valueOf(line.getBoolean(fieldName, 0));
                break;
            case "INT32":
                value = String.valueOf(line.getInteger(fieldName, 0));
                break;
            case "INT64":
                value = String.valueOf(line.getLong(fieldName, 0));
                break;
            case "INT96":
                value = String.valueOf(getTimestampMillis(line.getInt96(fieldName, 0)));
                break;
            case "FLOAT":
                value = String.valueOf(line.getFloat(fieldName, 0));
                break;
            case "DOUBLE":
                value = String.valueOf(line.getDouble(fieldName, 0));
                break;
            case "FIXED_LEN_BYTE_ARRAY":
                if (type.getOriginalType() != null && type.getOriginalType().name().equals("DECIMAL")){
                    value = String.valueOf(binaryToDecimal(type.asPrimitiveType().getDecimalMetadata().getPrecision(), type.asPrimitiveType().getDecimalMetadata().getScale(), line.getBinary(fieldName, 0).getBytes()));
                    int precision = type.asPrimitiveType().getDecimalMetadata().getPrecision();
                    int scale = type.asPrimitiveType().getDecimalMetadata().getScale();
                    BigDecimal decimalValue = binaryToDecimal(precision, scale, line.getBinary(fieldName, 0).getBytes());
                    String precisionFormat = String.join("", Collections.nCopies(precision-1, "#"));
                    String scaleFrmat = String.join("", Collections.nCopies(scale,"0"));
                    String format = precisionFormat + "0."+ scaleFrmat;
                    DecimalFormat decimalFormat = new DecimalFormat(format);

                    value = decimalFormat.format(decimalValue);
                }
                break;
            case "BINARY":
                value = line.getString(fieldName, 0);
                break;
            default:
                value = line.getString(fieldName, 0);
        }
        return value;
    }

public static long getTimestampMillis(Binary timestampBinary)
    {
        if (timestampBinary.length() != 12) {
            return 0;
        }
        byte[] bytes = timestampBinary.getBytes();

        // little endian encoding - need to invert byte order
        long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
        int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);

        return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
    }

    private static long julianDayToMillis(int julianDay)
    {
        return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
    }


static BigDecimal binaryToDecimal(int precision, int scale, byte[] bytes) {
        /*
         * Precision <= 18 checks for the max number of digits for an unscaled long,
         * else treat with big integer conversion
         */
        if (precision <= 18) {

            int start = 0;//buffer.arrayOffset() + buffer.position();
            int end = bytes.length; //buffer.arrayOffset() + buffer.limit();
            long unscaled = 0L;
            int i = start;
            while ( i < end ) {
                unscaled = ( unscaled << 8 | bytes[i] & 0xff );
                i++;
            }
            int bits = 8*(end - start);
            long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
            BigDecimal result;
            if (unscaledNew <= -pow(10,18) || unscaledNew >= pow(10,18)) {
                result =  new BigDecimal(unscaledNew);
//                System.out.println(result);
            } else {
                result =  BigDecimal.valueOf(unscaledNew / pow(10,scale));
//                System.out.println(result);
            }
            return result;
        } else {
            BigDecimal result =  new BigDecimal(new BigInteger(bytes), scale);
//            System.out.println(result);
            return  result;
        }
    }

parquet文件timestamp类型实际为INT96类型,decimal实际为FIXED_LEN_BYTE_ARRAY二进制类型,要想得到原来的数据,都需要进行转换,在网上很少能找到相关问题,希望对其他人有所帮助

12-31 10:59