【1】引入第三方Bahir提供的Flink-redis相关依赖包

org.apache.bahir

flink-connector-redis_2.11

1.0

【2】Flink连接Redis并输出Sink处理结果

package com.zzx.flink

import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.connectors.redis.RedisSink

import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig

import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSinkTest {

def main(args: Array[String]): Unit = {

// 创建一个流处理执行环境

val env = StreamExecutionEnvironment.getExecutionEnvironment

//从文件中读取数据并转换为 类

val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")

//转换 SensorReading为用户自定义的类,是从文件转换而来的

val dataStream: DataStream[SensorReading] = inputStreamFromFile

.map( data => {

var dataArray = data.split(",")

SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)

})

//定义一个 redis 的配置类 继承了FlinkJedisConfigBase 正是 SensorReading需要传入的参数,底层将有些数据保存成了状态数据。

val conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.52.131").setPort(6379).setPassword("zzx").build()

//定义 RedisMapper 数据保存的类型

val myMapper = new RedisMapper[SensorReading] {

//定义保存数据到 redis的命令,hset table key value

override def getCommandDescription: RedisCommandDescription = {

// hset tablesname

new RedisCommandDescription(RedisCommand.HSET , "sensor_temp")

}

//设置key

override def getKeyFromData(data: SensorReading): String = data.id

//设置value

override def getValueFromData(data: SensorReading): String = data.temperature.toString

}

dataStream.addSink(new RedisSink[SensorReading](conf,myMapper))

env.execute("Redis Sink test")

}

}

查看源码可知RedisSink是继承自RichSinkFunction

public class RedisSink extends RichSinkFunction {

【3】查看Redis输出信息

好文推荐

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。