awesome-website-2

Python 网站开发(5) – 搭建Web框架

搭建Web框架

由于aiohttp作为一个Web框架比较底层,我们还需要基于aiohttp编写一个更方便处理URL的Web框架。

在www目录新建coroweb.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import asyncio, os, inspect, logging, functools

from urllib import parse

from aiohttp import web

## apis是处理分页的模块,代码在本章页面末尾,请将apis.py放在www下以防报错
## APIError 是指API调用时发生逻辑错误
from apis import APIError

## 编写装饰函数 @get()
def get(path):
## Define decorator @get('/path')
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kw):
return func(*args, **kw)
wrapper.__method__ = 'GET'
wrapper.__route__ = path
return wrapper
return decorator

## 编写装饰函数 @post()
def post(path):
## Define decorator @post('/path')
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kw):
return func(*args, **kw)
wrapper.__method__ = 'POST'
wrapper.__route__ = path
return wrapper
return decorator

## 以下是RequestHandler需要定义的一些函数
def get_required_kw_args(fn):
args = []
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty:
args.append(name)
return tuple(args)

def get_named_kw_args(fn):
args = []
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
args.append(name)
return tuple(args)

def has_named_kw_args(fn):
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
return True

def has_var_kw_arg(fn):
params = inspect.signature(fn).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.VAR_KEYWORD:
return True

def has_request_arg(fn):
sig = inspect.signature(fn)
params = sig.parameters
found = False
for name, param in params.items():
if name == 'request':
found = True
continue
if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD):
raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig)))
return found

## 定义RequestHandler从URL函数中分析其需要接受的参数
class RequestHandler(object):

def __init__(self, app, fn):
self._app = app
self._func = fn
self._has_request_arg = has_request_arg(fn)
self._has_var_kw_arg = has_var_kw_arg(fn)
self._has_named_kw_args = has_named_kw_args(fn)
self._named_kw_args = get_named_kw_args(fn)
self._required_kw_args = get_required_kw_args(fn)

async def __call__(self, request):
kw = None
if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args:
if request.method == 'POST':
if not request.content_type:
return web.HTTPBadRequest('Missing Content-Type.')
ct = request.content_type.lower()
if ct.startswith('application/json'):
params = await request.json()
if not isinstance(params, dict):
return web.HTTPBadRequest('JSON body must be object.')
kw = params
elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):
params = await request.post()
kw = dict(**params)
else:
return web.HTTPBadRequest('Unsupported Content-Type: %s' % request.content_type)
if request.method == 'GET':
qs = request.query_string
if qs:
kw = dict()
for k, v in parse.parse_qs(qs, True).items():
kw[k] = v[0]
if kw is None:
kw = dict(**request.match_info)
else:
if not self._has_var_kw_arg and self._named_kw_args:
# remove all unamed kw:
copy = dict()
for name in self._named_kw_args:
if name in kw:
copy[name] = kw[name]
kw = copy
# check named arg:
for k, v in request.match_info.items():
if k in kw:
logging.warning('Duplicate arg name in named arg and kw args: %s' % k)
kw[k] = v
if self._has_request_arg:
kw['request'] = request
# check required kw:
if self._required_kw_args:
for name in self._required_kw_args:
if not name in kw:
return web.HTTPBadRequest('Missing argument: %s' % name)
logging.info('call with args: %s' % str(kw))
try:
r = await self._func(**kw)
return r
except APIError as e:
return dict(error=e.error, data=e.data, message=e.message)
## 定义add_static函数,来注册static文件夹下的文件
def add_static(app):
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
app.router.add_static('/static/', path)
logging.info('add static %s => %s' % ('/static/', path))

## 定义add_route函数,来注册一个URL处理函数
def add_route(app, fn):
method = getattr(fn, '__method__', None)
path = getattr(fn, '__route__', None)
if path is None or method is None:
raise ValueError('@get or @post not defined in %s.' % str(fn))
if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
fn = asyncio.coroutine(fn)
logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))
app.router.add_route(method, path, RequestHandler(app, fn))

## 定义add_routes函数,自动把handler模块的所有符合条件的URL函数注册了
def add_routes(app, module_name):
n = module_name.rfind('.')
if n == (-1):
mod = __import__(module_name, globals(), locals())
else:
name = module_name[n+1:]
mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name)
for attr in dir(mod):
if attr.startswith('_'):
continue
fn = getattr(mod, attr)
if callable(fn):
method = getattr(fn, '__method__', None)
path = getattr(fn, '__route__', None)
if method and path:
add_route(app, fn)

最后,在app.py中加入middleware、jinja2模板和自注册的支持。app.py代码修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import logging; logging.basicConfig(level=logging.INFO)
import asyncio, os, json, time
from datetime import datetime
from aiohttp import web
from jinja2 import Environment, FileSystemLoader

## config 配置代码在后面会创建添加, 可先从'https://github.com/yzyly1992/2019_Python_Web_Dev'下载或下一章中复制`config.py`和`config_default.py`到`www`下,以防报错
from config import configs

import orm
from coroweb import add_routes, add_static

## handlers 是url处理模块, 当handlers.py在API章节里完全编辑完再将下一行代码的双井号去掉
## from handlers import cookie2user, COOKIE_NAME

## 初始化jinja2的函数
def init_jinja2(app, **kw):
logging.info('init jinja2...')
options = dict(
autoescape = kw.get('autoescape', True),
block_start_string = kw.get('block_start_string', '{%'),
block_end_string = kw.get('block_end_string', '%}'),
variable_start_string = kw.get('variable_start_string', '{{'),
variable_end_string = kw.get('variable_end_string', '}}'),
auto_reload = kw.get('auto_reload', True)
)
path = kw.get('path', None)
if path is None:
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
logging.info('set jinja2 template path: %s' % path)
env = Environment(loader=FileSystemLoader(path), **options)
filters = kw.get('filters', None)
if filters is not None:
for name, f in filters.items():
env.filters[name] = f
app['__templating__'] = env

## 以下是middleware,可以把通用的功能从每个URL处理函数中拿出来集中放到一个地方
## URL处理日志工厂
async def logger_factory(app, handler):
async def logger(request):
logging.info('Request: %s %s' % (request.method, request.path))
return (await handler(request))
return logger

## 认证处理工厂--把当前用户绑定到request上,并对URL/manage/进行拦截,检查当前用户是否是管理员身份
## 需要handlers.py的支持, 当handlers.py在API章节里完全编辑完再将下面代码的双井号去掉
##async def auth_factory(app, handler):
## async def auth(request):
## logging.info('check user: %s %s' % (request.method, request.path))
## request.__user__ = None
## cookie_str = request.cookies.get(COOKIE_NAME)
## if cookie_str:
## user = await cookie2user(cookie_str)
## if user:
## logging.info('set current user: %s' % user.email)
## request.__user__ = user
## if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin):
## return web.HTTPFound('/signin')
## return (await handler(request))
## return auth

## 数据处理工厂
async def data_factory(app, handler):
async def parse_data(request):
if request.method == 'POST':
if request.content_type.startswith('application/json'):
request.__data__ = await request.json()
logging.info('request json: %s' % str(request.__data__))
elif request.content_type.startswith('application/x-www-form-urlencoded'):
request.__data__ = await request.post()
logging.info('request form: %s' % str(request.__data__))
return (await handler(request))
return parse_data

## 响应返回处理工厂
async def response_factory(app, handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
## 在handlers.py完全完成后,去掉下一行的双井号
##r['__user__'] = request.__user__
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, int) and t >= 100 and t < 600:
return web.Response(t)
if isinstance(r, tuple) and len(r) == 2:
t, m = r
if isinstance(t, int) and t >= 100 and t < 600:
return web.Response(t, str(m))
# default:
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response

## 时间转换
def datetime_filter(t):
delta = int(time.time() - t)
if delta < 60:
return u'1分钟前'
if delta < 3600:
return u'%s分钟前' % (delta // 60)
if delta < 86400:
return u'%s小时前' % (delta // 3600)
if delta < 604800:
return u'%s天前' % (delta // 86400)
dt = datetime.fromtimestamp(t)
return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)

async def init(loop):
await orm.create_pool(loop=loop, **configs.db)
## 在handlers.py完全完成后,在下面middlewares的list中加入auth_factory
app = web.Application(loop=loop, middlewares=[
logger_factory, response_factory
])
init_jinja2(app, filters=dict(datetime=datetime_filter))
add_routes(app, 'handlers')
add_static(app)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 9000)
logging.info('server started at http://127.0.0.1:9000...')
return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

以下是处理分页和API错误的代码apis.py, 请将之放置在www下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import json, logging, inspect, functools

## 建立Page类来处理分页,可以在page_size更改每页项目的个数
class Page(object):

def __init__(self, item_count, page_index=1, page_size=8):
self.item_count = item_count
self.page_size = page_size
self.page_count = item_count // page_size + (1 if item_count % page_size > 0 else 0)
if (item_count == 0) or (page_index > self.page_count):
self.offset = 0
self.limit = 0
self.page_index = 1
else:
self.page_index = page_index
self.offset = self.page_size * (page_index - 1)
self.limit = self.page_size
self.has_next = self.page_index < self.page_count
self.has_previous = self.page_index > 1

def __str__(self):
return 'item_count: %s, page_count: %s, page_index: %s, page_size: %s, offset: %s, limit: %s' % (self.item_count, self.page_count, self.page_index, self.page_size, self.offset, self.limit)

__repr__ = __str__

## 以下为API的几类错误代码
class APIError(Exception):
def __init__(self, error, data='', message=''):
super(APIError, self).__init__(message)
self.error = error
self.data = data
self.message = message

class APIValueError(APIError):
def __init__(self, field, message=''):
super(APIValueError, self).__init__('value:invalid', field, message)

class APIResourceNotFoundError(APIError):
def __init__(self, field, message=''):
super(APIResourceNotFoundError, self).__init__('value:notfound', field, message)

class APIPermissionError(APIError):
def __init__(self, message=''):
super(APIPermissionError, self).__init__('permission:forbidden', 'permission', message)

if __name__=='__main__':
import doctest
doctest.testmod()

有了Web框架,接下来就可以添加需要的URL到handlers模块来处理了。

Donate? comment?