Flink CDC 3.0 Starrocks建表失败会导致任务卡主!

现象

StarRocks建表失败,然后任务自动重启,重启完毕后数据回放,jobMaster打印下面日志后,整个任务会卡主

There are already processing requests. Wait for processing

原因分析

前提概要:可以先阅读CDC表变更处理流程然后再读下面会更加清晰

涉及类包括SchemaRegistry、SchemaOperator和StarRocksMetadataApplier类

SchemaRegistry->handleEventFromOperator方法执行建表失败后会导致任务重启,但是jobMaster不会重启,因此SchemaRegistry.requestHandler.pendingSchemaChanges无法删除导致任务卡主!

public void flushSuccess(TableId tableId, int sinkSubtask) {

flushedSinkWriters.add(sinkSubtask);

if (flushedSinkWriters.equals(activeSinkWriters)) {

LOG.info(

"All sink subtask have flushed for table {}. Start to apply schema change.",

tableId.toString());

PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);

//执行表结构变更操作!

applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());

waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));

if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {

//异常会跳过删除pendingSchame!

startNextSchemaChangeRequest();

}

}

}

//删除pendingSchemaChanges中已经完成的pendingSchame

private void startNextSchemaChangeRequest() {

this.pendingSchemaChanges.remove(0);

this.flushedSinkWriters.clear();

...

}

public CompletableFuture handleSchemaChangeRequest(

SchemaChangeRequest request) {

//历史pendingSchame未删除导致,卡主

if (pendingSchemaChanges.isEmpty()) {

LOG.info(

"Received schema change event request from table {}. Start to buffer requests for others.",

request.getTableId().toString());

if (request.getSchemaChangeEvent() instanceof CreateTableEvent

&& schemaManager.schemaExists(request.getTableId())) {

return CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(false)));

}

CompletableFuture response =

CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true)));

schemaManager.applySchemaChange(request.getSchemaChangeEvent());

pendingSchemaChanges.add(new PendingSchemaChange(request, response));

pendingSchemaChanges.get(0).startToWaitForReleaseRequest();

return response;

} else {

LOG.info("There are already processing requests. Wait for processing.");

CompletableFuture response = new CompletableFuture<>();

pendingSchemaChanges.add(new PendingSchemaChange(request, response));

return response;

}

}

解决办法

让建表执行成功catch住异常,将schame删除后再异常重启,代码如下所示在Starrocks抛出异常时,将异常类转换为添加有注解@ThrowableAnnotation(ThrowableType.NonRecoverableError)的异常类官方已经解决:https://github.com/apache/flink-cdc/pull/3128 (SchemaOperator算子状态初始化时,发送sendRequestToCoordinator(new RefreshPendingListsRequest())清空SchemaRegistryRequestHandler中pendingSchemaChanges)

//com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler类

public void flushSuccess(TableId tableId, int sinkSubtask) {

flushedSinkWriters.add(sinkSubtask);

if (flushedSinkWriters.equals(activeSinkWriters)) {

LOG.info(

"All sink subtask have flushed for table {}. Start to apply schema change.",

tableId.toString());

PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);

try {

applySchemaChange(

tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());

} catch (Exception e) {

// 2 解决办法 catch住所有的exception可能不严谨

// pendingSchemaChanges.remove(0);

// flushedSinkWriters.clear();

//3解决办法 这里还可以考虑在实现端控制抛出可恢复还是不可恢复类型的异常!例如在StarRocksMetadataApplier的applyCreateTable方法中将RuntimeException 改为添加有ThrowableAnnotation注解的异常类!

throw new MyException(e);

}

waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));

if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {

startNextSchemaChangeRequest();

}

}

@ThrowableAnnotation(ThrowableType.NonRecoverableError)

private static class MyException extends RuntimeException {

public MyException(Throwable cause) {

super(cause);

}

}

相关文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。