awesome-website-2
Python 网站开发(5) – 搭建Web框架
搭建Web框架
由于aiohttp作为一个Web框架比较底层,我们还需要基于aiohttp编写一个更方便处理URL的Web框架。
在www目录新建coroweb.py:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172import asyncio, os, inspect, logging, functools
from urllib import parse
from aiohttp import web
## apis是处理分页的模块,代码在本章页面末尾,请将apis.py放在www下以防报错
## APIError 是指API调用时发生逻辑错误
from apis import APIError
## 编写装饰函数 @get()
def get(path):
## Define decorator @get('/path')
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kw):
return func(*args, **kw)
wrapper.__method__ = 'GET'
wrapper.__route__ = path
return wrapper
return decorator
## 编写装饰函数 @post()
def post(path):
## Define decorator @post('/path')
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kw):
return func(*args, **kw)
wrapper.__method__ = 'POST'
wrapper.__route__ = path
return wrapper
return decorator
## 以下是RequestHandler需要定义的一些函数
def get_required_kw_args(fn):
args = []
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty:
args.append(name)
return tuple(args)
def get_named_kw_args(fn):
args = []
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
args.append(name)
return tuple(args)
def has_named_kw_args(fn):
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
return True
def has_var_kw_arg(fn):
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.VAR_KEYWORD:
return True
def has_request_arg(fn):
sig = inspect.signature(fn)
params = sig.parameters
found = False
for name, param in params.items():
if name == 'request':
found = True
continue
if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD):
raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig)))
return found
## 定义RequestHandler从URL函数中分析其需要接受的参数
class RequestHandler(object):
def __init__(self, app, fn):
self._app = app
self._func = fn
self._has_request_arg = has_request_arg(fn)
self._has_var_kw_arg = has_var_kw_arg(fn)
self._has_named_kw_args = has_named_kw_args(fn)
self._named_kw_args = get_named_kw_args(fn)
self._required_kw_args = get_required_kw_args(fn)
async def __call__(self, request):
kw = None
if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args:
if request.method == 'POST':
if not request.content_type:
return web.HTTPBadRequest('Missing Content-Type.')
ct = request.content_type.lower()
if ct.startswith('application/json'):
params = await request.json()
if not isinstance(params, dict):
return web.HTTPBadRequest('JSON body must be object.')
kw = params
elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):
params = await request.post()
kw = dict(**params)
else:
return web.HTTPBadRequest('Unsupported Content-Type: %s' % request.content_type)
if request.method == 'GET':
qs = request.query_string
if qs:
kw = dict()
for k, v in parse.parse_qs(qs, True).items():
kw[k] = v[0]
if kw is None:
kw = dict(**request.match_info)
else:
if not self._has_var_kw_arg and self._named_kw_args:
# remove all unamed kw:
copy = dict()
for name in self._named_kw_args:
if name in kw:
copy[name] = kw[name]
kw = copy
# check named arg:
for k, v in request.match_info.items():
if k in kw:
logging.warning('Duplicate arg name in named arg and kw args: %s' % k)
kw[k] = v
if self._has_request_arg:
kw['request'] = request
# check required kw:
if self._required_kw_args:
for name in self._required_kw_args:
if not name in kw:
return web.HTTPBadRequest('Missing argument: %s' % name)
logging.info('call with args: %s' % str(kw))
try:
r = await self._func(**kw)
return r
except APIError as e:
return dict(error=e.error, data=e.data, message=e.message)
## 定义add_static函数,来注册static文件夹下的文件
def add_static(app):
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
app.router.add_static('/static/', path)
logging.info('add static %s => %s' % ('/static/', path))
## 定义add_route函数,来注册一个URL处理函数
def add_route(app, fn):
method = getattr(fn, '__method__', None)
path = getattr(fn, '__route__', None)
if path is None or method is None:
raise ValueError('@get or @post not defined in %s.' % str(fn))
if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
fn = asyncio.coroutine(fn)
logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))
app.router.add_route(method, path, RequestHandler(app, fn))
## 定义add_routes函数,自动把handler模块的所有符合条件的URL函数注册了
def add_routes(app, module_name):
n = module_name.rfind('.')
if n == (-1):
mod = __import__(module_name, globals(), locals())
else:
name = module_name[n+1:]
mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name)
for attr in dir(mod):
if attr.startswith('_'):
continue
fn = getattr(mod, attr)
if callable(fn):
method = getattr(fn, '__method__', None)
path = getattr(fn, '__route__', None)
if method and path:
add_route(app, fn)
最后,在app.py中加入middleware、jinja2模板和自注册的支持。app.py代码修改如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146import logging; logging.basicConfig(level=logging.INFO)
import asyncio, os, json, time
from datetime import datetime
from aiohttp import web
from jinja2 import Environment, FileSystemLoader
## config 配置代码在后面会创建添加, 可先从'https://github.com/yzyly1992/2019_Python_Web_Dev'下载或下一章中复制`config.py`和`config_default.py`到`www`下,以防报错
from config import configs
import orm
from coroweb import add_routes, add_static
## handlers 是url处理模块, 当handlers.py在API章节里完全编辑完再将下一行代码的双井号去掉
## from handlers import cookie2user, COOKIE_NAME
## 初始化jinja2的函数
def init_jinja2(app, **kw):
logging.info('init jinja2...')
options = dict(
autoescape = kw.get('autoescape', True),
block_start_string = kw.get('block_start_string', '{%'),
block_end_string = kw.get('block_end_string', '%}'),
variable_start_string = kw.get('variable_start_string', '{{'),
variable_end_string = kw.get('variable_end_string', '}}'),
auto_reload = kw.get('auto_reload', True)
)
path = kw.get('path', None)
if path is None:
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
logging.info('set jinja2 template path: %s' % path)
env = Environment(loader=FileSystemLoader(path), **options)
filters = kw.get('filters', None)
if filters is not None:
for name, f in filters.items():
env.filters[name] = f
app['__templating__'] = env
## 以下是middleware,可以把通用的功能从每个URL处理函数中拿出来集中放到一个地方
## URL处理日志工厂
async def logger_factory(app, handler):
async def logger(request):
logging.info('Request: %s %s' % (request.method, request.path))
return (await handler(request))
return logger
## 认证处理工厂--把当前用户绑定到request上,并对URL/manage/进行拦截,检查当前用户是否是管理员身份
## 需要handlers.py的支持, 当handlers.py在API章节里完全编辑完再将下面代码的双井号去掉
##async def auth_factory(app, handler):
## async def auth(request):
## logging.info('check user: %s %s' % (request.method, request.path))
## request.__user__ = None
## cookie_str = request.cookies.get(COOKIE_NAME)
## if cookie_str:
## user = await cookie2user(cookie_str)
## if user:
## logging.info('set current user: %s' % user.email)
## request.__user__ = user
## if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin):
## return web.HTTPFound('/signin')
## return (await handler(request))
## return auth
## 数据处理工厂
async def data_factory(app, handler):
async def parse_data(request):
if request.method == 'POST':
if request.content_type.startswith('application/json'):
request.__data__ = await request.json()
logging.info('request json: %s' % str(request.__data__))
elif request.content_type.startswith('application/x-www-form-urlencoded'):
request.__data__ = await request.post()
logging.info('request form: %s' % str(request.__data__))
return (await handler(request))
return parse_data
## 响应返回处理工厂
async def response_factory(app, handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
## 在handlers.py完全完成后,去掉下一行的双井号
##r['__user__'] = request.__user__
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, int) and t >= 100 and t < 600:
return web.Response(t)
if isinstance(r, tuple) and len(r) == 2:
t, m = r
if isinstance(t, int) and t >= 100 and t < 600:
return web.Response(t, str(m))
# default:
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
## 时间转换
def datetime_filter(t):
delta = int(time.time() - t)
if delta < 60:
return u'1分钟前'
if delta < 3600:
return u'%s分钟前' % (delta // 60)
if delta < 86400:
return u'%s小时前' % (delta // 3600)
if delta < 604800:
return u'%s天前' % (delta // 86400)
dt = datetime.fromtimestamp(t)
return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)
async def init(loop):
await orm.create_pool(loop=loop, **configs.db)
## 在handlers.py完全完成后,在下面middlewares的list中加入auth_factory
app = web.Application(loop=loop, middlewares=[
logger_factory, response_factory
])
init_jinja2(app, filters=dict(datetime=datetime_filter))
add_routes(app, 'handlers')
add_static(app)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 9000)
logging.info('server started at http://127.0.0.1:9000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
以下是处理分页和API错误的代码apis.py, 请将之放置在www下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48import json, logging, inspect, functools
## 建立Page类来处理分页,可以在page_size更改每页项目的个数
class Page(object):
def __init__(self, item_count, page_index=1, page_size=8):
self.item_count = item_count
self.page_size = page_size
self.page_count = item_count // page_size + (1 if item_count % page_size > 0 else 0)
if (item_count == 0) or (page_index > self.page_count):
self.offset = 0
self.limit = 0
self.page_index = 1
else:
self.page_index = page_index
self.offset = self.page_size * (page_index - 1)
self.limit = self.page_size
self.has_next = self.page_index < self.page_count
self.has_previous = self.page_index > 1
def __str__(self):
return 'item_count: %s, page_count: %s, page_index: %s, page_size: %s, offset: %s, limit: %s' % (self.item_count, self.page_count, self.page_index, self.page_size, self.offset, self.limit)
__repr__ = __str__
## 以下为API的几类错误代码
class APIError(Exception):
def __init__(self, error, data='', message=''):
super(APIError, self).__init__(message)
self.error = error
self.data = data
self.message = message
class APIValueError(APIError):
def __init__(self, field, message=''):
super(APIValueError, self).__init__('value:invalid', field, message)
class APIResourceNotFoundError(APIError):
def __init__(self, field, message=''):
super(APIResourceNotFoundError, self).__init__('value:notfound', field, message)
class APIPermissionError(APIError):
def __init__(self, message=''):
super(APIPermissionError, self).__init__('permission:forbidden', 'permission', message)
if __name__=='__main__':
import doctest
doctest.testmod()
有了Web框架,接下来就可以添加需要的URL到handlers模块来处理了。
awesome-website-1
Python 网站开发(1) – 搭建开发环境
用pip安装网站开发所需要的第三方库:
- 异步框架的 aiohttp
- 前端模板引擎 jinja2
- 数据库 MySQL的Python异步驱动程序 aiomysql (需要先下载安装最新版的 MySQL, 选择免费的 MySQL Community Server 下载安装就好)
- 轻量级标记语言Markdown, 将文本转换为有效的HTML
$ pip3 install aiohttp jinja2 aiomysql markdown
构建项目结构
选择一个工作目录,建立如下的目录结构:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15awesome-website/ <-- 根目录
|
+- backup/ <-- 备份目录
|
+- conf/ <-- 配置文件
|
+- dist/ <-- 打包目录
|
+- www/ <-- Web目录,存放.py文件
| |
| +- static/ <-- 存放静态文件
| |
| +- templates/ <-- 存放模板文件
|
+- LICENSE <-- 代码LICENSE
创建好项目的目录结构后,建议同时建立git仓库并同步到GitHub, 保证代码储存及修改的安全。
Python 网站开发(2) – 编写网站骨架
编写网站骨架
为了搭建一个高效的网站,网站的IO处理要建立在asyncio(异步io)的基础上, 我们可以用 aiohttp 写一个基本的服务器应用 app.py 存放在www目录:
1 | import logging; logging.basicConfig(level=logging.INFO) |
在www目录下运行这个 app.py, 服务器将在9000端口持续监听 http 请求,并异步对首页 / 进行响应:1
2
3$ python3 app.py
======== Running on http://127.0.0.1:9000 ========
(Press CTRL+C to quit)
打开浏览器输入地址 http://127.0.0.1:9000 进行测试,如果返回我们设定好的Awesome Website字符串,就说明我们网站服务器应用的框架已经搭好了。
Python 网站开发(3) – 编写ORM
编写ORM*
对象关系映射 (Object Relational Mapping, 简称ORM) 模式是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术。换句话说,ORM是通过使用描述对象和数据库之间映射的元数据,将程序中的对象自动持久化到关系数据库中。
在一个网站中,所有的数据(包括用户,日志,评论等)都存储在数据库中。我们的网站awesome-website选择用MySQL作为数据库。访问数据库需要创建数据库连接,游标对象,执行SQL语句,处理异常,清理资源。这些访问数据库的代码如果分散到每个函数中去,十分难以维护,效率低下不利于复用。因此,我们将常用的MySQL数据库操作用函数封装起来,便于网站调用。
由于我们的网站基于异步编程,系统的每一层都必须是异步。aiomysql为MySQL数据库提供了异步IO的驱动。
创建连接池
我们需要创建一个全局的连接池,每个HTTP请求都可以从连接池中直接获取数据库连接。使用连接池的好处是不必频繁地打开和关闭数据库连接,而是能复用就尽量复用。
连接池由全局变量__pool存储,缺省情况下将编码设置为utf8,自动提交事务。在www目录下新建 orm.py加入以下代码:
1 | import asyncio, logging, aiomysql |
Select
要执行SELECT语句,我们用select函数执行,需要传入SQL语句和SQL参数。缀加以下代码至orm.py:
1 | async def select(sql, args, size=None): |
SQL语句的占位符是?,而MySQL的占位符是%s,select()函数在内部自动替换。注意要始终坚持使用带参数的SQL,而不是自己拼接SQL字符串,这样可以防止SQL注入攻击。
Insert, Update, Delete
要执行INSERT、UPDATE、DELETE语句,可以定义一个通用的execute()函数,因为这3种SQL的执行都需要相同的参数,以及返回一个整数表示影响的行数。缀加以下代码至orm.py:
1 | async def execute(sql, args): |
execute()函数和select()函数所不同的是,cursor对象不返回结果集,而是通过rowcount返回结果数。
ORM
首先要定义的是所有ORM映射的基类Model。缀加以下代码至orm.py:
1 | class Model(dict, metaclass=ModelMetaclass): |
以及Field和各种Field子类。缀加以下代码至orm.py:
1 | class Field(object): |
注意到Model只是一个基类,要将具体的子类如User的映射信息读取出来需要通过metaclass:ModelMetaclass。 这样,任何继承自Model的类(比如User),会自动通过ModelMetaclass扫描映射关系,并存储到自身的类属性如table、mappings中。缀加以下代码至Model代码之前:
1 | def create_args_string(num): |
然后,我们往Model类添加class方法,就可以让所有子类调用class方法:
1 | class Model(dict): |
往Model类添加实例方法,就可以让所有子类调用实例方法:
1 | class Model(dict): |
Python 网站开发(4) – 编写Model
编写Model
orm.py编写完成后,就可以把网站应用需要的三个表(user, blog, comment)用Model表示出来。在www目录下,新建models.py:
1 | import time, uuid |
初始化数据库表
由于网站表的数量较少,可以手动创建SQL脚本schema.sql到根目录:
1 | -- schema.sql |
把SQL脚本 schema.sql放到MySQL命令行里执行,就完成了数据库表的初始化:
$ mysql -u root -p < schema.sql
然后我们可以编写数据访问代码test.py测试一下。如新建一个User的对象:1
2
3
4
5
6
7
8
9
10
11
12import orm
import asyncio
from models import User, Blog, Comment
async def test(loop):
await orm.create_pool(loop=loop, user='root', password='root', db='awesome')
u = User(name='Test', email='test@qq.com', passwd='1234567890', image='about:blank')
await u.save()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test(loop))
loop.close()
运行test.py后,可以在MySQL客户端命令行查询,看看测试的数据是不是正常储存到MySQL里面。
爬取中大招生网2016~2018年专业分数表
根据网站具体的css选择器要做修改,还有爬取标题时候中英括号有的不一样。
1 | # -*- coding: utf-8 -*- |
1 | # -*- coding: utf-8 -*- |
1 | # -*- coding: utf-8 -*- |
爬取豆瓣电源 并把名字和年份记录在csv文件上
1. 爬取豆瓣电源 并把名字和年份记录在csv文件上:
这里用到的是比较新的requests_html的HTMLSession
官方文档: https://cncert.github.io/requests-html-doc-cn/#/?id=%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%951
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19# -*- coding: utf-8 -*-
from requests_html import HTMLSession
import csv
session = HTMLSession()
file = open('movies.csv', 'w', newline='')
csvwriter = csv.writer(file)
csvwriter.writerow(['名称', '年份'])
links = ['https://movie.douban.com/subject/1292052/', 'https://movie.douban.com/subject/26752088/', 'https://movie.douban.com/subject/1962665/']
for link in links:
r = session.get(link)
title = r.html.find('#content > h1 > span:nth-child(1)', first=True)
year = r.html.find('#content > h1 > span.year', first=True)
csvwriter.writerow([title.text, year.text])
file.close()
- 爬取北京地区的爬虫工程师薪资数据
1 | # -*- coding: utf-8 -*- |
报错:
RuntimeError: Cannot use HTMLSession within an existing event loop. Use AsyncHTMLSession instead.
详见:
https://github.com/kennethreitz/requests-html/issues 未解决,连创始人都没回,好像跟异步有关,之后有时间啃了那个库的文档再说(别用这个库了(谷歌很难找,还是找对应github开源库的issue)
scrapy爬虫框架入门
Scrapy Engine(引擎): 负责Spider、ItemPipeline、Downloader、Scheduler中间的通讯,信号、数据传递等。
Scheduler(调度器): 它负责接受引擎发送过来的Request请求,并按照一定的方式进行整理排列,入队,当引擎需要时,交还给引擎。
Downloader(下载器):负责下载Scrapy Engine(引擎)发送的所有Requests请求,并将其获取到的Responses交还给Scrapy Engine(引擎),由引擎交给Spider来处理,
Spider(爬虫):它负责处理所有Responses,从中分析提取数据,获取Item字段需要的数据,并将需要跟进的URL提交给引擎,再次进入Scheduler(调度器),
Item Pipeline(管道):它负责处理Spider中获取到的Item,并进行进行后期处理(详细分析、过滤、存储等)的地方.
Downloader Middlewares(下载中间件):你可以当作是一个可以自定义扩展下载功能的组件。
Spider Middlewares(Spider中间件):你可以理解为是一个可以自定扩展和操作引擎和Spider中间通信的功能组件(比如进入Spider的Responses;和从Spider出去的Requests)
- 创建一个Scrapy项目
- 定义提取的Item
- 编写爬取网站的 spider 并提取 Item
- 编写 Item Pipeline 来存储提取到的Item(即数据)