Skip to content

Commit

Permalink
Merge pull request #51 from alipay/20191129-functionext
Browse files Browse the repository at this point in the history
20191129 functionext
  • Loading branch information
quhongwei authored Dec 13, 2019
2 parents 12afc0f + 998adc2 commit c195e55
Show file tree
Hide file tree
Showing 44 changed files with 1,536 additions and 85 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
<url>https://github.com/alipay/rdf-file</url>

<properties>
<rdf.file.core.version>2.2.5</rdf.file.core.version>
<rdf.file.oss.version>2.2.5</rdf.file.oss.version>
<rdf.file.sftp.version>2.2.5</rdf.file.sftp.version>
<rdf.file.core.version>2.2.6</rdf.file.core.version>
<rdf.file.oss.version>2.2.6</rdf.file.oss.version>
<rdf.file.sftp.version>2.2.6</rdf.file.sftp.version>
</properties>

<modules>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2019 All Rights Reserved.
*/
package com.alipay.rdf.file.codec;

import com.alipay.rdf.file.exception.RdfErrorEnum;
import com.alipay.rdf.file.exception.RdfFileException;
import com.alipay.rdf.file.loader.TemplateLoader;
import com.alipay.rdf.file.meta.FileColumnMeta;
import com.alipay.rdf.file.meta.FileMeta;
import com.alipay.rdf.file.model.FileConfig;
import com.alipay.rdf.file.model.FileDataTypeEnum;

import java.util.List;

/**
* Copyright (C) 2013-2018 Ant Financial Services Group
*
* @author quhongwei
* @version : AbstractColumnInfoCodec.java, v 0.1 2019年11月29日 12:55 quhongwei Exp $
*/
public abstract class AbstractColumnInfoCodec {

protected static String getValue(FileColumnMeta colMeta, String method) {
String value;
if ("desc".equalsIgnoreCase(method)) {
value = colMeta.getDesc();
} else if ("name".equalsIgnoreCase(method)) {
value = colMeta.getName();
} else {
throw new RdfFileException(
"rdf-file#AbstractColumnInfoCodec.getValue 无效方法参数method=" + method,
RdfErrorEnum.UNSUPPORTED_OPERATION);
}
return value;
}

protected static List<FileColumnMeta> getColumnMetas(FileConfig config, FileDataTypeEnum dataType) {
FileMeta fileMeta = TemplateLoader.load(config);
switch (dataType) {
case HEAD:
return fileMeta.getHeadColumns();
case BODY:
return fileMeta.getBodyColumns();
case TAIL:
return fileMeta.getTailColumns();
default:
throw new RdfFileException(
"rdf-file#AbstractColumnInfoCodec.getColumnMetas dateType=" + dataType.name(),
RdfErrorEnum.UNSUPPORTED_OPERATION);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* @author hongwei.quhw
* @version $Id: HeadColumnCodec.java, v 0.1 2017-1-3 下午5:50:00 hongwei.quhw Exp $
*/
@Deprecated
public class BodyColumnHorizontalCodec {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* @author hongwei.quhw
* @version $Id: HeadColumnCodec.java, v 0.1 2017-1-3 下午5:50:00 hongwei.quhw Exp $
*/
@Deprecated
public class BodyColumnVerticalCodec {
public static final BodyColumnVerticalCodec instance = new BodyColumnVerticalCodec();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.alipay.rdf.file.codec;

import com.alipay.rdf.file.exception.RdfErrorEnum;
import com.alipay.rdf.file.exception.RdfFileException;
import com.alipay.rdf.file.interfaces.FileReader;
import com.alipay.rdf.file.interfaces.FileWriter;
import com.alipay.rdf.file.loader.ProtocolLoader;
import com.alipay.rdf.file.loader.TemplateLoader;
import com.alipay.rdf.file.meta.FileColumnMeta;
import com.alipay.rdf.file.meta.FileMeta;
import com.alipay.rdf.file.model.FileConfig;
import com.alipay.rdf.file.model.FileDataTypeEnum;
import com.alipay.rdf.file.spi.RdfFileRowSplitSpi.SplitContext;
import com.alipay.rdf.file.util.RdfFileUtil;

import java.util.List;

/**
* Copyright (C) 2013-2018 Ant Financial Services Group
*
* 字段信息水平编码解码
*
* @author hongwei.quhw
* @version $Id: ColumnInfoHorizontalCodec.java, v 0.1 2017-1-3 下午5:50:00 hongwei.quhw Exp $
*/
public class ColumnInfoHorizontalCodec extends AbstractColumnInfoCodec {

public static void serialize(FileDataTypeEnum layoutType, FileDataTypeEnum dataType, FileConfig config, FileWriter writer,
String method) {
FileMeta fileMeta = TemplateLoader.load(config);
List<FileColumnMeta> colMetas = getColumnMetas(config, dataType);
StringBuilder colHead = new StringBuilder();

if (null != RdfFileUtil.getRowSplit(config) && fileMeta.isStartWithSplit(layoutType)) {
colHead.append(RdfFileUtil.getRowSplit(config));
}

for (int i = 0; i < colMetas.size(); i++) {
FileColumnMeta colMeta = colMetas.get(i);
colHead.append(getValue(colMeta, method));
//添加字段分割符
if (null != RdfFileUtil.getRowSplit(config) && (i < colMetas.size() - 1 || fileMeta.isEndWithSplit(layoutType))) {
colHead.append(RdfFileUtil.getRowSplit(config));
}
}
writer.writeLine(colHead.toString());

}

public static <T> T deserialize(FileDataTypeEnum layoutType, FileDataTypeEnum dataType, FileConfig config,
FileReader reader,
String method) {
String line = reader.readLine();
RdfFileUtil.assertNotBlank(line, "文件=" + config.getFilePath() + ", " + layoutType.name() + " 内容缺失");

FileMeta fileMeta = TemplateLoader.load(config);
String[] columns = ProtocolLoader.loadProtocol(fileMeta.getProtocol()).getRowSplit().split(
new SplitContext(line, config, FileDataTypeEnum.BODY));
List<FileColumnMeta> colMetas = getColumnMetas(config, dataType);

int splitLength = fileMeta.isStartWithSplit(layoutType) ? colMetas.size() + 1 : colMetas.size();
splitLength = fileMeta.isEndWithSplit(layoutType) ? splitLength + 1 : splitLength;

if (splitLength != columns.length) {
throw new RdfFileException("文件=" + config.getFilePath() + ", " + layoutType.name() + " line=" + line,
RdfErrorEnum.DESERIALIZE_ERROR);
}

int statIndex = fileMeta.isStartWithSplit(layoutType) ? 1 : 0;
int endIndex = fileMeta.isEndWithSplit(layoutType) ? columns.length - 1 : columns.length;

for (int i = statIndex; i < endIndex; i++) {
FileColumnMeta colMeta = colMetas.get(i - statIndex);
if (!getValue(colMeta, method).equals(columns[i])) {
throw new RdfFileException(
"文件" + layoutType.name() + "字段校验:文件模板定义的第" + i + "个column为[" + colMetas.get(i).getDesc() + "], 实际文件中为["
+ columns[i] + "]", RdfErrorEnum.DESERIALIZE_ERROR);
}
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.alipay.rdf.file.codec;

import com.alipay.rdf.file.exception.RdfErrorEnum;
import com.alipay.rdf.file.exception.RdfFileException;
import com.alipay.rdf.file.interfaces.FileReader;
import com.alipay.rdf.file.interfaces.FileWriter;
import com.alipay.rdf.file.meta.FileColumnMeta;
import com.alipay.rdf.file.model.FileConfig;
import com.alipay.rdf.file.model.FileDataTypeEnum;
import com.alipay.rdf.file.util.RdfFileUtil;

/**
* Copyright (C) 2013-2018 Ant Financial Services Group
*
* 字段信息纵向编码解码
*
* @author hongwei.quhw
* @version $Id: ColumnInfoVerticalCodec.java, v 0.1 2017-1-3 下午5:50:00 hongwei.quhw Exp $
*/
public class ColumnInfoVerticalCodec extends AbstractColumnInfoCodec {
public static final ColumnInfoVerticalCodec instance = new ColumnInfoVerticalCodec();

public static void serialize(FileDataTypeEnum layoutType, FileDataTypeEnum dataType, FileConfig fileConfig,
FileWriter writer, String method) {
//按行写入column字段
for (FileColumnMeta colMeta : getColumnMetas(fileConfig, dataType)) {
writer.writeLine(getValue(colMeta, method));
}
}

public static <T> T deserialize(FileDataTypeEnum layoutType, FileDataTypeEnum dataType, FileConfig fileConfig,
FileReader reader, String method) {
for (FileColumnMeta colMeta : getColumnMetas(fileConfig, dataType)) {
String columName = RdfFileUtil.assertTrimNotBlank(reader.readLine());
RdfFileUtil.assertNotBlank(columName, "文件=" + fileConfig.getFilePath() + ", " + layoutType.name() + " 内容缺失");

String tempalteValue = getValue(colMeta, method);
if (!tempalteValue.equals(columName)) {
throw new RdfFileException(
"rdf-file#" + layoutType.name() + "模板中定义的column为" + tempalteValue + ", 文件中读取的column为" + columName + " 不一致",
RdfErrorEnum.VALIDATE_ERROR);

}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,6 @@ public static FileColumnMeta getColumnMeta(FileColumnMeta colMeta, FileMeta file

return new FileColumnMeta(colMeta.getColIndex(), colMeta.getName(), colMeta.getDesc(),
colMeta.getType(), colMeta.isRequired(), colMeta.getRange(), colMeta.getDefaultValue(),
fileMeta);
fileMeta, colMeta.getDataType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
* Copyright (C) 2013-2018 Ant Financial Services Group
*
* body 字段
*
*
* 使用更通用的ColumnInfoFunction函数功能替换
*
* @author hongwei.quhw
* @version $Id: BodyColumnFunction.java, v 0.1 2017年8月19日 下午1:21:38 hongwei.quhw Exp $
*/
@Deprecated
@SuppressWarnings("rawtypes")
public class BodyColumnFunction extends RdfFunction {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private String serialize(Object field, FileColumnMeta columnMeta, FileConfig fil
}

private ColumnRegEx getColumnRegEx(FileColumnMeta columnMeta) {
String key = columnMeta.getFileMeta().getTemplatePath() + "-" + columnMeta.getName();
String key = columnMeta.getFileMeta().getTemplatePath() + "-" + columnMeta.getDataType().name() + "-" + columnMeta.getName();
ColumnRegEx columnRegEx = columnRegExs.get(key);
if (null == columnRegEx) {
columnRegEx = new ColumnRegEx();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.alipay.rdf.file.function;

import com.alipay.rdf.file.codec.ColumnInfoHorizontalCodec;
import com.alipay.rdf.file.codec.ColumnInfoVerticalCodec;
import com.alipay.rdf.file.exception.RdfErrorEnum;
import com.alipay.rdf.file.exception.RdfFileException;
import com.alipay.rdf.file.loader.ExtensionLoader;
import com.alipay.rdf.file.loader.FormatLoader;
import com.alipay.rdf.file.loader.TemplateLoader;
import com.alipay.rdf.file.meta.FileMeta;
import com.alipay.rdf.file.model.FileDataTypeEnum;
import com.alipay.rdf.file.protocol.RowDefinition;
import com.alipay.rdf.file.spi.RdfFileColumnTypeSpi;
import com.alipay.rdf.file.spi.RdfFileFormatSpi;
import com.alipay.rdf.file.util.RdfFileUtil;

/**
* Copyright (C) 2013-2018 Ant Financial Services Group
*
* 字段信息序列化反序列化函数
*
* 参考fund.xml协议
*
* 1. ${columnInfo.count(body)} 计算body的字段数
* 2. ${columnInfo.vertical(body,name)} 每个body字段名作为一行
* 3. ${columnInfo.horizontal(tail,name)} 所有文件尾字段名作为一行数据
*
* 函数第一个参数代表对文件数据模板中哪个部分的字段进行信息处理
* 函数第二个参数代表对文件数据模板中name或者desc字段处理
*
* @author quhongwei
* @version : ColumnInfoFunction.java, v 0.1 2019年11月29日 15:50 quhongwei Exp $
*/
public class ColumnInfoFunction extends RdfFunction {
@Override
public void checkParams() {
if (("horizontal".equals(expression) || "vertical".equals(expression)) && (null == params || params.length != 2)) {
throw new RdfFileException("rdf-file#ColumnInfoFunction.checkParams() 指定的参数应该为两个", RdfErrorEnum.FUNCTION_ERROR);
} else if ("count".equals(expression) && (null == params || params.length != 1)) {
throw new RdfFileException("rdf-file#ColumnInfoFunction.checkParams() 指定的参数应该为一个", RdfErrorEnum.FUNCTION_ERROR);
}
}

@Override
public int rowsAffected(RowDefinition rd, FileMeta fileMeta) {
if ("horizontal".equals(expression)) {
return 1;
} else if ("vertical".equals(expression)) {
return getColumnSize(fileMeta);
} else if ("count".equals(expression)) {
return 1;
} else {
throw new RdfFileException(
"rdf-file#ColumnInfoFunction" + expression + ", 无法计算rowsAffected",
RdfErrorEnum.UNSUPPORTED_OPERATION);
}
}

public void horizontal(FuncContext ctx) {
FileDataTypeEnum dataType = FileDataTypeEnum.valueOf(params[0].toUpperCase());
if (CodecType.SERIALIZE.equals(ctx.codecType)) {
ColumnInfoHorizontalCodec.serialize(rowType, dataType, ctx.fileConfig, ctx.writer, params[1]);

} else if (CodecType.DESERIALIZE.equals(ctx.codecType)) {
ColumnInfoHorizontalCodec.deserialize(rowType, dataType, ctx.fileConfig, ctx.reader, params[1]);
}
}

public void vertical(FuncContext ctx) {
FileDataTypeEnum dataType = FileDataTypeEnum.valueOf(params[0].toUpperCase());

if (CodecType.SERIALIZE.equals(ctx.codecType)) {
ColumnInfoVerticalCodec.serialize(rowType, dataType, ctx.fileConfig, ctx.writer, params[1]);
} else if (CodecType.DESERIALIZE.equals(ctx.codecType)) {
ColumnInfoVerticalCodec.deserialize(rowType, dataType, ctx.fileConfig, ctx.reader, params[1]);
}
}

public void count(FuncContext ctx) {
FileMeta fileMeta = TemplateLoader.load(ctx.fileConfig);
String typeName = ctx.columnMeta.getType().getName();

RdfFileFormatSpi columnFormat = FormatLoader.getColumnFormt(fileMeta.getProtocol(),
typeName);
RdfFileUtil.assertNotNull(columnFormat, "类型type=" + typeName + " 对应的format没有");
RdfFileColumnTypeSpi columnTypeCodec = ExtensionLoader
.getExtensionLoader(RdfFileColumnTypeSpi.class).getExtension(typeName);
RdfFileUtil.assertNotNull(columnTypeCodec, "没有type=" + typeName + " 对应的类型codec");

boolean startWithSplit = null != RdfFileUtil.getRowSplit(ctx.fileConfig) && fileMeta.isStartWithSplit(rowType);
boolean endtWithSplit = null != RdfFileUtil.getRowSplit(ctx.fileConfig) && fileMeta.isEndWithSplit(rowType);

switch (ctx.codecType) {
case SERIALIZE:
StringBuilder line = new StringBuilder();
if (startWithSplit) {
line.append(RdfFileUtil.getRowSplit(ctx.fileConfig));
}
line.append(getColumnSize(fileMeta));
if (endtWithSplit) {
line.append(RdfFileUtil.getRowSplit(ctx.fileConfig));
}
ctx.writer.writeLine(columnFormat.serialize(line.toString(), ctx.columnMeta, ctx.fileConfig));
break;
case DESERIALIZE:
String value = ctx.reader.readLine();
if (startWithSplit) {
value = value.substring(RdfFileUtil.getRowSplit(ctx.fileConfig).length());
}
if (endtWithSplit) {
value = value.substring(0, value.length() - RdfFileUtil.getRowSplit(ctx.fileConfig).length());
}
Object field = columnTypeCodec.deserialize(value, ctx.columnMeta);
RdfFileUtil.assertEquals(field.toString(),
String.valueOf(fileMeta.getBodyColumns().size()));
break;
default:
throw new RdfFileException("不支持序列化反序列化类型" + ctx.codecType.name(),
RdfErrorEnum.UNSUPPORTED_OPERATION);
}
}

private int getColumnSize(FileMeta fileMeta) {
FileDataTypeEnum dataType = FileDataTypeEnum.valueOf(params[0].toUpperCase());
switch (dataType) {
case HEAD:
return fileMeta.getHeadColumns().size();
case BODY:
return fileMeta.getBodyColumns().size();
case TAIL:
return fileMeta.getTailColumns().size();
default:
throw new RdfFileException(
"rdf-file#ColumnInfoFunction.rowsAffected dateType=" + dataType.name(),
RdfErrorEnum.UNSUPPORTED_OPERATION);
}
}

}
Loading

0 comments on commit c195e55

Please sign in to comment.