分布式键值存储系统

文章目录

分布式键值存储系统一、项目简介二、环境配置三、实验内容3.1 实现功能3.2 原理分析3.3 代码分析client.pyproxy_server.pynode_server.py

3.4 功能测试

四、实验总结4.1 遇到的困难和解决方法4.2 实验心得

一、项目简介

温馨提示:这个实现虽然很简单,但是也不要抄袭哦~

题⽬ 设计并实现⼀个分布式键值(key-value)存储系统,可以是基于磁盘的存储系统,也可以是基于内存的存储系统,可以是主从结构的集中式分布式系统,也可以是 P2P 式的⾮集中式分布式系统。能够完成基本的读、写、删除等功能,⽀持缓存、多⽤户和数据一致性保证。 要求

​必须是分布式的键值存储系统,⾄少在两个节点或者两个进程中测试;可以是集中式的也可以是⾮集中式;能够完成基本的操作如:PUT、GET、DEL 等;⽀持多⽤户同时操作;⾄少实现⼀种⾯向客户的⼀致性如单调写;需要完整的功能测试用例;涉及到节点通信时须采⽤ RPC 机制; 加分项:

具备性能优化措施如 cache 等;具备失效容错⽅法如:Paxos、Raft 等;具备安全防护功能;其他⾼级功能;

二、环境配置

编程语言:Python 3.10

主要使用库:xmlrpc

操作系统:Windows10

三、实验内容

3.1 实现功能

基本要求

必须是分布式的键值存储系统,⾄少在两个节点或者两个进程中测试; 可以是集中式的也可以是非集中式; 能够完成基本的操作如:PUT、GET、DEL 等; ⽀持多⽤户同时操作; ⾄少实现⼀种⾯向客户的⼀致性如单调写; 需要完整的功能测试用例; 涉及到节点通信时须采⽤ RPC 机制; 加分项

具备性能优化措施如 cache 等; 具备失效容错⽅法如:Paxos、Raft 等; 具备安全防护功能; 其他⾼级功能:用户友好界面,数据同步,可以并行读写。

3.2 原理分析

为了简化我的分布式键值存储系统实现,我做了两点设置:

代理服务分别为每个客户端都分配一个节点服务器,即一个节点服务器只一个客户端。数据保存在内存中,在节点服务器中,用全局变量模拟表示数据库。

我的分布式键值存储系统主要由四个部分组成:

客户端 client.py​代理服务器 proxy_server.py​节点服务器 node_server.py​数据库 ​database​

它们的关系和主要实现功能如下图所示:

具体来说,请求流程如下:

客户端登录系统,连接到代理服务器。代理服务器分配客户端ID和对应的节点服务器。客户端发送KV操作请求到代理服务器,代理服务器调用对应节点服务器的KV操作。节点服务器对数据库执行KV操作,将结果返回给代理服务器。代理服务器将结果返回给对应的客户端,客户端显示KV操作结果。

实际上,这里的代理服务器相当于中间件,使得分布式的节点服务器对于客户端而言是透明的,客户端始终只会感受只与一个服务器进行交互。

3.3 代码分析

我的分布式键值存储系统实现的操作有:

PUT key value —— 添加 (key, value)GET key —— 获取指定 key 的值DELETE key —— 删除指定 key 的值LIST —— 显示所有 (key, value)LOG —— 获取服务器端日志EXIT —— 退出客户端HELP—— 获取命令帮助菜单

系统可以运行多个服务器和多个客户端,多个客户端将连接到同一个代理服务器,代理服务器将不同客户端的请求分别发送给不同的节点服务器处理,然后将各自的响应返回。

客户端与服务器之间的通信使用rpc进行,代码中使用python库xmlrpc​实现。XML-RPC是一个RPC的分布式计算协议,通过XML将调用函数封装,并使用HTTP协议作为发送机制。

客户端有登录验证功能。用户名和密码存储在代理服务器端,对客户端进行隐藏。客户端需要输入正确的用户名和密码才能连接到服务器。

代理服务器可以限定客户端最大连接数,若超过这个数量将拒绝连接。

节点服务器使用了锁机制,保证了对数据库操作的串行化和一致性,可以支持并行操作。又因为数据库和日志是全局的,修改后会同步到所有的节点服务器,保证了数据的同步。因此,我的键值存储系统满足顺序一致性。

每个节点服务器实例都会拥有自己独立的缓存空间。Server​类中引入了一个cache​字典,用于存储最近访问的键值对。获取键值对前,节点服务器首先检查缓存中是否存在请求的键,存在则直接返回;只有不存在的情况下,才从数据库中检索该值,并相应地更新缓存。

具体代码实现如下,代码中有详细注释,更详细功能分析包含在代码注释中。

client.py

import xmlrpc.client as xmlrpclib

class Client(object):

def __init__(self):

self.id = None # 客户端ID

self.proxy = None # XML-RPC代理

self.port = None # 连接端口

def connect(self, username, password):

self.port = '21000'

self.proxy = xmlrpclib.ServerProxy('http://localhost:' + self.port)

# 登录

# 在此处进行验证 调用代理服务器的验证功能

if self.proxy.authenticate(username, password):

self.id = self.proxy.get_id() # 从代理服务器获取客户端ID

return self.id

# 验证成功的处理逻辑

else:

print('登录失败,请检查用户名和密码。')

return None

def handle_user_command(self):

try:

while True:

command = input(f"客户端 {self.id} 输入命令 (PUT/GET/DEL/LIST/LOG/EXIT)>> ").upper()

if command == 'HELP':

self.print_help() # 打印命令帮助

else:

self.send_command_to_server(command) # 向服务器发送命令

if command == 'EXIT':

break

except KeyboardInterrupt:

pass

def print_help(self):

# 打印命令帮助信息

print(

'-------------------------------------------\n'

'命令帮助:\n'

'PUT key value —— 添加 (key, value)\n'

'GET key —— 获取指定 key 的值\n'

'DEL key —— 删除指定 key 的值\n'

'LIST —— 显示所有 (key, value)\n'

'LOG —— 获取服务器端日志\n'

'EXIT —— 退出客户端\n'

'-------------------------------------------'

)

def send_command_to_server(self, command):

msg = getattr(self.proxy, 'function')(self.id, command) # 向服务器发送命令并获取返回信息

if msg is not None:

print(msg) # 打印服务器返回信息

if __name__ == '__main__':

print("尝试登录...")

username = input('输入用户名: ')

password = input('输入密码: ')

client = Client()

# 验证用户名和密码是否匹配

client_id = client.connect(username, password) # 连接到服务器并获取客户端ID

if client_id is None:

print('连接失败,没有多余的用户ID可以分配。')

else:

# 登录成功,显示欢迎消息和客户端ID

print('-------------------------------------------')

print("登录成功。")

print('欢迎使用分布式键值系统!')

print(f'您的客户端ID是 {client_id}')

print('输入 获取命令列表。')

print('-------------------------------------------')

client.handle_user_command() # 处理用户命令

proxy_server.py

from xmlrpc.server import SimpleXMLRPCServer

import xmlrpc.client as xmlrpclib

class ProxyServer:

def __init__(self, client_count):

# 用户名和密码

self.users = {

'1': '1',

'2': '2',

'3': '3',

}

# 初始化代理服务器,设置客户端连接状态和服务器列表

self.client_ids = [False] * client_count # 用于标记客户端是否连接的列表

# 连接到不同的服务器节点,服务器的基地址是20000

self.servers = [xmlrpclib.ServerProxy(f'http://localhost:{20000 + i}') for i in range(client_count)]

# 分配客户端ID

def get_id(self):

for i, connected in enumerate(self.client_ids):

if not connected:

self.client_ids[i] = True

print(f'客户端 {i} 登录')

return i

print('没有可用的 ID')

return None

# 处理客户端发来的命令

def function(self, client_id, clause):

clause = clause.strip().split() # 解析命令

lens = len(clause)

if lens < 1:

return '错误的命令。输入 help 查看帮助信息。'

command = clause[0]

# 检查命令类型

if command in ['PUT', 'GET', 'DEL', 'LIST', 'LOG', 'EXIT']:

# 获取对应的方法

server_function = getattr(self, command.lower())

return server_function(client_id, clause)

else:

return '错误的命令。输入 help 查看帮助信息。'

# 处理客户端退出命令

def exit(self, client_id, clause):

self.client_ids[client_id] = False

print(f'客户端 {client_id} 退出')

return f'客户端 {client_id} 退出'

# 实现PUT方法

def put(self, client_id, clause):

if len(clause) != 3:

return '错误的命令格式。使用方法: PUT key value'

key, value = clause[1], clause[2]

if self.servers[client_id].put(key, value):

return f"成功添加/更新键 {key},值 {value}"

return f"无法添加/更新键 {key},值 {value}"

# 实现GET方法

def get(self, client_id, clause):

if len(clause) != 2:

return '错误的命令格式。使用方法: GET key'

key = clause[1]

value = self.servers[client_id].get(key)

return f"键 {key} 的值为:{value if value is not None else '[未找到值]'}"

# 实现LIST方法

def list(self, client_id, clause):

if len(clause) != 1:

return '错误的命令格式。使用方法: LIST'

return self.servers[client_id].list()

# 实现DELETE方法

def delete(self, client_id, clause):

if len(clause) != 2:

return '错误的命令格式。使用方法: DEL key'

key = clause[1]

if self.servers[client_id].delete(key):

return f"删除键 {key} 成功"

return f"无法删除键 {key}"

# 实现LOG方法

def log(self, client_id, clause):

if len(clause) != 1:

return '错误的命令格式。使用方法: LOG'

return self.servers[client_id].get_log()

# 登陆验证

def authenticate(self, username, password):

if username not in self.users:

print('不存在该用户名,请重试!')

return False

elif self.users[username] != password:

print('密码错误,请重试!')

return False

else:

return True

if __name__ == '__main__':

count = int(input('输入客户端数量: '))

proxy = ProxyServer(client_count=count)

server = SimpleXMLRPCServer(('localhost', 21000), allow_none=True)

server.register_instance(proxy)

print(f"代理服务器正在运行...")

server.serve_forever()

node_server.py

import threading

from xmlrpc.server import SimpleXMLRPCServer

from xmlrpc.server import SimpleXMLRPCRequestHandler

# 这里模拟数据库

log = []

database = {}

database_lock = threading.Lock()

class Server:

def __init__(self, server_id):

self.server_id = server_id

self.cache = {} # 每个服务器实例的缓存字典

def put(self, key, value):

# 存储键值对到数据库并更新缓存

with database_lock:

database[key] = value

self.cache[key] = value # 添加/更新缓存

msg = f"添加/更新key:{key},value:{value}"

self.write_log(msg)

return True

def get(self, key):

# 先检查缓存,如果存在于缓存中则直接返回

if key in self.cache:

return self.cache[key]

# 如果不在缓存中,则从数据库中获取,并更新缓存

with database_lock:

value = database.get(key)

if value:

self.cache[key] = value # 如果在数据库中找到,则更新缓存

return value

def delete(self, key):

# 从数据库中删除键值对,并从缓存中删除

with database_lock:

if key in database:

del database[key]

if key in self.cache:

del self.cache[key] # 从缓存中删除

msg = f"删除key:{key}"

self.write_log(msg)

return True

return False

def list(self):

# 返回整个数据库

with database_lock:

return database

def get_log(self):

# 返回服务器日志

with database_lock:

return log

def write_log(self, msg):

# 记录服务器操作相关的日志

log.append(f"服务器 {self.server_id}:{msg}")

return True

def run_server(server_id):

# 启动和运行 XML-RPC 服务器

server = SimpleXMLRPCServer(("localhost", 20000 + server_id), requestHandler=SimpleXMLRPCRequestHandler)

server.register_instance(Server(server_id))

print(f"服务器 {server_id} 正在运行在端口 {20000 + server_id}\n")

server.serve_forever()

if __name__ == "__main__":

# 输入服务器数量并启动相应数量的线程

count = int(input('输入服务器数量:'))

threads = []

for i in range(count):

server_thread = threading.Thread(target=run_server, args=(i,))

threads.append(server_thread)

server_thread.start()

3.4 功能测试

依次运行节点服务器、代理服务器、客户端进行测试。

四、实验总结

4.1 遇到的困难和解决方法

我遇到的困难主要是如何设计这个系统,如何实现这个设计,以及在实现过程中遇到的BUG。

在设计这个系统时,我首先想的是实现一个集中式的键值存储系统,即只有一个客户端和一个服务器。这个实现需要rpc进行远程通信。经过了解,我选择了较为简单的python语言和xmlrpc库来实现远程通信。

但是初次使用就遇到了计算机积极拒绝的问题。

​​

首先,我排除了端口号错误和方法调用错误的原因。

然后,在尝试了多个解决方法,如设置防火墙、修改配置网络等都无效的情况下,我找到了最终的解决方法。因为我们电脑系统开放的端口号是不一样的,因此我们需要在命令行窗口使用**netstat -an**​命令来查看系统开放的端口,处于listening状态的端口才是可以使用的。将端口号改为listening的端口,这个BUG就解决了。

参考:127.0.0.1由于目标积极拒绝,无法连接

​​

设计好单个服务器与客户端的系统后,很简单的就可以拓展为多个客户端和多个服务器,现在的问题是该如何对客户隐藏有多个服务器的事实,使其感觉到只有一个服务器在运行呢?这个问题我通过中间件的设计思想,增加代理服务器解决了。

设计好一个简单的键值存储系统后,我思考如何给它添加一些功能,如一致性、安全防护、数据同步等问题。为了简化设计,我使用全局变量模拟数据库,使得所有服务器都连接到同一个数据库中。除此之外,我还对数据库的读写引入锁机制,对用户的登录引入登录验证功能。

4.2 实验心得

通过这个实验,我对RPC的原理和使用更加熟悉,也更具体地感受到中间件在分布式系统中的重要作用。特别是从一个简单的系统一步步拓展完善,直到实现一个包含简单功能的分布式键值存储系统的这个过程,加深了我对分布式系统的理解。

文章来源

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。