一、准备工作

1.安装虚拟机

VMware 安装 CentOS 7, 选择mini版,英文,网络NAT。

http://mirrors.aliyun.com/centos/7.9.2009/isos/x86_64/CentOS-7-x86_64-Minimal-2009.iso

vim /etc/sysconfig/network-scripts/ifcfg-ens33

将最后一行修改为

ONBOOT="yes"

重启网络服务, 确保自己能够ping通baidu,如果依旧不行可以直接reboot重启虚拟机

systemctl restart network

查看ip地址

yum install net-tools

ifconfig 

2. 安装java 环境

yum install java-1.8.*

3.安装scala

yum install https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.rpm

4.安装screen

yum install screen

#新建 screen -S xxx

#退出 ctrl + A + D

#重连 screen -r

#列表 screen -ls

5. 安装wget,vim

yum install wget

yum install vim

6.关闭防火墙

systemctl stop firewalld

二、安装Kafka

1. 下载

cd

wget https://dlcdn.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz --no-check-certificate

tar -xzf kafka_2.13-3.7.0.tgz

cd kafka_2.13-3.7.0

2. 启动zookeeper

#启动zookeeper

screen -S zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

#ctrl+A+D退出screen

3. 启动kafka

screen -S kafka

bin/kafka-server-start.sh config/server.properties

#ctrl+A+D退出screen

三、安装flink

cd

wget https://archive.apache.org/dist/flink/flink-1.13.0/flink-1.13.0-bin-scala_2.11.tgz --no-check-certificate

tar -xf flink-1.13.0-bin-scala_2.11.tgz && cd flink-1.13.0

#启动

./bin/start-cluster.sh

四、安装spark

#安装spark

#参考教程 https://spark.apache.org/docs/3.2.0/

cd

wget https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz --no-check-certificate

tar -xf spark-3.2.0-bin-hadoop3.2.tgz

cd spark-3.2.0-bin-hadoop3.2

./bin/pyspark

五、配置Python环境

1.安装python

yum install python36 python3-devel

yum install gcc gcc-c++

pip3 install pip --upgrade

2. 安装pyalink

pip3 install pyalink --user -i https://mirrors.aliyun.com/pypi/simple --ignore-installed PyYAML

ln -si /root/.local/bin/* /usr/bin/

3. 安装pyspark

pip3 install pyspark -i https://mirrors.aliyun.com/pypi/simple/

4. 安装kafka-python

pip3 install kafka-python

5. 配置jupyter

jupyter notebook --generate-config

jupyter notebook password

修改配置文件

vim /root/.jupyter/jupyter_notebook_config.py

#修改对应的两行

c.NotebookApp.ip = '*'

c.NotebookApp.open_browser = False

启动

jupyter notebook --allow-root

六、测试

本机打开浏览器访问 服务器ip:8888, 例如192.168.128.140:8888

1. 测试kafka

生产者

from kafka import KafkaProducer

import json

producer = KafkaProducer(

bootstrap_servers=['localhost:9092'],

key_serializer=lambda k: json.dumps(k).encode(),

value_serializer=lambda v: json.dumps(v).encode()

)

msg = "Hello World"

producer.send('result', msg)

消费者

from kafka import KafkaConsumer

consumer = KafkaConsumer('result', bootstrap_servers=['localhost:9092'])

for msg in consumer:

print(msg.value)

2. 测试flink

from pyalink.alink import *

import pandas as pd

useLocalEnv(1)

df = pd.DataFrame(

[

[2009, 0.5],

[2010, 9.36],

[2011, 52.0],

[2012, 191.0],

[2013, 350.0],

[2014, 571.0],

[2015, 912.0],

[2016, 1207.0],

[2017, 1682.0]

]

)

train_set = BatchOperator.fromDataframe(df, schemaStr='sumMoney double, fraud double')

trainer = LinearRegTrainBatchOp()\

.setFeatureCols(["sumMoney"])\

.setLabelCol("fraud")

train_set.link(trainer);

train_set.print()

3.测试spark

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('ml').getOrCreate()

_schema1 = 'x1 int, x2 int, x3 int, y int '

_schema2 = 'x1 int, x2 int , x3 int '

trainDf = spark.createDataFrame([

[900,50,90,1],

[800,50,90,1],

[600,50,120,1],

[500,40,100,0],

[750,60,150,0]

],schema=_schema1)

testDf = spark.createDataFrame([

[650,60,90],

[600,40,90],

[750,50,60]

],schema=_schema2)

trainDf.show()

testDf.show()

推荐文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。