最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

DataX二次开发——HdfsReader和HdfsWriter插件增加parquet文件读写

来源:博客园


(资料图片)

一、研发背景

DataX官方开源的版本支持HDFS文件的读写,但是截止目前,并没有支持Parquet文件的读写,得益于DataX出色的数据同步性能,去年公司的项目大部分采用了DataX作为数据同步工具,但是从CDH集群同步Parquet或者将其他数据源的数据以Parquet格式写入HDFS,这两个常用场景没有进行支持。因此只能自己动手,补充HdfsReader和HdfsWriter插件,以支持Parquet文件的读写。

二、HdfsReader插件

本插件比较简单,一共五个类,具体类名及对应修改项如下:

  • DFSUtil:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
  • HdfsConstant:增加Parquet文件类的枚举项。
  • HdfsReader:增加判断是否配置为Parquet文件类型的判断条件分支。
  • HdfsReaderErrorCode:无需更改。
  • Type:无需更改。

按需修改其中四个类即可,具体代码如下:

DFSUtil

import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.google.common.primitives.Ints;import com.google.common.primitives.Longs;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.element.BoolColumn;import com.alibaba.datax.common.element.BytesColumn;import com.alibaba.datax.common.element.Column;import com.alibaba.datax.common.element.ColumnEntry;import com.alibaba.datax.common.element.DateColumn;import com.alibaba.datax.common.element.DoubleColumn;import com.alibaba.datax.common.element.LongColumn;import com.alibaba.datax.common.element.Record;import com.alibaba.datax.common.element.StringColumn;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordSender;import com.alibaba.datax.common.plugin.TaskPluginCollector;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.storage.reader.StorageReaderErrorCode;import com.alibaba.datax.storage.reader.StorageReaderUtil;import org.apache.avro.Conversions;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.hadoop.hive.ql.io.RCFile;import org.apache.hadoop.hive.ql.io.RCFileRecordReader;import org.apache.hadoop.hive.ql.io.orc.OrcFile;import org.apache.hadoop.hive.ql.io.orc.Reader;import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.security.UserGroupInformation;import org.apache.hadoop.util.ReflectionUtils;import org.apache.orc.TypeDescription;import org.apache.parquet.avro.AvroParquetReader;import org.apache.parquet.example.data.Group;import org.apache.parquet.hadoop.ParquetReader;import org.apache.parquet.hadoop.example.GroupReadSupport;import org.apache.parquet.hadoop.util.HadoopInputFile;import org.apache.parquet.io.api.Binary;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.io.InputStream;import java.math.BigDecimal;import java.math.RoundingMode;import java.nio.ByteBuffer;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Arrays;import java.util.Date;import java.util.HashSet;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;import static com.alibaba.datax.common.base.Key.COLUMN;import static com.alibaba.datax.common.base.Key.NULL_FORMAT;public class DFSUtil{    private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class);    // the offset of julian, 2440588 is 1970/1/1    private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588;    private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);    private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);    private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;    private final org.apache.hadoop.conf.Configuration hadoopConf;    private final boolean haveKerberos;    private final HashSet sourceHDFSAllFilesList = new HashSet<>();    private String specifiedFileType = null;    private String kerberosKeytabFilePath;    private String kerberosPrincipal;    public DFSUtil(Configuration taskConfig)    {        hadoopConf = new org.apache.hadoop.conf.Configuration();        //io.file.buffer.size 性能参数        //http://blog.csdn.net/yangjl38/article/details/7583374        Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);        JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));        if (null != hadoopSiteParams) {            Set paramKeys = hadoopSiteParams.getKeys();            for (String each : paramKeys) {                hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));            }        }        hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS));        //是否有Kerberos认证        this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);        if (haveKerberos) {            this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);            this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);            this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");        }        this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);        LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf));    }    private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath)    {        if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {            UserGroupInformation.setConfiguration(hadoopConf);            try {                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);            }            catch (Exception e) {                String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",                        kerberosKeytabFilePath, kerberosPrincipal);                throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);            }        }    }    /**     * 获取指定路径列表下符合条件的所有文件的绝对路径     *     * @param srcPaths 路径列表     * @param specifiedFileType 指定文件类型     * @return set of string     */    public Set getAllFiles(List srcPaths, String specifiedFileType)    {        this.specifiedFileType = specifiedFileType;        if (!srcPaths.isEmpty()) {            for (String eachPath : srcPaths) {                LOG.info("get HDFS all files in path = [{}]", eachPath);                getHDFSAllFiles(eachPath);            }        }        return sourceHDFSAllFilesList;    }    private void addSourceFileIfNotEmpty(FileStatus f)    {        if (f.isFile()) {            String filePath = f.getPath().toString();            if (f.getLen() > 0) {                addSourceFileByType(filePath);            }            else {                LOG.warn("文件[{}]长度为0,将会跳过不作处理!", filePath);            }        }    }    public void getHDFSAllFiles(String hdfsPath)    {        try {            FileSystem hdfs = FileSystem.get(hadoopConf);            //判断hdfsPath是否包含正则符号            if (hdfsPath.contains("*") || hdfsPath.contains("?")) {                Path path = new Path(hdfsPath);                FileStatus[] stats = hdfs.globStatus(path);                for (FileStatus f : stats) {                    if (f.isFile()) {                        addSourceFileIfNotEmpty(f);                    }                    else if (f.isDirectory()) {                        getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);                    }                }            }            else {                getHDFSAllFilesNORegex(hdfsPath, hdfs);            }        }        catch (IOException e) {            String message = String.format("无法读取路径[%s]下的所有文件,请确认您的配置项fs.defaultFS, path的值是否正确," +                    "是否有读写权限,网络是否已断开!", hdfsPath);            LOG.error(message);            throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e);        }    }    private void getHDFSAllFilesNORegex(String path, FileSystem hdfs)            throws IOException    {        // 获取要读取的文件的根目录        Path listFiles = new Path(path);        // If the network disconnected, this method will retry 45 times        // each time the retry interval for 20 seconds        // 获取要读取的文件的根目录的所有二级子文件目录        FileStatus[] stats = hdfs.listStatus(listFiles);        for (FileStatus f : stats) {            // 判断是不是目录,如果是目录,递归调用            if (f.isDirectory()) {                LOG.info("[{}] 是目录, 递归获取该目录下的文件", f.getPath());                getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);            }            else if (f.isFile()) {                addSourceFileIfNotEmpty(f);            }            else {                String message = String.format("该路径[%s]文件类型既不是目录也不是文件,插件自动忽略。", f.getPath());                LOG.info(message);            }        }    }    // 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList    private void addSourceFileByType(String filePath)    {        // 检查file的类型和用户配置的fileType类型是否一致        boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);        if (isMatchedFileType) {            String msg = String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType);            LOG.info(msg);            sourceHDFSAllFilesList.add(filePath);        }        else {            String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," +                            "请确认您配置的目录下面所有文件的类型均为[%s]"                    , filePath, this.specifiedFileType);            LOG.error(message);            throw DataXException.asDataXException(                    HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);        }    }    public InputStream getInputStream(String filepath)    {        InputStream inputStream;        Path path = new Path(filepath);        try {            FileSystem fs = FileSystem.get(hadoopConf);            //If the network disconnected, this method will retry 45 times            //each time the retry interval for 20 seconds            inputStream = fs.open(path);            return inputStream;        }        catch (IOException e) {            String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filepath, filepath);            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);        }    }    public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig,            RecordSender recordSender, TaskPluginCollector taskPluginCollector)    {        LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath);        Path seqFilePath = new Path(sourceSequenceFilePath);        try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf,                SequenceFile.Reader.file(seqFilePath))) {            //获取SequenceFile.Reader实例            //获取key 与 value            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf);            Text value = new Text();            while (reader.next(key, value)) {                if (StringUtils.isNotBlank(value.toString())) {                    StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString());                }            }        }        catch (Exception e) {            String message = String.format("SequenceFile.Reader读取文件[%s]时出错", sourceSequenceFilePath);            LOG.error(message);            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e);        }    }    public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig,            RecordSender recordSender, TaskPluginCollector taskPluginCollector)    {        LOG.info("Start Read rc-file [{}].", sourceRcFilePath);        List column = StorageReaderUtil                .getListColumnEntry(readerSliceConfig, COLUMN);        // warn: no default value "\N"        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);        Path rcFilePath = new Path(sourceRcFilePath);        RCFileRecordReader recordReader = null;        try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) {            long fileLen = fs.getFileStatus(rcFilePath).getLen();            FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null);            recordReader = new RCFileRecordReader(hadoopConf, split);            LongWritable key = new LongWritable();            BytesRefArrayWritable value = new BytesRefArrayWritable();            Text txt = new Text();            while (recordReader.next(key, value)) {                String[] sourceLine = new String[value.size()];                txt.clear();                for (int i = 0; i < value.size(); i++) {                    BytesRefWritable v = value.get(i);                    txt.set(v.getData(), v.getStart(), v.getLength());                    sourceLine[i] = txt.toString();                }                StorageReaderUtil.transportOneRecord(recordSender,                        column, sourceLine, nullFormat, taskPluginCollector);            }        }        catch (IOException e) {            String message = String.format("读取文件[%s]时出错", sourceRcFilePath);            LOG.error(message);            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e);        }        finally {            try {                if (recordReader != null) {                    recordReader.close();                    LOG.info("Finally, Close RCFileRecordReader.");                }            }            catch (IOException e) {                LOG.warn(String.format("finally: 关闭RCFileRecordReader失败, %s", e.getMessage()));            }        }    }    public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,            RecordSender recordSender, TaskPluginCollector taskPluginCollector)    {        LOG.info("Start Read orc-file [{}].", sourceOrcFilePath);        List column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);        try {            Path orcFilePath = new Path(sourceOrcFilePath);            Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf));            TypeDescription schema = reader.getSchema();            assert column != null;            if (column.isEmpty()) {                for (int i = 0; i < schema.getChildren().size(); i++) {                    ColumnEntry columnEntry = new ColumnEntry();                    columnEntry.setIndex(i);                    columnEntry.setType(schema.getChildren().get(i).getCategory().getName());                    column.add(columnEntry);                }            }            VectorizedRowBatch rowBatch = schema.createRowBatch(1024);            org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema));            while (rowIterator.nextBatch(rowBatch)) {                transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat);            }        }        catch (Exception e) {            String message = String.format("从orc-file文件路径[%s]中读取数据发生异常,请联系系统管理员。"                    , sourceOrcFilePath);            LOG.error(message);            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);        }    }    private void transportOrcRecord(VectorizedRowBatch rowBatch, List columns, RecordSender recordSender,            TaskPluginCollector taskPluginCollector, String nullFormat)    {        Record record;        for (int row = 0; row < rowBatch.size; row++) {            record = recordSender.createRecord();            try {                for (ColumnEntry column : columns) {                    Column columnGenerated;                    if (column.getValue() != null) {                        if (!"null".equals(column.getValue())) {                            columnGenerated = new StringColumn(column.getValue());                        }                        else {                            columnGenerated = new StringColumn();                        }                        record.addColumn(columnGenerated);                        continue;                    }                    int i = column.getIndex();                    String columnType = column.getType().toUpperCase();                    ColumnVector col = rowBatch.cols[i];                    Type type = Type.valueOf(columnType);                    if (col.isNull[row]) {                        record.addColumn(new StringColumn(null));                        continue;                    }                    switch (type) {                        case INT:                        case LONG:                        case BOOLEAN:                        case BIGINT:                            columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]);                            break;                        case DATE:                            columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row]));                            break;                        case DOUBLE:                            columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]);                            break;                        case DECIMAL:                            columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue());                            break;                        case BINARY:                            BytesColumnVector b = (BytesColumnVector) col;                            byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]);                            columnGenerated = new BytesColumn(val);                            break;                        case TIMESTAMP:                            columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row));                            break;                        default:                            // type is string or other                            String v = ((BytesColumnVector) col).toString(row);                            columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v);                            break;                    }                    record.addColumn(columnGenerated);                }                recordSender.sendToWriter(record);            }            catch (Exception e) {                if (e instanceof DataXException) {                    throw (DataXException) e;                }                taskPluginCollector.collectDirtyRecord(record, e.getMessage());            }        }    }    public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,            RecordSender recordSender, TaskPluginCollector taskPluginCollector)    {        LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath);        List column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);        Path parquetFilePath = new Path(sourceParquetFilePath);        hadoopConf.set("parquet.avro.readInt96AsFixed", "true");        JobConf conf = new JobConf(hadoopConf);        GenericData decimalSupport = new GenericData();        decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());        try (ParquetReader reader = AvroParquetReader                .builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf))                .withDataModel(decimalSupport)                .withConf(conf)                .build()) {            GenericData.Record gRecord = reader.read();            Schema schema = gRecord.getSchema();            if (null == column || column.isEmpty()) {                column = new ArrayList<>(schema.getFields().size());                String sType;                // 用户没有填写具体的字段信息,需要从parquet文件构建                for (int i = 0; i < schema.getFields().size(); i++) {                    ColumnEntry columnEntry = new ColumnEntry();                    columnEntry.setIndex(i);                    Schema type;                    if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) {                        type = schema.getFields().get(i).schema().getTypes().get(1);                    }                    else {                        type = schema.getFields().get(i).schema();                    }                    sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();                    if (sType.startsWith("timestamp")) {                        columnEntry.setType("timestamp");                    }                    else {                        columnEntry.setType(sType);                    }                    column.add(columnEntry);                }            }            while (gRecord != null) {                transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat);                gRecord = reader.read();            }        }        catch (IOException e) {            String message = String.format("从parquet file文件路径[%s]中读取数据发生异常,请联系系统管理员。"                    , sourceParquetFilePath);            LOG.error(message);            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);        }    }    /*     * create a transport record for Parquet file     *     *     */    private void transportParquetRecord(List columnConfigs, GenericData.Record gRecord, RecordSender recordSender,            TaskPluginCollector taskPluginCollector, String nullFormat)    {        Record record = recordSender.createRecord();        Column columnGenerated;        int scale = 10;        try {            for (ColumnEntry columnEntry : columnConfigs) {                String columnType = columnEntry.getType();                Integer columnIndex = columnEntry.getIndex();                String columnConst = columnEntry.getValue();                String columnValue = null;                if (null != columnIndex) {                    if (null != gRecord.get(columnIndex)) {                        columnValue = gRecord.get(columnIndex).toString();                    }                    else {                        record.addColumn(new StringColumn(null));                        continue;                    }                }                else {                    columnValue = columnConst;                }                if (columnType.startsWith("decimal(")) {                    String ps = columnType.replace("decimal(", "").replace(")", "");                    columnType = "decimal";                    if (ps.contains(",")) {                        scale = Integer.parseInt(ps.split(",")[1].trim());                    }                    else {                        scale = 0;                    }                }                Type type = Type.valueOf(columnType.toUpperCase());                if (StringUtils.equals(columnValue, nullFormat)) {                    columnValue = null;                }                try {                    switch (type) {                        case STRING:                            columnGenerated = new StringColumn(columnValue);                            break;                        case INT:                        case LONG:                            columnGenerated = new LongColumn(columnValue);                            break;                        case DOUBLE:                            columnGenerated = new DoubleColumn(columnValue);                            break;                        case DECIMAL:                            if (null == columnValue) {                                columnGenerated = new DoubleColumn((Double) null);                            }                            else {                                columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP));                            }                            break;                        case BOOLEAN:                            columnGenerated = new BoolColumn(columnValue);                            break;                        case DATE:                            if (columnValue == null) {                                columnGenerated = new DateColumn((Date) null);                            }                            else {                                String formatString = columnEntry.getFormat();                                if (StringUtils.isNotBlank(formatString)) {                                    // 用户自己配置的格式转换                                    SimpleDateFormat format = new SimpleDateFormat(                                            formatString);                                    columnGenerated = new DateColumn(                                            format.parse(columnValue));                                }                                else {                                    // 框架尝试转换                                    columnGenerated = new DateColumn(new StringColumn(columnValue).asDate());                                }                            }                            break;                        case TIMESTAMP:                            if (null == columnValue) {                                columnGenerated = new DateColumn();                            }                            else if (columnValue.startsWith("[")) {                                // INT96 https://github.com/apache/parquet-mr/pull/901                                GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex);                                Date date = new Date(getTimestampMills(fixed.bytes()));                                columnGenerated = new DateColumn(date);                            }                            else {                                columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000);                            }                            break;                        case BINARY:                            columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array());                            break;                        default:                            String errorMessage = String.format("您配置的列类型暂不支持 : [%s]", columnType);                            LOG.error(errorMessage);                            throw DataXException.asDataXException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage);                    }                }                catch (Exception e) {                    throw new IllegalArgumentException(String.format(                            "类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e));                }                record.addColumn(columnGenerated);            } // end for            recordSender.sendToWriter(record);        }        catch (IllegalArgumentException | IndexOutOfBoundsException iae) {            taskPluginCollector.collectDirtyRecord(record, iae.getMessage());        }        catch (Exception e) {            if (e instanceof DataXException) {                throw (DataXException) e;            }            // 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式            taskPluginCollector.collectDirtyRecord(record, e.getMessage());        }    }    private TypeDescription getOrcSchema(String filePath)    {        Path path = new Path(filePath);        try {            Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf));//            return reader.getTypes().get(0).getSubtypesCount()            return reader.getSchema();        }        catch (IOException e) {            String message = "读取orc-file column列数失败,请联系系统管理员";            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);        }    }    public boolean checkHdfsFileType(String filepath, String specifiedFileType)    {        Path file = new Path(filepath);        try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) {            if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.ORC)) {                return isORCFile(file, fs, in);            }            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.RC)) {                return isRCFile(filepath, in);            }            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.SEQ)) {                return isSequenceFile(file, in);            }            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.PARQUET)) {                return isParquetFile(file);            }            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.CSV)                    || StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.TEXT)) {                return true;            }        }        catch (Exception e) {            String message = String.format("检查文件[%s]类型失败,目前支持 %s 格式的文件," +                    "请检查您文件类型和文件是否正确。", filepath, HdfsConstant.SUPPORT_FILE_TYPE);            LOG.error(message);            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);        }        return false;    }    // 判断file是否是ORC File    private boolean isORCFile(Path file, FileSystem fs, FSDataInputStream in)    {        try {            // figure out the size of the file using the option or filesystem            long size = fs.getFileStatus(file).getLen();            //read last bytes into buffer to get PostScript            int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);            in.seek(size - readSize);            ByteBuffer buffer = ByteBuffer.allocate(readSize);            in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),                    buffer.remaining());            //read the PostScript            //get length of PostScript            int psLen = buffer.get(readSize - 1) & 0xff;            String orcMagic = org.apache.orc.OrcFile.MAGIC;            int len = orcMagic.length();            if (psLen < len + 1) {                return false;            }            int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1                    - len;            byte[] array = buffer.array();            // now look for the magic string at the end of the postscript.            if (Text.decode(array, offset, len).equals(orcMagic)) {                return true;            }            else {                // If it isn"t there, this may be the 0.11.0 version of ORC.                // Read the first 3 bytes of the file to check for the header                in.seek(0);                byte[] header = new byte[len];                in.readFully(header, 0, len);                // if it isn"t there, this isn"t an ORC file                if (Text.decode(header, 0, len).equals(orcMagic)) {                    return true;                }            }        }        catch (IOException e) {            LOG.info("检查文件类型: [{}] 不是ORC File.", file);        }        return false;    }    // 判断file是否是RC file    private boolean isRCFile(String filepath, FSDataInputStream in)    {        // The first version of RCFile used the sequence file header.        final byte[] originalMagic = {(byte) "S", (byte) "E", (byte) "Q"};        // The "magic" bytes at the beginning of the RCFile        final byte[] rcMagic = {(byte) "R", (byte) "C", (byte) "F"};        // the version that was included with the original magic, which is mapped        // into ORIGINAL_VERSION        final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;        // All the versions should be place in this list.        final int ORIGINAL_VERSION = 0;  // version with SEQ        // version with RCF        // final int NEW_MAGIC_VERSION = 1        // final int CURRENT_VERSION = NEW_MAGIC_VERSION        final int CURRENT_VERSION = 1;        byte version;        byte[] magic = new byte[rcMagic.length];        try {            in.seek(0);            in.readFully(magic);            if (Arrays.equals(magic, originalMagic)) {                if (in.readByte() != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {                    return false;                }                version = ORIGINAL_VERSION;            }            else {                if (!Arrays.equals(magic, rcMagic)) {                    return false;                }                // Set "version"                version = in.readByte();                if (version > CURRENT_VERSION) {                    return false;                }            }            if (version == ORIGINAL_VERSION) {                try {                    Class keyCls = hadoopConf.getClassByName(Text.readString(in));                    Class valCls = hadoopConf.getClassByName(Text.readString(in));                    if (!keyCls.equals(RCFile.KeyBuffer.class) || !valCls.equals(RCFile.ValueBuffer.class)) {                        return false;                    }                }                catch (ClassNotFoundException e) {                    return false;                }            }//            boolean decompress = in.readBoolean(); // is compressed?            if (version == ORIGINAL_VERSION) {                // is block-compressed? it should be always false.                boolean blkCompressed = in.readBoolean();                return !blkCompressed;            }            return true;        }        catch (IOException e) {            LOG.info("检查文件类型: [{}] 不是RC File.", filepath);        }        return false;    }    // 判断file是否是Sequence file    private boolean isSequenceFile(Path filepath, FSDataInputStream in)    {        final byte[] seqMagic = {(byte) "S", (byte) "E", (byte) "Q"};        byte[] magic = new byte[seqMagic.length];        try {            in.seek(0);            in.readFully(magic);            return Arrays.equals(magic, seqMagic);        }        catch (IOException e) {            LOG.info("检查文件类型: [{}] 不是Sequence File.", filepath);        }        return false;    }    //判断是否为parquet(考虑判断parquet文件的schema是否不为空)    private boolean isParquetFile(Path file)     {        try {            GroupReadSupport readSupport = new GroupReadSupport();            ParquetReader.Builder reader = ParquetReader.builder(readSupport, file);            ParquetReader build = reader.build();            if (build.read() != null) {                return true;            }        }        catch (IOException e) {            LOG.info("检查文件类型: [{}] 不是Parquet File.", file);        }        return false;    }    /**     * Returns GMT"s timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos).     *     * @param timestampBinary INT96 parquet timestamp     * @return timestamp in millis, GMT timezone     */    public static long getTimestampMillis(Binary timestampBinary)    {        if (timestampBinary.length() != 12) {            return 0;        }        byte[] bytes = timestampBinary.getBytes();        return getTimestampMills(bytes);    }    public static long getTimestampMills(byte[] bytes)    {        assert bytes.length == 12;        // little endian encoding - need to invert byte order        long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);        int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);        return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);    }    private static long julianDayToMillis(int julianDay)    {        return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;    }}

HdfsConstant

package com.alibaba.datax.plugin.reader.hdfsreader;import com.alibaba.datax.common.base.Constant;import java.util.Arrays;import java.util.List;public class HdfsConstant        extends Constant{    public static final String SOURCE_FILES = "sourceFiles";    public static final String TEXT = "TEXT";    public static final String ORC = "ORC";    public static final String CSV = "CSV";    public static final String SEQ = "SEQ";    public static final String RC = "RC";    public static final String PARQUET = "PARQUET"; //新增parquet文件类型    public static final String HDFS_DEFAULT_KEY = "fs.defaultFS";    public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";    protected static final List SUPPORT_FILE_TYPE =            Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET);    private HdfsConstant() {}}

HdfsReader

package com.alibaba.datax.plugin.reader.hdfsreader;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordSender;import com.alibaba.datax.common.spi.Reader;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.storage.reader.StorageReaderUtil;import com.alibaba.datax.storage.util.FileHelper;import org.apache.commons.io.Charsets;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.InputStream;import java.nio.charset.UnsupportedCharsetException;import java.util.ArrayList;import java.util.Collections;import java.util.HashSet;import java.util.List;import static com.alibaba.datax.common.base.Key.COLUMN;import static com.alibaba.datax.common.base.Key.ENCODING;import static com.alibaba.datax.common.base.Key.INDEX;import static com.alibaba.datax.common.base.Key.TYPE;import static com.alibaba.datax.common.base.Key.VALUE;public class HdfsReader        extends Reader{    /**     * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。     * 

* 整个 Reader 执行流程是: *

     * Job类init-->prepare-->split     * Task类init-->prepare-->startRead-->post-->destroy     * Task类init-->prepare-->startRead-->post-->destroy     * Job类post-->destroy     * 
*/ public static class Job extends Reader.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration readerOriginConfig = null; private HashSet sourceFiles; private String specifiedFileType = null; private DFSUtil dfsUtil = null; private List path = null; @Override public void init() { LOG.info("init() begin..."); this.readerOriginConfig = getPluginJobConf(); validate(); dfsUtil = new DFSUtil(readerOriginConfig); LOG.info("init() ok and end..."); } public void validate() { readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR); // path check String pathInString = readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE); if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) { path = Collections.singletonList(pathInString); } else { path = readerOriginConfig.getList(Key.PATH, String.class); if (null == path || path.isEmpty()) { throw DataXException.asDataXException(HdfsReaderErrorCode.REQUIRED_VALUE, "您需要指定待读取的源目录或文件"); } for (String eachPath : path) { if (!eachPath.startsWith("/")) { String message = String.format("请检查参数path:[%s],需要配置为绝对路径", eachPath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.ILLEGAL_VALUE, message); } } } specifiedFileType = readerOriginConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE).toUpperCase(); if (!HdfsConstant.SUPPORT_FILE_TYPE.contains(specifiedFileType)) { String message = "HdfsReader插件目前支持 " + HdfsConstant.SUPPORT_FILE_TYPE + " 几种格式的文件"; throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message); } String encoding = this.readerOriginConfig.getString(ENCODING, "UTF-8"); try { Charsets.toCharset(encoding); } catch (UnsupportedCharsetException uce) { throw DataXException.asDataXException( HdfsReaderErrorCode.ILLEGAL_VALUE, String.format("不支持的编码格式 : [%s]", encoding), uce); } catch (Exception e) { throw DataXException.asDataXException( HdfsReaderErrorCode.ILLEGAL_VALUE, String.format("运行配置异常 : %s", e.getMessage()), e); } //check Kerberos boolean haveKerberos = readerOriginConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { readerOriginConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsReaderErrorCode.REQUIRED_VALUE); readerOriginConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsReaderErrorCode.REQUIRED_VALUE); } // validate the Columns validateColumns(); // validate compress String compress = readerOriginConfig.getString(Key.COMPRESS, "NONE"); if ("gzip".equalsIgnoreCase(compress)) { // correct to gz readerOriginConfig.set(Key.COMPRESS, "gz"); } } private void validateColumns() { // 检测是column 是否为 ["*"] 若是则填为空 List column = this.readerOriginConfig.getListConfiguration(COLUMN); if (null != column && 1 == column.size() && ("\"*\"".equals(column.get(0).toString()) || ""*"".equals(column.get(0).toString()))) { readerOriginConfig.set(COLUMN, new ArrayList()); } else { // column: 1. index type 2.value type 3.when type is Data, may be has dateFormat value List columns = readerOriginConfig.getListConfiguration(COLUMN); if (null == columns || columns.isEmpty()) { throw DataXException.asDataXException(HdfsReaderErrorCode.CONFIG_INVALID_EXCEPTION, "您需要指定 columns"); } for (Configuration eachColumnConf : columns) { eachColumnConf.getNecessaryValue(TYPE, HdfsReaderErrorCode.REQUIRED_VALUE); Integer columnIndex = eachColumnConf.getInt(INDEX); String columnValue = eachColumnConf.getString(VALUE); if (null == columnIndex && null == columnValue) { throw DataXException.asDataXException( HdfsReaderErrorCode.NO_INDEX_VALUE, "由于您配置了type, 则至少需要配置 index 或 value, 当前配置为:" + eachColumnConf); } if (null != columnIndex && null != columnValue) { throw DataXException.asDataXException(HdfsReaderErrorCode.MIXED_INDEX_VALUE, "您混合配置了index, value, 每一列同时仅能选择其中一种"); } } } } @Override public void prepare() { LOG.info("prepare(), start to getAllFiles..."); this.sourceFiles = (HashSet) dfsUtil.getAllFiles(path, specifiedFileType); LOG.info("您即将读取的文件数为: [{}], 列表为: [{}]", sourceFiles.size(), sourceFiles); } @Override public List split(int adviceNumber) { LOG.info("split() begin..."); List readerSplitConfigs = new ArrayList<>(); // warn:每个slice拖且仅拖一个文件, int splitNumber = sourceFiles.size(); if (0 == splitNumber) { throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION, String.format("未能找到待读取的文件,请确认您的配置项path: %s", readerOriginConfig.getString(Key.PATH))); } List> splitSourceFiles = FileHelper.splitSourceFiles(new ArrayList<>(sourceFiles), splitNumber); for (List files : splitSourceFiles) { Configuration splitConfig = readerOriginConfig.clone(); splitConfig.set(HdfsConstant.SOURCE_FILES, files); readerSplitConfigs.add(splitConfig); } return readerSplitConfigs; } @Override public void post() { // } @Override public void destroy() { // } } public static class Task extends Reader.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private List sourceFiles; private String specifiedFileType; private DFSUtil dfsUtil = null; @Override public void init() { this.taskConfig = getPluginJobConf(); this.sourceFiles = taskConfig.getList(HdfsConstant.SOURCE_FILES, String.class); this.specifiedFileType = taskConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE); this.dfsUtil = new DFSUtil(taskConfig); } @Override public void prepare() { // } @Override public void startRead(RecordSender recordSender) { LOG.info("read start"); for (String sourceFile : this.sourceFiles) { LOG.info("reading file : [{}]", sourceFile); if (specifiedFileType.equalsIgnoreCase(HdfsConstant.TEXT) || specifiedFileType.equalsIgnoreCase(HdfsConstant.CSV)) { InputStream inputStream = dfsUtil.getInputStream(sourceFile); StorageReaderUtil.readFromStream(inputStream, sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.ORC)) { dfsUtil.orcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.SEQ)) { dfsUtil.sequenceFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.RC)) { dfsUtil.rcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.PARQUET)) { dfsUtil.parquetFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else { String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC、PARQUET等六种格式的文件," + "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE, RC, PARQUET"; throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message); } if (recordSender != null) { recordSender.flush(); } } LOG.info("end read source files..."); } @Override public void post() { // } @Override public void destroy() { // } }}

HdfsWriter插件

本插件比较简单,一共五个类,具体类名及对应修改项如下:

  • HdfsHelper:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
  • HdfsWriter:增加Parquet文件类的枚举项。
  • SupportHiveDataType:无需更改。
  • HdfsWriterErrorCode:无需更改。
  • Type:无需更改。

按需修改其中四个类即可,具体代码如下:

HdfsHelper

package com.alibaba.datax.plugin.writer.hdfswriter;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.alibaba.datax.common.base.Constant;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.element.Column;import com.alibaba.datax.common.element.Record;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordReceiver;import com.alibaba.datax.common.plugin.TaskPluginCollector;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil;import com.alibaba.datax.unstructuredstorage.util.HdfsUtil;import org.apache.avro.Conversions;import org.apache.avro.LogicalTypes;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.avro.generic.GenericRecordBuilder;import org.apache.commons.lang3.StringUtils;import org.apache.commons.lang3.Validate;import org.apache.commons.lang3.tuple.MutablePair;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.apache.hadoop.hive.common.type.HiveDecimal;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobContext;import org.apache.hadoop.mapred.RecordWriter;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.security.UserGroupInformation;import org.apache.hadoop.security.authentication.util.KerberosName;import org.apache.orc.CompressionKind;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import org.apache.parquet.avro.AvroParquetWriter;import org.apache.parquet.column.ParquetProperties;import org.apache.parquet.hadoop.ParquetWriter;import org.apache.parquet.hadoop.metadata.CompressionCodecName;import org.apache.parquet.schema.OriginalType;import org.apache.parquet.schema.PrimitiveType;import org.apache.parquet.schema.Types;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.FileNotFoundException;import java.io.IOException;import java.lang.reflect.Field;import java.math.BigDecimal;import java.nio.charset.StandardCharsets;import java.sql.Timestamp;import java.text.SimpleDateFormat;import java.util.*;public class HdfsHelper{    public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class);    public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";    public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS";    private FileSystem fileSystem = null;    private JobConf conf = null;    private org.apache.hadoop.conf.Configuration hadoopConf = null;    // Kerberos    private boolean haveKerberos = false;    private String kerberosKeytabFilePath;    private String kerberosPrincipal;    private String krb5ConfPath;    public static MutablePair transportOneRecord(            Record record, char fieldDelimiter, List columnsConfiguration, TaskPluginCollector taskPluginCollector)    {        MutablePair, Boolean> transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector);        //保存<转换后的数据,是否是脏数据>        MutablePair transportResult = new MutablePair<>();        transportResult.setRight(false);        Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));        transportResult.setRight(transportResultList.getRight());        transportResult.setLeft(recordResult);        return transportResult;    }    public static MutablePair, Boolean> transportOneRecord(            Record record, List columnsConfiguration,            TaskPluginCollector taskPluginCollector)    {        MutablePair, Boolean> transportResult = new MutablePair<>();        transportResult.setRight(false);        List recordList = new ArrayList<>();        int recordLength = record.getColumnNumber();        if (0 != recordLength) {            Column column;            for (int i = 0; i < recordLength; i++) {                column = record.getColumn(i);                if (null != column.getRawData()) {                    String rowData = column.getRawData().toString();                    SupportHiveDataType columnType = SupportHiveDataType.valueOf(                            columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());                    //根据writer端类型配置做类型转换                    try {                        switch (columnType) {                            case TINYINT:                                recordList.add(Byte.valueOf(rowData));                                break;                            case SMALLINT:                                recordList.add(Short.valueOf(rowData));                                break;                            case INT:                            case INTEGER:                                recordList.add(Integer.valueOf(rowData));                                break;                            case BIGINT:                                recordList.add(column.asLong());                                break;                            case FLOAT:                                recordList.add(Float.valueOf(rowData));                                break;                            case DOUBLE:                                recordList.add(column.asDouble());                                break;                            case STRING:                            case VARCHAR:                            case CHAR:                                recordList.add(column.asString());                                break;                            case DECIMAL:                                recordList.add(HiveDecimal.create(column.asBigDecimal()));                                break;                            case BOOLEAN:                                recordList.add(column.asBoolean());                                break;                            case DATE:                                recordList.add(org.apache.hadoop.hive.common.type.Date.valueOf(column.asString()));                                break;                            case TIMESTAMP:                                recordList.add(Timestamp.valueOf(column.asString()));                                break;                            case BINARY:                                recordList.add(column.asBytes());                                break;                            default:                                throw DataXException                                        .asDataXException(                                                HdfsWriterErrorCode.ILLEGAL_VALUE,                                                String.format(                                                        "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",                                                        columnsConfiguration.get(i).getString(Key.NAME),                                                        columnsConfiguration.get(i).getString(Key.TYPE)));                        }                    }                    catch (Exception e) {                        // warn: 此处认为脏数据                        e.printStackTrace();                        String message = String.format(                                "字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].",                                columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData());                        taskPluginCollector.collectDirtyRecord(record, message);                        transportResult.setRight(true);                        break;                    }                }                else {                    // warn: it"s all ok if nullFormat is null                    recordList.add(null);                }            }        }        transportResult.setLeft(recordList);        return transportResult;    }    public static GenericRecord transportParRecord(            Record record, List columnsConfiguration,            TaskPluginCollector taskPluginCollector, GenericRecordBuilder builder)    {        int recordLength = record.getColumnNumber();        if (0 != recordLength) {            Column column;            for (int i = 0; i < recordLength; i++) {                column = record.getColumn(i);                String colName = columnsConfiguration.get(i).getString(Key.NAME);                String typename = columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase();                if (null == column || column.getRawData() == null) {                    builder.set(colName, null);                }                else {                    String rowData = column.getRawData().toString();                    SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename);                    //根据writer端类型配置做类型转换                    try {                        switch (columnType) {                            case INT:                            case INTEGER:                                builder.set(colName, Integer.valueOf(rowData));                                break;                            case LONG:                                builder.set(colName, column.asLong());                                break;                            case FLOAT:                                builder.set(colName, Float.valueOf(rowData));                                break;                            case DOUBLE:                                builder.set(colName, column.asDouble());                                break;                            case STRING:                                builder.set(colName, column.asString());                                break;                            case DECIMAL:                                builder.set(colName, new BigDecimal(column.asString()).setScale(columnsConfiguration.get(i).getInt(Key.SCALE), BigDecimal.ROUND_HALF_UP));                                break;                            case BOOLEAN:                                builder.set(colName, column.asBoolean());                                break;                            case BINARY:                                builder.set(colName, column.asBytes());                                break;                            case TIMESTAMP:                                builder.set(colName, column.asLong() / 1000);                                break;                            default:                                throw DataXException                                        .asDataXException(                                                HdfsWriterErrorCode.ILLEGAL_VALUE,                                                String.format(                                                        "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",                                                        columnsConfiguration.get(i).getString(Key.NAME),                                                        columnsConfiguration.get(i).getString(Key.TYPE)));                        }                    }                    catch (Exception e) {                        // warn: 此处认为脏数据                        String message = String.format(                                "字段类型转换错误:目标字段为[%s]类型,实际字段值为[%s].",                                columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData());                        taskPluginCollector.collectDirtyRecord(record, message);                        break;                    }                }            }        }        return builder.build();    }    public static String generateParquetSchemaFromColumnAndType(List columns) {        Map decimalColInfo = new HashMap<>(16);        ColumnTypeUtil.DecimalInfo PARQUET_DEFAULT_DECIMAL_INFO = new ColumnTypeUtil.DecimalInfo(10, 2);        Types.MessageTypeBuilder typeBuilder = Types.buildMessage();        for (Configuration column : columns) {            String name = column.getString("name");            String colType = column.getString("type");            Validate.notNull(name, "column.name can"t be null");            Validate.notNull(colType, "column.type can"t be null");            switch (colType.toLowerCase()) {                case "tinyint":                case "smallint":                case "int":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(name);                    break;                case "bigint":                case "long":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(name);                    break;                case "float":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(name);                    break;                case "double":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(name);                    break;                case "binary":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name);                    break;                case "char":                case "varchar":                case "string":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(name);                    break;                case "boolean":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(name);                    break;                case "timestamp":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT96).named(name);                    break;                case "date":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).as(OriginalType.DATE).named(name);                    break;                default:                    if (ColumnTypeUtil.isDecimalType(colType)) {                        ColumnTypeUtil.DecimalInfo decimalInfo = ColumnTypeUtil.getDecimalInfo(colType, PARQUET_DEFAULT_DECIMAL_INFO);                        typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)                                .as(OriginalType.DECIMAL)                                .precision(decimalInfo.getPrecision())                                .scale(decimalInfo.getScale())                                .length(HdfsUtil.computeMinBytesForPrecision(decimalInfo.getPrecision()))                                .named(name);                        decimalColInfo.put(name, decimalInfo);                    } else {                        typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name);                    }                    break;            }        }        return typeBuilder.named("m").toString();    }    public FileSystem getFileSystem(String defaultFS, Configuration taskConfig)    {        this.hadoopConf = new org.apache.hadoop.conf.Configuration();        Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);        JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));        if (null != hadoopSiteParams) {            Set paramKeys = hadoopSiteParams.getKeys();            for (String each : paramKeys) {                hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));            }        }        this.hadoopConf.set(HDFS_DEFAULT_FS_KEY, defaultFS);        //是否有Kerberos认证        this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);        if (haveKerberos) {            LOG.info("krb5.conf路径:【{}】 \n keytab路径:【{}】 \n principal:【{}】\n",                    taskConfig.getString(Key. KRB5_CONF_FILE_PATH),                    taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH),                    taskConfig.getString(Key.KERBEROS_PRINCIPAL));            this.krb5ConfPath = taskConfig.getString(Key. KRB5_CONF_FILE_PATH);            this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);            this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);            LOG.info("检测到kerberos认证,正在进行认证");        }        System.setProperty("java.security.krb5.conf",krb5ConfPath);        System.setProperty("sun.security.krb5.Config",krb5ConfPath);        refreshConfig();        this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath,hadoopConf,this.krb5ConfPath);        conf = new JobConf(hadoopConf);        try {            fileSystem = FileSystem.get(conf);        }        catch (IOException e) {            String message = String.format("获取FileSystem时发生网络IO异常,请检查您的网络是否正常!HDFS地址:[message:defaultFS = %s]",                    defaultFS);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        catch (Exception e) {            String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]",                    "message:defaultFS =" + defaultFS);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        if (null == fileSystem) {            String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [message:defaultFS = %s]",                    defaultFS);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, message);        }        return fileSystem;    }    /** 刷新krb内容信息 */    public static void refreshConfig() {        try {            sun.security.krb5.Config.refresh();            Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm");            defaultRealmField.setAccessible(true);            defaultRealmField.set(                    null,                    org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm());            // reload java.security.auth.login.config            javax.security.auth.login.Configuration.setConfiguration(null);        } catch (Exception e) {            LOG.warn(                    "resetting default realm failed, current default realm will still be used.", e);        }    }    public   void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5ConfPath) {        hadoopConf.set("hadoop.security.authentication", "kerberos");        hadoopConf.set("hive.security.authentication", "kerberos");        hadoopConf.set("hadoop.security.authorization", "true");        hadoopConf.set("dfs.permissions","false");        hadoopConf.set("hadoop.security.auth_to_local","RULE:[2:$1@$0](.*@CDHDEV.COM)s/.*/hadoop/ \n" +                "        DEFAULT");        if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {            UserGroupInformation.setConfiguration(hadoopConf);            KerberosName.resetDefaultRealm();            try {                LOG.info("开始认证");                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);            }            catch (Exception e) {                LOG.info("kerberos认证失败");                String message = String.format("kerberos认证失败,请检查 " +                                "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]",                        kerberosKeytabFilePath, kerberosPrincipal);                e.printStackTrace();                throw DataXException.asDataXException(HdfsWriterErrorCode.KERBEROS_LOGIN_ERROR, message, e);            }        }    }    /**     * 获取指定目录下的文件列表     *     * @param dir 需要搜索的目录     * @return 文件数组,文件是全路径,     * eg:hdfs://10.101.204.12:9000/user/hive/warehouse/writer.db/text/test.textfile     */    public Path[] hdfsDirList(String dir)    {        Path path = new Path(dir);        Path[] files;        try {            FileStatus[] status = fileSystem.listStatus(path);            files = new Path[status.length];            for (int i = 0; i < status.length; i++) {                files[i] = status[i].getPath();            }        }        catch (IOException e) {            String message = String.format("获取目录[%s]文件列表时发生网络IO异常,请检查您的网络是否正常!", dir);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        return files;    }//    public boolean isPathExists(String filePath) {////        Path path = new Path(filePath);//        boolean exist;//        try {//            exist = fileSystem.exists(path);//        }//        catch (IOException e) {//            String message = String.format("判断文件路径[%s]是否存在时发生网络IO异常,请检查您的网络是否正常!",//                    "message:filePath =" + filePath);//            e.printStackTrace();//            LOG.error(message);//            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);//        }//        return exist;//    }    public boolean isPathDir(String filePath)    {        Path path = new Path(filePath);        boolean isDir;        try {            isDir = fileSystem.getFileStatus(path).isDirectory();        }        catch (IOException e) {            String message = String.format("判断路径[%s]是否是目录时发生网络IO异常,请检查您的网络是否正常!", filePath);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        return isDir;    }    public void deleteFilesFromDir(Path dir)    {        try {            final RemoteIterator files = fileSystem.listFiles(dir, false);            while (files.hasNext()) {                final LocatedFileStatus next = files.next();                fileSystem.deleteOnExit(next.getPath());            }        }        catch (FileNotFoundException fileNotFoundException) {            throw new DataXException(HdfsWriterErrorCode.FILE_NOT_FOUND, fileNotFoundException.getMessage());        }        catch (IOException ioException) {            throw new DataXException(HdfsWriterErrorCode.IO_ERROR, ioException.getMessage());        }    }    public void deleteDir(Path path)    {        LOG.info("start delete tmp dir [{}] .", path);        try {            if (fileSystem.exists(path)) {                fileSystem.delete(path, true);            }        }        catch (Exception e) {            LOG.error("删除临时目录[{}]时发生IO异常,请检查您的网络是否正常!", path);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        LOG.info("finish delete tmp dir [{}] .", path);    }    /**     * move all files in sourceDir to targetDir     *     * @param sourceDir the source directory     * @param targetDir the target directory     */    public void moveFilesToDest(Path sourceDir, Path targetDir)    {        try {            final FileStatus[] fileStatuses = fileSystem.listStatus(sourceDir);            for (FileStatus file : fileStatuses) {                if (file.isFile() && file.getLen() > 0) {                    LOG.info("start move file [{}] to dir [{}].", file.getPath(), targetDir.getName());                    fileSystem.rename(file.getPath(), new Path(targetDir, file.getPath().getName()));                }            }        }        catch (IOException e) {            throw DataXException.asDataXException(HdfsWriterErrorCode.IO_ERROR, e);        }        LOG.info("finish move file(s).");    }    //关闭FileSystem    public void closeFileSystem()    {        try {            fileSystem.close();        }        catch (IOException e) {            LOG.error("关闭FileSystem时发生IO异常,请检查您的网络是否正常!");            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }    }    // 写text file类型文件    public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,                                   TaskPluginCollector taskPluginCollector)    {        char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER);        List columns = config.getListConfiguration(Key.COLUMN);        String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase().trim();        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");        String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0";        conf.set(JobContext.TASK_ATTEMPT_ID, attempt);        if (!"NONE".equals(compress)) {            // fileName must remove suffix, because the FileOutputFormat will add suffix            fileName = fileName.substring(0, fileName.lastIndexOf("."));            Class codecClass = getCompressCodec(compress);            if (null != codecClass) {                FileOutputFormat.setOutputCompressorClass(conf, codecClass);            }        }        Path outputPath = new Path(fileName);        FileOutputFormat.setOutputPath(conf, outputPath);        FileOutputFormat.setWorkOutputPath(conf, outputPath);        try {            RecordWriter writer = new TextOutputFormat()                    .getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL);            Record record;            while ((record = lineReceiver.getFromReader()) != null) {                MutablePair transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector);                if (Boolean.FALSE.equals(transportResult.getRight())) {                    writer.write(NullWritable.get(), transportResult.getLeft());                }            }            writer.close(Reporter.NULL);        }        catch (Exception e) {            LOG.error("写文件文件[{}]时发生IO异常,请检查您的网络是否正常!", fileName);            Path path = new Path(fileName);            deleteDir(path.getParent());            throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);        }    }    // compress 已经转为大写    public Class getCompressCodec(String compress)    {        compress = compress.toUpperCase();        Class codecClass;        switch (compress) {            case "GZIP":                codecClass = org.apache.hadoop.io.compress.GzipCodec.class;                break;            case "BZIP2":                codecClass = org.apache.hadoop.io.compress.BZip2Codec.class;                break;            case "SNAPPY":                codecClass = org.apache.hadoop.io.compress.SnappyCodec.class;                break;            case "LZ4":                codecClass = org.apache.hadoop.io.compress.Lz4Codec.class;                break;            case "ZSTD":                codecClass = org.apache.hadoop.io.compress.ZStandardCodec.class;                break;            case "DEFLATE":            case "ZLIB":                codecClass = org.apache.hadoop.io.compress.DeflateCodec.class;                break;            default:                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                        String.format("目前不支持您配置的 compress 模式 : [%s]", compress));        }        return codecClass;    }    /*     * 写Parquet file类型文件     * 一个parquet文件的schema类似如下:     * {     *    "type": "record",     *    "name": "testFile",     *    "doc":  "test records",     *    "fields":     *      [{     *        "name": "id",     *        "type": ["null", "int"]     *     *      },     *      {     *        "name": "empName",     *        "type": "string"     *      }     *    ]     *  }     * "null" 表示该字段允许为空     */    public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,                                      TaskPluginCollector taskPluginCollector)    {        List columns = config.getListConfiguration(Key.COLUMN);        String compress = config.getString(Key.COMPRESS, "UNCOMPRESSED").toUpperCase().trim();        if ("NONE".equals(compress)) {            compress = "UNCOMPRESSED";        }        // construct parquet schema        Schema schema = generateParquetSchema(columns);        Path path = new Path(fileName);        LOG.info("write parquet file {}", fileName);        CompressionCodecName codecName = CompressionCodecName.fromConf(compress);        GenericData decimalSupport = new GenericData();        decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());        try (ParquetWriter writer = AvroParquetWriter                .builder(path)                .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)                .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)                .withSchema(schema)                .withConf(hadoopConf)                .withCompressionCodec(codecName)                .withValidation(false)                .withDictionaryEncoding(false)                .withDataModel(decimalSupport)                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)                .build()) {            Record record;            while ((record = lineReceiver.getFromReader()) != null) {                GenericRecordBuilder builder = new GenericRecordBuilder(schema);                GenericRecord transportResult = transportParRecord(record, columns, taskPluginCollector, builder);                writer.write(transportResult);            }        }        catch (Exception e) {            LOG.error("写文件文件[{}]时发生IO异常,请检查您的网络是否正常!", fileName);            deleteDir(path.getParent());            throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);        }    }    private Schema generateParquetSchema(List columns)    {        List fields = new ArrayList<>();        String fieldName;        String type;        List unionList = new ArrayList<>(2);        for (Configuration column : columns) {            unionList.clear();            fieldName = column.getString(Key.NAME);            type = column.getString(Key.TYPE).trim().toUpperCase();            unionList.add(Schema.create(Schema.Type.NULL));            switch (type) {                case "DECIMAL":                    Schema dec = LogicalTypes                            .decimal(column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION),                                    column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE))                            .addToSchema(Schema.createFixed(fieldName, null, null, 16));                    unionList.add(dec);                    break;                case "DATE":                    Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));                    unionList.add(date);                    break;                case "TIMESTAMP":                    Schema ts = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));                    unionList.add(ts);                    break;                case "UUID":                    Schema uuid = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));                    unionList.add(uuid);                    break;                case "BINARY":                    unionList.add(Schema.create(Schema.Type.BYTES));                    break;                default:                    // other types                    unionList.add(Schema.create(Schema.Type.valueOf(type)));                    break;            }            fields.add(new Schema.Field(fieldName, Schema.createUnion(unionList), null, null));        }        Schema schema = Schema.createRecord("dataxTestParquet", null, "parquet", false);        schema.setFields(fields);        return schema;    }    private void setRow(VectorizedRowBatch batch, int row, Record record, List columns,                        TaskPluginCollector taskPluginCollector)    {        for (int i = 0; i < columns.size(); i++) {            Configuration eachColumnConf = columns.get(i);            String type = eachColumnConf.getString(Key.TYPE).trim().toUpperCase();            SupportHiveDataType columnType;            ColumnVector col = batch.cols[i];            if (type.startsWith("DECIMAL")) {                columnType = SupportHiveDataType.DECIMAL;            }            else {                columnType = SupportHiveDataType.valueOf(type);            }            if (record.getColumn(i) == null || record.getColumn(i).getRawData() == null) {                col.isNull[row] = true;                col.noNulls = false;                continue;            }            try {                switch (columnType) {                    case TINYINT:                    case SMALLINT:                    case INT:                    case BIGINT:                    case BOOLEAN:                    case DATE:                        ((LongColumnVector) col).vector[row] = record.getColumn(i).asLong();                        break;                    case FLOAT:                    case DOUBLE:                        ((DoubleColumnVector) col).vector[row] = record.getColumn(i).asDouble();                        break;                    case DECIMAL:                        HiveDecimalWritable hdw = new HiveDecimalWritable();                        hdw.set(HiveDecimal.create(record.getColumn(i).asBigDecimal())                                .setScale(eachColumnConf.getInt(Key.SCALE), HiveDecimal.ROUND_HALF_UP));                        ((DecimalColumnVector) col).set(row, hdw);                        break;                    case TIMESTAMP:                        ((TimestampColumnVector) col).set(row, java.sql.Timestamp.valueOf(record.getColumn(i).asString()));                        break;                    case STRING:                    case VARCHAR:                    case CHAR:                        byte[] buffer;                        if (record.getColumn(i).getType() == Column.Type.BYTES) {                            //convert bytes to base64 string                            buffer = Base64.getEncoder().encode((byte[]) record.getColumn(i).getRawData());                        }                        else {                            buffer = record.getColumn(i).getRawData().toString().getBytes(StandardCharsets.UTF_8);                        }                        ((BytesColumnVector) col).setRef(row, buffer, 0, buffer.length);                        break;                    case BINARY:                        byte[] content = (byte[]) record.getColumn(i).getRawData();                        ((BytesColumnVector) col).setRef(row, content, 0, content.length);                        break;                    default:                        throw DataXException                                .asDataXException(                                        HdfsWriterErrorCode.ILLEGAL_VALUE,                                        String.format("您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. " +                                                        "请修改表中该字段的类型或者不同步该字段.",                                                eachColumnConf.getString(Key.NAME),                                                eachColumnConf.getString(Key.TYPE)));                }            }            catch (Exception e) {                taskPluginCollector.collectDirtyRecord(record, e.getMessage());                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                        String.format("设置Orc数据行失败,源列类型: %s, 目的原始类型:%s, 目的列Hive类型: %s, 字段名称: %s, 源值: %s, 错误根源:%n%s",                                record.getColumn(i).getType(), columnType, eachColumnConf.getString(Key.TYPE),                                eachColumnConf.getString(Key.NAME),                                record.getColumn(i).getRawData(), e));            }        }    }    /*     * 写orcfile类型文件     */    public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,                                  TaskPluginCollector taskPluginCollector)    {        List columns = config.getListConfiguration(Key.COLUMN);        String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase();        StringJoiner joiner = new StringJoiner(",");        for (Configuration column : columns) {            if ("decimal".equals(column.getString(Key.TYPE))) {                joiner.add(String.format("%s:%s(%s,%s)", column.getString(Key.NAME), "decimal",                        column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION),                        column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE)));            }            else if ("date".equalsIgnoreCase(column.getString(Key.TYPE))) {                joiner.add(String.format("%s:bigint", column.getString(Key.NAME)));            }            else {                joiner.add(String.format("%s:%s", column.getString(Key.NAME), column.getString(Key.TYPE)));            }        }        TypeDescription schema = TypeDescription.fromString("struct<" + joiner + ">");        try (Writer writer = OrcFile.createWriter(new Path(fileName),                OrcFile.writerOptions(conf)                        .setSchema(schema)                        .compress(CompressionKind.valueOf(compress)))) {            Record record;            VectorizedRowBatch batch = schema.createRowBatch(1024);            while ((record = lineReceiver.getFromReader()) != null) {                int row = batch.size++;                setRow(batch, row, record, columns, taskPluginCollector);                if (batch.size == batch.getMaxSize()) {                    writer.addRowBatch(batch);                    batch.reset();                }            }            if (batch.size != 0) {                writer.addRowBatch(batch);                batch.reset();            }        }        catch (IOException e) {            LOG.error("写文件文件[{}]时发生IO异常,请检查您的网络是否正常!", fileName);            Path path = new Path(fileName);            deleteDir(path.getParent());            throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);        }    }}

HdfsWriter

package com.alibaba.datax.plugin.writer.hdfswriter;import com.alibaba.datax.common.base.Constant;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordReceiver;import com.alibaba.datax.common.spi.Writer;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.storage.util.FileHelper;import org.apache.commons.io.Charsets;import org.apache.commons.lang3.StringUtils;import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil;import com.alibaba.datax.unstructuredstorage.util.HdfsUtil;import org.apache.commons.lang3.Validate;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.orc.CompressionKind;import org.apache.parquet.hadoop.metadata.CompressionCodecName;import org.apache.parquet.schema.MessageTypeParser;import org.apache.parquet.schema.OriginalType;import org.apache.parquet.schema.PrimitiveType;import org.apache.parquet.schema.Types;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.nio.file.Paths;import java.util.*;import java.util.regex.Matcher;import java.util.regex.Pattern;import java.util.stream.Collectors;public class HdfsWriter        extends Writer{    public static class Job            extends Writer.Job    {        private static final Logger LOG = LoggerFactory.getLogger(Job.class);        // 写入文件的临时目录,完成写入后,该目录需要删除        private String tmpStorePath;        private Configuration writerSliceConfig = null;        private String defaultFS;        private String path;        private String fileName;        private String writeMode;        private HdfsHelper hdfsHelper = null;        private FileSystem filsSystem;        public static final Set SUPPORT_FORMAT = new HashSet<>(Arrays.asList("ORC", "PARQUET", "TEXT"));        @Override        public void init()        {            this.writerSliceConfig = this.getPluginJobConf();            this.validateParameter();            hdfsHelper = new HdfsHelper();            filsSystem = hdfsHelper.getFileSystem(defaultFS, this.writerSliceConfig);        }        private void validateParameter()        {            this.defaultFS = this.writerSliceConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsWriterErrorCode.REQUIRED_VALUE);            //fileType check            String fileType = this.writerSliceConfig.getNecessaryValue(Key.FILE_TYPE, HdfsWriterErrorCode.REQUIRED_VALUE).toUpperCase();            if (!SUPPORT_FORMAT.contains(fileType)) {                String message = String.format("[%s] 文件格式不支持, HdfsWriter插件目前仅支持 %s, ", fileType, SUPPORT_FORMAT);                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);            }            //path            this.path = this.writerSliceConfig.getNecessaryValue(Key.PATH, HdfsWriterErrorCode.REQUIRED_VALUE);            if (!path.startsWith("/")) {                String message = String.format("请检查参数path:[%s],需要配置为绝对路径", path);                LOG.error(message);                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);            }            if (path.contains("*") || path.contains("?")) {                String message = String.format("请检查参数path:[%s],不能包含*,?等特殊字符", path);                LOG.error(message);                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);            }            //fileName            this.fileName = this.writerSliceConfig.getNecessaryValue(Key.FILE_NAME, HdfsWriterErrorCode.REQUIRED_VALUE);            //columns check            List columns = this.writerSliceConfig.getListConfiguration(Key.COLUMN);            if (null == columns || columns.isEmpty()) {                throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, "您需要指定 columns");            }            else {                boolean rewriteFlag = false;                for (int i = 0; i < columns.size(); i++) {                    Configuration eachColumnConf = columns.get(i);                    eachColumnConf.getNecessaryValue(Key.NAME, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE);                    eachColumnConf.getNecessaryValue(Key.TYPE, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE);                    if (eachColumnConf.getString(Key.TYPE).toUpperCase().startsWith("DECIMAL")) {                        String type = eachColumnConf.getString(Key.TYPE);                        eachColumnConf.set(Key.TYPE, "decimal");                        eachColumnConf.set(Key.PRECISION, getDecimalPrecision(type));                        eachColumnConf.set(Key.SCALE, getDecimalScale(type));                        columns.set(i, eachColumnConf);                        rewriteFlag = true;                    }                }                if (rewriteFlag) {                    this.writerSliceConfig.set(Key.COLUMN, columns);                }            }            //writeMode check            this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE);            if (!Constant.SUPPORTED_WRITE_MODE.contains(writeMode)) {                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                        String.format("仅支持append, nonConflict, overwrite三种模式, 不支持您配置的 writeMode 模式 : [%s]",                                writeMode));            }            if ("TEXT".equals(fileType)) {                //fieldDelimiter check                String fieldDelimiter = this.writerSliceConfig.getString(Key.FIELD_DELIMITER, null);                if (StringUtils.isEmpty(fieldDelimiter)) {                    throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE,                            String.format("写TEXT格式文件,必须提供有效的[%s] 参数.", Key.FIELD_DELIMITER));                }                if (1 != fieldDelimiter.length()) {                    // warn: if it has, length must be one                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                            String.format("仅仅支持单字符切分, 您配置的切分为 : [%s]", fieldDelimiter));                }            }            //compress check            String compress = this.writerSliceConfig.getString(Key.COMPRESS, "NONE").toUpperCase().trim();            if ("ORC".equals(fileType)) {                try {                    CompressionKind.valueOf(compress);                }                catch (IllegalArgumentException e) {                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                            String.format("目前ORC 格式仅支持 %s 压缩,不支持您配置的 compress 模式 : [%s]",                                    Arrays.toString(CompressionKind.values()), compress));                }            }            if ("PARQUET".equals(fileType)) {                // parquet 默认的非压缩标志是 UNCOMPRESSED ,而不是常见的 NONE,这里统一为 NONE                if ("NONE".equals(compress)) {                    compress = "UNCOMPRESSED";                }                try {                    CompressionCodecName.fromConf(compress);                }                catch (Exception e) {                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                            String.format("目前PARQUET 格式仅支持 %s 压缩, 不支持您配置的 compress 模式 : [%s]",                                    Arrays.toString(CompressionCodecName.values()), compress));                }            }            boolean haveKerberos = this.writerSliceConfig.getBool(Key.HAVE_KERBEROS, false);            if (haveKerberos) {                this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsWriterErrorCode.REQUIRED_VALUE);                this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsWriterErrorCode.REQUIRED_VALUE);            }            // encoding check            String encoding = this.writerSliceConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);            try {                encoding = encoding.trim();                this.writerSliceConfig.set(Key.ENCODING, encoding);                Charsets.toCharset(encoding);            }            catch (Exception e) {                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                        String.format("不支持您配置的编码格式:[%s]", encoding), e);            }        }        public boolean isPathExists(String filePath) {            Path path = new Path(filePath);            boolean exist;            try {                exist = hdfsHelper.getFileSystem(this.defaultFS,this.writerSliceConfig).exists(path);            }            catch (IOException e) {                String message = String.format("判断文件路径[%s]是否存在时发生网络IO异常,请检查您的网络是否正常!",                        "message:filePath =" + filePath);                e.printStackTrace();                LOG.error(message);                throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);            }            return exist;        }        @Override        public void prepare()        {            //临时存放路径            this.tmpStorePath = buildTmpFilePath(path);            //若路径已经存在,检查path是否是目录            if (isPathExists(path)) {                if (!hdfsHelper.isPathDir(path)) {                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                            String.format("您配置的path: [%s] 不是一个合法的目录, 请您注意文件重名, 不合法目录名等情况.", path));                }                //根据writeMode对目录下文件进行处理                // 写入之前,当前目录下已有的文件,根据writeMode判断是否覆盖                Path[] existFilePaths = hdfsHelper.hdfsDirList(path);                boolean isExistFile = existFilePaths.length > 0;                if ("append".equals(writeMode)) {                    LOG.info("由于您配置了writeMode = append, 写入前不做清理工作, [{}] 目录下写入相应文件名前缀 [{}] 的文件", path, fileName);                }                else if ("nonConflict".equals(writeMode) && isExistFile) {                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                            String.format("由于您配置了writeMode= nonConflict,但您配置的 path: [%s] 目录不为空, 下面存在其他文件或文件夹: %s",                                    path, String.join(",", Arrays.stream(existFilePaths).map(Path::getName).collect(Collectors.toSet()))));                }            }            else {                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,                        String.format("您配置的path: [%s] 不存在, 请先创建目录.", path));            }        }        @Override        public void post()        {            if ("overwrite".equals(writeMode)) {                hdfsHelper.deleteFilesFromDir(new Path(path));            }            hdfsHelper.moveFilesToDest(new Path(this.tmpStorePath), new Path(this.path));            // 删除临时目录            hdfsHelper.deleteDir(new Path(tmpStorePath));        }        @Override        public void destroy()        {            hdfsHelper.closeFileSystem();        }        @Override        public List split(int mandatoryNumber)        {            LOG.info("begin splitting ...");            List writerSplitConfigs = new ArrayList<>();            String filePrefix = fileName;            //获取该路径下的所有已有文件列表            Set allFiles = Arrays.stream(hdfsHelper.hdfsDirList(path)).map(Path::toString).collect(Collectors.toSet());            String fileType = this.writerSliceConfig.getString(Key.FILE_TYPE, "txt").toLowerCase();            String tmpFullFileName;            String endFullFileName;            for (int i = 0; i < mandatoryNumber; i++) {                // handle same file name                Configuration splitTaskConfig = this.writerSliceConfig.clone();                tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType);                endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType);                // 如果文件已经存在,则重新生成文件名                while (allFiles.contains(endFullFileName)) {                    tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType);                    endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType);                }                allFiles.add(endFullFileName);                splitTaskConfig.set(Key.FILE_NAME, tmpFullFileName);                LOG.info("split wrote file name:[{}]", tmpFullFileName);                writerSplitConfigs.add(splitTaskConfig);            }            LOG.info("end splitting.");            return writerSplitConfigs;        }        /**         * 创建临时目录         * 在给定目录的下,创建一个已点开头,uuid为名字的文件夹,用于临时存储写入的文件         *         * @param userPath hdfs path         * @return temporary path         */        private String buildTmpFilePath(String userPath)        {            String tmpDir;            String tmpFilePath;          //  while (true) {                tmpDir = "." + UUID.randomUUID().toString().replace("-", "_");                tmpFilePath = Paths.get(userPath, tmpDir).toString();                if (isPathExists(tmpFilePath)) {                    return tmpFilePath;                } else {                    return null;                }            //}        }        /**         * get decimal type precision         * if not specified, use DECIMAL_DEFAULT_PRECISION as default         * example:         * 
         *  decimal -> 38         *  decimal(10) -> 10         *  
* * @param type decimal type including precision and scale (if present) * @return decimal precision */ private static int getDecimalPrecision(String type) { if (!type.contains("(")) { return Constant.DEFAULT_DECIMAL_MAX_PRECISION; } else { String regEx = "[^0-9]"; Pattern p = Pattern.compile(regEx); Matcher m = p.matcher(type); return Integer.parseInt(m.replaceAll(" ").trim().split(" ")[0]); } } /** * get decimal type scale * if precision is not present, return DECIMAL_DEFAULT_SCALE * if precision is present and not specify scale, return 0 * example: *
         *  decimal -> 10         *  decimal(8) -> 0         *  decimal(8,2) -> 2         *  
* * @param type decimal type string, including precision and scale (if present) * @return decimal scale */ private static int getDecimalScale(String type) { if (!type.contains("(")) { return Constant.DEFAULT_DECIMAL_MAX_SCALE; } if (!type.contains(",")) { return 0; } else { return Integer.parseInt(type.split(",")[1].replace(")", "").trim()); } } public void unitizeParquetConfig(Configuration writerSliceConfig) { String parquetSchema = writerSliceConfig.getString(Key.PARQUET_SCHEMA); if (StringUtils.isNotBlank(parquetSchema)) { LOG.info("parquetSchema has config. use parquetSchema:\n{}", parquetSchema); return; } List columns = writerSliceConfig.getListConfiguration(Key.COLUMN); if (columns == null || columns.isEmpty()) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_BLANK,"parquetSchema or column can"t be blank!"); } parquetSchema = generateParquetSchemaFromColumn(columns); // 为了兼容历史逻辑,对之前的逻辑做保留,但是如果配置的时候报错,则走新逻辑 try { MessageTypeParser.parseMessageType(parquetSchema); } catch (Throwable e) { LOG.warn("The generated parquetSchema {} is illegal, try to generate parquetSchema in another way", parquetSchema); parquetSchema = HdfsHelper.generateParquetSchemaFromColumnAndType(columns); LOG.info("The last generated parquet schema is {}", parquetSchema); } writerSliceConfig.set(Key.PARQUET_SCHEMA, parquetSchema); LOG.info("DataxParquetMode use default fields."); writerSliceConfig.set(Key.PARQUET_MODE, "fields"); } private String generateParquetSchemaFromColumn(List columns) { StringBuffer parquetSchemaStringBuffer = new StringBuffer(); parquetSchemaStringBuffer.append("message m {"); for (Configuration column: columns) { String name = column.getString("name"); Validate.notNull(name, "column.name can"t be null"); String type = column.getString("type"); Validate.notNull(type, "column.type can"t be null"); String parquetColumn = String.format("optional %s %s;", type, name); parquetSchemaStringBuffer.append(parquetColumn); } parquetSchemaStringBuffer.append("}"); String parquetSchema = parquetSchemaStringBuffer.toString(); LOG.info("generate parquetSchema:\n{}", parquetSchema); return parquetSchema; } } public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration writerSliceConfig; private String fileType; private String fileName; private HdfsHelper hdfsHelper = null; @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); String defaultFS = this.writerSliceConfig.getString(Key.DEFAULT_FS); this.fileType = this.writerSliceConfig.getString(Key.FILE_TYPE).toUpperCase(); hdfsHelper = new HdfsHelper(); hdfsHelper.getFileSystem(defaultFS, writerSliceConfig); //得当的已经是绝对路径,eg:/user/hive/warehouse/writer.db/text/test.snappy this.fileName = this.writerSliceConfig.getString(Key.FILE_NAME); } @Override public void prepare() { // } @Override public void startWrite(RecordReceiver lineReceiver) { LOG.info("write to file : [{}]", this.fileName); if ("TEXT".equals(fileType)) { //写TEXT FILE hdfsHelper.textFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } else if ("ORC".equals(fileType)) { //写ORC FILE hdfsHelper.orcFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } else if ("PARQUET".equals(fileType)) { //写Parquet FILE hdfsHelper.parquetFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } LOG.info("end do write"); } @Override public void post() { // } @Override public void destroy() { // } }}

以上类需修改或增加方法,以支持Parquet文件的读写,当前代码已在生产环境稳定运行一年有余,未遇到报错问题,大家如有问题可联系我。

关键词: 文件类型 是否正常 字段类型