使用Flink CDC从mysql同步数据到Doris,遇到报错:Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
mysql source配置使用scan.startup.mode=EARLIEST_OFFSET
仔细看报错,里面有一句:com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET 翻译一下:当scan.startup.mode=EARLIEST_OFFSET时,如果在最早的offset之后表结构有过变化就无法支持。
原因应该是在binlog 的earliest_offset之后,表结构被修改过。 我的解决方案:source配置中使用scan.startup.mode' = 'timestamp'和'scan.startup.timestamp-millis' = '1667232000000',以时间戳为消费依据。
完整报错如下:
2024-01-25 09:00:07,079 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: MySQL Source -> Sink: Writer -> Sink: Committer (1/1)#619 (eb387b512a8060e9924a8d5f8c9d49c7_cbc357ccb763df2852fee8c4fc7d55f2_0_619) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:75)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:429)
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:93)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1095)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
... 1 more
Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET
at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:80)
... 8 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
... 8 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1705912278, file=mysql-bin.004666, pos=78051656, gtids=08a46a1c-7250-11ed-a032-42010ae6d40e:1-262756959,c5293797-7079-11ed-9d5d-42010ae6d40e:1-43523, row=1, server_id=792401336, event=2}
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:246)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:841)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:946)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:832)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:1057)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:408)
... 7 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254)
at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283)
at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:78)
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:54)
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209)
... 12 more
精彩内容
发表评论