SparkStreaming(java)读取Kafka(kerberos)写入Hbase(kerberos)

概述开始

概述

你好!我们这次的目标是使用SparkStreaming(java)读取kafka中的数据,写入Hbase,关于向Kafka内写数据的知识请参考:

开始

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HConstants;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapreduce.Job;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.spark.streaming.kafka010.ConsumerStrategies;

import org.apache.spark.streaming.kafka010.KafkaUtils;

import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

import java.io.IOException;

import java.util.*;

public class Temp {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf();

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

sparkConf.registerKryoClasses((Class[]) Collections.singletonList(ConsumerRecord.class).toArray());

SparkSession spark = SparkSession.builder().

appName("DevelopSpark")

.master("local[*]")

.config(sparkConf)

.enableHiveSupport()

.getOrCreate();

JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());

JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext, Durations.seconds(5));

String bootstrapServer = "192.168.165.63:9092,192.168.165.61:9092,192.168.165.62:9092";

//zookeeper hostName

String zkQuorum4 = "zk-56,zk-66,zk-67";

// 此处需要topic名称

List topics = Arrays.asList("[topicName1]","[topicName2]");

String keytabPath = "/Users/keytabs/user.keytab";

// 准备kafka所需参数

Map kafkaParams = new HashMap();

kafkaParams.put("bootstrap.servers",bootstrapServer);

kafkaParams.put("key.deserializer", StringDeserializer.class);

kafkaParams.put("value.deserializer", StringDeserializer.class);

kafkaParams.put("group.id","1");

kafkaParams.put("auto.offset.reset","latest");

kafkaParams.put("security.protocol","SASL_PLAINTEXT");

kafkaParams.put("sasl.kerberos.service.name","henghe");

kafkaParams.put("sasl.mechanism","GSSAPI");

kafkaParams.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"" + keytabPath + "\" principal=\"user@USER.COM\";");

kafkaParams.put("enable.auto.commit",false);

//读kafka

JavaInputDStream> stream = KafkaUtils.createDirectStream(javaStreamingContext,

LocationStrategies.PreferConsistent(),

ConsumerStrategies.Subscribe(topics, kafkaParams));

//准备hbase所需参数

Configuration config = HBaseConfiguration.create();

config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000);

config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000);

config.set("hbase.zookeeper.quorum", zkQuorum4);

config.set("zookeeper.znode.parent", "/hbase1");

config.set("hadoop.security.authentication", "kerberos");

config.set("hbase.security.authentication", "kerberos");

config.set("hbase.master.kerberos.principal", "henghe/_HOST@HENGHE.COM");

config.set("hbase.thrift.kerberos.principal", "henghe/_HOST@HENGHE.COM");

config.set("hbase.regionserver.kerberos.principal", "henghe/_HOST@HENGHE.COM");

config.set("hbase.client.keytab.principal", "henghe@HENGHE.COM");

config.set("hbase.client.userprovider.class", "org.apache.hadoop.hbase.security.UserProvider");

config.set("hbase.client.keytab.file", keytabPath);

// 此处需要填写表名

config.set(TableOutputFormat.OUTPUT_TABLE,"[tableName]");

JobConf jobConf = new JobConf(config);

Job job = null;

try {

job = Job.getInstance(jobConf);

} catch (IOException e) {

e.printStackTrace();

}

job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);

job.setOutputKeyClass(ImmutableBytesWritable.class);

job.setOutputValueClass(Put.class);

Job finalJob = job;

// 写hbase

stream.foreachRDD(rdd -> {

if (rdd.count()>0){

JavaRDD putJavaRDD = rdd.map(cr -> {

String mes = cr.value();

// 此处需要rowkey

Put put = new Put(Bytes.toBytes(mes));

// 此处分别需要 列族名,列名,和数据

put.addColumn(Bytes.toBytes("[familyName]"), Bytes.toBytes("[colName]"), Bytes.toBytes(mes));

return put;

});

JavaPairRDD javaPairRDD = putJavaRDD

.mapToPair((PairFunction) p -> new Tuple2<>(new ImmutableBytesWritable(), p));

javaPairRDD.saveAsNewAPIHadoopDataset(finalJob.getConfiguration());

System.out.println("ok......");

}

});

}

}

好文推荐

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。