概要
Flink流数据常常存在写入数据库的场景,一般是通过继承RichSinkFunction来实现对数据的写入。如果sink之前不做优化处理,写入时都是单条写入。单条写入有许多弊端: 1、写入频繁造成数据库压力大 2、写入速度慢、效率低,造成反压 所以需要使用批量写入的方式,本文通过开窗window定时缓存周期数据形成批,下发给sink节点,本文通过大数据量生产环境验证,不仅实现了批量写入,还在防止数据倾斜支持并行等方面做了优化,乃呕心之作。
批量写入功能实现
主函数
KeyedStream keyedStream=sinkStream.keyBy(new HashModKeySelector(keyIndexList,paralleSize));
winStream=keyedStream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(windowSize))) .process(new RowProcessWindowFunction(keyIndexList));
DataStreamSink sink=winStream.addSink(new DbSinkFunction(conf,writeSql));
1、对业务数据进行分组HashModKeySelector
public class HashModKeySelector implements KeySelector
private static final Logger logger = LoggerFactory.getLogger(HashModKeySelector2.class);
private static final long serialVersionUID = 1L;
/**
* key在row中的索引
*/
private List
private Integer paralleSize;
private Map
public HashModKeySelector2(List
this.keyIndexList=keyIndexList;
this.paralleSize=paralleSize;
}
@Override
public String getKey(Row value) {
int size=keyIndexList.size();
Row keyRow=new Row(size);
for(int i=0;i int index=keyIndexList.get(i); keyRow.setField(i, value.getField(index)); } int keyHash=keyRow.hashCode()%paralleSize; String strKey=String.valueOf(keyHash); String md5Value = md5Map.get(strKey); if(StringUtils.isBlank(md5Value)){ md5Value=md5(strKey); md5Map.put(strKey,md5Value); } return md5Value; } public static String md5(String key) { String result=""; try { // 创建MD5消息摘要对象 MessageDigest md = MessageDigest.getInstance("MD5"); // 计算消息的摘要 byte[] digest = md.digest(key.getBytes()); // 将摘要转换为十六进制字符串 String hexString = bytesToHex(digest); result=hexString; } catch (Exception e) { logger.error("计算{}md5值失败:",key,e); return key; } return result; } public static String bytesToHex(byte[] bytes) { StringBuilder hexString = new StringBuilder(); for (byte b : bytes) { String hex = Integer.toHexString(0xff & b); if (hex.length() == 1) { hexString.append('0'); } hexString.append(hex); } return hexString.toString(); } } 2、 使用滚动窗口缓存数据,将单条数据放入集合中,发送到下游 public class RowProcessWindowFunction extends ProcessWindowFunction private static final Logger LOG = LoggerFactory.getLogger(RowProcessWindowFunction.class); /** * key在row中的索引 */ private List public RowProcessWindowFunction(List if(keyIndexList==null||keyIndexList.size()==0) { LOG.error("keyIndexList is empty"); throw new RuntimeException("keyIndexList is empty"); } this.keyIndexList=keyIndexList; } @Override public void process(String key, Context context, Iterable List for (Row row : inRow) { rowList.add(row); } int size=rowList.size(); Row[] rows=new Row[size]; int index=0; for(Row tmpRow:rowList) { rows[index]=tmpRow; index=index+1; } out.collect(rows); } } 3、批量写入 public class DbSinkFunction extends RichSinkFunction { private static final Logger LOG = LoggerFactory.getLogger(DbSinkFunction.class); private String driver = null; private String sql = null; DbConnectionPool pool = null; private Integer laodRate; private int columnTypes[]; public DbSinkFunction(String dbDriver, String dmlSql) { this.driver = dbDriver; this.sql = dmlSql; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //创建连接池 pool = new DbConnectionPool(conf, driver); } @Override public void close() throws Exception { //关闭资源、释放资源 super.close(); //关闭连接池 pool.close(); } /** * 写入数据库 */ @Override public void invoke(I record, Context context) throws Exception { PreparedStatement ps = null; Boolean isBatch = false; String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES")); int length=1; Connection connection =null; try { connection =pool.getConnection(); ps = connection.prepareStatement(sql); //如果是批量数据 if (record instanceof Row[]) { isBatch = true; connection.setAutoCommit(false); Row[] rowArray = (Row[]) record; length=rowArray.length; LOG.info("Row array:{}",length); int no=0; for(int i=1;i<=length;i++) { Row row=rowArray[i-1]; fillPreparedStatement(ps, row); ps.addBatch(); if(i%3000==0) { ps.executeBatch(); connection.commit(); ps.clearBatch(); no=0; } no=no+1; } if(no>0) { ps.executeBatch(); connection.commit(); ps.clearBatch(); } } else if (record instanceof Row) { //单条数据 isBatch = false; Row row = (Row) record; fillPreparedStatement(ps, row); ps.execute(); } else { throw new RuntimeException("不支持的数据类型 " + record.getClass()); } } catch (SQLException e) { connection.rollback(); if (isBatch) { doOneInsert(sql, connection, (Row[]) record); } } catch (Exception e) { LOG.error("写入失败", e); } finally { closeDBResources(ps,connection); } } /** * 批量失败时 单条写入 * * @param sql * @param connection * @param rowArray */ protected void doOneInsert(String sql, Connection connection, Row[] rowArray) { PreparedStatement ps = null; String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES")); try { Integer allSize = rowArray.length; Integer errCount = 0; connection.setAutoCommit(true); ps = connection.prepareStatement(sql); for (Row row : rowArray) { try { fillPreparedStatement(ps, row); ps.execute(); } catch (SQLException e) { errCount++; } finally { ps.clearParameters(); } } } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { closeDBResources(ps,null); } } private void closeDBResources(Statement stmt, Connection conn) { try { if (!(null== stmt||stmt.isClosed())) { stmt.close(); } if (!(null == conn||conn.isClosed())) { conn.close(); } } catch (SQLException e) { } } 参考文章
发表评论