改造python3中的http.server为简单的文件上传下载服务

改造

修改python3中的http.server.SimpleHTTPRequestHandler,实现简单的文件上传下载服务

 simple_http_file_server.py:

# !/usr/bin/env python3

import datetime
import email
import html
import http.server
import io
import mimetypes
import os
import posixpath
import re
import shutil
import sys
import urllib.error
import urllib.parse
import urllib.request
import socket
from http import HTTPStatus
import threading
import contextlib
 
__version__ = "0.1"
__all__ = ["MySimpleHTTPRequestHandler"]
 
class MySimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
	server_version = "SimpleHTTP/" + __version__
	extensions_map = _encodings_map_default = {
		'.gz': 'application/gzip',
		'.Z': 'application/octet-stream',
		'.bz2': 'application/x-bzip2',
		'.xz': 'application/x-xz',
	}
 
	def __init__(self, *args, directory=None, **kwargs):
		if directory is None:
			directory = os.getcwd()
		self.directory = os.fspath(directory)
		super().__init__(*args, **kwargs)
 
	def do_GET(self):
		f = self.send_head()
		if f:
			try:
				self.copyfile(f, self.wfile)
			finally:
				f.close()
 
	def do_HEAD(self):
		f = self.send_head()
		if f:
			f.close()
 
	def send_head(self):
		path = self.translate_path(self.path)
		f = None
		if os.path.isdir(path):
			parts = urllib.parse.urlsplit(self.path)
			if not parts.path.endswith('/'):
				# redirect browser - doing basically what apache does
				self.send_response(HTTPStatus.MOVED_PERMANENTLY)
				new_parts = (parts[0], parts[1], parts[2] + '/',
							 parts[3], parts[4])
				new_url = urllib.parse.urlunsplit(new_parts)
				self.send_header("Location", new_url)
				self.end_headers()
				return None
			for index in "index.html", "index.htm":
				index = os.path.join(path, index)
				if os.path.exists(index):
					path = index
					break
			else:
				return self.list_directory(path)
 
		ctype = self.guess_type(path)
		if path.endswith("/"):
			self.send_error(HTTPStatus.NOT_FOUND, "File not found")
			return None
		try:
			f = open(path, 'rb')
		except OSError:
			self.send_error(HTTPStatus.NOT_FOUND, "File not found")
			return None
		try:
			fs = os.fstat(f.fileno())
			# Use browser cache if possible
			if ("If-Modified-Since" in self.headers
					and "If-None-Match" not in self.headers):
				# compare If-Modified-Since and time of last file modification
				try:
					ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"])
				except (TypeError, IndexError, OverflowError, ValueError):
					# ignore ill-formed values
					pass
				else:
					if ims.tzinfo is None:
						# obsolete format with no timezone, cf.
						# https://tools.ietf.org/html/rfc7231#section-7.1.1.1
						ims = ims.replace(tzinfo=datetime.timezone.utc)
					if ims.tzinfo is datetime.timezone.utc:
						# compare to UTC datetime of last modification
						last_modif = datetime.datetime.fromtimestamp(
							fs.st_mtime, datetime.timezone.utc)
						# remove microseconds, like in If-Modified-Since
						last_modif = last_modif.replace(microsecond=0)
 
						if last_modif <= ims:
							self.send_response(HTTPStatus.NOT_MODIFIED)
							self.end_headers()
							f.close()
							return None
 
			self.send_response(HTTPStatus.OK)
			self.send_header("Content-type", ctype)
			self.send_header("Content-Length", str(fs[6]))
			self.send_header("Last-Modified",
							 self.date_time_string(fs.st_mtime))
			self.end_headers()
			return f
		except:
			f.close()
			raise
 
	def list_directory(self, path):
		try:
			list_dir = os.listdir(path)
		except OSError:
			self.send_error(HTTPStatus.NOT_FOUND, "No permission to list_dir directory")
			return None
		list_dir.sort(key=lambda a: a.lower())
		r = []
		try:
			display_path = urllib.parse.unquote(self.path, errors='surrogatepass')
		except UnicodeDecodeError:
			display_path = urllib.parse.unquote(path)
		display_path = html.escape(display_path, quote=False)
		enc = sys.getfilesystemencoding()
 
		form = """
			<h1>文件上传</h1>\n
			<form ENCTYPE="multipart/form-data" method="post">\n
				<input name="file" type="file"/>\n
				<input type="submit" value="upload"/>\n
			</form>\n"""
		title = 'Directory listing for %s' % display_path
		r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
				 '"http://www.w3.org/TR/html4/strict.dtd">')
		r.append('<html>\n<head>')
		r.append('<meta http-equiv="Content-Type" '
				 'content="text/html; charset=%s">' % enc)
		r.append('<title>%s</title>\n</head>' % title)
		r.append('<body>%s\n<h1>%s</h1>' % (form, title))
		r.append('<hr>\n<ul>')
		for name in list_dir:
			fullname = os.path.join(path, name)
			displayname = linkname = name
			# Append / for directories or @ for symbolic links
			if os.path.isdir(fullname):
				displayname = name + "/"
				linkname = name + "/"
			if os.path.islink(fullname):
				displayname = name + "@"
				# Note: a link to a directory displays with @ and links with /
			r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),
													   html.escape(displayname, quote=False)))
		r.append('</ul>\n<hr>\n</body>\n</html>\n')
		encoded = '\n'.join(r).encode(enc, 'surrogate escape')
		f = io.BytesIO()
		f.write(encoded)
		f.seek(0)
		self.send_response(HTTPStatus.OK)
		self.send_header("Content-type", "text/html; charset=%s" % enc)
		self.send_header("Content-Length", str(len(encoded)))
		self.end_headers()
		return f
 
	def translate_path(self, path):
		# abandon query parameters
		path = path.split('?', 1)[0]
		path = path.split('#', 1)[0]
		# Don't forget explicit trailing slash when normalizing. Issue17324
		trailing_slash = path.rstrip().endswith('/')
		try:
			path = urllib.parse.unquote(path, errors='surrogatepass')
		except UnicodeDecodeError:
			path = urllib.parse.unquote(path)
		path = posixpath.normpath(path)
		words = path.split('/')
		words = filter(None, words)
		path = self.directory
		for word in words:
			if os.path.dirname(word) or word in (os.curdir, os.pardir):
				# Ignore components that are not a simple file/directory name
				continue
			path = os.path.join(path, word)
		if trailing_slash:
			path += '/'
		return path
 
	def copyfile(self, source, outputfile):
		shutil.copyfileobj(source, outputfile)
 
	def guess_type(self, path):
		base, ext = posixpath.splitext(path)
		if ext in self.extensions_map:
			return self.extensions_map[ext]
		ext = ext.lower()
		if ext in self.extensions_map:
			return self.extensions_map[ext]
		guess, _ = mimetypes.guess_type(path)
		if guess:
			return guess
		return 'application/octet-stream'
 
	def do_POST(self):
		r, info = self.deal_post_data()
		self.log_message('%s, %s => %s' % (r, info, self.client_address))
		enc = sys.getfilesystemencoding()
		res = [
			'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
			'"http://www.w3.org/TR/html4/strict.dtd">',
			'<html>\n<head>',
			'<meta http-equiv="Content-Type" content="text/html; charset=%s">' % enc,
			'<title>%s</title>\n</head>' % "Upload Result Page",
			'<body><h1>%s</h1>\n' % "Upload Result"
		]
		if r:
			res.append('<p>SUCCESS: %s</p>\n' % info)
		else:
			res.append('<p>FAILURE: %s</p>' % info)
		res.append('<a href=\"%s\">back</a>' % self.headers['referer'])
		res.append('</body></html>')
		encoded = '\n'.join(res).encode(enc, 'surrogate escape')
		f = io.BytesIO()
		f.write(encoded)
		length = f.tell()
		f.seek(0)
		self.send_response(200)
		self.send_header("Content-type", "text/html")
		self.send_header("Content-Length", str(length))
		self.end_headers()
		if f:
			self.copyfile(f, self.wfile)
			f.close()
 
	def deal_post_data(self):
		content_type = self.headers['content-type']
		if not content_type:
			return False, "Content-Type header doesn't contain boundary"
		boundary = content_type.split("=")[1].encode()
		remain_bytes = int(self.headers['content-length'])
		line = self.rfile.readline()
		remain_bytes -= len(line)
		if boundary not in line:
			return False, "Content NOT begin with boundary"
		line = self.rfile.readline()
		remain_bytes -= len(line)
		fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode())
		if not fn:
			return False, "Can't find out file name..."
		path = self.translate_path(self.path)
		fn = os.path.join(path, fn[0])
		line = self.rfile.readline()
		remain_bytes -= len(line)
		line = self.rfile.readline()
		remain_bytes -= len(line)
		try:
			out = open(fn, 'wb')
		except IOError:
			return False, "Can't create file to write, do you have permission to write?"
 
		preline = self.rfile.readline()
		remain_bytes -= len(preline)
		while remain_bytes > 0:
			line = self.rfile.readline()
			remain_bytes -= len(line)
			if boundary in line:
				preline = preline[0:-1]
				if preline.endswith(b'\r'):
					preline = preline[0:-1]
				out.write(preline)
				out.close()
				return True, "File '%s' upload success!" % fn
			else:
				out.write(preline)
				preline = line
		return False, "Unexpect Ends of data."
 

def _get_best_family(*address):
	infos = socket.getaddrinfo(
		*address,
		type=socket.SOCK_STREAM,
		flags=socket.AI_PASSIVE,
	)
	family, type, proto, canonname, sockaddr = next(iter(infos))
	return family, sockaddr




def serve_forever(port=8000, bind=None, directory="."):
	"""
	This runs an HTTP server on port 8000 (or the port argument).
	"""

	# ensure dual-stack is not disabled; ref #38907
	class DualStackServer(http.server.ThreadingHTTPServer):
		def server_bind(self):
			# suppress exception when protocol is IPv4
			with contextlib.suppress(Exception):
				self.socket.setsockopt(
					socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
			return super().server_bind()

		def finish_request(self, request, client_address):
			self.RequestHandlerClass(request, client_address, self,
										directory=directory)


	HandlerClass=MySimpleHTTPRequestHandler
	ServerClass=DualStackServer
	protocol="HTTP/1.0"
	ServerClass.address_family, addr = _get_best_family(bind, port)
	HandlerClass.protocol_version = protocol
	with ServerClass(addr, HandlerClass) as httpd:
		host, port = httpd.socket.getsockname()[:2]
		url_host = f'[{host}]' if ':' in host else host
		print(
			f"Serving HTTP on {host} port {port} "
			f"(http://{url_host}:{port}/) ..."
		)
		try:
			httpd.serve_forever()
		except KeyboardInterrupt:
			print("\nKeyboard interrupt received, exiting.")
			sys.exit(0)
 
if __name__ == '__main__':
	import argparse
	parser = argparse.ArgumentParser()
	parser.add_argument('--bind', '-b', metavar='ADDRESS',
						help='specify alternate bind address '
							 '(default: all interfaces)')
	parser.add_argument('--directory', '-d', default=os.getcwd(),
						help='specify alternate directory '
							 '(default: current directory)')
	parser.add_argument('port', action='store', default=8000, type=int,
						nargs='?',
						help='specify alternate port (default: 8000)')
	args = parser.parse_args()

	# 在主线程中执行
	# serve_forever(
	# 	port=args.port,
	# 	bind=args.bind,
	#	directory = args.directory
	# )

	# 在子线程中执行,这样主线程可以执行异步工作
	thread1 = threading.Thread(name='t1',target= serve_forever,
			kwargs={
				"port" : args.port,
				"bind" : args.bind,
				"directory" : args.directory
			}
		)
	thread1.start()
	# thread1.join()
	

	

使用帮助:

>python3 simple_http_file_server.py --help              
usage: simple_http_file_server.py [-h] [--cgi] [--bind ADDRESS] [--directory DIRECTORY] [port]

positional arguments:
  port                  specify alternate port (default: 8000)

optional arguments:
  -h, --help            show this help message and exit
  --cgi                 run as CGI server
  --bind ADDRESS, -b ADDRESS
                        specify alternate bind address (default: all interfaces)
  --directory DIRECTORY, -d DIRECTORY
                        specify alternate directory (default: current directory)

使用示例:

>python3 simple_http_file_server.py -d ~ 12345
Serving HTTP on :: port 12345 (http://[::]:12345/) ...
::ffff:127.0.0.1 - - [30/Nov/2023 09:35:06] "GET / HTTP/1.1" 200 -

。。。

访问:

浏览器中访问:http://127.0.0.1:12345/


参考:

        Python3 实现简单HTTP服务器(附带文件上传)_python3 -m http.server-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/225133.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Thymeleaf生成pdf表格合并单元格描边不显示

生成pdf后左侧第一列的右描边不显示&#xff0c;但是html显示正常 显示异常时描边的写法 cellpadding“0” cellspacing“0” &#xff0c;td,th描边 .self-table{border:1px solid #000;border-collapse: collapse;width:100%}.self-table th{font-size:12px;border:1px sol…

markdown学习(初学者)

学习途径&#xff1a; 在线markdown编辑器_微信公众号markdown排版工具 Markdown 基本语法 | Markdown 官方教程 标题 要创建标题&#xff0c;请在单词或短语前面添加井号 (#) 。# 的数量代表了标题的级别。例如&#xff0c;添加三个 # 表示创建一个三级标题 (<h3>) (…

【华为网络-配置-023】- 一般企业网架构方案(单节点方案)

要求&#xff1a; 1、防火墙 FW1 G1/0/0 接口使用 PPPoE 拨号获取 IP 地址。 2、FW1 配置信任&#xff08;内网包含服务器&#xff09;和非信任区域&#xff08;Internet 外网&#xff09;。 3、FW1 配置 NAPT 使内网可以上网。 4、核心交换机 LSW1 划分 VLAN 并配置各接口及三…

Python 日期时间模块详解(datetime)

文章目录 1 概述1.1 datetime 类图1.2 类描述 2 常用方法2.1 获取当前日期时间&#xff1a;now()、today()、time()2.2 日期时间格式化&#xff1a;strftime()2.3 日期时间大小比较&#xff1a;>、、<2.4 日期时间间隔&#xff1a;- 3 扩展3.1 Python 中日期时间格式化符…

Java / Scala - Trie 树简介与应用实现

目录 一.引言 二.Tire 树简介 1.树 Tree 2.二叉搜索树 Binary Search Tree 3.字典树 Trie Tree 3.1 基本概念 3.2 额外信息 3.3 结点实现 3.4 查找与存储 三.Trie 树应用 1.应用场景 2.Java / Scala 实现 2.1 Pom 依赖 2.2 关键词匹配 四.总结 一.引言 Trie 树…

案例054:基于微信的追星小程序

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

安装node.js并创建第一个vue项目

目录 一&#xff0c;下载node.js 二&#xff0c;创建一个vue项目 一&#xff0c;下载node.js 1.进入官网&#xff1a;Node.js (nodejs.org) 2.选择版本 3.选择安装方式 4.运行安装包&#xff0c;下载文件 5.选择要安装的路径后一直next 6.安装完成后打开命令提示符&#xff…

PHP短信接口防刷防轰炸多重解决方案三(可正式使用)

短信接口盗刷轰炸&#xff1a;指的是黑客利用非法手段获取短信接口的访问权限&#xff0c;然后使用该接口发送大量垃圾短信给目标用户 短信验证码轰炸解决方案一(验证码类解决)-CSDN博客 短信验证码轰炸解决方案二(防止海外ip、限制ip、限制手机号次数解决)-CSDN博客 PHP短信…

推荐几款转换视频格式的好用转换工具,小白也能上手

视频格式转换工具是一种专门转换视频的软件&#xff0c;可让你将一种视频格式转换为另一种视频格式&#xff08;例如&#xff0c;MOV 到 MP4&#xff09;&#xff0c;通常可以节省空间。 本文将介绍一些用于转换视频格式的好用转换工具&#xff0c;并且详细描述了它们的主要功…

【FPGA】数字电路设计基础

数字电路基础 1 什么是数字电路 在学习数字电路之前&#xff0c;我们先要了解下什么是数字电路。想要搞明白数字电路&#xff0c;就要搞明白生活中有 两种概念&#xff0c; 数字信号和模拟信号&#xff0c;模拟信号一般包括压力、气温、速度等信号&#xff0c;模拟量的值是可…

智能成绩表 - 华为OD统一考试(C卷)

OD统一考试(C卷) 分值: 100分 题目描述 小明来到某学校当老师,需要将学生按考试总分或单科分数进行排名,你能帮帮他吗? 输入描述 第1行输入两个整数,学生人数n和科目数量m。0<n<100,0<m<10 第2行输入m个科目名称,彼此之间用空格隔开。科目名称只由英文…

这书看着贼得劲儿

作者呕心沥血2年&#xff0c;再出力作~~~ 给大家推荐一本好玩的书 神经网络与TensorFlow 本来以为出版了第一本书&#xff0c;应该对于漫长的审核有免疫力了&#xff0c;结果又被这本书折磨了2年。于是作者痛定思痛&#xff0c;决定第三本书写一本纯科普的书籍。 墙裂推荐 这…

跨境电商危机公关:应对负面舆情的策略优化

随着跨境电商的快速发展&#xff0c;企业在全球市场中面临的竞争与挑战也日益复杂。在这个数字时代&#xff0c;负面舆情一旦爆发&#xff0c;可能对企业形象和经营造成深远影响。 因此&#xff0c;跨境电商企业需要建立有效的危机公关策略&#xff0c;以迅速、果断、有效地应…

Python集合魔法:解锁数据去重技巧

. 在Python编程的魔法世界中&#xff0c;有一种数据类型几乎被忽视&#xff0c;但却拥有强大的超能力&#xff0c;那就是集合&#xff08;Set&#xff09;。 集合是一种无序、唯一的数据类型&#xff0c;它以其独特的特点在编程世界中独占一席之地。 1. 集合的定义和特点 集…

Quantlib环境安装踩坑记录

1.conda 安装python3.7环境&#xff1b; 2.因为torch1.8.2环境已经被弃用&#xff0c;所以通过nvidia-smi命令确认cuda版本是11.6后进入环境&#xff0c; 输入pip install torch1.12.0cu116 torchvision0.13.0cu116 torchaudio0.12.0 torchtext0.13.0 --extra-index-url https:…

第二证券:股债跷跷板是什么?投资者该如何应对?

股市和债市虽然是两个不同的证券交易商场&#xff0c;但它们之间保持着一定的联系&#xff0c;比较典型的&#xff0c;便是股债商场间的联动&#xff0c;也称为股债跷跷板。股债跷跷板是什么&#xff1f;出资者该怎样应对&#xff1f;关于这些&#xff0c;本文将借用相关知识作…

基于SSM框架的购物商城系统论文

摘 要 网络技术和计算机技术发展至今&#xff0c;已经拥有了深厚的理论基础&#xff0c;并在现实中进行了充分运用&#xff0c;尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代&#xff0c;所以对于信息的宣传和管理就很关键。因此商城购物信息的…

多屏模式输入法可以正确切换屏幕展示原理剖析

背景 hi&#xff0c;粉丝朋友们&#xff1a; 近期有个学员问到了一个输入法相关问题。刚好梳理了一下输入法相关的在多屏模式的一个展示流程&#xff0c;这里做个记录&#xff0c;也相当于深入理解窗口相关的一篇干货blog。 如上面两幅图展示&#xff0c;输入法可以自由自在显…

从零开始,利用ChatGPT学会写作的完整指南

文章目录 前言了解ChatGPT访问OpenAI平台使用ChatGPT进行简单的对话定义写作主题逐步生成文章段落添加个性化和细节编辑和润色反复修改直至满意 图书推荐内容简介作者简介获取方式 前言 在数字时代&#xff0c;人工智能技术日益成熟&#xff0c;为我们提供了全新的学习和创作机…

使用@Validated注解导致Controller里注入的Service为空

2023年12月7日 今天在写接口参数校验时遇到一个问题&#xff0c;将注解Validated写在Controller上时导致导入的Service为空 Controller代码 Validated RestController RequestMapping("test") public class TestController {Autowiredprivate TestService testServ…