一个基于selectors开发ftp并发上传下载文件小程序
理论基础: ? ? ? ? ?IO多路复用之selectors模块 服务端代码: import os import socket import selectors import json BASE_DIR = os.path.dirname(os.path.abspath(__file__)) buffer_size = 1024 STATUS_CODE = { ? ? 700: '路径存在', ? ? 701: '路径不存在,请重新新输入', ? ? 702: "文件名已存在,请重命名文件", ? ? 800: "文件已存在,但是大小不够,是否继续上传", ? ? 801: "文件已存在", ? ? 802: "准备发送文件", ? ? 803: "准备接受数据", ? ? 804: "文件不存在", } class SelectFtpServer: ? ? def __init__(self): ? ? ? ? self.dic = {} ? ? ? ? self.sel = selectors.DefaultSelector() ? ? ? ? self.create_socket() ? ? ? ? self.handle() ? ? def create_socket(self): ? ? ? ? server = socket.socket() ? ? ? ? server.bind(("127.0.0.1",8885)) ? ? ? ? server.listen(5) ? ? ? ? server.setblocking(False) ? ? ? ? self.sel.register(server,selectors.EVENT_READ,self.accept) ? ? ? ? print("服务端开启,等待用户连接...") ? ? def handle(self): ? ? ? ? while True: ? ? ? ? ? ? events = self.sel.select() ? ? ? ? ? ? for key,mask in events: ? ? ? ? ? ? ? ? callback = key.data ? ? ? ? ? ? ? ? callback(key.fileobj,mask) ? ? def accept(self,sock,mask): ? ? ? ? conn,addr = sock.accept() ? ? ? ? print("{}连接成功".format(addr)) ? ? ? ? conn.setblocking(False) ? ? ? ? self.sel.register(conn,self.read) ? ? ? ? self.dic[conn] = {} ? ? ? ? # print(self.dic) ? ? def read(self,conn,mask): ? ? ? ? try: ? ? ? ? ? ? if not self.dic[conn]: ? ? ? ? ? ? ? ? data = json.loads(conn.recv(buffer_size).decode("utf-8")) ? ? ? ? ? ? ? ? if data: ? ? ? ? ? ? ? ? ? ? cmd = data["action"] ? ? ? ? ? ? ? ? ? ? file_name = data["file_name"] ? ? ? ? ? ? ? ? ? ? file_size = data['file_size'] ? ? ? ? ? ? ? ? ? ? self.dic[conn] = {"action": cmd,"file_name": file_name,"file_size": file_size,"has_recv": 0} ? ? ? ? ? ? ? ? ? ? if cmd == 'put': ? ? ? ? ? ? ? ? ? ? ? ? data = { ? ? ? ? ? ? ? ? ? ? ? ? ? ? "status_code": 802 ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? conn.send(json.dumps(data).encode()) ? ? ? ? ? ? ? ? ? ? elif cmd == 'get': ? ? ? ? ? ? ? ? ? ? ? ? file_path = os.path.join(BASE_DIR,"upload",file_name) ? ? ? ? ? ? ? ? ? ? ? ? # print(file_path) ? ? ? ? ? ? ? ? ? ? ? ? if os.path.exists(file_path): ? ? ? ? ? ? ? ? ? ? ? ? ? ? file_size = os.path.getsize(file_path) ? ? ? ? ? ? ? ? ? ? ? ? ? ? data = { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "status_code": 803, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "file_size": file_size ? ? ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? ? ? ? ? ? ? data = { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "status_code": 804, ? ? ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? conn.send(json.dumps(data).encode()) ? ? ? ? ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? ? ? ? ? pass ? ? ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? ? ? self.sel.unregister(conn) ? ? ? ? ? ? ? ? ? ? conn.close() ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? cmd = self.dic[conn].get("action",None) ? ? ? ? ? ? ? ? if cmd: ? ? ? ? ? ? ? ? ? ? if hasattr(self,cmd): ? ? ? ? ? ? ? ? ? ? ? ? func = getattr(self,cmd) ? ? ? ? ? ? ? ? ? ? ? ? func(conn) ? ? ? ? ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? ? ? ? ? print("错误的命令!") ? ? ? ? ? ? ? ? ? ? ? ? conn.close() ? ? ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? ? ? print("错误的命令!") ? ? ? ? ? ? ? ? ? ? conn.close() ? ? ? ? except ConnectionResetError as e: ? ? ? ? ? ? print('error',e) ? ? ? ? ? ? self.sel.unregister(conn) ? ? ? ? ? ? conn.close() ? ? def put(self,conn): ? ? ? ? file_size = self.dic[conn]['file_size'] ? ? ? ? path = os.path.join(BASE_DIR,self.dic[conn]['file_name']) ? ? ? ? recv_data = conn.recv(buffer_size) ? ? ? ? self.dic[conn]['has_recv'] += len(recv_data) ? ? ? ? with open(path,'ab') as f: ? ? ? ? ? ? f.write(recv_data) ? ? ? ? ? ? if self.dic[conn]['has_recv'] >= file_size: ? ? ? ? ? ? ? ? print("%s上传完毕!" % self.dic[conn]['file_name']) ? ? ? ? ? ? ? ? print(self.dic[conn]) ? ? ? ? ? ? ? ? self.dic[conn] = {} ? ? def get(self,conn): ? ? ? ? file_name = self.dic[conn]['file_name'] ? ? ? ? path = os.path.join(BASE_DIR,file_name) ? ? ? ? if json.loads(conn.recv(buffer_size).decode("utf-8"))["status_code"] == 802: ? ? ? ? ? ? with open(path,'rb') as f: ? ? ? ? ? ? ? ? for line in f: ? ? ? ? ? ? ? ? ? ? conn.send(line) ? ? ? ? ? ? self.dic[conn] = {} ? ? ? ? ? ? print('文件下载完毕!') ? ? ? ? ? ? # print(self.dic[conn]) if __name__ == '__main__': ? ? SelectFtpServer() 客户端代码: import socket import os,sys import json BASE_DIR = os.path.dirname(os.path.abspath(__file__)) buffer_size = 1024 class SelectFtpClient: ? ? def __init__(self): ? ? ? ? self.args = sys.argv ? ? ? ? if len(self.args) > 1: ? ? ? ? ? ? self.port = (self.args[1],int(self.args[2])) ? ? ? ? else: ? ? ? ? ? ? self.port = ("127.0.0.1",8885) ? ? ? ? self.create_socket() ? ? ? ? self.command() ? ? def create_socket(self): ? ? ? ? try: ? ? ? ? ? ? self.sk = socket.socket() ? ? ? ? ? ? self.sk.connect(self.port) ? ? ? ? ? ? print('连接FTP服务器成功!') ? ? ? ? except Exception as e: ? ? ? ? ? ? print("error: ",e) ? ? def command(self): ? ? ? ? while True: ? ? ? ? ? ? cmd = input('>>>').strip() ? ? ? ? ? ? if cmd == 'exit()': ? ? ? ? ? ? ? ? print("结束运行") ? ? ? ? ? ? ? ? break ? ? ? ? ? ? try: ? ? ? ? ? ? ? ? cmd,file = cmd.split() ? ? ? ? ? ? ? ? if hasattr(self,cmd): ? ? ? ? ? ? ? ? ? ? func = getattr(self,cmd) ? ? ? ? ? ? ? ? ? ? func(cmd,file) ? ? ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? ? ? print('调用错误!') ? ? ? ? ? ? except ValueError: ? ? ? ? ? ? ? ? print("请正确输入命令") ? ? def put(self,cmd,file): ? ? ? ? if os.path.isfile(file): ? ? ? ? ? ? file_name = os.path.basename(file) ? ? ? ? ? ? file_size = os.path.getsize(file) ? ? ? ? ? ? data = { ? ? ? ? ? ? ? ? "action": cmd, ? ? ? ? ? ? ? ? "file_name": file_name, ? ? ? ? ? ? ? ? "file_size": file_size ? ? ? ? ? ? } ? ? ? ? ? ? self.sk.send(json.dumps(data).encode()) ? ? ? ? ? ? recv_data = json.loads(self.sk.recv(buffer_size).decode("utf-8")) ? ? ? ? ? ? print('recvStatus',recv_data) ? ? ? ? ? ? has_send = 0 ? ? ? ? ? ? if recv_data['status_code'] == 802: ? ? ? ? ? ? ? ? with open(file,'rb') as f: ? ? ? ? ? ? ? ? ? ? while file_size > has_send: ? ? ? ? ? ? ? ? ? ? ? ? contant = f.read(1024) ? ? ? ? ? ? ? ? ? ? ? ? self.sk.send(contant) ? ? ? ? ? ? ? ? ? ? ? ? has_send += len(contant) ? ? ? ? ? ? ? ? ? ? ? ? self.progress_bar(has_send,file_size) ? ? ? ? ? ? ? ? print('n%s文件上传完毕' % (file_name,)) ? ? ? ? else: ? ? ? ? ? ? print('文件不存在') ? ? def get(self,file_name): ? ? ? ? data = { ? ? ? ? ? ? "action": cmd, ? ? ? ? ? ? "file_name": file_name, ? ? ? ? ? ? "file_size": 0 ? ? ? ? } ? ? ? ? self.sk.send(json.dumps(data).encode()) ? ? ? ? rec_data = json.loads(self.sk.recv(buffer_size).decode("utf-8")) ? ? ? ? if rec_data["status_code"] == 803: ? ? ? ? ? ? data = { ? ? ? ? ? ? ? ? "status_code": 802 ? ? ? ? ? ? } ? ? ? ? ? ? self.sk.send(json.dumps(data).encode("utf-8")) ? ? ? ? ? ? path = os.path.join(BASE_DIR,file_name) ? ? ? ? ? ? has_recv = 0 ? ? ? ? ? ? with open(path,'wb') as f_write: ? ? ? ? ? ? ? ? while has_recv < rec_data['file_size']: ? ? ? ? ? ? ? ? ? ? content = self.sk.recv(buffer_size) ? ? ? ? ? ? ? ? ? ? has_recv += len(content) ? ? ? ? ? ? ? ? ? ? f_write.write(content) ? ? ? ? ? ? ? ? ? ? self.progress_bar(has_recv,rec_data['file_size']) ? ? ? ? ? ? ? ? print('nfile %s 下载完成!' % file_name) ? ? ? ? else: ? ? ? ? ? ? print("文件不存在!") ? ? def progress_bar(self,f_size,total): ? ? ? ? percent = round(f_size / total * 100) ? ? ? ? str = '*' * (percent // 2) + ' ' * ((100 - percent) // 2) ? ? ? ? print('r' + str + ' {}%'.format(percent),end='',flush=True,) if __name__ == '__main__': ? ? SelectFtpClient() (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |