加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

一个基于selectors开发ftp并发上传下载文件小程序

发布时间:2020-12-17 17:01:54 所属栏目:Python 来源:网络整理
导读:理论基础: ? ? ? ? ?IO多路复用之selectors模块 服务端代码: import os import socket import selectors import json BASE_DIR = os.path.dirname(os.path.abspath(__file__)) buffer_size = 1024 STATUS_CODE = { ? ? 700: '路径存在', ? ? 701: '路径不存

理论基础:

? ? ? ? ?IO多路复用之selectors模块


服务端代码:


import os

import socket

import selectors

import json

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

buffer_size = 1024

STATUS_CODE = {

? ? 700: '路径存在',

? ? 701: '路径不存在,请重新新输入',

? ? 702: "文件名已存在,请重命名文件",

? ? 800: "文件已存在,但是大小不够,是否继续上传",

? ? 801: "文件已存在",

? ? 802: "准备发送文件",

? ? 803: "准备接受数据",

? ? 804: "文件不存在",

}

class SelectFtpServer:

? ? def __init__(self):

? ? ? ? self.dic = {}

? ? ? ? self.sel = selectors.DefaultSelector()

? ? ? ? self.create_socket()

? ? ? ? self.handle()

? ? def create_socket(self):

? ? ? ? server = socket.socket()

? ? ? ? server.bind(("127.0.0.1",8885))

? ? ? ? server.listen(5)

? ? ? ? server.setblocking(False)

? ? ? ? self.sel.register(server,selectors.EVENT_READ,self.accept)

? ? ? ? print("服务端开启,等待用户连接...")

? ? def handle(self):

? ? ? ? while True:

? ? ? ? ? ? events = self.sel.select()

? ? ? ? ? ? for key,mask in events:

? ? ? ? ? ? ? ? callback = key.data

? ? ? ? ? ? ? ? callback(key.fileobj,mask)

? ? def accept(self,sock,mask):

? ? ? ? conn,addr = sock.accept()

? ? ? ? print("{}连接成功".format(addr))

? ? ? ? conn.setblocking(False)

? ? ? ? self.sel.register(conn,self.read)

? ? ? ? self.dic[conn] = {}

? ? ? ? # print(self.dic)

? ? def read(self,conn,mask):

? ? ? ? try:

? ? ? ? ? ? if not self.dic[conn]:

? ? ? ? ? ? ? ? data = json.loads(conn.recv(buffer_size).decode("utf-8"))

? ? ? ? ? ? ? ? if data:

? ? ? ? ? ? ? ? ? ? cmd = data["action"]

? ? ? ? ? ? ? ? ? ? file_name = data["file_name"]

? ? ? ? ? ? ? ? ? ? file_size = data['file_size']

? ? ? ? ? ? ? ? ? ? self.dic[conn] = {"action": cmd,"file_name": file_name,"file_size": file_size,"has_recv": 0}

? ? ? ? ? ? ? ? ? ? if cmd == 'put':

? ? ? ? ? ? ? ? ? ? ? ? data = {

? ? ? ? ? ? ? ? ? ? ? ? ? ? "status_code": 802

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? conn.send(json.dumps(data).encode())

? ? ? ? ? ? ? ? ? ? elif cmd == 'get':

? ? ? ? ? ? ? ? ? ? ? ? file_path = os.path.join(BASE_DIR,"upload",file_name)

? ? ? ? ? ? ? ? ? ? ? ? # print(file_path)

? ? ? ? ? ? ? ? ? ? ? ? if os.path.exists(file_path):

? ? ? ? ? ? ? ? ? ? ? ? ? ? file_size = os.path.getsize(file_path)

? ? ? ? ? ? ? ? ? ? ? ? ? ? data = {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "status_code": 803,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "file_size": file_size

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? ? ? ? ? ? data = {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "status_code": 804,

? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? conn.send(json.dumps(data).encode())

? ? ? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? ? ? ? pass

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? ? self.sel.unregister(conn)

? ? ? ? ? ? ? ? ? ? conn.close()

? ? ? ? ? ? else:

? ? ? ? ? ? ? ? cmd = self.dic[conn].get("action",None)

? ? ? ? ? ? ? ? if cmd:

? ? ? ? ? ? ? ? ? ? if hasattr(self,cmd):

? ? ? ? ? ? ? ? ? ? ? ? func = getattr(self,cmd)

? ? ? ? ? ? ? ? ? ? ? ? func(conn)

? ? ? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? ? ? ? print("错误的命令!")

? ? ? ? ? ? ? ? ? ? ? ? conn.close()

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? ? print("错误的命令!")

? ? ? ? ? ? ? ? ? ? conn.close()

? ? ? ? except ConnectionResetError as e:

? ? ? ? ? ? print('error',e)

? ? ? ? ? ? self.sel.unregister(conn)

? ? ? ? ? ? conn.close()

? ? def put(self,conn):

? ? ? ? file_size = self.dic[conn]['file_size']

? ? ? ? path = os.path.join(BASE_DIR,self.dic[conn]['file_name'])

? ? ? ? recv_data = conn.recv(buffer_size)

? ? ? ? self.dic[conn]['has_recv'] += len(recv_data)

? ? ? ? with open(path,'ab') as f:

? ? ? ? ? ? f.write(recv_data)

? ? ? ? ? ? if self.dic[conn]['has_recv'] >= file_size:

? ? ? ? ? ? ? ? print("%s上传完毕!" % self.dic[conn]['file_name'])

? ? ? ? ? ? ? ? print(self.dic[conn])

? ? ? ? ? ? ? ? self.dic[conn] = {}

? ? def get(self,conn):

? ? ? ? file_name = self.dic[conn]['file_name']

? ? ? ? path = os.path.join(BASE_DIR,file_name)

? ? ? ? if json.loads(conn.recv(buffer_size).decode("utf-8"))["status_code"] == 802:

? ? ? ? ? ? with open(path,'rb') as f:

? ? ? ? ? ? ? ? for line in f:

? ? ? ? ? ? ? ? ? ? conn.send(line)

? ? ? ? ? ? self.dic[conn] = {}

? ? ? ? ? ? print('文件下载完毕!')

? ? ? ? ? ? # print(self.dic[conn])

if __name__ == '__main__':

? ? SelectFtpServer()

客户端代码:


import socket

import os,sys

import json

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

buffer_size = 1024

class SelectFtpClient:

? ? def __init__(self):

? ? ? ? self.args = sys.argv

? ? ? ? if len(self.args) > 1:

? ? ? ? ? ? self.port = (self.args[1],int(self.args[2]))

? ? ? ? else:

? ? ? ? ? ? self.port = ("127.0.0.1",8885)

? ? ? ? self.create_socket()

? ? ? ? self.command()

? ? def create_socket(self):

? ? ? ? try:

? ? ? ? ? ? self.sk = socket.socket()

? ? ? ? ? ? self.sk.connect(self.port)

? ? ? ? ? ? print('连接FTP服务器成功!')

? ? ? ? except Exception as e:

? ? ? ? ? ? print("error: ",e)

? ? def command(self):

? ? ? ? while True:

? ? ? ? ? ? cmd = input('>>>').strip()

? ? ? ? ? ? if cmd == 'exit()':

? ? ? ? ? ? ? ? print("结束运行")

? ? ? ? ? ? ? ? break

? ? ? ? ? ? try:

? ? ? ? ? ? ? ? cmd,file = cmd.split()

? ? ? ? ? ? ? ? if hasattr(self,cmd):

? ? ? ? ? ? ? ? ? ? func = getattr(self,cmd)

? ? ? ? ? ? ? ? ? ? func(cmd,file)

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? ? print('调用错误!')

? ? ? ? ? ? except ValueError:

? ? ? ? ? ? ? ? print("请正确输入命令")

? ? def put(self,cmd,file):

? ? ? ? if os.path.isfile(file):

? ? ? ? ? ? file_name = os.path.basename(file)

? ? ? ? ? ? file_size = os.path.getsize(file)

? ? ? ? ? ? data = {

? ? ? ? ? ? ? ? "action": cmd,

? ? ? ? ? ? ? ? "file_name": file_name,

? ? ? ? ? ? ? ? "file_size": file_size

? ? ? ? ? ? }

? ? ? ? ? ? self.sk.send(json.dumps(data).encode())

? ? ? ? ? ? recv_data = json.loads(self.sk.recv(buffer_size).decode("utf-8"))

? ? ? ? ? ? print('recvStatus',recv_data)

? ? ? ? ? ? has_send = 0

? ? ? ? ? ? if recv_data['status_code'] == 802:

? ? ? ? ? ? ? ? with open(file,'rb') as f:

? ? ? ? ? ? ? ? ? ? while file_size > has_send:

? ? ? ? ? ? ? ? ? ? ? ? contant = f.read(1024)

? ? ? ? ? ? ? ? ? ? ? ? self.sk.send(contant)

? ? ? ? ? ? ? ? ? ? ? ? has_send += len(contant)

? ? ? ? ? ? ? ? ? ? ? ? self.progress_bar(has_send,file_size)

? ? ? ? ? ? ? ? print('n%s文件上传完毕' % (file_name,))

? ? ? ? else:

? ? ? ? ? ? print('文件不存在')

? ? def get(self,file_name):

? ? ? ? data = {

? ? ? ? ? ? "action": cmd,

? ? ? ? ? ? "file_name": file_name,

? ? ? ? ? ? "file_size": 0

? ? ? ? }

? ? ? ? self.sk.send(json.dumps(data).encode())

? ? ? ? rec_data = json.loads(self.sk.recv(buffer_size).decode("utf-8"))

? ? ? ? if rec_data["status_code"] == 803:

? ? ? ? ? ? data = {

? ? ? ? ? ? ? ? "status_code": 802

? ? ? ? ? ? }

? ? ? ? ? ? self.sk.send(json.dumps(data).encode("utf-8"))

? ? ? ? ? ? path = os.path.join(BASE_DIR,file_name)

? ? ? ? ? ? has_recv = 0

? ? ? ? ? ? with open(path,'wb') as f_write:

? ? ? ? ? ? ? ? while has_recv < rec_data['file_size']:

? ? ? ? ? ? ? ? ? ? content = self.sk.recv(buffer_size)

? ? ? ? ? ? ? ? ? ? has_recv += len(content)

? ? ? ? ? ? ? ? ? ? f_write.write(content)

? ? ? ? ? ? ? ? ? ? self.progress_bar(has_recv,rec_data['file_size'])

? ? ? ? ? ? ? ? print('nfile %s 下载完成!' % file_name)

? ? ? ? else:

? ? ? ? ? ? print("文件不存在!")

? ? def progress_bar(self,f_size,total):

? ? ? ? percent = round(f_size / total * 100)

? ? ? ? str = '*' * (percent // 2) + ' ' * ((100 - percent) // 2)

? ? ? ? print('r' + str + ' {}%'.format(percent),end='',flush=True,)

if __name__ == '__main__':

? ? SelectFtpClient()


(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读