使用Flink CDC从mysql同步数据到Doris,遇到报错:Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema

mysql source配置使用scan.startup.mode=EARLIEST_OFFSET

仔细看报错,里面有一句:com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET 翻译一下:当scan.startup.mode=EARLIEST_OFFSET时,如果在最早的offset之后表结构有过变化就无法支持。

原因应该是在binlog 的earliest_offset之后,表结构被修改过。 我的解决方案:source配置中使用scan.startup.mode' = 'timestamp'和'scan.startup.timestamp-millis' = '1667232000000',以时间戳为消费依据。

完整报错如下:

2024-01-25 09:00:07,079 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: MySQL Source -> Sink: Writer -> Sink: Committer (1/1)#619 (eb387b512a8060e9924a8d5f8c9d49c7_cbc357ccb763df2852fee8c4fc7d55f2_0_619) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)

at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)

at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)

at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)

at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)

at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)

at java.base/java.lang.Thread.run(Unknown Source)

Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

... 1 more

Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.

at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)

at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:75)

at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:429)

at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:93)

at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1095)

at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)

at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)

at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)

at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)

... 1 more

Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET

at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:80)

... 8 more

Caused by: io.debezium.DebeziumException: Error processing binlog event

... 8 more

Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1705912278, file=mysql-bin.004666, pos=78051656, gtids=08a46a1c-7250-11ed-a032-42010ae6d40e:1-262756959,c5293797-7079-11ed-9d5d-42010ae6d40e:1-43523, row=1, server_id=792401336, event=2}

at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:246)

at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:841)

at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:946)

at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:832)

at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:1057)

at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:408)

... 7 more

Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema

at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254)

at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283)

at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)

at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:78)

at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:54)

at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209)

... 12 more

精彩内容

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。