概要

Flink流数据常常存在写入数据库的场景,一般是通过继承RichSinkFunction来实现对数据的写入。如果sink之前不做优化处理,写入时都是单条写入。单条写入有许多弊端: 1、写入频繁造成数据库压力大 2、写入速度慢、效率低,造成反压 所以需要使用批量写入的方式,本文通过开窗window定时缓存周期数据形成批,下发给sink节点,本文通过大数据量生产环境验证,不仅实现了批量写入,还在防止数据倾斜支持并行等方面做了优化,乃呕心之作。

批量写入功能实现

主函数

KeyedStream keyedStream=sinkStream.keyBy(new HashModKeySelector(keyIndexList,paralleSize));

winStream=keyedStream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(windowSize))) .process(new RowProcessWindowFunction(keyIndexList));

DataStreamSink sink=winStream.addSink(new DbSinkFunction(conf,writeSql));

1、对业务数据进行分组HashModKeySelector

public class HashModKeySelector implements KeySelector {

private static final Logger logger = LoggerFactory.getLogger(HashModKeySelector2.class);

private static final long serialVersionUID = 1L;

/**

* key在row中的索引

*/

private List keyIndexList=null;

private Integer paralleSize;

private Map md5Map = new ConcurrentHashMap<>();

public HashModKeySelector2(List keyIndexList, Integer paralleSize) {

this.keyIndexList=keyIndexList;

this.paralleSize=paralleSize;

}

@Override

public String getKey(Row value) {

int size=keyIndexList.size();

Row keyRow=new Row(size);

for(int i=0;i

int index=keyIndexList.get(i);

keyRow.setField(i, value.getField(index));

}

int keyHash=keyRow.hashCode()%paralleSize;

String strKey=String.valueOf(keyHash);

String md5Value = md5Map.get(strKey);

if(StringUtils.isBlank(md5Value)){

md5Value=md5(strKey);

md5Map.put(strKey,md5Value);

}

return md5Value;

}

public static String md5(String key) {

String result="";

try {

// 创建MD5消息摘要对象

MessageDigest md = MessageDigest.getInstance("MD5");

// 计算消息的摘要

byte[] digest = md.digest(key.getBytes());

// 将摘要转换为十六进制字符串

String hexString = bytesToHex(digest);

result=hexString;

} catch (Exception e) {

logger.error("计算{}md5值失败:",key,e);

return key;

}

return result;

}

public static String bytesToHex(byte[] bytes) {

StringBuilder hexString = new StringBuilder();

for (byte b : bytes) {

String hex = Integer.toHexString(0xff & b);

if (hex.length() == 1) {

hexString.append('0');

}

hexString.append(hex);

}

return hexString.toString();

}

}

2、 使用滚动窗口缓存数据,将单条数据放入集合中,发送到下游

public class RowProcessWindowFunction extends ProcessWindowFunction{

private static final Logger LOG = LoggerFactory.getLogger(RowProcessWindowFunction.class);

/**

* key在row中的索引

*/

private List keyIndexList;

public RowProcessWindowFunction(List keyIndexList) {

if(keyIndexList==null||keyIndexList.size()==0) {

LOG.error("keyIndexList is empty");

throw new RuntimeException("keyIndexList is empty");

}

this.keyIndexList=keyIndexList;

}

@Override

public void process(String key, Context context, Iterable inRow, Collector out) throws Exception {

List rowList=new ArrayList<>();

for (Row row : inRow) {

rowList.add(row);

}

int size=rowList.size();

Row[] rows=new Row[size];

int index=0;

for(Row tmpRow:rowList) {

rows[index]=tmpRow;

index=index+1;

}

out.collect(rows);

}

}

3、批量写入

public class DbSinkFunction extends RichSinkFunction {

private static final Logger LOG = LoggerFactory.getLogger(DbSinkFunction.class);

private String driver = null;

private String sql = null;

DbConnectionPool pool = null;

private Integer laodRate;

private int columnTypes[];

public DbSinkFunction(String dbDriver, String dmlSql) {

this.driver = dbDriver;

this.sql = dmlSql;

}

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

//创建连接池

pool = new DbConnectionPool(conf, driver);

}

@Override

public void close() throws Exception {

//关闭资源、释放资源

super.close();

//关闭连接池

pool.close();

}

/**

* 写入数据库

*/

@Override

public void invoke(I record, Context context) throws Exception {

PreparedStatement ps = null;

Boolean isBatch = false;

String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));

int length=1;

Connection connection =null;

try {

connection =pool.getConnection();

ps = connection.prepareStatement(sql);

//如果是批量数据

if (record instanceof Row[]) {

isBatch = true;

connection.setAutoCommit(false);

Row[] rowArray = (Row[]) record;

length=rowArray.length;

LOG.info("Row array:{}",length);

int no=0;

for(int i=1;i<=length;i++) {

Row row=rowArray[i-1];

fillPreparedStatement(ps, row);

ps.addBatch();

if(i%3000==0) {

ps.executeBatch();

connection.commit();

ps.clearBatch();

no=0;

}

no=no+1;

}

if(no>0) {

ps.executeBatch();

connection.commit();

ps.clearBatch();

}

} else if (record instanceof Row) {

//单条数据

isBatch = false;

Row row = (Row) record;

fillPreparedStatement(ps, row);

ps.execute();

} else {

throw new RuntimeException("不支持的数据类型 " + record.getClass());

}

} catch (SQLException e) {

connection.rollback();

if (isBatch) {

doOneInsert(sql, connection, (Row[]) record);

}

} catch (Exception e) {

LOG.error("写入失败", e);

} finally {

closeDBResources(ps,connection);

}

}

/**

* 批量失败时 单条写入

*

* @param sql

* @param connection

* @param rowArray

*/

protected void doOneInsert(String sql, Connection connection, Row[] rowArray) {

PreparedStatement ps = null;

String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));

try {

Integer allSize = rowArray.length;

Integer errCount = 0;

connection.setAutoCommit(true);

ps = connection.prepareStatement(sql);

for (Row row : rowArray) {

try {

fillPreparedStatement(ps, row);

ps.execute();

} catch (SQLException e) {

errCount++;

} finally {

ps.clearParameters();

}

}

} catch (Exception e) {

LOG.error(e.getMessage(), e);

} finally {

closeDBResources(ps,null);

}

}

private void closeDBResources(Statement stmt, Connection conn) {

try {

if (!(null== stmt||stmt.isClosed())) {

stmt.close();

}

if (!(null == conn||conn.isClosed())) {

conn.close();

}

} catch (SQLException e) {

}

}

参考文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。