1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > 使用python3实现ftp服务功能实例(服务端 For Linux)

使用python3实现ftp服务功能实例(服务端 For Linux)

时间:2022-09-29 20:05:18

相关推荐

使用python3实现ftp服务功能实例(服务端 For Linux)

后端开发|Python教程

python ftp Linux

后端开发-Python教程

这篇文章主要介绍了python3实现ftp服务功能,服务端 For Linux,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

手机怎么搭建秒赞网站源码,vscode远程传输文件,ssh连接ubuntu,tomcat 设置jre,手写SQLite,家乡风景简介网页设计,jsp数据库连接代码,购买服务器必须备案吗,jstool 什么插件,最快上手的前端框架是什么,爬虫尿布,php push,东莞seo服务,springboot邮件源码,css del标签,水墨画网站模板,js将网页生成pdf,discuz x3模板制作教程,dedecms修改后台密码,ecshop商品页面,管理系统分类,大转盘网站程序lzw

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

广告机发布系统源码,vscode安装vetur,pcilib ubuntu,yum 卸载tomcat,c#+sqlite自动创建,wordpress免费主题插件下载,h5棋牌游戏前端框架,爬虫点击勾选确认阅读,php startup,合肥seo公司价格,农业公司网站源码,c 象棋游戏网页项目源码,android应用模板下载lzw

功能介绍:

易语言源码案例,ubuntu 终端创建用户,服务器上tomcat配置,电子爬虫怎么写,怎么提高php的技术学习,通化seo排名lzw

可执行的命令:

ls

pwd

cd

put

rm

get

mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条

server main 代码:

# Author by Andy# _*_ coding:utf-8 _*_import os, sys, json, hashlib, socketserver, timebase_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))sys.path.append(base_dir)from conf import userdb_setclass Ftp_server(socketserver.BaseRequestHandler): user_home_dir = \ def auth(self, *args): \验证用户名及密码\ cmd_dic = args[0] username = cmd_dic["username"] password = cmd_dic["password"] f = open(userdb_set.userdb_set(), ) user_info = json.load(f) if username in user_info.keys(): if password == user_info[username]: self.request.send(.encode()) os.chdir(/home/%s % username) self.user_home_dir = os.popen(pwd).read().strip() data = "%s login successed" % username self.loging(data) else: self.request.send(1.encode()) data = "%s login failed" % username self.loging(data) f.close else: self.request.send(1.encode()) data = "%s login failed" % username self.loging(data) f.close ########################################## def get(self, *args): \给客户端传输文件\ request_code = { : file is ready to get, 1: file not found! } cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) filename = cmd_dic["filename"] if os.path.isfile(filename): self.request.send(.encode(utf-8)) # 确认文件存在 self.request.recv(1024) self.request.send(str(os.stat(filename).st_size).encode(utf-8)) self.request.recv(1024) m = hashlib.md5() f = open(filename, b) for line in f: m.update(line) self.request.send(line) self.request.send(m.hexdigest().encode(utf-8)) print(From server:Md5 value has been sended!) f.close() else: self.request.send(1.encode(utf-8)) ########################################### def cd(self, *args): \执行cd命令\ user_current_dir = os.popen(pwd).read().strip() cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) path = cmd_dic[path] if path.startswith(/): if self.user_home_dir in path: os.chdir(path) new_dir = os.popen(pwd).read() user_current_dir = new_dir self.request.send(Change dir successfully!.encode("utf-8")) data = Change dir successfully! self.loging(data) elif os.path.exists(path): self.request.send(Permission Denied!.encode("utf-8")) data = Permission Denied! self.loging(data) else: self.request.send(Directory not found!.encode("utf-8")) data = Directory not found! self.loging(data) elif os.path.exists(path): os.chdir(path) new_dir = os.popen(pwd).read().strip() if self.user_home_dir in new_dir: self.request.send(Change dir successfully!.encode("utf-8")) user_current_dir = new_dir data = Change dir successfully! self.loging(data) else: os.chdir(user_current_dir) self.request.send(Permission Denied!.encode("utf-8")) data = Permission Denied! self.loging(data) else: self.request.send(Directory not found!.encode("utf-8")) data = Directory not found! self.loging(data) ########################################### def rm(self, *args): request_code = { : file exist,and Please confirm whether to rm, 1: file not found! } cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) filename = cmd_dic[filename] if os.path.exists(filename): self.request.send(.encode("utf-8")) # 确认文件存在 client_response = self.request.recv(1024).decode() if client_response == : os.popen( m -rf %s % filename) self.request.send((File %s has been deleted! % filename).encode("utf-8")) self.loging(File %s has been deleted! % filename) else: self.request.send((File %s not deleted! % filename).encode("utf-8")) self.loging(File %s not deleted! % filename) else: self.request.send(1.encode("utf-8")) ######################################## def pwd(self, *args): \执行pwd命令\ cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) server_response = os.popen(pwd).read().strip().encode("utf-8") self.request.send(server_response) ############################################# def ls(self, *args): \执行ls命名\ cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) path = cmd_dic[path] cmd = ls -l %s % path server_response = os.popen(cmd).read().encode("utf-8") self.request.send(server_response) ############################################ def put(self, *args): \接收客户端文件\ cmd_dic = args[0] self.loging(json.dumps(cmd_dic)) filename = cmd_dic["filename"] filesize = cmd_dic["size"] if os.path.isfile(filename): f = open(filename + .new, wb) else: f = open(filename, wb) request_code = { 200: Ready to recceive data!, 210: Not ready to received data! } self.request.send(200.encode()) receive_size = 0 while True: if receive_size % localtime + data + \ ) ############################################## def handle(self): # print("您本次访问使用的IP为:%s" %self.client_address[0]) # localtime = time.asctime( time.localtime(time.time())) # print(localtime) while True: try: self.data = self.request.recv(1024).decode() # # print(self.data) cmd_dic = json.loads(self.data) action = cmd_dic["action"] # print("用户请求%s"%action) if hasattr(self, action):func = getattr(self, action)func(cmd_dic) except Exception as e: self.loging(str(e)) breakdef run(): HOST, PORT = .0.0.0, 6969 print("The server is started,and listenning at port 6969") server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server) server.serve_forever()if name == main: run()

设置用户口令代码:

#Author by Andy#_*_ coding:utf-8 _*_import os,json,hashlib,sysbase_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))userdb_file = base_dir+"\data\\userdb"# print(userdb_file)def userdb_set(): if os.path.isfile(userdb_file): # print(userdb_file) return userdb_file else: print(请先为您的服务器创建用户!) user_data = {} dict={} Exit_flags = True while Exit_flags: username = input("Please input username:") if username != exit: password = input("Please input passwod:") if password != exit:user_data.update({username:password})m = hashlib.md5()# m.update(hello)# print(m.hexdigest())for i in user_data: # print(i,user_data[i]) m.update(user_data[i].encode()) dict.update({i:m.hexdigest()}) else:break else: break f = open(userdb_file,w) json.dump(dict,f) f.close() return userdb_file

目录结构:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。