本片博文将阐述项目工作中使用Quartz的情况,包含项目背景、项目框架、Quartz集群部署等方面,重点讲述如何在实际项目中使用Quartz。

1. 背景

因项目需求,需要定时调用数据下载接口,并将数据存储至诸如mongo、redis、elasticsearch等数据库或缓存中。具体涉及到的需求如下:

a. 调用接口的任务均从mongo数据库读取;

b. 任务的个数随着业务量的增加而增加;

c. 每个调用任务的定时执行时间可能不同,且定时执行时间在mongo中可配置;

d. 任务的执行需要动态更新,如检测到某一任务的定时时间发生变化,则任务的执行也需要实时修改 

e. mongo、redis、elasticsearch等数据库中所存储的字段也由mongo进行配置;

f. 任务执行需要实时性较高、可靠性较强、可扩展性较高等

综上需求,调研了一番,发现任务调度框架Quartz可满足项目需求。

2. 框架

基于项目的需求,结合任务调度框架Quartz,大体的流程框架如下图所示:

1) 首先构建从mongo加载任务

2) 将任务的配置信息初始化至Quartz

3) 通过Quartz的Job任务实现定时调用下载接口任务

4) 将下载的数据依据配置,存储至数据库中

5) 定时检测任务通过定时扫描mongo数据库,查看相关任务信息的配置是否发生变化,如果发生变化,则进行动态更新

6) 为了实现高可用性、可扩展性,可以直接使用Quartz原生的集群特性。

3. 核心代码

核心代码将会涵盖上述流程图中的相关环节,为了项目的保密性,相关信息也会隐藏。

3.1 任务主流程  

import com.quartz.conf.Configuration;

import com.quartz.conf.OcpConfHelper;

import com.quartz.module.TaskInfo;

import org.apache.log4j.PropertyConfigurator;

import org.quartz.Scheduler;

import org.quartz.SchedulerException;

import org.quartz.impl.StdSchedulerFactory;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.List;

public class SchedulerRunner {

static Logger logger = LoggerFactory.getLogger(SchedulerRunner.class);

public static void main(String[] args) {

// 加载日志配置文件

PropertyConfigurator.configure("./conf/log4j.properties");

// 加载quartz配置文件

System.setProperty("org.quartz.properties", "./conf/quartz.properties");

// 执行任务解析与调度

run();

}

public static void run(){

// 获取配置信息表

List taskInfos = GenerateTaskInfo.generateTaskInfoFromMysql();

if(taskInfos.size() == 0){

logger.info("there is no tasks from mongoInfo");

return;

}

// 过滤下线任务

taskInfos = GenerateTaskInfo.filterTask(taskInfos);

if(taskInfos.size() == 0){

logger.info("all tasks if offline, no need to run");

return;

}

Scheduler scheduler = null;

try {

scheduler = StdSchedulerFactory.getDefaultScheduler();

} catch (SchedulerException e) {

e.printStackTrace();

}

if(scheduler == null){

logger.error("create scheduler failed");

return;

}

if(isSchedulerClear()){

clearSchedulerJob(scheduler);

}

// 加入任务调度

for(TaskInfo task : taskInfos){

SchedulerFactory.addJob2Scheduler(task, scheduler);

}

// 加入动态更新任务

SchedulerFactory.addDynamicUpdateJob2Scheduler(scheduler);

// 开启任务

try {

scheduler.start();

} catch (SchedulerException e) {

logger.error("start scheduler error!");

}

}

public static void clearSchedulerJob(Scheduler scheduler){

try {

scheduler.clear();

} catch (SchedulerException e) {

logger.error("clear scheduler error!");

}

}

/**

* 基于配置文件中的信息,加载调度器开始运行时的清洗标识

* @return

*/

private static boolean isSchedulerClear(){

Configuration conf = OcpConfHelper.getInstance().getOcpConf();

return conf.getBooleanValue("cleanSchedulerFlag", "true");

}

}

View Code

3.2 封装任务对象

import java.util.List;

import java.util.Map;

public class TaskInfo {

protected String categoryId; // 业务Id

protected String categoryName; // 业务名称

protected String sourceId; // 信源Id

protected String sourceName; // 信源名称

protected int sourceStatus; // 信源状态

protected String pipelineConf; // 信源pipeline配置信息

protected List dbStoreTypes; // 业务的存储类型

protected String esConfInfo; // ES存储配置

protected String dbConfInfo; // DB存储配置

protected String cronInfo; // 定时任务信息

protected int sourceType; // 实时更新还是离线更新

protected List indexBuildEles; // 更新索引的信息

protected List idBuildEles; // id的构建因素

protected String indexType; // 全量或增量

protected String categoryLevel1; // 一级分类

protected String zhName; // 中文信息

protected Map outputType; //输出参数名及其类型

protected String providerName;

protected String functionName; //category_function名称

public String getProviderName() {

return providerName;

}

public void setProviderName(String providerName) {

this.providerName = providerName;

}

public String getCategoryId() {

return categoryId;

}

public void setCategoryId(String categoryId) {

this.categoryId = categoryId;

}

public String getCategoryName() {

return categoryName;

}

public void setCategoryName(String categoryName) {

this.categoryName = categoryName;

}

public String getSourceId() {

return sourceId;

}

public void setSourceId(String sourceId) {

this.sourceId = sourceId;

}

public String getSourceName() {

return sourceName;

}

public void setSourceName(String sourceName) {

this.sourceName = sourceName;

}

public int getSourceStatus() {

return sourceStatus;

}

public void setSourceStatus(int sourceStatus) {

this.sourceStatus = sourceStatus;

}

public String getPipelineConf() {

return pipelineConf;

}

public void setPipelineConf(String pipelineConf) {

this.pipelineConf = pipelineConf;

}

public String getEsConfInfo() {

return esConfInfo;

}

public void setEsConfInfo(String esConfInfo) {

this.esConfInfo = esConfInfo;

}

public String getDbConfInfo() {

return dbConfInfo;

}

public void setDbConfInfo(String dbConfInfo) {

this.dbConfInfo = dbConfInfo;

}

public String getCronInfo() {

return cronInfo;

}

public void setCronInfo(String cronInfo) {

this.cronInfo = cronInfo;

}

public int getSourceType() {

return sourceType;

}

public void setSourceType(int sourceType) {

this.sourceType = sourceType;

}

public List getIdBuildEles() {

return idBuildEles;

}

public void setIdBuildEles(List idBuildEles) {

this.idBuildEles = idBuildEles;

}

public List getIndexBuildEles() {

return indexBuildEles;

}

public void setIndexBuildEles(List indexBuildEles) {

this.indexBuildEles = indexBuildEles;

}

public String getIndexType() {

return indexType;

}

public void setIndexType(String indexType) {

this.indexType = indexType;

}

public String getCategoryLevel1() {

return categoryLevel1;

}

public void setCategoryLevel1(String categoryLevel1) {

this.categoryLevel1 = categoryLevel1;

}

public String getZhName() {

return zhName;

}

public void setZhName(String zhName) {

this.zhName = zhName;

}

public TaskInfo(){}

public List getDbStoreTypes() {

return dbStoreTypes;

}

public void setDbStoreTypes(List dbStoreTypes) {

this.dbStoreTypes = dbStoreTypes;

}

public Map getOutputType() {

return outputType;

}

public void setOutputType(Map outputType) {

this.outputType = outputType;

}

public String getFunctionName() {

return functionName;

}

public void setFunctionName(String functionName) {

this.functionName = functionName;

}

/**

* 是否有相同的定时信息

* @param taskInfo

* @return

*/

public boolean hasSameCronInfo(TaskInfo taskInfo){

if(taskInfo == null) return false;

return this.getCronInfo().equalsIgnoreCase(taskInfo.getCronInfo());

}

@Override

public String toString() {

return "TaskInfo{" +

"categoryId='" + categoryId + '\'' +

", categoryName='" + categoryName + '\'' +

", sourceId='" + sourceId + '\'' +

", sourceName='" + sourceName + '\'' +

", sourceStatus=" + sourceStatus +

", pipelineConf='" + pipelineConf + '\'' +

", dbStoreTypes=" + dbStoreTypes +

", esConfInfo='" + esConfInfo + '\'' +

", dbConfInfo='" + dbConfInfo + '\'' +

", cronInfo='" + cronInfo + '\'' +

", sourceType=" + sourceType +

", indexBuildEles=" + indexBuildEles +

", idBuildEles=" + idBuildEles +

", indexType='" + indexType + '\'' +

", categoryLevel1='" + categoryLevel1 + '\'' +

", zhName='" + zhName + '\'' +

", outputType='" + outputType + '\'' +

", providerName='" + providerName + '\'' +

", functionName='" + functionName + '\'' +

'}';

}

@Override

public boolean equals(Object o) {

if (this == o) return true;

if (o == null || getClass() != o.getClass()) return false;

TaskInfo taskInfo = (TaskInfo) o;

if (sourceStatus != taskInfo.sourceStatus) return false;

if (sourceType != taskInfo.sourceType) return false;

if (categoryName != null ? !categoryName.equals(taskInfo.categoryName) : taskInfo.categoryName != null)

return false;

if (sourceName != null ? !sourceName.equals(taskInfo.sourceName) : taskInfo.sourceName != null) return false;

if (providerName != null ? !providerName.equals(taskInfo.providerName) : taskInfo.providerName != null) return false;

if (pipelineConf != null ? !pipelineConf.equals(taskInfo.pipelineConf) : taskInfo.pipelineConf != null)

return false;

if (dbStoreTypes != null ? !dbStoreTypes.equals(taskInfo.dbStoreTypes) : taskInfo.dbStoreTypes != null)

return false;

if (esConfInfo != null ? !esConfInfo.equals(taskInfo.esConfInfo) : taskInfo.esConfInfo != null) return false;

if (dbConfInfo != null ? !dbConfInfo.equals(taskInfo.dbConfInfo) : taskInfo.dbConfInfo != null) return false;

if (cronInfo != null ? !cronInfo.equals(taskInfo.cronInfo) : taskInfo.cronInfo != null) return false;

if (indexBuildEles != null ? !indexBuildEles.equals(taskInfo.indexBuildEles) : taskInfo.indexBuildEles != null)

return false;

if (idBuildEles != null ? !idBuildEles.equals(taskInfo.idBuildEles) : taskInfo.idBuildEles != null)

return false;

if (indexType != null ? !indexType.equals(taskInfo.indexType) : taskInfo.indexType != null) return false;

if (categoryLevel1 != null ? !categoryLevel1.equals(taskInfo.categoryLevel1) : taskInfo.categoryLevel1 != null)

return false;

if (outputType != null ? !outputType.equals(taskInfo.outputType) : taskInfo.outputType != null)

return false;

if (functionName != null ? !functionName.equals(taskInfo.functionName) : taskInfo.functionName != null)

return false;

return zhName != null ? zhName.equals(taskInfo.zhName) : taskInfo.zhName == null;

}

@Override

public int hashCode() {

int result = categoryName != null ? categoryName.hashCode() : 0;

result = 31 * result + (sourceName != null ? sourceName.hashCode() : 0);

result = 31 * result + (providerName != null ? providerName.hashCode() : 0);

result = 31 * result + sourceStatus;

result = 31 * result + (pipelineConf != null ? pipelineConf.hashCode() : 0);

result = 31 * result + (dbStoreTypes != null ? dbStoreTypes.hashCode() : 0);

result = 31 * result + (esConfInfo != null ? esConfInfo.hashCode() : 0);

result = 31 * result + (dbConfInfo != null ? dbConfInfo.hashCode() : 0);

result = 31 * result + (cronInfo != null ? cronInfo.hashCode() : 0);

result = 31 * result + sourceType;

result = 31 * result + (indexBuildEles != null ? indexBuildEles.hashCode() : 0);

result = 31 * result + (idBuildEles != null ? idBuildEles.hashCode() : 0);

result = 31 * result + (indexType != null ? indexType.hashCode() : 0);

result = 31 * result + (categoryLevel1 != null ? categoryLevel1.hashCode() : 0);

result = 31 * result + (zhName != null ? zhName.hashCode() : 0);

result = 31 * result + (outputType != null ? outputType.hashCode() : 0);

result = 31 * result + (functionName != null ? functionName.hashCode() : 0);

return result;

}

}

View Code

3.3 任务的构造及初始化

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONArray;

import com.alibaba.fastjson.JSONException;

import com.alibaba.fastjson.JSONObject;

import com.google.common.collect.Lists;

import com.google.common.collect.Maps;

import com.google.gson.Gson;

import com.quartz.consts.SourceType;

import com.quartz.consts.Sql;

import com.quartz.consts.StatusType;

import com.quartz.module.TaskInfo;

import com.quartz.util.MongoUtil;

import com.quartz.util.MySqlUtil;

import com.quartz.util.TimeUtil;

import com.mongodb.BasicDBObject;

import com.mongodb.DBCollection;

import com.mongodb.DBCursor;

import com.mongodb.DBObject;

import org.bson.types.ObjectId;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.sql.Connection;

import java.util.*;

/**

* 获取调度任务的相关信息

* Created by songwang4 on 2017/6/7.

*/

public class GenerateTaskInfo {

static Logger logger = LoggerFactory.getLogger(GenerateTaskInfo.class);

static DBCollection sourceColl = MongoUtil.createOcpSourceDB();

static DBCollection categoryColl = MongoUtil.createOcpCategoryDB();

/**

* 从数据库中读取任务相关信息

*

* @return

*/

public static List generateTaskInfoFromMongo() {

// 将任务信息进行封装

List tasks = Lists.newArrayList();

TaskInfo task = null;

DBCursor sourceCur = sourceColl.find();

DBObject sourceObj = null;

DBObject categoryObj = null;

while (sourceCur.hasNext()) {

sourceObj = sourceCur.next();

task = new TaskInfo();

String sourceName = sourceObj.get("sourceName").toString();

String categoryName = sourceObj.get("category").toString();

// 基于业务名查找对应的业务表信息

categoryObj = categoryColl.findOne(new BasicDBObject("catName", categoryName));

if (categoryObj == null) {

logger.error("no category found through source: " + sourceName);

continue;

}

task.setCategoryId(categoryObj.get("_id").toString()); // 业务Id

task.setCategoryName(categoryName); // 业务名

List dbStoreTypes = Lists.newArrayList();

if (categoryObj.containsField("storeType")) {

try {

JSONArray storeTypeArr = JSON.parseArray(categoryObj.get("storeType").toString());

for (int i = 0; i < storeTypeArr.size(); i++) {

dbStoreTypes.add(storeTypeArr.getString(i));

}

} catch (Exception e) {

}

}

task.setDbStoreTypes(dbStoreTypes); // 存储类型

task.setCategoryLevel1(categoryObj.get("parent").toString()); // 一级业务分类

task.setZhName(sourceObj.get("zhName").toString());

task.setDbConfInfo(categoryObj.containsField("db") ? categoryObj.get("db").toString() : categoryName); // DB配置

task.setEsConfInfo(categoryObj.containsField("es") ? categoryObj.get("es").toString() : categoryName); // ES配置

task.setIndexBuildEles(extractBuilderEles(categoryObj, "isIndex", "itemName")); // 构建ES索引信息

task.setIdBuildEles(extractBuilderEles(categoryObj, "isGK", "itemName")); // 构建id的信息元素

task.setSourceId(sourceObj.get("_id").toString()); // 信源Id

task.setSourceName(sourceName); // 信源名称

int status = StatusType.OFFLINE;

if (sourceObj.containsField("status")) {

String statusType = sourceObj.get("status").toString();

if (statusType.equals(StatusType.STR_ONLINE)) {

status = StatusType.ONLINE;

}

}

task.setSourceStatus(status); // 信源的上下线状态

int sourceType = SourceType.REAL_TIME_PROCESS;

if (sourceObj.containsField("type")) {

String strStatusType = sourceObj.get("type").toString();

if (strStatusType.equals(SourceType.STR_OFF_LINE_PROCESS)) {

sourceType = SourceType.OFF_LINE_PROCESS;

}

}

task.setSourceType(sourceType); // 离线或实时处理

task.setIndexType(sourceObj.containsField("indexType") ?

sourceObj.get("indexType").toString() : ""); // 增量或全量标识

// 定时时间配置

task.setCronInfo(sourceObj.containsField("timerInfo") ?

sourceObj.get("timerInfo").toString() : "");

if (task.getCronInfo().trim().length() == 0) {

task.setCronInfo(generateCronInfo(sourceObj));

}

task.setPipelineConf(sourceObj.containsField("mappingWorkflow") ?

sourceObj.get("mappingWorkflow").toString() : ""); // pipeline配置信息

tasks.add(task);

}

sourceCur.close();

return tasks;

}

/**

* 构建生成id或es的信息元素

*

* @param categoryObj

* @param queryField

* @param retureField

* @return

*/

public static List extractBuilderEles(DBObject categoryObj, String queryField, String retureField) {

List builerEles = Lists.newArrayList();

JSONArray dataItemArr = null;

try {

dataItemArr = JSON.parseArray(categoryObj.get("dataItems").toString());

} catch (JSONException e) {

}

if (dataItemArr != null && dataItemArr.size() > 0) {

JSONObject dataItemJson = null;

for (int i = 0; i < dataItemArr.size(); i++) {

dataItemJson = dataItemArr.getJSONObject(i);

if (dataItemJson.containsKey(queryField) && dataItemJson.getBoolean(queryField)) {

builerEles.add(dataItemJson.getString(retureField).trim());

}

}

}

return builerEles;

}

/**

* 基于业务表中的信息构造定时任务表达式

*

* @param sourceObj

* @return

*/

public static String generateCronInfo(DBObject sourceObj) {

String updateTimeType = "";

String updateTimeCycle = "";

if (sourceObj.containsField("updateType")) {

updateTimeType = sourceObj.get("updateType").toString();

}

if (sourceObj.containsField("updateCycle")) {

updateTimeCycle = sourceObj.get("updateCycle").toString();

}

if (updateTimeType.trim().length() == 0 || updateTimeCycle.trim().length() == 0) {

return "";

}

StringBuilder sb = new StringBuilder();

Date date = null;

if (updateTimeType.equalsIgnoreCase("YEAR")) {

date = TimeUtil.parseDate(updateTimeCycle, "MM-dd HH:mm");

if (date == null) {

try {

sb.append(TimeUtil.extractFixedTimeByDay(Integer.parseInt(updateTimeCycle), 0, 0));

} catch (NumberFormatException e) {

}

} else {

sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")

.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")

.append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" ")

.append(TimeUtil.extractFixedTime(date, Calendar.MONTH) + 1).append(" ? *");

}

}

if (updateTimeType.equalsIgnoreCase("MONTH")) {

date = TimeUtil.parseDate(updateTimeCycle, "dd HH:mm");

if (date == null) return "";

sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")

.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")

.append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" * ?");

}

if (updateTimeType.equalsIgnoreCase("DAY")) {

date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");

if (date == null) return "";

sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")

.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" * * ?");

}

if (updateTimeType.equalsIgnoreCase("WEEK")) {

String weekDay = "1";

if (sourceObj.containsField("weekDay")) {

weekDay = sourceObj.get("weekDay").toString();

}

date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");

if (date == null) return "";

sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")

.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ? * ")

.append(TimeUtil.extractFixedTime(weekDay));

}

if (updateTimeType.equalsIgnoreCase("HOUR")) {

try {

int hour = Integer.parseInt(updateTimeCycle);

sb.append(TimeUtil.extractFixedTimeByHour(hour, 0));

} catch (NumberFormatException e) {

}

}

if (updateTimeType.equalsIgnoreCase("MINUTE")) {

try {

int minute = Integer.parseInt(updateTimeCycle);

sb.append(TimeUtil.extractFixedTimeByMinute(minute));

} catch (NumberFormatException e) {

}

}

if (updateTimeType.equalsIgnoreCase("SECOND")) {

sb.append("*/").append(updateTimeCycle).append(" * * * * ?");

}

return sb.toString();

}

/**

* 过滤下线的任务

*

* @param tasks

* @return

*/

public static List filterTask(List tasks) {

List taskInfos = Lists.newArrayList();

for (TaskInfo taskInfo : tasks) {

// 过滤下线的信源状态或实时的信源

if (taskInfo.getSourceStatus() == StatusType.OFFLINE

|| taskInfo.getSourceType() != SourceType.OFF_LINE_PROCESS) {

continue;

}

taskInfos.add(taskInfo);

}

return taskInfos;

}

/**

* 基于业务名称对任务进行分组

*

* @param oriTasks

* @return

*/

public static Map> groupTaskByCategory(List oriTasks) {

Map> categoryTasks = Maps.newHashMap();

for (TaskInfo oriTask : oriTasks) {

if (!categoryTasks.containsKey(oriTask.getCategoryId())) {

List taskInfos = Lists.newArrayList();

taskInfos.add(oriTask);

categoryTasks.put(oriTask.getCategoryId(), taskInfos);

} else {

boolean hasSameSourceId = false;

for (TaskInfo taskInfo : categoryTasks.get(oriTask.getCategoryId())) {

if (taskInfo.getSourceId().equals(oriTask.getSourceId())) {

hasSameSourceId = true;

break;

}

}

if (!hasSameSourceId) {

categoryTasks.get(oriTask.getCategoryId()).add(oriTask);

}

}

}

return categoryTasks;

}

}

View Code

3.4 调用下载接口的任务

import com.alibaba.fastjson.JSONException;

import com.alibaba.fastjson.JSONObject;

import com.google.common.collect.Lists;

import com.google.gson.Gson;

import com.pipes.entity.CrawlerLog;

import com.pipes.out.IDataOut;

import com.pipes.parser.PipeExecuter;

import com.quartz.consts.SourceType;

import com.quartz.consts.StatusType;

import com.quartz.consts.StoreType;

import com.quartz.module.TaskInfo;

import com.quartz.output.*;

import com.quartz.util.CrawlerLogUtil;

import org.apache.log4j.PropertyConfigurator;

import org.elasticsearch.common.Strings;

import org.quartz.*;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.List;

/**

* 离线存储任务

* 注意:上一个任务如未完成,且下一次的定时任务已到执行时间,则需要等待上一个任务

* 执行完成,再进行下一个任务

*/

@DisallowConcurrentExecution

public class ScheduleJob implements Job {

static Logger logger = LoggerFactory.getLogger(ScheduleJob.class);

public ScheduleJob() {

}

@Override

public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {

JobDetail jobDetail = jobExecutionContext.getJobDetail();

JSONObject json = new JSONObject();

json.put("jobName", jobDetail.getKey().getName());

json.put("jobGroup", jobDetail.getKey().getGroup());

json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName());

json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup());

logger.info("job is running: " + json.toString());

JobDataMap dataMap = jobDetail.getJobDataMap();

JSONObject confJson = null;

try {

confJson = JSONObject.parseObject(dataMap.getString(SchedulerFactory.CONF_INFO));

} catch (JSONException e) {

}

if (confJson == null) {

logger.error("conf is empty: " + json.toString());

return;

}

// 获取存储类型

TaskInfo taskInfo = new Gson().fromJson(confJson.toString(), TaskInfo.class);

if (!isNeedtoRun(taskInfo)) {

logger.info("no need to run: " + json.toString());

return;

}

List dataOuts = Lists.newArrayList();

for (String dbStoreType : taskInfo.getDbStoreTypes()) {

switch (dbStoreType) {

case StoreType.STR_MONGO_STORE:

dataOuts.add(new DataOut2Mongo(taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));

break;

case StoreType.STR_ES_STORE:

dataOuts.add(new DataOut2ES(taskInfo.getCategoryName(),taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getIndexBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));

break;

case StoreType.STR_REDIS_STORE:

dataOuts.add(new DataOut2Redis(taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));

break;

}

}

// 创建数据拉取对象,拉取前存储一次,拉取后存储一次

CrawlerLog crawlerLog = createCrawlerLog(taskInfo);

if (dataOuts.size() > 0) {

PipeExecuter.executeSave(taskInfo.getPipelineConf(), dataOuts, crawlerLog);

}

}

/**

* 判断job是否需要执行

*

* @param taskInfo

* @return

*/

public static boolean isNeedtoRun(TaskInfo taskInfo) {

// 实时or离线

if (taskInfo.getSourceType() == SourceType.REAL_TIME_PROCESS) {

logger.warn("the job is real-time process, no need to run");

return false;

}

// job的上下线状态

if (taskInfo.getSourceStatus() == StatusType.OFFLINE) {

logger.warn("the job status is offline, no need to run");

return false;

}

// pipeline的配置信息

if (Strings.isNullOrEmpty(taskInfo.getPipelineConf()) || taskInfo.getPipelineConf().trim().length() == 0) {

logger.warn("no pipeline configure info, no need to run");

return false;

}

// job的存储信息

if (taskInfo.getDbStoreTypes().size() == 0) {

logger.warn("the job store type is 0, no need to store");

return false;

}

return true;

}

/**

* 创建拉取数据的日志,以便管理系统查看

*

* @param taskInfo

* @return

*/

public CrawlerLog createCrawlerLog(TaskInfo taskInfo) {

CrawlerLog crawlerLog = new CrawlerLog();

crawlerLog.setIndexType(taskInfo.getIndexType()); // 增量还是全量

crawlerLog.setCategoryLv1(taskInfo.getCategoryLevel1());

String sourceName = taskInfo.getSourceName();

String sourceZhName = taskInfo.getZhName();

String sourceArr[] = sourceName.split("_");

String sourceZhArr[] = sourceZhName.split("_");

crawlerLog.setCategoryLv2((sourceArr != null && sourceArr.length > 0) ? sourceArr[0] : "");

crawlerLog.setFunctionName((sourceArr != null && sourceArr.length > 1) ? sourceArr[1] : "");

crawlerLog.setProviderName((sourceArr != null && sourceArr.length > 2) ? sourceArr[2] : "");

crawlerLog.setFunctionZhName((sourceZhArr != null && sourceZhArr.length > 1) ? sourceZhArr[1] : "");

crawlerLog.setProviderZhName((sourceZhArr != null && sourceZhArr.length > 2) ? sourceZhArr[2] : "");

crawlerLog.setId();

return crawlerLog;

}

}

View Code

3.5 任务调度工厂

工厂用于生成任务的触发器Trigger,以及创建任务Job。

import com.google.gson.Gson;

import com.quartz.consts.PrefixType;

import com.quartz.module.TaskInfo;

import org.quartz.*;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class SchedulerFactory {

static Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);

public static final String CONF_INFO = "conf_info";

public static final String DYNAMIC_UPDATE_JOB_NAME = "dynamicUpdateJob";

public static final String DYNAMIC_UPDATE_GROUP_NAME = "dynamicUpdateGroup";

public static final String DYNAMIC_UPDATE_CRONTINFO = "*/30 * * * * ?";

/**

* 将任务加入任务调度中

* @param taskInfo

* @param scheduler

*/

public static void addJob2Scheduler(TaskInfo taskInfo, Scheduler scheduler) {

try {

JobDetail jobDetail = generateJobDetail(taskInfo);

if(jobDetail == null){

logger.error("create job failed!");

return;

}

Trigger triger = generateTrigger(taskInfo);

if(triger == null){

logger.error("create trigger failed!");

return;

}

// 加载执行Job及定时器

scheduler.scheduleJob(jobDetail,triger);

} catch (SchedulerException e) {

logger.error("create scheduler error, error message: "+e.toString());

}

}

public static void addDynamicUpdateJob2Scheduler(Scheduler scheduler) {

try {

JobDetail jobDetail = generateDynamicUpdateJobDetail(DYNAMIC_UPDATE_JOB_NAME, DYNAMIC_UPDATE_GROUP_NAME);

if(jobDetail == null){

logger.error("create job failed!");

return;

}

Trigger triger = generateTrigger(DYNAMIC_UPDATE_JOB_NAME, DYNAMIC_UPDATE_GROUP_NAME, DYNAMIC_UPDATE_CRONTINFO);

if(triger == null){

logger.error("create trigger failed!");

return;

}

// 加载执行Job及定时器

scheduler.scheduleJob(jobDetail,triger);

} catch (SchedulerException e) {

logger.error("create scheduler error, error message: "+e.toString());

}

}

/**

* 于信源信息生成对应的job

* @param taskInfo

* @return

*/

public static JobDetail generateJobDetail(TaskInfo taskInfo) {

String jobName = taskInfo.getSourceName();

if(jobName.trim().length() == 0){

logger.error("job name is empty, please check!");

return null;

}

String jobGroup = taskInfo.getCategoryName();

if(jobGroup.trim().length() == 0){

logger.error("job group is empty, please check!");

return null;

}

return JobBuilder.newJob(ScheduleJob.class)

.withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup)

.requestRecovery()

.usingJobData(CONF_INFO, new Gson().toJson(taskInfo)).build();

}

public static JobDetail generateDynamicUpdateJobDetail(String jobName, String jobGroup) {

if(jobName.trim().length() == 0){

logger.error("job name is empty, please check!");

return null;

}

if(jobGroup.trim().length() == 0){

logger.error("job group is empty, please check!");

return null;

}

return JobBuilder.newJob(DynamicUpdateJob.class)

.withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup)

.requestRecovery()

.build();

}

/**

* 基于信源信息生成对应的trigger

* @param taskInfo

* @return

*/

public static Trigger generateTrigger(TaskInfo taskInfo) {

String sourceTriggerName = taskInfo.getSourceName();

if(sourceTriggerName.trim().length() == 0){

logger.error("trigger name is empty, please check!");

return null;

}

String sourceTriggerGroup = taskInfo.getCategoryName();

if(sourceTriggerGroup.trim().length() == 0){

logger.error("trigger group is empty, please check!");

return null;

}

String cronInfo = taskInfo.getCronInfo();

if(cronInfo.trim().length() == 0){

logger.error("cron timer info is empty, please check!");

return null;

}

return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName,

PrefixType.TRIGGER_PREFIX+sourceTriggerGroup)

.withSchedule(CronScheduleBuilder.cronSchedule(cronInfo))

.build();

}

public static Trigger generateTrigger(String sourceTriggerName, String sourceTriggerGroup, String cronInfo) {

if(sourceTriggerName.trim().length() == 0){

logger.error("trigger name is empty, please check!");

return null;

}

if(sourceTriggerGroup.trim().length() == 0){

logger.error("trigger group is empty, please check!");

return null;

}

if(cronInfo.trim().length() == 0){

logger.error("cron timer info is empty, please check!");

return null;

}

return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName,

PrefixType.TRIGGER_PREFIX+sourceTriggerGroup)

.withSchedule(CronScheduleBuilder.cronSchedule(cronInfo))

.build();

}

}

View Code

3.6 动态检测任务更新的Job

import com.alibaba.fastjson.JSONObject;

import com.google.common.collect.Lists;

import com.google.gson.Gson;

import com.quartz.consts.PrefixType;

import com.quartz.module.TaskInfo;

import org.quartz.*;

import org.quartz.impl.matchers.GroupMatcher;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.List;

@DisallowConcurrentExecution

public class DynamicUpdateJob implements Job{

private static Logger logger = LoggerFactory.getLogger(DynamicUpdateJob.class);

public DynamicUpdateJob(){}

@Override

public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {

JobDetail jobDetail = jobExecutionContext.getJobDetail();

JSONObject json = new JSONObject();

json.put("jobName", jobDetail.getKey().getName());

json.put("jobGroup", jobDetail.getKey().getGroup());

json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName());

json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup());

logger.info("job is running: "+json.toString());

// 获取当前的调度器

Scheduler scheduler = jobExecutionContext.getScheduler();

// 获取配置信息中的任务(注意需要保持)

List confTaskInfos = GenerateTaskInfo.generateTaskInfoFromMysql();

// 获取所有的job信息

List schedulerJobKeys = acquireJobKeysWithinSceduler(scheduler);

// 1. 配置任务不存在,而sheduler相关任务存在,则进行下线处理

for(JobKey schedulerJobKey : schedulerJobKeys ){

boolean hasSameJobKeyInConfTask = false;

for(TaskInfo confTaskInfo : confTaskInfos){

if(generateJobKey(confTaskInfo).equals(schedulerJobKey)){

hasSameJobKeyInConfTask = true;

break;

}

}

if(!hasSameJobKeyInConfTask){

try {

scheduler.deleteJob(schedulerJobKey);

logger.info("delete offline job: "+schedulerJobKey.toString());

} catch (SchedulerException e) {

logger.error("delete offline job error: "+json.toString());

}

}

}

// 2 配置任务与调度器任务比较

for(TaskInfo confTaskInfo : confTaskInfos){

JobKey confJobKey = generateJobKey(confTaskInfo);

boolean hasSameJob = false;

for(JobKey schedulerJobKey : schedulerJobKeys ){

if(confJobKey.equals(schedulerJobKey)){

hasSameJob = true;

break;

}

}

if(hasSameJob){ //具有相同名称的job

logger.info("has same jobKey: "+confJobKey);

JobDetail schedulerJobDetail = null;

try {

schedulerJobDetail = scheduler.getJobDetail(confJobKey);

} catch (SchedulerException e) {

logger.error("get job detail from scheduler error: "+confJobKey);

}

if(schedulerJobDetail == null) continue;

// 1) 是否需要下线

if(!ScheduleJob.isNeedtoRun(confTaskInfo)){

try {

logger.info("has same jobKey and offline the job "+confJobKey);

scheduler.deleteJob(confJobKey);

} catch (SchedulerException e) {

logger.error("delete offline job error: "+confJobKey);

}

}else{

// 2) 是否需要更新任务

TaskInfo schedulerTaskInfo = parseTaskInfoFromJobDataMap(schedulerJobDetail);

logger.info("confTaskInfo: " + confTaskInfo);

logger.info("schedulerTaskInfo: " + schedulerTaskInfo);

if(!confTaskInfo.equals(schedulerTaskInfo)){

try {

logger.info("has same jobKey and update the job "+confJobKey);

scheduler.deleteJob(confJobKey);

SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler);

} catch (SchedulerException e) {

logger.error("update scheduler info error: "+confJobKey);

}

}else{

logger.info("the job info is same "+confJobKey);

}

}

}else{ // 创建新的Job

// 1) 是否满足上线的条件

if(!ScheduleJob.isNeedtoRun(confTaskInfo)){

logger.info("the status is offline, no need to create new job: "+confJobKey);

continue;

}

logger.info("no same jobKey and create job "+confJobKey);

// 2) 上线

SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler);

}

}

}

protected List acquireJobKeysWithinSceduler(Scheduler scheduler){

List jobKeys = Lists.newArrayList();

try {

for(String groupName : scheduler.getJobGroupNames()){

if(groupName.equals(PrefixType.JOB_PREFIX+SchedulerFactory.DYNAMIC_UPDATE_GROUP_NAME)){

continue;

}

for(JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))){

jobKeys.add(jobKey);

}

}

} catch (SchedulerException e) {

}

return jobKeys;

}

protected TaskInfo parseTaskInfoFromJobDataMap(JobDetail jobDetail){

try {

String confInfo = jobDetail.getJobDataMap().getString(SchedulerFactory.CONF_INFO);

return new Gson().fromJson(confInfo, TaskInfo.class);

} catch (Exception e) {

logger.error("parse task info from JobDataMap error!");

return null;

}

}

protected JobKey generateJobKey(TaskInfo taskInfo){

return generateJobKey(taskInfo.getSourceName(), taskInfo.getCategoryName());

}

protected JobKey generateJobKey(String jobName, String jobGroup){

return JobKey.jobKey(PrefixType.JOB_PREFIX+jobName,PrefixType.JOB_PREFIX+jobGroup);

}

}

View Code

3.7 Es数据库存储

import com.alibaba.fastjson.JSONObject;

import com.pipes.out.IDataOut;

import com.quartz.conf.Configuration;

import com.quartz.conf.OcpConfHelper;

import com.quartz.util.IdBuilder;

import com.quartz.util.OutputTypeTransform;

import org.elasticsearch.ElasticsearchException;

import org.elasticsearch.action.ActionListener;

import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;

import org.elasticsearch.action.bulk.BulkRequestBuilder;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.action.index.IndexResponse;

import org.elasticsearch.action.search.SearchResponse;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.transport.InetSocketTransportAddress;

import org.elasticsearch.index.query.QueryBuilder;

import org.elasticsearch.index.query.QueryBuilders;

import org.elasticsearch.index.reindex.DeleteByQueryAction;

import org.elasticsearch.rest.RestStatus;

import org.elasticsearch.transport.client.PreBuiltTransportClient;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.net.InetAddress;

import java.net.UnknownHostException;

import java.util.List;

import java.util.Map;

/**

* Created by songwang4 on 2017/6/7.

*/

public class DataOut2ES implements IDataOut, IDataClose {

static Logger logger = LoggerFactory.getLogger(DataOut2ES.class);

static TransportClient client;

String indexName; // 默认为ocp

String typeName;

String sourceName;

List indexBuildEles;

List idBuilderEles;

Map outputType;

String providerName;

public DataOut2ES(String indexName,String type){

this.indexName = indexName;

this.typeName = type;

init();

}

public DataOut2ES(String indexName,String type, List indexBuildEles){

this(indexName,type);

this.indexBuildEles = indexBuildEles;

}

public DataOut2ES(String indexName,String type, List idBuilderEles, List indexBuildEles){

this(indexName,type, indexBuildEles);

this.idBuilderEles = idBuilderEles;

}

public DataOut2ES(String indexName,String type, String sourceName, List idBuilderEles, List indexBuildEles){

this(indexName,type, idBuilderEles, indexBuildEles);

this.sourceName = sourceName;

}

public DataOut2ES(String indexName,String type, String sourceName, List idBuilderEles, List indexBuildEles, Map outputType,String providerName){

this(indexName,type,sourceName, idBuilderEles, indexBuildEles);

this.outputType = outputType;

this.providerName = providerName;

}

public static void init() {

if(client == null){

Configuration conf = OcpConfHelper.getInstance().getOcpConf();

String esClusterName = conf.getStringValue("ocp_es_cluster_name", "");

String esIp = conf.getStringValue("ocp_es_ip", "");

int esPort = conf.getIntValue("ocp_es_port", "");

Settings settings = Settings.builder()

.put("cluster.name",esClusterName)

.put("client.transport.sniff", true)

.put("client.transport.ping_timeout", "120s")

.put("client.transport.nodes_sampler_interval","30s").build();

try {

client = new PreBuiltTransportClient(settings)

.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esIp),esPort));

} catch (UnknownHostException e) {

e.printStackTrace();

}

}

}

/**

* 批量写入

* @param datas

*/

public void save(List datas) {

// 批量的插入数据

BulkRequestBuilder bulkRequest = client.prepareBulk();

for(JSONObject data : datas){

//按输出字段类型进行转换

// data = OutputTypeTransform.transform(data,outputType);

String id64 = IdBuilder.generateId(this.sourceName, data, this.idBuilderEles);

if(id64.trim().length() == 0) continue;

JSONObject indexJson = new JSONObject();

for(String indexBuildEle : this.indexBuildEles){

if(data.containsKey(indexBuildEle)){

indexJson.put(indexBuildEle, data.get(indexBuildEle));

}

}

if(indexJson.keySet().isEmpty()){

logger.info("no json fields, so no need to save");

return;

}

bulkRequest.add(client.prepareIndex(indexName, typeName, id64).setSource(indexJson.toString()));

}

BulkResponse bulkResponse = bulkRequest.execute().actionGet();

if(bulkResponse.hasFailures()){

logger.error("insert data 2 es error "+indexName);

System.out.println(bulkResponse.buildFailureMessage());

}

}

public void saveWithoutIndexBuilds(List datas) {

// 批量的插入数据

BulkRequestBuilder bulkRequest = client.prepareBulk();

for(JSONObject data : datas){

bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(data.toString()));

}

BulkResponse bulkResponse = bulkRequest.execute().actionGet();

if(bulkResponse.hasFailures()){

logger.error("insert data 2 es error "+indexName);

System.out.println(bulkResponse.buildFailureMessage());

}

}

public void saveWithoutIndexBuilds2(List datas) {

// 批量的插入数据

BulkRequestBuilder bulkRequest = client.prepareBulk();

for(JSONObject data : datas){

String _id = data.getString("_id");

JSONObject source = data.getJSONObject("_source");

bulkRequest.add(client.prepareIndex(indexName, typeName,_id).setSource(source.toString()));

}

BulkResponse bulkResponse = bulkRequest.execute().actionGet();

if(bulkResponse.hasFailures()){

logger.error("insert data 2 es error "+indexName);

System.out.println(bulkResponse.buildFailureMessage());

}

}

/**

* 判断索引是否存在

* @param indexName

* @return

*/

public boolean isExistsIndex(String indexName){

IndicesExistsResponse response = client.admin().indices()

.exists(new IndicesExistsRequest().indices(new String[]{indexName})).actionGet();

return response.isExists();

}

/**

* 创建索引信息

* @param indexName

* @return

*/

public boolean createIndex(String indexName){

try {

CreateIndexResponse indexResponse = this.client

.admin()

.indices()

.prepareCreate(indexName)

.get();

return indexResponse.isAcknowledged();

} catch (ElasticsearchException e) {

e.printStackTrace();

}

return false;

}

@Override

public void save(Object data) {

if(this.indexBuildEles.size() == 0){

logger.error("index fields are empty in es, no index need to save, info: " + data.toString());

return;

}

// 逐条插入数据

JSONObject json = null;

try {

json = (JSONObject)data;

} catch (Exception e) {

logger.error("trans data to json error in es :" + data.toString());

return;

}

if(json == null){

logger.error("trans data to Json error in es, info " + data.toString());

return;

}

// json = OutputTypeTransform.transform(json,outputType);

// 构建索引id

String id64 = IdBuilder.generateId(this.sourceName, json, this.idBuilderEles);

if(id64.trim().length() == 0){

logger.error("generate 64 bit id is null,please check: " + data.toString());

return;

}

JSONObject indexJson = new JSONObject();

for(String indexBuildEle : this.indexBuildEles){

if(json.containsKey(indexBuildEle)){

indexJson.put(indexBuildEle, json.get(indexBuildEle));

}

}

if(indexJson.keySet().isEmpty()){

logger.info("no json fields, so no need to save");

return;

}

logger.info("index info: "+indexJson);

IndexResponse response = client.prepareIndex(this.indexName, this.typeName, id64).setSource(indexJson.toString()).get();

if(response.status() != RestStatus.CREATED && response.status() != RestStatus.OK){

logger.error("index error in es, status is "+response.status().getStatus()+"info: " + data.toString());

return;

}

}

@Override

public void close() {

}

}

View Code

  以上代码均为与Quartz相关的整体流程,虽然各个细节方面的代码,如配置类,数据库初始化类或加载类、以及部分帮助类没有展示,但对于Quartz的核心使用,已略窥一二。如有问题,可留言回复。

4. 集群模式

注意:上述默认使用Quartz集群模式,从主流程加载的quartz.properties中配置的集群模式如下,可进行参考。

#============================================================================

# Configure Main Scheduler Properties

#============================================================================

org.quartz.scheduler.instanceName: OcpScheduler

org.quartz.scheduler.instanceId: OcpInstance

org.quartz.scheduler.skipUpdateCheck: true

#============================================================================

# Configure ThreadPool

#============================================================================

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool

org.quartz.threadPool.threadCount: 50

org.quartz.threadPool.threadPriority: 5

org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

#============================================================================

# Configure JobStore

#============================================================================

org.quartz.jobStore.misfireThreshold: 120000

org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX

org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

org.quartz.jobStore.useProperties: false

org.quartz.jobStore.tablePrefix: QRTZ_

org.quartz.jobStore.dataSource: ocpQzDs

org.quartz.jobStore.isClustered: true

org.quartz.jobStore.clusterCheckinInterval = 60000

#============================================================================

# Configure Datasources

#============================================================================

org.quartz.dataSource.ocpQzDs.driver: com.mysql.jdbc.Driver

org.quartz.dataSource.ocpQzDs.URL:jdbc:mysql://192.168.1.1:3306/test?useUnicode=true&characterEncoding=utf-8

org.quartz.dataSource.ocpQzDs.user: test

org.quartz.dataSource.ocpQzDs.password: test

org.quartz.dataSource.ocpQzDs.maxConnection: 30

#============================================================================

# Configure Plugins

#============================================================================

org.quartz.plugin.shutdownHook.class: org.quartz.plugins.management.ShutdownHookPlugin

org.quartz.plugin.shutdownHook.cleanShutdown: true

#org.quartz.plugin.triggHistory.class: org.quartz.plugins.history.LoggingJobHistoryPlugin

View Code

查看原文