最新要闻
- 金色枫叶是什么意思?金色枫叶相关的成语有哪些?
- 无人生还的凶手是谁?无人生还的人物介绍
- 逆生长是什么意思?人怎么才能逆生长?
- 快讯:“小行星”撞地球今日中午上演!法国、英国、荷兰均肉眼可见
- 十大全系标配!长安逸达一出场 就把压力给到合资了
- 【世界速看料】别迷信日本制造!董明珠:格力中央空调国内第一创历史 日立大金都不行
- 国产最帅电动猎装车成了!极氪已交付8万台:完成7.5亿美元A轮融资 杀向全球前三
- 黄旭东评价李培楠《星际争霸2》夺冠:这一刻等了20年!
- 速讯:Redmi K60官宣降价:2999元普及512GB存储、老用户保价+送手环
- 不花钱每天能跑12公里 印度首款太阳能汽车亮相:像是“三蹦子”
- 世界即时:ChatGPT惊动谷歌创始人:罕见出山检查Bard AI代码数据
- 热议:曝宁德时代将赴美建厂:福特出地出厂房 “宁王”出技术
- 当前速读:实测用微软ChatGPT写求职信:“不道德”、被拒绝
- 当前观点:Opera新版本集成ChatGPT:一键生成网页内容摘要
- 全球信息:车圈美女测2023新款比亚迪秦PLUS DM-i:1箱油跨8省1300km 油耗3.49L/100km
- 多家快递回应站点派件延迟问题:寄送时效要看站点运力恢复如何
广告
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
DataX二次开发——HdfsReader和HdfsWriter插件增加parquet文件读写
(资料图片)
一、研发背景
DataX官方开源的版本支持HDFS文件的读写,但是截止目前,并没有支持Parquet文件的读写,得益于DataX出色的数据同步性能,去年公司的项目大部分采用了DataX作为数据同步工具,但是从CDH集群同步Parquet或者将其他数据源的数据以Parquet格式写入HDFS,这两个常用场景没有进行支持。因此只能自己动手,补充HdfsReader和HdfsWriter插件,以支持Parquet文件的读写。
二、HdfsReader插件
本插件比较简单,一共五个类,具体类名及对应修改项如下:
- DFSUtil:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
- HdfsConstant:增加Parquet文件类的枚举项。
- HdfsReader:增加判断是否配置为Parquet文件类型的判断条件分支。
- HdfsReaderErrorCode:无需更改。
- Type:无需更改。
按需修改其中四个类即可,具体代码如下:
DFSUtil
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.google.common.primitives.Ints;import com.google.common.primitives.Longs;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.element.BoolColumn;import com.alibaba.datax.common.element.BytesColumn;import com.alibaba.datax.common.element.Column;import com.alibaba.datax.common.element.ColumnEntry;import com.alibaba.datax.common.element.DateColumn;import com.alibaba.datax.common.element.DoubleColumn;import com.alibaba.datax.common.element.LongColumn;import com.alibaba.datax.common.element.Record;import com.alibaba.datax.common.element.StringColumn;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordSender;import com.alibaba.datax.common.plugin.TaskPluginCollector;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.storage.reader.StorageReaderErrorCode;import com.alibaba.datax.storage.reader.StorageReaderUtil;import org.apache.avro.Conversions;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.hadoop.hive.ql.io.RCFile;import org.apache.hadoop.hive.ql.io.RCFileRecordReader;import org.apache.hadoop.hive.ql.io.orc.OrcFile;import org.apache.hadoop.hive.ql.io.orc.Reader;import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.security.UserGroupInformation;import org.apache.hadoop.util.ReflectionUtils;import org.apache.orc.TypeDescription;import org.apache.parquet.avro.AvroParquetReader;import org.apache.parquet.example.data.Group;import org.apache.parquet.hadoop.ParquetReader;import org.apache.parquet.hadoop.example.GroupReadSupport;import org.apache.parquet.hadoop.util.HadoopInputFile;import org.apache.parquet.io.api.Binary;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.io.InputStream;import java.math.BigDecimal;import java.math.RoundingMode;import java.nio.ByteBuffer;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Arrays;import java.util.Date;import java.util.HashSet;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;import static com.alibaba.datax.common.base.Key.COLUMN;import static com.alibaba.datax.common.base.Key.NULL_FORMAT;public class DFSUtil{ private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class); // the offset of julian, 2440588 is 1970/1/1 private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588; private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; private final org.apache.hadoop.conf.Configuration hadoopConf; private final boolean haveKerberos; private final HashSetsourceHDFSAllFilesList = new HashSet<>(); private String specifiedFileType = null; private String kerberosKeytabFilePath; private String kerberosPrincipal; public DFSUtil(Configuration taskConfig) { hadoopConf = new org.apache.hadoop.conf.Configuration(); //io.file.buffer.size 性能参数 //http://blog.csdn.net/yangjl38/article/details/7583374 Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG); JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG)); if (null != hadoopSiteParams) { Set paramKeys = hadoopSiteParams.getKeys(); for (String each : paramKeys) { hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each)); } } hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS)); //是否有Kerberos认证 this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH); this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL); this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos"); } this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath); LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf)); } private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) { if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { UserGroupInformation.setConfiguration(hadoopConf); try { UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); } catch (Exception e) { String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确", kerberosKeytabFilePath, kerberosPrincipal); throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e); } } } /** * 获取指定路径列表下符合条件的所有文件的绝对路径 * * @param srcPaths 路径列表 * @param specifiedFileType 指定文件类型 * @return set of string */ public Set getAllFiles(List srcPaths, String specifiedFileType) { this.specifiedFileType = specifiedFileType; if (!srcPaths.isEmpty()) { for (String eachPath : srcPaths) { LOG.info("get HDFS all files in path = [{}]", eachPath); getHDFSAllFiles(eachPath); } } return sourceHDFSAllFilesList; } private void addSourceFileIfNotEmpty(FileStatus f) { if (f.isFile()) { String filePath = f.getPath().toString(); if (f.getLen() > 0) { addSourceFileByType(filePath); } else { LOG.warn("文件[{}]长度为0,将会跳过不作处理!", filePath); } } } public void getHDFSAllFiles(String hdfsPath) { try { FileSystem hdfs = FileSystem.get(hadoopConf); //判断hdfsPath是否包含正则符号 if (hdfsPath.contains("*") || hdfsPath.contains("?")) { Path path = new Path(hdfsPath); FileStatus[] stats = hdfs.globStatus(path); for (FileStatus f : stats) { if (f.isFile()) { addSourceFileIfNotEmpty(f); } else if (f.isDirectory()) { getHDFSAllFilesNORegex(f.getPath().toString(), hdfs); } } } else { getHDFSAllFilesNORegex(hdfsPath, hdfs); } } catch (IOException e) { String message = String.format("无法读取路径[%s]下的所有文件,请确认您的配置项fs.defaultFS, path的值是否正确," + "是否有读写权限,网络是否已断开!", hdfsPath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e); } } private void getHDFSAllFilesNORegex(String path, FileSystem hdfs) throws IOException { // 获取要读取的文件的根目录 Path listFiles = new Path(path); // If the network disconnected, this method will retry 45 times // each time the retry interval for 20 seconds // 获取要读取的文件的根目录的所有二级子文件目录 FileStatus[] stats = hdfs.listStatus(listFiles); for (FileStatus f : stats) { // 判断是不是目录,如果是目录,递归调用 if (f.isDirectory()) { LOG.info("[{}] 是目录, 递归获取该目录下的文件", f.getPath()); getHDFSAllFilesNORegex(f.getPath().toString(), hdfs); } else if (f.isFile()) { addSourceFileIfNotEmpty(f); } else { String message = String.format("该路径[%s]文件类型既不是目录也不是文件,插件自动忽略。", f.getPath()); LOG.info(message); } } } // 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList private void addSourceFileByType(String filePath) { // 检查file的类型和用户配置的fileType类型是否一致 boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType); if (isMatchedFileType) { String msg = String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType); LOG.info(msg); sourceHDFSAllFilesList.add(filePath); } else { String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," + "请确认您配置的目录下面所有文件的类型均为[%s]" , filePath, this.specifiedFileType); LOG.error(message); throw DataXException.asDataXException( HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message); } } public InputStream getInputStream(String filepath) { InputStream inputStream; Path path = new Path(filepath); try { FileSystem fs = FileSystem.get(hadoopConf); //If the network disconnected, this method will retry 45 times //each time the retry interval for 20 seconds inputStream = fs.open(path); return inputStream; } catch (IOException e) { String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filepath, filepath); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e); } } public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath); Path seqFilePath = new Path(sourceSequenceFilePath); try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf, SequenceFile.Reader.file(seqFilePath))) { //获取SequenceFile.Reader实例 //获取key 与 value Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf); Text value = new Text(); while (reader.next(key, value)) { if (StringUtils.isNotBlank(value.toString())) { StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString()); } } } catch (Exception e) { String message = String.format("SequenceFile.Reader读取文件[%s]时出错", sourceSequenceFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e); } } public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read rc-file [{}].", sourceRcFilePath); List column = StorageReaderUtil .getListColumnEntry(readerSliceConfig, COLUMN); // warn: no default value "\N" String nullFormat = readerSliceConfig.getString(NULL_FORMAT); Path rcFilePath = new Path(sourceRcFilePath); RCFileRecordReader recordReader = null; try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) { long fileLen = fs.getFileStatus(rcFilePath).getLen(); FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null); recordReader = new RCFileRecordReader(hadoopConf, split); LongWritable key = new LongWritable(); BytesRefArrayWritable value = new BytesRefArrayWritable(); Text txt = new Text(); while (recordReader.next(key, value)) { String[] sourceLine = new String[value.size()]; txt.clear(); for (int i = 0; i < value.size(); i++) { BytesRefWritable v = value.get(i); txt.set(v.getData(), v.getStart(), v.getLength()); sourceLine[i] = txt.toString(); } StorageReaderUtil.transportOneRecord(recordSender, column, sourceLine, nullFormat, taskPluginCollector); } } catch (IOException e) { String message = String.format("读取文件[%s]时出错", sourceRcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e); } finally { try { if (recordReader != null) { recordReader.close(); LOG.info("Finally, Close RCFileRecordReader."); } } catch (IOException e) { LOG.warn(String.format("finally: 关闭RCFileRecordReader失败, %s", e.getMessage())); } } } public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read orc-file [{}].", sourceOrcFilePath); List column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN); String nullFormat = readerSliceConfig.getString(NULL_FORMAT); try { Path orcFilePath = new Path(sourceOrcFilePath); Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf)); TypeDescription schema = reader.getSchema(); assert column != null; if (column.isEmpty()) { for (int i = 0; i < schema.getChildren().size(); i++) { ColumnEntry columnEntry = new ColumnEntry(); columnEntry.setIndex(i); columnEntry.setType(schema.getChildren().get(i).getCategory().getName()); column.add(columnEntry); } } VectorizedRowBatch rowBatch = schema.createRowBatch(1024); org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema)); while (rowIterator.nextBatch(rowBatch)) { transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat); } } catch (Exception e) { String message = String.format("从orc-file文件路径[%s]中读取数据发生异常,请联系系统管理员。" , sourceOrcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } private void transportOrcRecord(VectorizedRowBatch rowBatch, List columns, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record; for (int row = 0; row < rowBatch.size; row++) { record = recordSender.createRecord(); try { for (ColumnEntry column : columns) { Column columnGenerated; if (column.getValue() != null) { if (!"null".equals(column.getValue())) { columnGenerated = new StringColumn(column.getValue()); } else { columnGenerated = new StringColumn(); } record.addColumn(columnGenerated); continue; } int i = column.getIndex(); String columnType = column.getType().toUpperCase(); ColumnVector col = rowBatch.cols[i]; Type type = Type.valueOf(columnType); if (col.isNull[row]) { record.addColumn(new StringColumn(null)); continue; } switch (type) { case INT: case LONG: case BOOLEAN: case BIGINT: columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]); break; case DATE: columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row])); break; case DOUBLE: columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]); break; case DECIMAL: columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue()); break; case BINARY: BytesColumnVector b = (BytesColumnVector) col; byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]); columnGenerated = new BytesColumn(val); break; case TIMESTAMP: columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row)); break; default: // type is string or other String v = ((BytesColumnVector) col).toString(row); columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v); break; } record.addColumn(columnGenerated); } recordSender.sendToWriter(record); } catch (Exception e) { if (e instanceof DataXException) { throw (DataXException) e; } taskPluginCollector.collectDirtyRecord(record, e.getMessage()); } } } public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath); List column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN); String nullFormat = readerSliceConfig.getString(NULL_FORMAT); Path parquetFilePath = new Path(sourceParquetFilePath); hadoopConf.set("parquet.avro.readInt96AsFixed", "true"); JobConf conf = new JobConf(hadoopConf); GenericData decimalSupport = new GenericData(); decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); try (ParquetReader reader = AvroParquetReader . builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)) .withDataModel(decimalSupport) .withConf(conf) .build()) { GenericData.Record gRecord = reader.read(); Schema schema = gRecord.getSchema(); if (null == column || column.isEmpty()) { column = new ArrayList<>(schema.getFields().size()); String sType; // 用户没有填写具体的字段信息,需要从parquet文件构建 for (int i = 0; i < schema.getFields().size(); i++) { ColumnEntry columnEntry = new ColumnEntry(); columnEntry.setIndex(i); Schema type; if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) { type = schema.getFields().get(i).schema().getTypes().get(1); } else { type = schema.getFields().get(i).schema(); } sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName(); if (sType.startsWith("timestamp")) { columnEntry.setType("timestamp"); } else { columnEntry.setType(sType); } column.add(columnEntry); } } while (gRecord != null) { transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat); gRecord = reader.read(); } } catch (IOException e) { String message = String.format("从parquet file文件路径[%s]中读取数据发生异常,请联系系统管理员。" , sourceParquetFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } /* * create a transport record for Parquet file * * */ private void transportParquetRecord(List columnConfigs, GenericData.Record gRecord, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record = recordSender.createRecord(); Column columnGenerated; int scale = 10; try { for (ColumnEntry columnEntry : columnConfigs) { String columnType = columnEntry.getType(); Integer columnIndex = columnEntry.getIndex(); String columnConst = columnEntry.getValue(); String columnValue = null; if (null != columnIndex) { if (null != gRecord.get(columnIndex)) { columnValue = gRecord.get(columnIndex).toString(); } else { record.addColumn(new StringColumn(null)); continue; } } else { columnValue = columnConst; } if (columnType.startsWith("decimal(")) { String ps = columnType.replace("decimal(", "").replace(")", ""); columnType = "decimal"; if (ps.contains(",")) { scale = Integer.parseInt(ps.split(",")[1].trim()); } else { scale = 0; } } Type type = Type.valueOf(columnType.toUpperCase()); if (StringUtils.equals(columnValue, nullFormat)) { columnValue = null; } try { switch (type) { case STRING: columnGenerated = new StringColumn(columnValue); break; case INT: case LONG: columnGenerated = new LongColumn(columnValue); break; case DOUBLE: columnGenerated = new DoubleColumn(columnValue); break; case DECIMAL: if (null == columnValue) { columnGenerated = new DoubleColumn((Double) null); } else { columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP)); } break; case BOOLEAN: columnGenerated = new BoolColumn(columnValue); break; case DATE: if (columnValue == null) { columnGenerated = new DateColumn((Date) null); } else { String formatString = columnEntry.getFormat(); if (StringUtils.isNotBlank(formatString)) { // 用户自己配置的格式转换 SimpleDateFormat format = new SimpleDateFormat( formatString); columnGenerated = new DateColumn( format.parse(columnValue)); } else { // 框架尝试转换 columnGenerated = new DateColumn(new StringColumn(columnValue).asDate()); } } break; case TIMESTAMP: if (null == columnValue) { columnGenerated = new DateColumn(); } else if (columnValue.startsWith("[")) { // INT96 https://github.com/apache/parquet-mr/pull/901 GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex); Date date = new Date(getTimestampMills(fixed.bytes())); columnGenerated = new DateColumn(date); } else { columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000); } break; case BINARY: columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array()); break; default: String errorMessage = String.format("您配置的列类型暂不支持 : [%s]", columnType); LOG.error(errorMessage); throw DataXException.asDataXException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage); } } catch (Exception e) { throw new IllegalArgumentException(String.format( "类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e)); } record.addColumn(columnGenerated); } // end for recordSender.sendToWriter(record); } catch (IllegalArgumentException | IndexOutOfBoundsException iae) { taskPluginCollector.collectDirtyRecord(record, iae.getMessage()); } catch (Exception e) { if (e instanceof DataXException) { throw (DataXException) e; } // 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式 taskPluginCollector.collectDirtyRecord(record, e.getMessage()); } } private TypeDescription getOrcSchema(String filePath) { Path path = new Path(filePath); try { Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf));// return reader.getTypes().get(0).getSubtypesCount() return reader.getSchema(); } catch (IOException e) { String message = "读取orc-file column列数失败,请联系系统管理员"; throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } public boolean checkHdfsFileType(String filepath, String specifiedFileType) { Path file = new Path(filepath); try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) { if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.ORC)) { return isORCFile(file, fs, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.RC)) { return isRCFile(filepath, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.SEQ)) { return isSequenceFile(file, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.PARQUET)) { return isParquetFile(file); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.CSV) || StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.TEXT)) { return true; } } catch (Exception e) { String message = String.format("检查文件[%s]类型失败,目前支持 %s 格式的文件," + "请检查您文件类型和文件是否正确。", filepath, HdfsConstant.SUPPORT_FILE_TYPE); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e); } return false; } // 判断file是否是ORC File private boolean isORCFile(Path file, FileSystem fs, FSDataInputStream in) { try { // figure out the size of the file using the option or filesystem long size = fs.getFileStatus(file).getLen(); //read last bytes into buffer to get PostScript int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); in.seek(size - readSize); ByteBuffer buffer = ByteBuffer.allocate(readSize); in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); //read the PostScript //get length of PostScript int psLen = buffer.get(readSize - 1) & 0xff; String orcMagic = org.apache.orc.OrcFile.MAGIC; int len = orcMagic.length(); if (psLen < len + 1) { return false; } int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len; byte[] array = buffer.array(); // now look for the magic string at the end of the postscript. if (Text.decode(array, offset, len).equals(orcMagic)) { return true; } else { // If it isn"t there, this may be the 0.11.0 version of ORC. // Read the first 3 bytes of the file to check for the header in.seek(0); byte[] header = new byte[len]; in.readFully(header, 0, len); // if it isn"t there, this isn"t an ORC file if (Text.decode(header, 0, len).equals(orcMagic)) { return true; } } } catch (IOException e) { LOG.info("检查文件类型: [{}] 不是ORC File.", file); } return false; } // 判断file是否是RC file private boolean isRCFile(String filepath, FSDataInputStream in) { // The first version of RCFile used the sequence file header. final byte[] originalMagic = {(byte) "S", (byte) "E", (byte) "Q"}; // The "magic" bytes at the beginning of the RCFile final byte[] rcMagic = {(byte) "R", (byte) "C", (byte) "F"}; // the version that was included with the original magic, which is mapped // into ORIGINAL_VERSION final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6; // All the versions should be place in this list. final int ORIGINAL_VERSION = 0; // version with SEQ // version with RCF // final int NEW_MAGIC_VERSION = 1 // final int CURRENT_VERSION = NEW_MAGIC_VERSION final int CURRENT_VERSION = 1; byte version; byte[] magic = new byte[rcMagic.length]; try { in.seek(0); in.readFully(magic); if (Arrays.equals(magic, originalMagic)) { if (in.readByte() != ORIGINAL_MAGIC_VERSION_WITH_METADATA) { return false; } version = ORIGINAL_VERSION; } else { if (!Arrays.equals(magic, rcMagic)) { return false; } // Set "version" version = in.readByte(); if (version > CURRENT_VERSION) { return false; } } if (version == ORIGINAL_VERSION) { try { Class> keyCls = hadoopConf.getClassByName(Text.readString(in)); Class> valCls = hadoopConf.getClassByName(Text.readString(in)); if (!keyCls.equals(RCFile.KeyBuffer.class) || !valCls.equals(RCFile.ValueBuffer.class)) { return false; } } catch (ClassNotFoundException e) { return false; } }// boolean decompress = in.readBoolean(); // is compressed? if (version == ORIGINAL_VERSION) { // is block-compressed? it should be always false. boolean blkCompressed = in.readBoolean(); return !blkCompressed; } return true; } catch (IOException e) { LOG.info("检查文件类型: [{}] 不是RC File.", filepath); } return false; } // 判断file是否是Sequence file private boolean isSequenceFile(Path filepath, FSDataInputStream in) { final byte[] seqMagic = {(byte) "S", (byte) "E", (byte) "Q"}; byte[] magic = new byte[seqMagic.length]; try { in.seek(0); in.readFully(magic); return Arrays.equals(magic, seqMagic); } catch (IOException e) { LOG.info("检查文件类型: [{}] 不是Sequence File.", filepath); } return false; } //判断是否为parquet(考虑判断parquet文件的schema是否不为空) private boolean isParquetFile(Path file) { try { GroupReadSupport readSupport = new GroupReadSupport(); ParquetReader.Builder reader = ParquetReader.builder(readSupport, file); ParquetReader build = reader.build(); if (build.read() != null) { return true; } } catch (IOException e) { LOG.info("检查文件类型: [{}] 不是Parquet File.", file); } return false; } /** * Returns GMT"s timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos). * * @param timestampBinary INT96 parquet timestamp * @return timestamp in millis, GMT timezone */ public static long getTimestampMillis(Binary timestampBinary) { if (timestampBinary.length() != 12) { return 0; } byte[] bytes = timestampBinary.getBytes(); return getTimestampMills(bytes); } public static long getTimestampMills(byte[] bytes) { assert bytes.length == 12; // little endian encoding - need to invert byte order long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]); int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]); return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND); } private static long julianDayToMillis(int julianDay) { return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY; }}
HdfsConstant
package com.alibaba.datax.plugin.reader.hdfsreader;import com.alibaba.datax.common.base.Constant;import java.util.Arrays;import java.util.List;public class HdfsConstant extends Constant{ public static final String SOURCE_FILES = "sourceFiles"; public static final String TEXT = "TEXT"; public static final String ORC = "ORC"; public static final String CSV = "CSV"; public static final String SEQ = "SEQ"; public static final String RC = "RC"; public static final String PARQUET = "PARQUET"; //新增parquet文件类型 public static final String HDFS_DEFAULT_KEY = "fs.defaultFS"; public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; protected static final ListSUPPORT_FILE_TYPE = Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET); private HdfsConstant() {}}
HdfsReader
package com.alibaba.datax.plugin.reader.hdfsreader;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordSender;import com.alibaba.datax.common.spi.Reader;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.storage.reader.StorageReaderUtil;import com.alibaba.datax.storage.util.FileHelper;import org.apache.commons.io.Charsets;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.InputStream;import java.nio.charset.UnsupportedCharsetException;import java.util.ArrayList;import java.util.Collections;import java.util.HashSet;import java.util.List;import static com.alibaba.datax.common.base.Key.COLUMN;import static com.alibaba.datax.common.base.Key.ENCODING;import static com.alibaba.datax.common.base.Key.INDEX;import static com.alibaba.datax.common.base.Key.TYPE;import static com.alibaba.datax.common.base.Key.VALUE;public class HdfsReader extends Reader{ /** * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。 ** 整个 Reader 执行流程是: *
* Job类init-->prepare-->split * Task类init-->prepare-->startRead-->post-->destroy * Task类init-->prepare-->startRead-->post-->destroy * Job类post-->destroy **/ public static class Job extends Reader.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration readerOriginConfig = null; private HashSetsourceFiles; private String specifiedFileType = null; private DFSUtil dfsUtil = null; private List path = null; @Override public void init() { LOG.info("init() begin..."); this.readerOriginConfig = getPluginJobConf(); validate(); dfsUtil = new DFSUtil(readerOriginConfig); LOG.info("init() ok and end..."); } public void validate() { readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR); // path check String pathInString = readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE); if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) { path = Collections.singletonList(pathInString); } else { path = readerOriginConfig.getList(Key.PATH, String.class); if (null == path || path.isEmpty()) { throw DataXException.asDataXException(HdfsReaderErrorCode.REQUIRED_VALUE, "您需要指定待读取的源目录或文件"); } for (String eachPath : path) { if (!eachPath.startsWith("/")) { String message = String.format("请检查参数path:[%s],需要配置为绝对路径", eachPath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.ILLEGAL_VALUE, message); } } } specifiedFileType = readerOriginConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE).toUpperCase(); if (!HdfsConstant.SUPPORT_FILE_TYPE.contains(specifiedFileType)) { String message = "HdfsReader插件目前支持 " + HdfsConstant.SUPPORT_FILE_TYPE + " 几种格式的文件"; throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message); } String encoding = this.readerOriginConfig.getString(ENCODING, "UTF-8"); try { Charsets.toCharset(encoding); } catch (UnsupportedCharsetException uce) { throw DataXException.asDataXException( HdfsReaderErrorCode.ILLEGAL_VALUE, String.format("不支持的编码格式 : [%s]", encoding), uce); } catch (Exception e) { throw DataXException.asDataXException( HdfsReaderErrorCode.ILLEGAL_VALUE, String.format("运行配置异常 : %s", e.getMessage()), e); } //check Kerberos boolean haveKerberos = readerOriginConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { readerOriginConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsReaderErrorCode.REQUIRED_VALUE); readerOriginConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsReaderErrorCode.REQUIRED_VALUE); } // validate the Columns validateColumns(); // validate compress String compress = readerOriginConfig.getString(Key.COMPRESS, "NONE"); if ("gzip".equalsIgnoreCase(compress)) { // correct to gz readerOriginConfig.set(Key.COMPRESS, "gz"); } } private void validateColumns() { // 检测是column 是否为 ["*"] 若是则填为空 List column = this.readerOriginConfig.getListConfiguration(COLUMN); if (null != column && 1 == column.size() && ("\"*\"".equals(column.get(0).toString()) || ""*"".equals(column.get(0).toString()))) { readerOriginConfig.set(COLUMN, new ArrayList ()); } else { // column: 1. index type 2.value type 3.when type is Data, may be has dateFormat value List columns = readerOriginConfig.getListConfiguration(COLUMN); if (null == columns || columns.isEmpty()) { throw DataXException.asDataXException(HdfsReaderErrorCode.CONFIG_INVALID_EXCEPTION, "您需要指定 columns"); } for (Configuration eachColumnConf : columns) { eachColumnConf.getNecessaryValue(TYPE, HdfsReaderErrorCode.REQUIRED_VALUE); Integer columnIndex = eachColumnConf.getInt(INDEX); String columnValue = eachColumnConf.getString(VALUE); if (null == columnIndex && null == columnValue) { throw DataXException.asDataXException( HdfsReaderErrorCode.NO_INDEX_VALUE, "由于您配置了type, 则至少需要配置 index 或 value, 当前配置为:" + eachColumnConf); } if (null != columnIndex && null != columnValue) { throw DataXException.asDataXException(HdfsReaderErrorCode.MIXED_INDEX_VALUE, "您混合配置了index, value, 每一列同时仅能选择其中一种"); } } } } @Override public void prepare() { LOG.info("prepare(), start to getAllFiles..."); this.sourceFiles = (HashSet ) dfsUtil.getAllFiles(path, specifiedFileType); LOG.info("您即将读取的文件数为: [{}], 列表为: [{}]", sourceFiles.size(), sourceFiles); } @Override public List split(int adviceNumber) { LOG.info("split() begin..."); List readerSplitConfigs = new ArrayList<>(); // warn:每个slice拖且仅拖一个文件, int splitNumber = sourceFiles.size(); if (0 == splitNumber) { throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION, String.format("未能找到待读取的文件,请确认您的配置项path: %s", readerOriginConfig.getString(Key.PATH))); } List > splitSourceFiles = FileHelper.splitSourceFiles(new ArrayList<>(sourceFiles), splitNumber); for (List
files : splitSourceFiles) { Configuration splitConfig = readerOriginConfig.clone(); splitConfig.set(HdfsConstant.SOURCE_FILES, files); readerSplitConfigs.add(splitConfig); } return readerSplitConfigs; } @Override public void post() { // } @Override public void destroy() { // } } public static class Task extends Reader.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private List sourceFiles; private String specifiedFileType; private DFSUtil dfsUtil = null; @Override public void init() { this.taskConfig = getPluginJobConf(); this.sourceFiles = taskConfig.getList(HdfsConstant.SOURCE_FILES, String.class); this.specifiedFileType = taskConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE); this.dfsUtil = new DFSUtil(taskConfig); } @Override public void prepare() { // } @Override public void startRead(RecordSender recordSender) { LOG.info("read start"); for (String sourceFile : this.sourceFiles) { LOG.info("reading file : [{}]", sourceFile); if (specifiedFileType.equalsIgnoreCase(HdfsConstant.TEXT) || specifiedFileType.equalsIgnoreCase(HdfsConstant.CSV)) { InputStream inputStream = dfsUtil.getInputStream(sourceFile); StorageReaderUtil.readFromStream(inputStream, sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.ORC)) { dfsUtil.orcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.SEQ)) { dfsUtil.sequenceFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.RC)) { dfsUtil.rcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.PARQUET)) { dfsUtil.parquetFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else { String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC、PARQUET等六种格式的文件," + "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE, RC, PARQUET"; throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message); } if (recordSender != null) { recordSender.flush(); } } LOG.info("end read source files..."); } @Override public void post() { // } @Override public void destroy() { // } }}
HdfsWriter插件
本插件比较简单,一共五个类,具体类名及对应修改项如下:
- HdfsHelper:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
- HdfsWriter:增加Parquet文件类的枚举项。
- SupportHiveDataType:无需更改。
- HdfsWriterErrorCode:无需更改。
- Type:无需更改。
按需修改其中四个类即可,具体代码如下:
HdfsHelper
package com.alibaba.datax.plugin.writer.hdfswriter;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.alibaba.datax.common.base.Constant;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.element.Column;import com.alibaba.datax.common.element.Record;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordReceiver;import com.alibaba.datax.common.plugin.TaskPluginCollector;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil;import com.alibaba.datax.unstructuredstorage.util.HdfsUtil;import org.apache.avro.Conversions;import org.apache.avro.LogicalTypes;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.avro.generic.GenericRecordBuilder;import org.apache.commons.lang3.StringUtils;import org.apache.commons.lang3.Validate;import org.apache.commons.lang3.tuple.MutablePair;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.apache.hadoop.hive.common.type.HiveDecimal;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobContext;import org.apache.hadoop.mapred.RecordWriter;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.security.UserGroupInformation;import org.apache.hadoop.security.authentication.util.KerberosName;import org.apache.orc.CompressionKind;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import org.apache.parquet.avro.AvroParquetWriter;import org.apache.parquet.column.ParquetProperties;import org.apache.parquet.hadoop.ParquetWriter;import org.apache.parquet.hadoop.metadata.CompressionCodecName;import org.apache.parquet.schema.OriginalType;import org.apache.parquet.schema.PrimitiveType;import org.apache.parquet.schema.Types;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.FileNotFoundException;import java.io.IOException;import java.lang.reflect.Field;import java.math.BigDecimal;import java.nio.charset.StandardCharsets;import java.sql.Timestamp;import java.text.SimpleDateFormat;import java.util.*;public class HdfsHelper{ public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class); public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS"; private FileSystem fileSystem = null; private JobConf conf = null; private org.apache.hadoop.conf.Configuration hadoopConf = null; // Kerberos private boolean haveKerberos = false; private String kerberosKeytabFilePath; private String kerberosPrincipal; private String krb5ConfPath; public static MutablePairtransportOneRecord( Record record, char fieldDelimiter, List columnsConfiguration, TaskPluginCollector taskPluginCollector) { MutablePair , Boolean> transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector); //保存<转换后的数据,是否是脏数据> MutablePair
transportResult = new MutablePair<>(); transportResult.setRight(false); Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter)); transportResult.setRight(transportResultList.getRight()); transportResult.setLeft(recordResult); return transportResult; } public static MutablePair , Boolean> transportOneRecord( Record record, List
columnsConfiguration, TaskPluginCollector taskPluginCollector) { MutablePair , Boolean> transportResult = new MutablePair<>(); transportResult.setRight(false); List
HdfsWriter
package com.alibaba.datax.plugin.writer.hdfswriter;import com.alibaba.datax.common.base.Constant;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordReceiver;import com.alibaba.datax.common.spi.Writer;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.storage.util.FileHelper;import org.apache.commons.io.Charsets;import org.apache.commons.lang3.StringUtils;import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil;import com.alibaba.datax.unstructuredstorage.util.HdfsUtil;import org.apache.commons.lang3.Validate;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.orc.CompressionKind;import org.apache.parquet.hadoop.metadata.CompressionCodecName;import org.apache.parquet.schema.MessageTypeParser;import org.apache.parquet.schema.OriginalType;import org.apache.parquet.schema.PrimitiveType;import org.apache.parquet.schema.Types;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.nio.file.Paths;import java.util.*;import java.util.regex.Matcher;import java.util.regex.Pattern;import java.util.stream.Collectors;public class HdfsWriter extends Writer{ public static class Job extends Writer.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); // 写入文件的临时目录,完成写入后,该目录需要删除 private String tmpStorePath; private Configuration writerSliceConfig = null; private String defaultFS; private String path; private String fileName; private String writeMode; private HdfsHelper hdfsHelper = null; private FileSystem filsSystem; public static final SetSUPPORT_FORMAT = new HashSet<>(Arrays.asList("ORC", "PARQUET", "TEXT")); @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); this.validateParameter(); hdfsHelper = new HdfsHelper(); filsSystem = hdfsHelper.getFileSystem(defaultFS, this.writerSliceConfig); } private void validateParameter() { this.defaultFS = this.writerSliceConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsWriterErrorCode.REQUIRED_VALUE); //fileType check String fileType = this.writerSliceConfig.getNecessaryValue(Key.FILE_TYPE, HdfsWriterErrorCode.REQUIRED_VALUE).toUpperCase(); if (!SUPPORT_FORMAT.contains(fileType)) { String message = String.format("[%s] 文件格式不支持, HdfsWriter插件目前仅支持 %s, ", fileType, SUPPORT_FORMAT); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } //path this.path = this.writerSliceConfig.getNecessaryValue(Key.PATH, HdfsWriterErrorCode.REQUIRED_VALUE); if (!path.startsWith("/")) { String message = String.format("请检查参数path:[%s],需要配置为绝对路径", path); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } if (path.contains("*") || path.contains("?")) { String message = String.format("请检查参数path:[%s],不能包含*,?等特殊字符", path); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } //fileName this.fileName = this.writerSliceConfig.getNecessaryValue(Key.FILE_NAME, HdfsWriterErrorCode.REQUIRED_VALUE); //columns check List columns = this.writerSliceConfig.getListConfiguration(Key.COLUMN); if (null == columns || columns.isEmpty()) { throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, "您需要指定 columns"); } else { boolean rewriteFlag = false; for (int i = 0; i < columns.size(); i++) { Configuration eachColumnConf = columns.get(i); eachColumnConf.getNecessaryValue(Key.NAME, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE); eachColumnConf.getNecessaryValue(Key.TYPE, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE); if (eachColumnConf.getString(Key.TYPE).toUpperCase().startsWith("DECIMAL")) { String type = eachColumnConf.getString(Key.TYPE); eachColumnConf.set(Key.TYPE, "decimal"); eachColumnConf.set(Key.PRECISION, getDecimalPrecision(type)); eachColumnConf.set(Key.SCALE, getDecimalScale(type)); columns.set(i, eachColumnConf); rewriteFlag = true; } } if (rewriteFlag) { this.writerSliceConfig.set(Key.COLUMN, columns); } } //writeMode check this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE); if (!Constant.SUPPORTED_WRITE_MODE.contains(writeMode)) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("仅支持append, nonConflict, overwrite三种模式, 不支持您配置的 writeMode 模式 : [%s]", writeMode)); } if ("TEXT".equals(fileType)) { //fieldDelimiter check String fieldDelimiter = this.writerSliceConfig.getString(Key.FIELD_DELIMITER, null); if (StringUtils.isEmpty(fieldDelimiter)) { throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, String.format("写TEXT格式文件,必须提供有效的[%s] 参数.", Key.FIELD_DELIMITER)); } if (1 != fieldDelimiter.length()) { // warn: if it has, length must be one throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("仅仅支持单字符切分, 您配置的切分为 : [%s]", fieldDelimiter)); } } //compress check String compress = this.writerSliceConfig.getString(Key.COMPRESS, "NONE").toUpperCase().trim(); if ("ORC".equals(fileType)) { try { CompressionKind.valueOf(compress); } catch (IllegalArgumentException e) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("目前ORC 格式仅支持 %s 压缩,不支持您配置的 compress 模式 : [%s]", Arrays.toString(CompressionKind.values()), compress)); } } if ("PARQUET".equals(fileType)) { // parquet 默认的非压缩标志是 UNCOMPRESSED ,而不是常见的 NONE,这里统一为 NONE if ("NONE".equals(compress)) { compress = "UNCOMPRESSED"; } try { CompressionCodecName.fromConf(compress); } catch (Exception e) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("目前PARQUET 格式仅支持 %s 压缩, 不支持您配置的 compress 模式 : [%s]", Arrays.toString(CompressionCodecName.values()), compress)); } } boolean haveKerberos = this.writerSliceConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsWriterErrorCode.REQUIRED_VALUE); this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsWriterErrorCode.REQUIRED_VALUE); } // encoding check String encoding = this.writerSliceConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING); try { encoding = encoding.trim(); this.writerSliceConfig.set(Key.ENCODING, encoding); Charsets.toCharset(encoding); } catch (Exception e) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("不支持您配置的编码格式:[%s]", encoding), e); } } public boolean isPathExists(String filePath) { Path path = new Path(filePath); boolean exist; try { exist = hdfsHelper.getFileSystem(this.defaultFS,this.writerSliceConfig).exists(path); } catch (IOException e) { String message = String.format("判断文件路径[%s]是否存在时发生网络IO异常,请检查您的网络是否正常!", "message:filePath =" + filePath); e.printStackTrace(); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } return exist; } @Override public void prepare() { //临时存放路径 this.tmpStorePath = buildTmpFilePath(path); //若路径已经存在,检查path是否是目录 if (isPathExists(path)) { if (!hdfsHelper.isPathDir(path)) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("您配置的path: [%s] 不是一个合法的目录, 请您注意文件重名, 不合法目录名等情况.", path)); } //根据writeMode对目录下文件进行处理 // 写入之前,当前目录下已有的文件,根据writeMode判断是否覆盖 Path[] existFilePaths = hdfsHelper.hdfsDirList(path); boolean isExistFile = existFilePaths.length > 0; if ("append".equals(writeMode)) { LOG.info("由于您配置了writeMode = append, 写入前不做清理工作, [{}] 目录下写入相应文件名前缀 [{}] 的文件", path, fileName); } else if ("nonConflict".equals(writeMode) && isExistFile) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("由于您配置了writeMode= nonConflict,但您配置的 path: [%s] 目录不为空, 下面存在其他文件或文件夹: %s", path, String.join(",", Arrays.stream(existFilePaths).map(Path::getName).collect(Collectors.toSet())))); } } else { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("您配置的path: [%s] 不存在, 请先创建目录.", path)); } } @Override public void post() { if ("overwrite".equals(writeMode)) { hdfsHelper.deleteFilesFromDir(new Path(path)); } hdfsHelper.moveFilesToDest(new Path(this.tmpStorePath), new Path(this.path)); // 删除临时目录 hdfsHelper.deleteDir(new Path(tmpStorePath)); } @Override public void destroy() { hdfsHelper.closeFileSystem(); } @Override public List split(int mandatoryNumber) { LOG.info("begin splitting ..."); List writerSplitConfigs = new ArrayList<>(); String filePrefix = fileName; //获取该路径下的所有已有文件列表 Set allFiles = Arrays.stream(hdfsHelper.hdfsDirList(path)).map(Path::toString).collect(Collectors.toSet()); String fileType = this.writerSliceConfig.getString(Key.FILE_TYPE, "txt").toLowerCase(); String tmpFullFileName; String endFullFileName; for (int i = 0; i < mandatoryNumber; i++) { // handle same file name Configuration splitTaskConfig = this.writerSliceConfig.clone(); tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType); endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType); // 如果文件已经存在,则重新生成文件名 while (allFiles.contains(endFullFileName)) { tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType); endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType); } allFiles.add(endFullFileName); splitTaskConfig.set(Key.FILE_NAME, tmpFullFileName); LOG.info("split wrote file name:[{}]", tmpFullFileName); writerSplitConfigs.add(splitTaskConfig); } LOG.info("end splitting."); return writerSplitConfigs; } /** * 创建临时目录 * 在给定目录的下,创建一个已点开头,uuid为名字的文件夹,用于临时存储写入的文件 * * @param userPath hdfs path * @return temporary path */ private String buildTmpFilePath(String userPath) { String tmpDir; String tmpFilePath; // while (true) { tmpDir = "." + UUID.randomUUID().toString().replace("-", "_"); tmpFilePath = Paths.get(userPath, tmpDir).toString(); if (isPathExists(tmpFilePath)) { return tmpFilePath; } else { return null; } //} } /** * get decimal type precision * if not specified, use DECIMAL_DEFAULT_PRECISION as default * example: * * decimal -> 38 * decimal(10) -> 10 ** * @param type decimal type including precision and scale (if present) * @return decimal precision */ private static int getDecimalPrecision(String type) { if (!type.contains("(")) { return Constant.DEFAULT_DECIMAL_MAX_PRECISION; } else { String regEx = "[^0-9]"; Pattern p = Pattern.compile(regEx); Matcher m = p.matcher(type); return Integer.parseInt(m.replaceAll(" ").trim().split(" ")[0]); } } /** * get decimal type scale * if precision is not present, return DECIMAL_DEFAULT_SCALE * if precision is present and not specify scale, return 0 * example: ** decimal -> 10 * decimal(8) -> 0 * decimal(8,2) -> 2 ** * @param type decimal type string, including precision and scale (if present) * @return decimal scale */ private static int getDecimalScale(String type) { if (!type.contains("(")) { return Constant.DEFAULT_DECIMAL_MAX_SCALE; } if (!type.contains(",")) { return 0; } else { return Integer.parseInt(type.split(",")[1].replace(")", "").trim()); } } public void unitizeParquetConfig(Configuration writerSliceConfig) { String parquetSchema = writerSliceConfig.getString(Key.PARQUET_SCHEMA); if (StringUtils.isNotBlank(parquetSchema)) { LOG.info("parquetSchema has config. use parquetSchema:\n{}", parquetSchema); return; } Listcolumns = writerSliceConfig.getListConfiguration(Key.COLUMN); if (columns == null || columns.isEmpty()) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_BLANK,"parquetSchema or column can"t be blank!"); } parquetSchema = generateParquetSchemaFromColumn(columns); // 为了兼容历史逻辑,对之前的逻辑做保留,但是如果配置的时候报错,则走新逻辑 try { MessageTypeParser.parseMessageType(parquetSchema); } catch (Throwable e) { LOG.warn("The generated parquetSchema {} is illegal, try to generate parquetSchema in another way", parquetSchema); parquetSchema = HdfsHelper.generateParquetSchemaFromColumnAndType(columns); LOG.info("The last generated parquet schema is {}", parquetSchema); } writerSliceConfig.set(Key.PARQUET_SCHEMA, parquetSchema); LOG.info("DataxParquetMode use default fields."); writerSliceConfig.set(Key.PARQUET_MODE, "fields"); } private String generateParquetSchemaFromColumn(List columns) { StringBuffer parquetSchemaStringBuffer = new StringBuffer(); parquetSchemaStringBuffer.append("message m {"); for (Configuration column: columns) { String name = column.getString("name"); Validate.notNull(name, "column.name can"t be null"); String type = column.getString("type"); Validate.notNull(type, "column.type can"t be null"); String parquetColumn = String.format("optional %s %s;", type, name); parquetSchemaStringBuffer.append(parquetColumn); } parquetSchemaStringBuffer.append("}"); String parquetSchema = parquetSchemaStringBuffer.toString(); LOG.info("generate parquetSchema:\n{}", parquetSchema); return parquetSchema; } } public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration writerSliceConfig; private String fileType; private String fileName; private HdfsHelper hdfsHelper = null; @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); String defaultFS = this.writerSliceConfig.getString(Key.DEFAULT_FS); this.fileType = this.writerSliceConfig.getString(Key.FILE_TYPE).toUpperCase(); hdfsHelper = new HdfsHelper(); hdfsHelper.getFileSystem(defaultFS, writerSliceConfig); //得当的已经是绝对路径,eg:/user/hive/warehouse/writer.db/text/test.snappy this.fileName = this.writerSliceConfig.getString(Key.FILE_NAME); } @Override public void prepare() { // } @Override public void startWrite(RecordReceiver lineReceiver) { LOG.info("write to file : [{}]", this.fileName); if ("TEXT".equals(fileType)) { //写TEXT FILE hdfsHelper.textFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } else if ("ORC".equals(fileType)) { //写ORC FILE hdfsHelper.orcFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } else if ("PARQUET".equals(fileType)) { //写Parquet FILE hdfsHelper.parquetFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } LOG.info("end do write"); } @Override public void post() { // } @Override public void destroy() { // } }}
以上类需修改或增加方法,以支持Parquet文件的读写,当前代码已在生产环境稳定运行一年有余,未遇到报错问题,大家如有问题可联系我。
-
DataX二次开发——HdfsReader和HdfsWriter插件增加parquet文件读写
一、研发背景DataX官方开源的版本支持HDFS文件的读写,但是截止目前,并没有支持Parquet文件的读写,得...
来源: DataX二次开发——HdfsReader和HdfsWriter插件增加parquet文件读写
全链路异步,让你的 SpringCloud 性能优化10倍+
每日焦点!机器学习-SVM
金色枫叶是什么意思?金色枫叶相关的成语有哪些?
无人生还的凶手是谁?无人生还的人物介绍
逆生长是什么意思?人怎么才能逆生长?
快讯:“小行星”撞地球今日中午上演!法国、英国、荷兰均肉眼可见
十大全系标配!长安逸达一出场 就把压力给到合资了
【世界速看料】别迷信日本制造!董明珠:格力中央空调国内第一创历史 日立大金都不行
国产最帅电动猎装车成了!极氪已交付8万台:完成7.5亿美元A轮融资 杀向全球前三
黄旭东评价李培楠《星际争霸2》夺冠:这一刻等了20年!
速讯:Redmi K60官宣降价:2999元普及512GB存储、老用户保价+送手环
不花钱每天能跑12公里 印度首款太阳能汽车亮相:像是“三蹦子”
世界即时:ChatGPT惊动谷歌创始人:罕见出山检查Bard AI代码数据
热议:曝宁德时代将赴美建厂:福特出地出厂房 “宁王”出技术
当前速读:实测用微软ChatGPT写求职信:“不道德”、被拒绝
天天滚动:安全圈最新重大数据泄露事件
世界最新:移动端重排版PDF阅读器比较
Flink 积压问题排查
全球视点!重新思考 Vue 组件的定义
环球快讯:读Java实战(第二版)笔记08_默认方法
当前观点:Opera新版本集成ChatGPT:一键生成网页内容摘要
全球信息:车圈美女测2023新款比亚迪秦PLUS DM-i:1箱油跨8省1300km 油耗3.49L/100km
动态:如何在Debian 11上安装Docker Swarm集群
fusion app登录注册示例
多家快递回应站点派件延迟问题:寄送时效要看站点运力恢复如何
格芯成都晶圆厂烂尾:紫光集团要接手?
豆瓣9.2分 《中国奇谭》今日正式收官!上美影厂还有三部新作
环球视点!学习笔记——尚好房:Apache Dubbo
零跑创始人:增程式结构简单成本低 就是为纯电车加上“充电宝”
天天快看点丨减少牙菌斑 两面针牙膏4支仅需17.9元 加赠2支牙刷
【新视野】系统扭矩590N·m 零百加速6.5秒!长安UNI-V混动iDD开卖:14.49万起
《中国式家长》精神续作 《中国式相亲2》上架Steam
女子驾宝马车“刹车失灵” 罪魁祸首竟是“脚垫”
热文:1月MPV销量排名出炉 GL8、赛那、腾势D9混战 谁将胜出?
快报:1km收费2元多!日本高速公路收费政策再增50年 2115年才免费
上班注意了 北京明起恢复机动车尾号限行:尾号0、5别开车
全球即时看!沃尔沃:自家电动车供不应求 不会跟随特斯拉降价
【全球新要闻】苏联搞砸的火箭 马斯克干成了
全球微头条丨9岁209天!美国一袖珍鼠创吉尼斯纪录:人工饲养“最长寿老鼠”
德系车传统技能?因排放作弊 奔驰或被30万车主索赔
世界关注:读Java实战(第二版)笔记07_用Optional取代null
新资讯:痞子衡嵌入式:我拿到了CSDN博客专家实体证书
全球观焦点:深入解读.NET MAUI音乐播放器项目(二):播放内核
女子戴金手镯做核磁共振:手腕被烫出一圈水泡
环球短讯!三亚旅游发现拔白发服务一小时50元 网友叹服:发量不允许
【全球聚看点】多地对体育中考项目作出调整:取消/选考中考男女生长跑 800米对健康不利
全球短讯!情人节多部爱情电影集中上映:跟邓超新电影强势对垒
每日热文:ChatGPT爆火!争议声也越来越大了
焦点速递!fusion app 常用小技巧
可怕又惊喜!87岁老人棺材内复生 亲属称席都吃了:目前一切正常
全球讯息:200M内存就能启动 Win11极限精简版升级:去除广告
【独家】19岁男孩患阿尔茨海默病 专家:这么做可以远离
全球微头条丨Pytorch环境安装
全球即时:PC销量下滑 AMD的Zen2处理器重新出山:配置没法看
《分布式技术原理与算法解析》学习笔记Day08
清华教授花20多万为村民3D打印住宅:直言房子一点不贵 方式会普及
观速讯丨雷军再次力荐小米13和Redmi K60!一小米之家上午开门就卖了7台
今亮点!土耳其专家称遇上地震是命遭主持人怒斥:网友热议说的没毛病
【天天报资讯】小伙入职1小时被HR告知招错人:补偿50元
全球热门:基于ModelViewSet写接口
当前热讯:日本大飞机失败!印度砸1000亿美元买空客波音500架飞机:为何不自研?
世界观点:杨元庆评兰奇:联想登顶全球PC市场的关键先生去世了
世界热议:女子小巷停车失误一口气撞了10辆:被救时还在踩油门
环球看热讯:女子路边买鹅蛋煮后蛋清是粉色 当事人懵了:不敢吃
全球实时:6档调节 MacBook都能用:诺西笔记本/平板铝合金支架19.9元
世界播报:加倍水润 杰士邦零感爆款避孕套大促:30只19.9元
当前简讯:索尼最强旗舰!Xperia 1 V渲染图首曝
焦点热文:中国女子土耳其地震中被埋:用纸吸雨水求生成功获救
环球热议:基于九个视图子类写五个接口
程序员画图软件推荐
JVM classpath的理解和设置总结
印度官员呼吁将情人节改成抱牛日:可增加幸福感
全球要闻:比尔·盖茨:ChatGPT像互联网发明一样重要、将会改变世界
【环球新视野】C++ 地球人口承载力
头条:5个python中编程的大坑
《流浪地球2》笨笨、门门设计稿公布 网友:别忘了MOSS
特斯拉股价2个月翻倍 马斯克有望夺回首富:只差400亿
天天速递!安卓14第一版正式发布 国产机泛滥的功能终于有了
世界观察:网易一面:select分页要调优100倍,说说你的思路?(内含Mysql的36军规)
买车送游戏机 丰田展示任天堂涂装版“大汉兰达”:内置大屏
最新资讯:好货不等人!森马官方清仓:春季高帮厚底板鞋79元大促(减190元)
每日快报!【踩坑日记】nginx server_name配置多域名的坑
日本独居长臂猿生娃动物园排查其情史:与“邻居”通过9毫米小孔交配
三句话惹生气?百度PLATO大火 网友:智能抬杠机器人
25%高增速:长城汽车2023年冲刺160万辆
星际飞船点火测试成功 马斯克预言:人类10年内登上火星
全球看点:对ChatGPT的几个提问,当码农小帮手可行
环球今日讯!状态模式
全球今热点:9.98万起真香 新款比亚迪秦PLUS DM-i上市 网友:不给合资留活路
LOJ 3395 集训队作业 Yet Another Permutation Problem 题解 (生成函数技巧)
重庆将为18万在校女学生免费接种HPV疫苗:全面预防宫颈癌
快资讯丨全球首创双枪充电!比亚迪赵长江:腾势D9纯电版20日左右交付
新消息丨做国内第一!深圳拼了:全市5G网速平均必500Mbps 上行下载更狠
全球球精选!你退了吗?网易已为超112万暴雪国服玩家退款
焦点快播:chatGPT接入个人微信(国内可用)
里程碑!ChatGPT参加美执业医师资格考试成绩合格 研究者:出了名的难
全球速看:雷军:小米汽车研发团队已超2300人 明年年一季度量产
最新快讯!真维斯官旗抄底:100%纯棉T恤4件99元包邮
2、如何验证是否存在CDN?