awesome-website-1

Python 网站开发(1) – 搭建开发环境

用pip安装网站开发所需要的第三方库:

  • 异步框架的 aiohttp
  • 前端模板引擎 jinja2
  • 数据库 MySQL的Python异步驱动程序 aiomysql (需要先下载安装最新版的 MySQL, 选择免费的 MySQL Community Server 下载安装就好)
  • 轻量级标记语言Markdown, 将文本转换为有效的HTML
    $ pip3 install aiohttp jinja2 aiomysql markdown

构建项目结构

选择一个工作目录,建立如下的目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
awesome-website/         <-- 根目录
|
+- backup/ <-- 备份目录
|
+- conf/ <-- 配置文件
|
+- dist/ <-- 打包目录
|
+- www/ <-- Web目录,存放.py文件
| |
| +- static/ <-- 存放静态文件
| |
| +- templates/ <-- 存放模板文件
|
+- LICENSE <-- 代码LICENSE

创建好项目的目录结构后,建议同时建立git仓库并同步到GitHub, 保证代码储存及修改的安全。

Python 网站开发(2) – 编写网站骨架

编写网站骨架

为了搭建一个高效的网站,网站的IO处理要建立在asyncio(异步io)的基础上, 我们可以用 aiohttp 写一个基本的服务器应用 app.py 存放在www目录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import logging; logging.basicConfig(level=logging.INFO)
import asyncio
from aiohttp import web

## 定义服务器响应请求的的返回为 "Awesome Website"
async def index(request):
return web.Response(body=b'<h1>Awesome Website</h1>', content_type='text/html')

## 建立服务器应用,持续监听本地9000端口的http请求,对首页"/"进行响应
def init():
app = web.Application()
app.router.add_get('/', index)
web.run_app(app, host='127.0.0.1', port=9000)

if __name__ == "__main__":
init()

在www目录下运行这个 app.py, 服务器将在9000端口持续监听 http 请求,并异步对首页 / 进行响应:

1
2
3
$ python3 app.py
======== Running on http://127.0.0.1:9000 ========
(Press CTRL+C to quit)

打开浏览器输入地址 http://127.0.0.1:9000 进行测试,如果返回我们设定好的Awesome Website字符串,就说明我们网站服务器应用的框架已经搭好了。

Python 网站开发(3) – 编写ORM

编写ORM*

对象关系映射 (Object Relational Mapping, 简称ORM) 模式是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术。换句话说,ORM是通过使用描述对象和数据库之间映射的元数据,将程序中的对象自动持久化到关系数据库中。

在一个网站中,所有的数据(包括用户,日志,评论等)都存储在数据库中。我们的网站awesome-website选择用MySQL作为数据库。访问数据库需要创建数据库连接,游标对象,执行SQL语句,处理异常,清理资源。这些访问数据库的代码如果分散到每个函数中去,十分难以维护,效率低下不利于复用。因此,我们将常用的MySQL数据库操作用函数封装起来,便于网站调用。

由于我们的网站基于异步编程,系统的每一层都必须是异步。aiomysql为MySQL数据库提供了异步IO的驱动。

创建连接池

我们需要创建一个全局的连接池,每个HTTP请求都可以从连接池中直接获取数据库连接。使用连接池的好处是不必频繁地打开和关闭数据库连接,而是能复用就尽量复用。

连接池由全局变量__pool存储,缺省情况下将编码设置为utf8,自动提交事务。在www目录下新建 orm.py加入以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio, logging, aiomysql

def log(sql, args=()):
logging.info('SQL: %s' % sql)

async def create_pool(loop, **kw):
logging.info('create database connection pool...')
global __pool
__pool = await aiomysql.create_pool(
host=kw.get('host', 'localhost'),
port=kw.get('port', 3306),
user=kw['user'],
password=kw['password'],
db=kw['db'],
charset=kw.get('charset', 'utf8'),
autocommit=kw.get('autocommit', True),
maxsize=kw.get('maxsize', 10),
minsize=kw.get('minsize', 1),
loop=loop
)

Select

要执行SELECT语句,我们用select函数执行,需要传入SQL语句和SQL参数。缀加以下代码至orm.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
async def select(sql, args, size=None):
log(sql, args)
global __pool
with (await __pool) as conn:
cur = await conn.cursor(aiomysql.DictCursor)
await cur.execute(sql.replace('?', '%s'), args or ())
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
await cur.close()
logging.info('rows returned: %s' % len(rs))
return rs

SQL语句的占位符是?,而MySQL的占位符是%s,select()函数在内部自动替换。注意要始终坚持使用带参数的SQL,而不是自己拼接SQL字符串,这样可以防止SQL注入攻击。

Insert, Update, Delete

要执行INSERT、UPDATE、DELETE语句,可以定义一个通用的execute()函数,因为这3种SQL的执行都需要相同的参数,以及返回一个整数表示影响的行数。缀加以下代码至orm.py:

1
2
3
4
5
6
7
8
9
10
11
async def execute(sql, args):
log(sql)
with (await __pool) as conn:
try:
cur = await conn.cursor()
await cur.execute(sql.replace('?', '%s'), args)
affected = cur.rowcount
await cur.close()
except BaseException as e:
raise
return affected

execute()函数和select()函数所不同的是,cursor对象不返回结果集,而是通过rowcount返回结果数。

ORM

首先要定义的是所有ORM映射的基类Model。缀加以下代码至orm.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Model(dict, metaclass=ModelMetaclass):

def __init__(self, **kw):
super(Model, self).__init__(**kw)

def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(r"'Model' object has no attribute '%s'" % key)

def __setattr__(self, key, value):
self[key] = value

def getValue(self, key):
return getattr(self, key, None)

def getValueOrDefault(self, key):
value = getattr(self, key, None)
if value is None:
field = self.__mappings__[key]
if field.default is not None:
value = field.default() if callable(field.default) else field.default
logging.debug('using default value for %s: %s' % (key, str(value)))
setattr(self, key, value)
return value

以及Field和各种Field子类。缀加以下代码至orm.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Field(object):

def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default

def __str__(self):
return '<%s, %s:%s>' % (self.__class__.__name__, self.column_type, self.name)

class StringField(Field):

def __init__(self, name=None, primary_key=False, default=None, ddl='varchar(100)'):
super().__init__(name, ddl, primary_key, default)

class BooleanField(Field):

def __init__(self, name=None, default=False):
super().__init__(name, 'boolean', False, default)

class IntegerField(Field):

def __init__(self, name=None, primary_key=False, default=0):
super().__init__(name, 'bigint', primary_key, default)

class FloatField(Field):

def __init__(self, name=None, primary_key=False, default=0.0):
super().__init__(name, 'real', primary_key, default)

class TextField(Field):

def __init__(self, name=None, default=None):
super().__init__(name, 'text', False, default)

注意到Model只是一个基类,要将具体的子类如User的映射信息读取出来需要通过metaclass:ModelMetaclass。 这样,任何继承自Model的类(比如User),会自动通过ModelMetaclass扫描映射关系,并存储到自身的类属性如tablemappings中。缀加以下代码至Model代码之前:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def create_args_string(num):
L = []
for n in range(num):
L.append('?')
return ', '.join(L)

class ModelMetaclass(type):

def __new__(cls, name, bases, attrs):
# 排除Model类本身:
if name=='Model':
return type.__new__(cls, name, bases, attrs)
# 获取table名称:
tableName = attrs.get('__table__', None) or name
logging.info('found model: %s (table: %s)' % (name, tableName))
# 获取所有的Field和主键名:
mappings = dict()
fields = []
primaryKey = None
for k, v in attrs.items():
if isinstance(v, Field):
logging.info(' found mapping: %s ==> %s' % (k, v))
mappings[k] = v
if v.primary_key:
# 找到主键:
if primaryKey:
raise RuntimeError('Duplicate primary key for field: %s' % k)
primaryKey = k
else:
fields.append(k)
if not primaryKey:
raise RuntimeError('Primary key not found.')
for k in mappings.keys():
attrs.pop(k)
escaped_fields = list(map(lambda f: '`%s`' % f, fields))
attrs['__mappings__'] = mappings # 保存属性和列的映射关系
attrs['__table__'] = tableName
attrs['__primary_key__'] = primaryKey # 主键属性名
attrs['__fields__'] = fields # 除主键外的属性名
# 构造默认的SELECT, INSERT, UPDATE和DELETE语句:
attrs['__select__'] = 'select `%s`, %s from `%s`' % (primaryKey, ', '.join(escaped_fields), tableName)
attrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ', '.join(escaped_fields), primaryKey, create_args_string(len(escaped_fields) + 1))
attrs['__update__'] = 'update `%s` set %s where `%s`=?' % (tableName, ', '.join(map(lambda f: '`%s`=?' % (mappings.get(f).name or f), fields)), primaryKey)
attrs['__delete__'] = 'delete from `%s` where `%s`=?' % (tableName, primaryKey)
return type.__new__(cls, name, bases, attrs)

然后,我们往Model类添加class方法,就可以让所有子类调用class方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Model(dict):

...

@classmethod
async def findAll(cls, where=None, args=None, **kw):
## find objects by where clause
sql = [cls.__select__]
if where:
sql.append('where')
sql.append(where)
if args is None:
args = []
orderBy = kw.get('orderBy', None)
if orderBy:
sql.append('order by')
sql.append(orderBy)
limit = kw.get('limit', None)
if limit is not None:
sql.append('limit')
if isinstance(limit, int):
sql.append('?')
args.append(limit)
elif isinstance(limit, tuple) and len(limit) == 2:
sql.append('?, ?')
args.extend(limit)
else:
raise ValueError('Invalid limit value: %s' % str(limit))
rs = await select(' '.join(sql), args)
return [cls(**r) for r in rs]

@classmethod
async def findNumber(cls, selectField, where=None, args=None):
## find number by select and where
sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)]
if where:
sql.append('where')
sql.append(where)
rs = await select(' '.join(sql), args, 1)
if len(rs) == 0:
return None
return rs[0]['_num_']

@classmethod
async def find(cls, pk):
## find object by primary key
rs = await select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)
if len(rs) == 0:
return None
return cls(**rs[0])

往Model类添加实例方法,就可以让所有子类调用实例方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Model(dict):

...

async def save(self):
args = list(map(self.getValueOrDefault, self.__fields__))
args.append(self.getValueOrDefault(self.__primary_key__))
rows = await execute(self.__insert__, args)
if rows != 1:
logging.warn('failed to insert record: affected rows: %s' % rows)

async def update(self):
args = list(map(self.getValue, self.__fields__))
args.append(self.getValue(self.__primary_key__))
rows = await execute(self.__update__, args)
if rows != 1:
logging.warn('failed to update by primary key: affected rows: %s' % rows)

async def remove(self):
args = [self.getValue(self.__primary_key__)]
rows = await execute(self.__delete__, args)
if rows != 1:
logging.warn('failed to remove by primary key: affected rows: %s' % rows)

Python 网站开发(4) – 编写Model

编写Model

orm.py编写完成后,就可以把网站应用需要的三个表(user, blog, comment)用Model表示出来。在www目录下,新建models.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import time, uuid

from orm import Model, StringField, BooleanField, FloatField, TextField

def next_id():
return '%015d%s000' % (int(time.time() * 1000), uuid.uuid4().hex)

class User(Model):
__table__ = 'users'

id = StringField(primary_key=True, default=next_id, ddl='varchar(50)')
email = StringField(ddl='varchar(50)')
passwd = StringField(ddl='varchar(50)')
admin = BooleanField()
name = StringField(ddl='varchar(50)')
image = StringField(ddl='varchar(500)')
created_at = FloatField(default=time.time)

class Blog(Model):
__table__ = 'blogs'

id = StringField(primary_key=True, default=next_id, ddl='varchar(50)')
user_id = StringField(ddl='varchar(50)')
user_name = StringField(ddl='varchar(50)')
user_image = StringField(ddl='varchar(500)')
name = StringField(ddl='varchar(50)')
summary = StringField(ddl='varchar(200)')
content = TextField()
created_at = FloatField(default=time.time)

class Comment(Model):
__table__ = 'comments'

id = StringField(primary_key=True, default=next_id, ddl='varchar(50)')
blog_id = StringField(ddl='varchar(50)')
user_id = StringField(ddl='varchar(50)')
user_name = StringField(ddl='varchar(50)')
user_image = StringField(ddl='varchar(500)')
content = TextField()
created_at = FloatField(default=time.time)

初始化数据库表

由于网站表的数量较少,可以手动创建SQL脚本schema.sql到根目录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
-- schema.sql

drop database if exists awesome;

create database awesome;

use awesome;

create user 'www-data'@'localhost' identified by 'www-data';
alter user 'www-data'@'localhost' identified with mysql_native_password by 'www-data';
grant select, insert, update, delete on awesome.* to 'www-data'@'localhost';

create table users (
`id` varchar(50) not null,
`email` varchar(50) not null,
`passwd` varchar(50) not null,
`admin` bool not null,
`name` varchar(50) not null,
`image` varchar(500) not null,
`created_at` real not null,
unique key `idx_email` (`email`),
key `idx_created_at` (`created_at`),
primary key (`id`)
) engine=innodb default charset=utf8;

create table blogs (
`id` varchar(50) not null,
`user_id` varchar(50) not null,
`user_name` varchar(50) not null,
`user_image` varchar(500) not null,
`name` varchar(50) not null,
`summary` varchar(200) not null,
`content` mediumtext not null,
`created_at` real not null,
key `idx_created_at` (`created_at`),
primary key (`id`)
) engine=innodb default charset=utf8;

create table comments (
`id` varchar(50) not null,
`blog_id` varchar(50) not null,
`user_id` varchar(50) not null,
`user_name` varchar(50) not null,
`user_image` varchar(500) not null,
`content` mediumtext not null,
`created_at` real not null,
key `idx_created_at` (`created_at`),
primary key (`id`)
) engine=innodb default charset=utf8;

把SQL脚本 schema.sql放到MySQL命令行里执行,就完成了数据库表的初始化:

$ mysql -u root -p < schema.sql
然后我们可以编写数据访问代码test.py测试一下。如新建一个User的对象:

1
2
3
4
5
6
7
8
9
10
11
12
import orm
import asyncio
from models import User, Blog, Comment

async def test(loop):
await orm.create_pool(loop=loop, user='root', password='root', db='awesome')
u = User(name='Test', email='test@qq.com', passwd='1234567890', image='about:blank')
await u.save()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test(loop))
loop.close()

运行test.py后,可以在MySQL客户端命令行查询,看看测试的数据是不是正常储存到MySQL里面。

Donate? comment?