目录

背景 第一部分 交互方式 第二部分 Py4j 参考文献及资料

背景

在日常大数据处理中,技术栈选择使用上经常会使用python语言和Hadoop大数据平台。两者最常见的结合使用方式便是:使用Python编写PySpark程序,提交至Hadoop Yarn上运行。

在这个过程中,通常需要和Hadoop的HDFS文件系统进行交互。例如业务上需要将Python编写的机器学习模型训练的模型结果(模型对象和模型参数,通常是序列化对象文件)持久化存储在HDFS文件系统中。模型对外服务的时候,需要从HDFS中读取模型,并对外服务。

本文介绍PySpark程序和HDFS交互的三种方式(第三方包、subprocess、py4j),重点介绍(py4j)的方式。

第一部分 交互方式

1.1 第三方包

市面上存在的该功能的第三方Python包主要有下面几种。

pyhdfs:https://pyhdfs.readthedocs.io/en/latest/pyhdfs.html#pyhdfs.HdfsClienthdfs:https://hdfscli.readthedocs.io/en/latest/libhdfs:https://github.com/vbarter/pyhdfs (好像项目不存在了)snakebite:https://github.com/spotify/snakebite

1.1.1 hdfs

from hdfs.client import Client

client = Client("http://192.168.31.3:50070", root="/", timeout=100)

print(client.list("/"))

# ['benchmarks', 'hbase', 'tmp', 'user', 'var']

# 返回一个list记录主目录

上面是列取hdfs指定目录的文件系统。下面是上传本地文件到hdfs文件系统:

client.upload("/tmp", "/root/jupyter/nohup.out")

# '/tmp/nohup.out'

# 返回路径信息

下载文件:

client.download("/tmp/nohup.out", "/tmp")

# 返回路径'/tmp/nohup.out'

1.1.2 snakebite

snakebite采用 protobuf 消息, 并实现 Hadoop RPC 协议来与 HDFS NameNode节点通信。

from snakebite.client import Client

client = Client(hdfs_hostname, hdfs_port)

client.delete('/some-path', recurse=True)

1.1.3 加密集群

实际生产集群为了安全,通常会启用Kerboros组件,是的集群成为安全集群。这时候访问客户端就需要补充相关认证信息。例如下面方式:

from hdfs.ext.kerberos import KerberosClient

from krbcontext import krbcontext

keytab_file = '/etc/coolpython.keytab'

principal = 'hadoop/admin@coolpython.net'

with krbcontext(using_keytab=True, keytab_file=keytab_file, principal=principal, ccache_file='/tmp/cache_keytab_zds'):

client = KerberosClient(url='http://10.110.50.1:50070')

hdfs_save_path = '/user/hadoop/backup/2020-06-10'

# 新建目录

client.makedirs(hdfs_save_path)

1.1.4 总结

使用第三方Python包和HDFS文件系统交互,对于非安全集群(未部署Kerboros)是较为方便的。对于安全集群,程序运行的操作系统需要有集群用户的keytab文件,以及集群principal名。如果没有keytab文件,需要使用kinit命令,通过用户名和密钥,直接和集群交互,临时获得缓存票据,但是零时票据有时效性,通常24小时。

如果python程序通过Pyspark进行了封装,就可以按照Spark程序进行提交。例如对于非安全集群(未部署Kerboros),PySpark程序提交Yarn的时候可以使用client和Cluster方式都可以(IP端口火墙打通)。对于安全集群,

1.2 subprocess

subprocess包通常用来通过python语言执行Linux命令。所以可以通过这个包来执行HDFS文件命令。例如下面的案例,实现参看HDFS文件系统目录。

import subprocess

cmd = 'hdfs dfs -ls /user/path'.split() # cmd must be an array of arguments

files = subprocess.check_output(cmd).strip().split('\n')

for path in files:

print (path)

使用该包需要程序所在的节点需要有部署Hadoop客户端(这样才有hdfs命令环境)。

对于PySpark程序,对于Client模式,需要在提交任务的节点具备Hadoop客户端。对于Cluster模型,需要集群上所有Yarn节点部署Hadoop客户端。

1.3 py4j

Py4J 是一个用Python 和Java 编写的库。 通过Py4J,Python程序能够动态访问Java虚拟机中的Java对象,Java程序也能够回调Python对象。

项目地址:https://github.com/py4j/py4j

Python是如何和Java的JVM虚拟机进行通信的呢?就是最简单的RPC方式。JVM作为RPC的Server端,Python程序作为RPC的客户端。JVM会开始一个Socket端口提供服务,Python程序只需要调用Py4J提供的Client接口。本质是Py4J为Java进程和Python进程提供了Socket通信。

Py4J在python语法上的进行了精心的包装,使得在python里的这些远程java对象使用起来与本地的python对象十分接近。

1.3.1 案例

# vi AdditionApplication.java

其中下面的

# ll /usr/anaconda3/share/py4j/py4j0.10.8.1.jar

编译后运行:

# 如果没有javac命令需要安装,参考:# apt-get install openjdk-8-jdk

# javac -cp /usr/anaconda3/share/py4j/py4j0.10.8.1.jar AdditionApplication.java

# java -classpath . -cp .:/usr/anaconda3/share/py4j/py4j0.10.8.1.jar AdditionApplication

# jps # 查看java 程序

19928 AdditionApplication

20105 Jps

lsof -p # 会看到 socket 25333 (LISTEN)

java 19928 root 17u IPv6 403699 0t0 TCP localhost:25333 (LISTEN)

上面java程序启动了进程,进程会监听socket端口(默认是:25333)

from py4j.java_gateway import JavaGateway

if __name__ == '__main__':

gateway = JavaGateway()

random = gateway.jvm.java.util.Random()

number1 = random.nextInt(10)

number2 = random.nextInt(10)

print(number1, number2)

addition_app = gateway.entry_point

# gateway.help(gateway.jvm.AdditionApplication)

value = addition_app.addition(number1, number2)

print(value)

# 输出

# 7 2

# 9

Gateway Server创建的任意对象都会携带由服务端生成的唯一的对象id,服务端会将生成的所有对象装在一个Map结构里。当Python客户端需要操纵远程对象时,会将对象id和操纵指令以及参数一起传递到服务端,服务端根据对象id找到对应的对象,然后使用反射方法执行指令。

事实上,PySpark就是基于Py4J实现的。

第二部分 py4j

下面是对HDFS操作的工具类,简单封装。

# -*-coding: utf-8 -*-

from pyspark.sql import SparkSession

import math

import logging

logger = logging.getLogger("HdfsClient")

# @Time :

# @Author :

# @Email :

# @File : hdfsUtils.py

# @IDE : PyCharm

# Description: operator hdfs file

class HdfsClient:

def __init__(self, spark: SparkSession):

self.spark = spark

self.fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration())

self.path = self.spark._jvm.org.apache.hadoop.fs.Path

self.FileUtil = spark._jvm.org.apache.hadoop.fs.FileUtil

def hdfs_list(self, path, subtract_one=True):

list_status = self.fs.listStatus(self.path(path))

# file.getPath().getName(), file.getBlockSize(), file.getLen()

files_size = [file.getLen() for file in list_status]

totol_size_in_mb = sum(files_size) / 1024.0 / 1024.0

# dont count _SUCCESS file

total_num_files = len(files_size) - 1 if subtract_one else len(files_size)

return totol_size_in_mb, total_num_files

def hdfs_rename(self, old_path, new_path):

self.fs.rename(self.path(old_path), self.path(new_path))

return True

def hdfs_delete(self, path):

self.fs.delete(self.path(path), True)

return True

def hdfs_exists(self, hdfs_path):

# alternativ check only if path exists (returns true, false)

return self.FileUtil.exists(self.path(hdfs_path))

def hdfs_mkdir(self, create_path):

# create folder on hdfs

return self.FileUtil.mkdirs(self.path(create_path))

def smart_overwrite(self, path, fmt='parquet', compression='gzip', block_size_in_MB=128, min_max_scale=8.0):

# 只支持parquet

totol_size_in_mb, total_num_files = self.hdfs_list(path)

avg_file_size_in_mb = totol_size_in_mb / total_num_files

min_file_size, max_file_size = min_max_scale / min_max_scale, block_size_in_MB * min_max_scale

n_files = int(math.ceil(totol_size_in_mb / (block_size_in_MB * 0.9)))

bak_path = path.rstrip('/') + '.bak'

if min_file_size <= avg_file_size_in_mb <= max_file_size:

logger.info("INFO: file size is normal. Don't overwrite")

return False

elif avg_file_size_in_mb < min_file_size:

if total_num_files <= 10:

logger.info("INFO: file size is too small, but number of files <= 10. Don't overwrite")

return False

else:

logger.warning("WARN: file size is too small, will read data, coalesce and overwrite")

self.spark.read.load(path).coalesce(max(10, n_files)).write.parquet(

bak_path, compression=compression, mode='overwrite'

)

self.hdfs_delete(path)

self.hdfs_rename(bak_path, path)

return True

else:

if total_num_files >= 10000:

logger.info("INFO: file size is too large, but number of files >= 10000. Don't overwrite")

return False

else:

logger.warning("WARN: file size is too large, will read data, repartition and overwrite")

self.spark.read.load(path).repartition(min(10000, n_files)).write.parquet(

bak_path, compression=compression, mode='overwrite'

)

self.hdfs_delete(path)

self.hdfs_rename(bak_path, path)

return True

def put_localfile_to_hdfs(self, local_file, hdfs_file, delSrc=True, overwrite=True):

# put local file to hdfs filesystem

try:

self.fs.copyFromLocalFile(delSrc, overwrite, self.path(local_file), self.path(hdfs_file))

logger.info("copyFromLocal {} to {} success".format(local_file, hdfs_file))

except Exception as e:

logger.error(e)

logger.info("copyFromLocal {} to {} error".format(local_file, hdfs_file))

def get_hdfsfile_to_local(self, hdfs_file, local_file):

try:

self.fs.copyToLocalFile(self.path(hdfs_file), self.path(local_file))

logger.info("copyToLocalFile {} to {} success".format(hdfs_file, local_file))

except Exception as e:

logger.error(e)

logger.info("copyToLocalFile {} to {} error".format(hdfs_file, local_file))

if __name__ == '__main__':

spark = SparkSession.builder.appName("HdfsClient Test").getOrCreate()

import pandas as pd

# data = pd.DataFrame(columns=["A", "B"], index=[1, 2])

# data.to_csv("data.csv")

hdfs_client = HdfsClient(spark)

# hdfs_client.get_hdfsfile_to_local("/user/admin/python/data.csv", "data.csv")

# print(pd.read_csv("data.csv"))

hdfs_client.hdfs_list("/user/admin/python")

print("succ")

spark.stop()

参考文献及资料

1、py4j,https://www.py4j.org/

更多关注公众号:

精彩文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。