目录
背景 第一部分 交互方式 第二部分 Py4j 参考文献及资料
背景
在日常大数据处理中,技术栈选择使用上经常会使用python语言和Hadoop大数据平台。两者最常见的结合使用方式便是:使用Python编写PySpark程序,提交至Hadoop Yarn上运行。
在这个过程中,通常需要和Hadoop的HDFS文件系统进行交互。例如业务上需要将Python编写的机器学习模型训练的模型结果(模型对象和模型参数,通常是序列化对象文件)持久化存储在HDFS文件系统中。模型对外服务的时候,需要从HDFS中读取模型,并对外服务。
本文介绍PySpark程序和HDFS交互的三种方式(第三方包、subprocess、py4j),重点介绍(py4j)的方式。
第一部分 交互方式
1.1 第三方包
市面上存在的该功能的第三方Python包主要有下面几种。
pyhdfs:https://pyhdfs.readthedocs.io/en/latest/pyhdfs.html#pyhdfs.HdfsClienthdfs:https://hdfscli.readthedocs.io/en/latest/libhdfs:https://github.com/vbarter/pyhdfs (好像项目不存在了)snakebite:https://github.com/spotify/snakebite
1.1.1 hdfs
from hdfs.client import Client
client = Client("http://192.168.31.3:50070", root="/", timeout=100)
print(client.list("/"))
# ['benchmarks', 'hbase', 'tmp', 'user', 'var']
# 返回一个list记录主目录
上面是列取hdfs指定目录的文件系统。下面是上传本地文件到hdfs文件系统:
client.upload("/tmp", "/root/jupyter/nohup.out")
# '/tmp/nohup.out'
# 返回路径信息
下载文件:
client.download("/tmp/nohup.out", "/tmp")
# 返回路径'/tmp/nohup.out'
1.1.2 snakebite
snakebite采用 protobuf 消息, 并实现 Hadoop RPC 协议来与 HDFS NameNode节点通信。
from snakebite.client import Client
client = Client(hdfs_hostname, hdfs_port)
client.delete('/some-path', recurse=True)
1.1.3 加密集群
实际生产集群为了安全,通常会启用Kerboros组件,是的集群成为安全集群。这时候访问客户端就需要补充相关认证信息。例如下面方式:
from hdfs.ext.kerberos import KerberosClient
from krbcontext import krbcontext
keytab_file = '/etc/coolpython.keytab'
principal = 'hadoop/admin@coolpython.net'
with krbcontext(using_keytab=True, keytab_file=keytab_file, principal=principal, ccache_file='/tmp/cache_keytab_zds'):
client = KerberosClient(url='http://10.110.50.1:50070')
hdfs_save_path = '/user/hadoop/backup/2020-06-10'
# 新建目录
client.makedirs(hdfs_save_path)
1.1.4 总结
使用第三方Python包和HDFS文件系统交互,对于非安全集群(未部署Kerboros)是较为方便的。对于安全集群,程序运行的操作系统需要有集群用户的keytab文件,以及集群principal名。如果没有keytab文件,需要使用kinit命令,通过用户名和密钥,直接和集群交互,临时获得缓存票据,但是零时票据有时效性,通常24小时。
如果python程序通过Pyspark进行了封装,就可以按照Spark程序进行提交。例如对于非安全集群(未部署Kerboros),PySpark程序提交Yarn的时候可以使用client和Cluster方式都可以(IP端口火墙打通)。对于安全集群,
1.2 subprocess
subprocess包通常用来通过python语言执行Linux命令。所以可以通过这个包来执行HDFS文件命令。例如下面的案例,实现参看HDFS文件系统目录。
import subprocess
cmd = 'hdfs dfs -ls /user/path'.split() # cmd must be an array of arguments
files = subprocess.check_output(cmd).strip().split('\n')
for path in files:
print (path)
使用该包需要程序所在的节点需要有部署Hadoop客户端(这样才有hdfs命令环境)。
对于PySpark程序,对于Client模式,需要在提交任务的节点具备Hadoop客户端。对于Cluster模型,需要集群上所有Yarn节点部署Hadoop客户端。
1.3 py4j
Py4J 是一个用Python 和Java 编写的库。 通过Py4J,Python程序能够动态访问Java虚拟机中的Java对象,Java程序也能够回调Python对象。
项目地址:https://github.com/py4j/py4j
Python是如何和Java的JVM虚拟机进行通信的呢?就是最简单的RPC方式。JVM作为RPC的Server端,Python程序作为RPC的客户端。JVM会开始一个Socket端口提供服务,Python程序只需要调用Py4J提供的Client接口。本质是Py4J为Java进程和Python进程提供了Socket通信。
Py4J在python语法上的进行了精心的包装,使得在python里的这些远程java对象使用起来与本地的python对象十分接近。
1.3.1 案例
# vi AdditionApplication.java
其中下面的
# ll /usr/anaconda3/share/py4j/py4j0.10.8.1.jar
编译后运行:
# 如果没有javac命令需要安装,参考:# apt-get install openjdk-8-jdk
# javac -cp /usr/anaconda3/share/py4j/py4j0.10.8.1.jar AdditionApplication.java
# java -classpath . -cp .:/usr/anaconda3/share/py4j/py4j0.10.8.1.jar AdditionApplication
# jps # 查看java 程序
19928 AdditionApplication
20105 Jps
lsof -p
java 19928 root 17u IPv6 403699 0t0 TCP localhost:25333 (LISTEN)
上面java程序启动了进程,进程会监听socket端口(默认是:25333)
from py4j.java_gateway import JavaGateway
if __name__ == '__main__':
gateway = JavaGateway()
random = gateway.jvm.java.util.Random()
number1 = random.nextInt(10)
number2 = random.nextInt(10)
print(number1, number2)
addition_app = gateway.entry_point
# gateway.help(gateway.jvm.AdditionApplication)
value = addition_app.addition(number1, number2)
print(value)
# 输出
# 7 2
# 9
Gateway Server创建的任意对象都会携带由服务端生成的唯一的对象id,服务端会将生成的所有对象装在一个Map结构里。当Python客户端需要操纵远程对象时,会将对象id和操纵指令以及参数一起传递到服务端,服务端根据对象id找到对应的对象,然后使用反射方法执行指令。
事实上,PySpark就是基于Py4J实现的。
第二部分 py4j
下面是对HDFS操作的工具类,简单封装。
# -*-coding: utf-8 -*-
from pyspark.sql import SparkSession
import math
import logging
logger = logging.getLogger("HdfsClient")
# @Time :
# @Author :
# @Email :
# @File : hdfsUtils.py
# @IDE : PyCharm
# Description: operator hdfs file
class HdfsClient:
def __init__(self, spark: SparkSession):
self.spark = spark
self.fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration())
self.path = self.spark._jvm.org.apache.hadoop.fs.Path
self.FileUtil = spark._jvm.org.apache.hadoop.fs.FileUtil
def hdfs_list(self, path, subtract_one=True):
list_status = self.fs.listStatus(self.path(path))
# file.getPath().getName(), file.getBlockSize(), file.getLen()
files_size = [file.getLen() for file in list_status]
totol_size_in_mb = sum(files_size) / 1024.0 / 1024.0
# dont count _SUCCESS file
total_num_files = len(files_size) - 1 if subtract_one else len(files_size)
return totol_size_in_mb, total_num_files
def hdfs_rename(self, old_path, new_path):
self.fs.rename(self.path(old_path), self.path(new_path))
return True
def hdfs_delete(self, path):
self.fs.delete(self.path(path), True)
return True
def hdfs_exists(self, hdfs_path):
# alternativ check only if path exists (returns true, false)
return self.FileUtil.exists(self.path(hdfs_path))
def hdfs_mkdir(self, create_path):
# create folder on hdfs
return self.FileUtil.mkdirs(self.path(create_path))
def smart_overwrite(self, path, fmt='parquet', compression='gzip', block_size_in_MB=128, min_max_scale=8.0):
# 只支持parquet
totol_size_in_mb, total_num_files = self.hdfs_list(path)
avg_file_size_in_mb = totol_size_in_mb / total_num_files
min_file_size, max_file_size = min_max_scale / min_max_scale, block_size_in_MB * min_max_scale
n_files = int(math.ceil(totol_size_in_mb / (block_size_in_MB * 0.9)))
bak_path = path.rstrip('/') + '.bak'
if min_file_size <= avg_file_size_in_mb <= max_file_size:
logger.info("INFO: file size is normal. Don't overwrite")
return False
elif avg_file_size_in_mb < min_file_size:
if total_num_files <= 10:
logger.info("INFO: file size is too small, but number of files <= 10. Don't overwrite")
return False
else:
logger.warning("WARN: file size is too small, will read data, coalesce and overwrite")
self.spark.read.load(path).coalesce(max(10, n_files)).write.parquet(
bak_path, compression=compression, mode='overwrite'
)
self.hdfs_delete(path)
self.hdfs_rename(bak_path, path)
return True
else:
if total_num_files >= 10000:
logger.info("INFO: file size is too large, but number of files >= 10000. Don't overwrite")
return False
else:
logger.warning("WARN: file size is too large, will read data, repartition and overwrite")
self.spark.read.load(path).repartition(min(10000, n_files)).write.parquet(
bak_path, compression=compression, mode='overwrite'
)
self.hdfs_delete(path)
self.hdfs_rename(bak_path, path)
return True
def put_localfile_to_hdfs(self, local_file, hdfs_file, delSrc=True, overwrite=True):
# put local file to hdfs filesystem
try:
self.fs.copyFromLocalFile(delSrc, overwrite, self.path(local_file), self.path(hdfs_file))
logger.info("copyFromLocal {} to {} success".format(local_file, hdfs_file))
except Exception as e:
logger.error(e)
logger.info("copyFromLocal {} to {} error".format(local_file, hdfs_file))
def get_hdfsfile_to_local(self, hdfs_file, local_file):
try:
self.fs.copyToLocalFile(self.path(hdfs_file), self.path(local_file))
logger.info("copyToLocalFile {} to {} success".format(hdfs_file, local_file))
except Exception as e:
logger.error(e)
logger.info("copyToLocalFile {} to {} error".format(hdfs_file, local_file))
if __name__ == '__main__':
spark = SparkSession.builder.appName("HdfsClient Test").getOrCreate()
import pandas as pd
# data = pd.DataFrame(columns=["A", "B"], index=[1, 2])
# data.to_csv("data.csv")
hdfs_client = HdfsClient(spark)
# hdfs_client.get_hdfsfile_to_local("/user/admin/python/data.csv", "data.csv")
# print(pd.read_csv("data.csv"))
hdfs_client.hdfs_list("/user/admin/python")
print("succ")
spark.stop()
参考文献及资料
1、py4j,https://www.py4j.org/
更多关注公众号:
精彩文章
发表评论