FlinkCDAS同步starrrocks,varchar超长导致数据同步失败

使用FlinkCDAS同步mysql数据到starrocks时,报org.apache.flink.table.api.TableException: Failed to deserialize the input record,看异常栈根源是starrocks入库异常,报too many filtered rows,同步的数据中存在异常行,被过滤了,无法正常入库。

org.apache.flink.table.api.TableException: Failed to deserialize the input record: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1678505077, file=, pos=0}} ConnectRecord{topic='mysql_binlog_source.dd.tableee', kafkaPartition=null, key=Struct{id=6031202774149}, keySchema=Schema{mysql_binlog_source.db.tableee.Key:STRUCT}, value=Struct{after=Struct{id=60312027,name=Ad from a Page post #6,031,202,774,149,run_status=,object_id=0,object_story_id=,effective_object_story_id=1693370527557087_1764951593732313,object_story_spec={"page_id":"1693370527557087","link_data":{"name":"Segredos para usu\u00e1rios Samsung!","image_hash":"efb4d7d867876aa44d33a4ba3b8e12adgb","call_to_action":{"type":"INSTALL_MOBILE_APP"}}},body=,image_hash=,image_file=,image_url=,image_crops=,video_id=,actor_image_hash=,link_url=,object_url=,url_tags=,preview_url=,thumbnail_url=,follow_redirect=,object_store_url=,link_deep_link_url=,call_to_action_type=,object_type=,product_set_id=,adlabels=,applink_treatment=,dynamic_ad_voice=,place_page_set_id=0,instagram_actor_id=0,instagram_permalink_url=,template_url=,object_instagram_id=0,link_og_id=,instagram_story_id=,zipbytes=,copy_from=,asset_feed_spec=,created_time=1655725861000,updated_time=1667633606000,page_id=1693370527557087},source=Struct{version=1.6.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,db=db,table=creative,server_id=0,file=,pos=0,row=0},op=r,ts_ms=1678505077334}, valueSchema=Schema{mysql_binlog_source.db.tableee.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}.

at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:150)

at com.ververica.cdc.connectors.mysql.source.MySqlEvolvingSourceDeserializeSchema.deserialize(MySqlEvolvingSourceDeserializeSchema.java:143)

at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:131)

at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:111)

at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:83)

at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:56)

at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:159)

at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)

at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:70)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:641)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1079)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1028)

at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)

at java.lang.Thread.run(Thread.java:834)

Suppressed: java.lang.RuntimeException: Writing records to StarRocks failed.

at com.starrocks.connector.flink.manager.StarRocksSinkManager.checkFlushException(StarRocksSinkManager.java:396)

at com.starrocks.connector.flink.manager.StarRocksSinkManager.close(StarRocksSinkManager.java:285)

at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.close(StarRocksDynamicSinkFunction.java:242)

at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)

at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)

at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)

at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1272)

at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)

at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)

at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)

at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1191)

at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:936)

at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:936)

... 3 more

Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:

{"Status":"Fail","BeginTxnTimeMs":0,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"3ecfb766-d4c0-4a4a-9ace-139e74e39f4a","LoadBytes":51839093,"StreamLoadPlanTimeMs":0,"NumberTotalRows":48570,"WriteDataTimeMs":471,"TxnId":2805956,"LoadTimeMs":473,"ErrorURL":"http://192.16.61.113:8040/api/_load_error_log?file=error_log_3542ac0c208e8ddc_b712c1cb407f85b9","ReadDataTimeMs":52,"NumberLoadedRows":48566,"NumberFilteredRows":4}

{"streamLoadErrorLog":"Error: NULL value in non-nullable column 'name'. Row: [6004092296930, '', 170703647, 6004092298930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092297130, '', 170703647, 6004092299930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092297730, '', 170703647, 6004092301330, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092304530, '', 170703647, 6004092304930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\n"}

at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:105)

at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:320)

at com.starrocks.connector.flink.manager.StarRocksSinkManager.lambda$startAsyncFlushing$0(StarRocksSinkManager.java:162)

... 1 more

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)

at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)

at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)

at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)

at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:168)

at com.ververica.cdc.connectors.mysql.source.MySqlEvolvingSourceDeserializeSchema$SimpleCollector.collect(MySqlEvolvingSourceDeserializeSchema.java:236)

at com.ververica.cdc.connectors.mysql.source.MySqlEvolvingSourceDeserializeSchema$SimpleCollector.collect(MySqlEvolvingSourceDeserializeSchema.java:227)

at com.ververica.cdc.debezium.table.AppendMetadataCollector.collect(AppendMetadataCollector.java:51)

at com.ververica.cdc.debezium.table.AppendSystemColumnAndMetadataCollector.collect(AppendSystemColumnAndMetadataCollector.java:43)

at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:169)

at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:130)

... 18 more

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)

at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:40)

at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:28)

at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)

at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)

at org.apache.flink.table.runtime.operators.evolution.SchemaEvolutionOperator.processElement(SchemaEvolutionOperator.java:171)

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)

... 30 more

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)

at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)

at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)

at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)

... 38 more

Caused by: java.lang.RuntimeException: Writing records to StarRocks failed.

at com.starrocks.connector.flink.manager.StarRocksSinkManager.checkFlushException(StarRocksSinkManager.java:396)

at com.starrocks.connector.flink.manager.StarRocksSinkManager.writeRecords(StarRocksSinkManager.java:215)

at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.invoke(StarRocksDynamicSinkFunction.java:198)

at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:68)

at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)

... 44 more

[CIRCULAR REFERENCE:com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:

{"Status":"Fail","BeginTxnTimeMs":0,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"3ecfb766-d4c0-4a4a-9ace-139e74e39f4a","LoadBytes":51839093,"StreamLoadPlanTimeMs":0,"NumberTotalRows":48570,"WriteDataTimeMs":471,"TxnId":2805956,"LoadTimeMs":473,"ErrorURL":"http://192.16.61.123:8040/api/_load_error_log?file=error_log_3542ac0c208e8ddc_b712c1cb407f85b9","ReadDataTimeMs":52,"NumberLoadedRows":48566,"NumberFilteredRows":4}

{"streamLoadErrorLog":"Error: NULL value in non-nullable column 'name'. Row: [6004092296930, '', 170703647, 6004092298930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092297130, '', 170703647, 6004092299930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092297730, '', 170703647, 6004092301330, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092304530, '', 170703647, 6004092304930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\n"}

]

通过starrocks返回的ErrorURL查看starrocks的异常日志。提示非空字段name存在空值。通过主键id反查mysqli源表的记录,发现name值是存在的。一开始怀疑是name的映射不对,但比较ddl发现是一致的。后来通过flink sql debug,发现是name值的字符长度超长,导致sink starrrocoks时被置空,最终导致入库异常。

Error: NULL value in non-nullable column 'name'. Row: [6004092296930, '', 170703647, 6004092298930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]

源表和starrocks表中name的定义为name varchar(20) not null。但实际写入的name值超过了20.修改starrocks的name定义,字符长度大于mysql中name的最长字符值即可。

参考文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。