Flask阶段

短小精悍并上下文管理,可扩展强的一个web框架。 install Flask module pip3 install Flask,我们做web开发时,是站在两大东西之上做的web框架和wsgi,Flask和Django中都是应用的并不是自己写的。Flask中werkzurg就是。 web服务网关接口,wsgi是一个协议,实现该写一个的模块:wsgiref|werkzeug,实现其协议的模块本质上就是socket服务端用于接收用户请求,并处理。一般web框架基于wsgi实现,这样实现关注点分离。 flask就是基于Werkzurg搭建起来的 from werkzeug.wrappers import Request, Response

from werkzeug.serving import run_simple

@Request.application

def run(environ, start_response):

return Response("Hello World!") # 返回字符串

if __name__=="__main__":

run_simple("localhost", 4000, run) # 监听本机的4000端口,如果访问就执行run函数

一个最简单的Flask from flask import Flask

app = Flask(__name__) # flask名称,一般这样写。

@app.route("/index") # url路径

def index(): #所调用的函数

return "Hello World!"

if __name__ == "__main__":

app.run() # 执行口

Flask实现用户登录程序 from flask import Flask, render_template, request, redirect, session

app = Flask(__name__, template_folder="templates", static_folder="static")

app.secret_key = "asdf" # session加盐

# request.form # 请求体

# request.args # 请求头

# request.method # 请求类型

@app.route("/login", methods=["GET", "POST"])

def login():

if request.method == "GET":

return render_template("login.html")

user = request.from.get("user")

pwd= request.from.get("pwd")

if user == "alex" and pwd == "666":

session["user"] = user

return redirect("/index")

return render_template("login.html", error="用户名或密码错误")

@app.route("/index")

def index():

user = session.get("user")

if not user:

return redirect("/login")

return render_template("index.html")

if __name__ == "__main__":

app.run()

1.配置文件

settings class Config(object):

DEBUG = False

TESTING = False

DATABASE_URL = "sqlite://:memory:"

class ProductionConfig(Config):

DATABASE_URL = "mysql://user@localhost/foo"

class DevelopmentConfig(Config):

DEBUT = True

class TestingConfig(Config):

TESTING = True

flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:

{

'DEBUG': get_debug_flag(default=False), 是否开启Debug模式

'TESTING': False, 是否开启测试模式

'PROPAGATE_EXCEPTIONS': None,

'PRESERVE_CONTEXT_ON_EXCEPTION': None,

'SECRET_KEY': None,

'PERMANENT_SESSION_LIFETIME': timedelta(days=31),

'USE_X_SENDFILE': False,

'LOGGER_NAME': None,

'LOGGER_HANDLER_POLICY': 'always',

'SERVER_NAME': None,

'APPLICATION_ROOT': None,

'SESSION_COOKIE_NAME': 'session',

'SESSION_COOKIE_DOMAIN': None,

'SESSION_COOKIE_PATH': None,

'SESSION_COOKIE_HTTPONLY': True,

'SESSION_COOKIE_SECURE': False,

'SESSION_REFRESH_EACH_REQUEST': True,

'MAX_CONTENT_LENGTH': None,

'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12),

'TRAP_BAD_REQUEST_ERRORS': False,

'TRAP_HTTP_EXCEPTIONS': False,

'EXPLAIN_TEMPLATE_LOADING': False,

'PREFERRED_URL_SCHEME': 'http',

'JSON_AS_ASCII': True,

'JSON_SORT_KEYS': True,

'JSONIFY_PRETTYPRINT_REGULAR': True,

'JSONIFY_MIMETYPE': 'application/json',

'TEMPLATES_AUTO_RELOAD': None,

}

xx # 如何通过代码找到指定的类

import importlib

path = "settings.Foo"

p, c = path.rsplit(".", maxsplit=1)

m = importlib.import_module(p)

cls = getattr(m, c)

# 找出类中的所有静态变量

for key in dir(cls):

if key.isupper():

print(key,getattr(cls, key))

Flask中有一个配置文件 from flask import Flask, render_template, request, redirect, session

app = Flask(__name__)

print(app.config) # 这是flask提供的配置文件

# app.config.["DEBUG"] = True # 我们可以通过这样来修改配置,但是flask也提供了另一种方式

app.config.from_object("sttings.DevelopmentConfig") # 这种方式就是自己创建一个文件和类,你在类中写的所有配置都会被应用。

if __name__ == "__main__":

app.run()

2.路由和视图

flask的路由系统 from flask import Flask, render_template, request, redirect, session, url_for

app = Flask(__name__)

# int 是值的类型,nid是名称:/login/1234

"""

# 这是常用的类型,不加类型默认是字符串类型

DEFAULT_CONVERTERS = {

'default': UnicodeConverter,

'string': UnicodeConverter,

'any': AnyConverter,

'path': PathConverter,

'int': IntegerConverter,

'float': FloatConverter,

'uuid': UUIDConverter,

}

"""

@app.route("/login/", endpoint="n1") # endpoint就是设置名字,不写默认是函数名

def login():

url_for("n1", nid=122) # 反向查找url,nid是携带的值

if __name__ == "__main__":

app.run()

路由的两种方式 @app.route("/xxx")

def index():

return "index"

# 其实到内部调用时就是使用了app.add_url_rule来实现的,为了方便所以在外面套了个装饰器。

def index():

return "index"

app.add_url_rule("/xxx", None, index)

# 注意事项:

# - 不用让endpoint重名

# - 如果重名函数也一定要相同。

app.route和app.add_url_rule参数 @app.route和app.add_url_rule参数:

rule, URL规则

view_func, 视图函数名称

defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数

endpoint=None, 名称,用于反向生成URL,即: url_for('名称')

methods=None, 允许的请求方式,如:["GET","POST"]

strict_slashes=None, 对URL最后的 / 符号是否严格要求,

如:

@app.route('/index',strict_slashes=False),

访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可

@app.route('/index',strict_slashes=True)

仅访问 http://www.xx.com/index

redirect_to=None, 重定向到指定地址

如:

@app.route('/index/', redirect_to='/home/')

def func(adapter, nid):

return "/home/888"

@app.route('/index/', redirect_to=func)

subdomain=None, 子域名访问

from flask import Flask, views, url_for

app = Flask(import_name=__name__)

app.config['SERVER_NAME'] = 'wupeiqi.com:5000'

@app.route("/", subdomain="admin")

def static_index():

"""Flask supports static subdomains

This is available at static.your-domain.tld"""

return "static.your-domain.tld"

@app.route("/dynamic", subdomain="")

def username_index(username):

"""Dynamic subdomains are also supported

Try going to user1.your-domain.tld/dynamic"""

return username + ".your-domain.tld"

if __name__ == '__main__':

app.run()

CBV import functools

from flask import Flask,views

app = Flask(__name__)

def wrapper(func):

@functools.wraps(func)

def inner(*args,**kwargs):

return func(*args,**kwargs)

return inner

class UserView(views.MethodView):

methods = ['GET']

decorators = [wrapper,]

def get(self,*args,**kwargs):

return 'GET'

def post(self,*args,**kwargs):

return 'POST'

app.add_url_rule('/user',None,UserView.as_view('uuuu'))

if __name__ == '__main__':

app.run()

自定义正则 from flask import Flask,url_for

app = Flask(__name__)

# 步骤一:定制类

from werkzeug.routing import BaseConverter

class RegexConverter(BaseConverter):

"""

自定义URL匹配正则表达式

"""

def __init__(self, map, regex):

super(RegexConverter, self).__init__(map)

self.regex = regex

def to_python(self, value):

"""

路由匹配时,匹配成功后传递给视图函数中参数的值

:param value:

:return:

"""

return int(value)

def to_url(self, value):

"""

使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数

:param value:

:return:

"""

val = super(RegexConverter, self).to_url(value)

return val

# 步骤二:添加到转换器

app.url_map.converters['reg'] = RegexConverter

"""

1. 用户发送请求

2. flask内部进行正则匹配

3. 调用to_python(正则匹配的结果)方法

4. to_python方法的返回值会交给视图函数的参数

"""

# 步骤三:使用自定义正则

@app.route('/index/')

def index(nid):

print(nid,type(nid))

print(url_for('index',nid=987))

return "index"

if __name__ == '__main__':

app.run()

3.请求和响应

请求 # request.method

# request.args

# request.form

# request.values

# request.cookies

# request.headers

# request.path

# request.full_path

# request.script_root

# request.url

# request.base_url

# request.url_root

# request.host_url

# request.host

# request.files

# obj = request.files['the_file_name']

# obj.save('/var/www/uploads/' + secure_filename(f.filename))

响应 from flask import Flask, render_template, request, redirect, session, url_for, jsonify

# 响应相关信息

# return "字符串"

# return render_template('html模板路径',**{})

# return redirect('/index.html')

# return jsonify({"k1":"v1"})

# response = make_response(render_template('index.html'))

# response是flask.wrappers.Response类型

# response.delete_cookie('key')

# response.set_cookie('key', 'value')

# response.headers['X-Something'] = 'A value'

# return response

实现登录后才可查看的效果 # 版本一:在每个函数中都加入判断

@app.route('/index')

def index():

if not session.get('user'):

return redirect(url_for('login'))

return render_template('index.html',stu_dic=STUDENT_DICT)

# 版本二:使用装饰器,但是需要用到functools的wraps,应为你写的装饰器肯定要先执行,不然无法被应用,但是你写的装饰器到最后被调用的函数是inner,但是我们需要自己的属性才可以实现。wraps的功能就是将inner赋予调用函数的所有属性和特征。

import functools

def auth(func):

@functools.wraps(func)

def inner(*args,**kwargs):

if not session.get('user'):

return redirect(url_for('login'))

ret = func(*args,**kwargs)

return ret

return inner

@app.route('/index')

@auth

def index():

return render_template('index.html',stu_dic=STUDENT_DICT)

# 应用场景:比较少的函数中需要额外添加功能。

# 版本三:before_request,在执行函数时会先执行before_request所定义的函数,返回None就会向后执行,如果返回数据就会直接将其返回到前端了。

@app.before_request

def xxxxxx():

if request.path == '/login':

return None

if session.get('user'):

return None

return redirect('/login')

4.模板渲染

基本数据类型:可以执行python语法,如:dict.get() list[‘xx’] 传入函数:django,自动执行\flask,不自动执行 全局定义函数 @app.template_global()

def sb(a1, a2):

# {{sb(1,9)}}

return a1 + a2

@app.template_filter()

def db(a1, a2, a3):

# {{ 1|db(2,3) }}

return a1 + a2 + a3

模板继承 layout.html

Title

模板

{% block content %}{% endblock %}

tpl.html

{% extends "layout.html"%}

{% block content %}

{{users.0}}

{% endblock %}

include {% include "form.html" %}

form.html

asdfasdf

asdfasdf

asdf

asdf

宏 {% macro ccccc(name, type='text', value='') %}

{% endmacro %}

{{ ccccc('n1') }}

{{ ccccc('n2') }}

flask默认时设置了xss的后端将html代码返回到前端时需要使用安全MarkUp("

asdf
"),前端也可以直接将其应用 {{u|safe}}。

5.session

当请求刚到来:flask读取cookie中session对应的值:eyJrMiI6NDU2LCJ1c2VyIjoib2xkYm95,将该值解密并反序列化成字典,放入内存以便视图函数使用。 @app.route('/ses')

def ses():

session['k1'] = 123

session['k2'] = 456

del session['k1']

return "Session"

session['xxx'] = 123

session['xxx']

当请求结束时,flask会读取内存中字典的值,进行序列化+加密,写入到用户cookie中。 threading.local【和flask无任何关系】

为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)。 import threading

from threading import local

import time

obj = local()

def task(i):

obj.xxxxx = i

time.sleep(2)

print(obj.xxxxx,i)

for i in range(10):

t = threading.Thread(target=task,args=(i,))

t.start()

根据字典自定义一个类似于threading.local功能? import time

import threading

DIC = {}

def task(i):

ident = threading.get_ident()

if ident in DIC:

DIC[ident]['xxxxx'] = i

else:

DIC[ident] = {'xxxxx':i }

time.sleep(2)

print(DIC[ident]['xxxxx'],i)

for i in range(10):

t = threading.Thread(target=task,args=(i,))

t.start()

通过getattr/setattr 构造出来 threading.local的加强版(协程) import time

import threading

try:

import greenlet

get_ident = greenlet.getcurrent # 获取协程唯一标识

except Exception as e:

get_ident = threading.get_ident # 获取线程唯一标识

class Local(object):

DIC = {}

def __getattr__(self, item): # obj.xxx时触发,item等于xxx

ident = get_ident()

if ident in self.DIC:

return self.DIC[ident].get(item)

return None

def __setattr__(self, key, value): # obj.xxx = "123"时触发,key等于xxx,value等于123

ident = get_ident()

if ident in self.DIC:

self.DIC[ident][key] = value

else:

self.DIC[ident] = {key:value}

obj = Local()

def task(i):

obj.xxxxx = i

time.sleep(2)

print(obj.xxxxx,i)

for i in range(10):

t = threading.Thread(target=task,args=(i,))

t.start()

5.1 flask-session

安装:pip3 install flask-session 使用 import redis

from flask import Flask,request,session

from flask.sessions import SecureCookieSessionInterface

from flask_session import Session

app = Flask(__name__)

# app.session_interface = SecureCookieSessionInterface()

# app.session_interface = RedisSessionInterface()

app.config['SESSION_TYPE'] = 'redis'

app.config['SESSION_REDIS'] = redis.Redis(host='140.143.227.206',port=6379,password='1234')

Session(app)

@app.route('/login')

def login():

session['user'] = 'alex'

return 'asdfasfd'

@app.route('/home')

def index():

print(session.get('user'))

return '...'

if __name__ == '__main__':

app.run()

6.闪现和中间件

闪现,在session中存储一个数据,读取时通过pop将数据移除。 from flask import Flask,flash,get_flashed_messages

@app.route('/page1')

def page1():

flash('临时数据存储','error')

flash('sdfsdf234234','error')

flash('adasdfasdf','info')

return "Session"

@app.route('/page2')

def page2():

print(get_flashed_messages(category_filter=['error']))

return "Session"

中间件 call方法什么时候出发?

用户发起请求时,才执行。

任务:在执行call方法之前,做一个操作,call方法执行之后做一个操作。

class Middleware(object):

def __init__(self,old):

self.old = old

def __call__(self, *args, **kwargs):

ret = self.old(*args, **kwargs)

return ret

if __name__ == '__main__':

app.wsgi_app = Middleware(app.wsgi_app)

app.run()

7.蓝图

完整目录图 ProjectName

-ProjectName

--static

--templates

--views

---account.py

from flask import Blueprint,render_template

# 创建蓝图对象

ac = Blueprint('ac',__name__)

# 只有调用该文件中的页面时才会触发

@ac.before_request

def x1():

print('app.before_request')

@ac.route('/login')

def login():

return render_template('login.html')

@ac.route('/logout')

def logout():

return 'Logout'

--__init__.py

from flask import Flask

from .views.account import ac

from .views.user import uc

def create_app():

app = Flask(__name__)

# @app.before_request

# def x1():

# print('app.before_request')

# 注册蓝图

app.register_blueprint(ac)

# 注册蓝图和编写url前缀

app.register_blueprint(uc,url_prefix='/api')

return app

-manage.py

from crm import create_app

app = create_app()

if __name__ == '__main__':

app.run()

8.特殊装饰器

before_request and after_request from flask import Flask

app = Flask(__name__)

@app.before_request

def x1():

print('before:x1')

return '滚'

@app.before_request

def xx1():

print('before:xx1')

@app.after_request

def x2(response):

print('after:x2')

return response

@app.after_request

def xx2(response):

print('after:xx2')

return response

@app.route('/index')

def index():

print('index')

return "Index"

@app.route('/order')

def order():

print('order')

return "order"

if __name__ == '__main__':

app.run()

# 执行结果是,before的返回会将其返回到页面,但是after还是会执行,并不像django返回后后面的都不会执行

before:x1

before:xx1

request

after:xx2

after:x2

before_first_request,只有第一次时才会被触发 from flask import Flask

app = Flask(__name__)

@app.before_first_request

def x1():

print('123123')

@app.route('/index')

def index():

print('index')

return "Index"

@app.route('/order')

def order():

print('order')

return "order"

if __name__ == '__main__':

app.run()

页面渲染的全局装饰器 @app.template_global()

def sb(a1, a2):

# {{sb(1,9)}}

return a1 + a2

@app.template_filter()

def db(a1, a2, a3):

# {{ 1|db(2,3) }}

return a1 + a2 + a3

errorhandler,当报指定的错误时会触发。 @app.errorhandler(404)

def not_found(arg):

print(arg)

return "没找到"

9.上下文管理

偏函数 import functools

def index(a1,a2):

return a1 + a2

# 原来的调用方式

# ret = index(1,23)

# print(ret)

# 偏函数,帮助开发者自动传递参数

new_func = functools.partial(index,666)

ret = new_func(1)

print(ret)

父类方法的执行 """

class Base(object):

def func(self):

print('Base.func')

class Foo(Base):

def func(self):

# 方式一:根据mro的顺序执行方法

# super(Foo,self).func()

# 方式二:主动执行Base类的方法

# Base.func(self)

print('Foo.func')

obj = Foo()

obj.func()

"""

####################################

class Base(object):

def func(self):

super(Base, self).func()

print('Base.func')

class Bar(object):

def func(self):

print('Bar.func')

class Foo(Base,Bar):

pass

# 示例一

# obj = Foo()

# obj.func()

# print(Foo.__mro__)

# 示例二

# obj = Base()

# obj.func()

栈 # by luffycity.com

class Stack(object):

def __init__(self):

self.data = []

def push(self,val):

self.data.append(val)

def pop(self):

return self.data.pop()

def top(self):

return self.data[-1]

_stack = Stack()

_stack.push('佳俊')

_stack.push('咸鱼')

print(_stack.pop())

print(_stack.pop())

Local """

{

1232:{k:v}

}

"""

try:

from greenlet import getcurrent as get_ident

except:

from threading import get_ident

"""

class Local(object):

def __init__(self):

object.__setattr__(self,'storage',{})

def __setattr__(self, key, value):

ident = get_ident()

if ident not in self.storage:

self.storage[ident] = {key:value}

else:

self.storage[ident][key] = value

def __getattr__(self, item):

ident = get_ident()

if ident in self.storage:

return self.storage[ident].get(item)

"""

class Local(object):

__slots__ = ('__storage__', '__ident_func__')

def __init__(self):

# __storage__ = {1231:{'stack':[]}}

object.__setattr__(self, '__storage__', {})

object.__setattr__(self, '__ident_func__', get_ident)

def __getattr__(self, name):

try:

return self.__storage__[self.__ident_func__()][name]

except KeyError:

raise AttributeError(name)

def __setattr__(self, name, value):

ident = self.__ident_func__()

storage = self.__storage__

try:

storage[ident][name] = value

except KeyError:

storage[ident] = {name: value}

def __delattr__(self, name):

try:

del self.__storage__[self.__ident_func__()][name]

except KeyError:

raise AttributeError(name)

obj = Local()

obj.stack = []

obj.stack.append('佳俊')

obj.stack.append('咸鱼')

print(obj.stack)

print(obj.stack.pop())

print(obj.stack)

LocalStack import functools

try:

from greenlet import getcurrent as get_ident

except:

from threading import get_ident

class Local(object):

__slots__ = ('__storage__', '__ident_func__')

def __init__(self):

# __storage__ = {1231:{'stack':[]}}

object.__setattr__(self, '__storage__', {})

object.__setattr__(self, '__ident_func__', get_ident)

def __getattr__(self, name):

try:

return self.__storage__[self.__ident_func__()][name]

except KeyError:

raise AttributeError(name)

def __setattr__(self, name, value):

# name=stack

# value=[]

ident = self.__ident_func__()

storage = self.__storage__

try:

storage[ident][name] = value

except KeyError:

storage[ident] = {name: value}

def __delattr__(self, name):

try:

del self.__storage__[self.__ident_func__()][name]

except KeyError:

raise AttributeError(name)

"""

__storage__ = {

12312: {stack:[ctx(session/request) ,]}

}

"""

# obj = Local()

# obj.stack = []

# obj.stack.append('佳俊')

# obj.stack.append('咸鱼')

# print(obj.stack)

# print(obj.stack.pop())

# print(obj.stack)

class LocalStack(object):

def __init__(self):

self._local = Local()

def push(self,value):

rv = getattr(self._local, 'stack', None) # self._local.stack =>local.getattr

if rv is None:

self._local.stack = rv = [] # self._local.stack =>local.setattr

rv.append(value) # self._local.stack.append(666)

return rv

def pop(self):

"""Removes the topmost item from the stack, will return the

old value or `None` if the stack was already empty.

"""

stack = getattr(self._local, 'stack', None)

if stack is None:

return None

elif len(stack) == 1:

return stack[-1]

else:

return stack.pop()

def top(self):

try:

return self._local.stack[-1]

except (AttributeError, IndexError):

return None

class RequestContext(object):

def __init__(self):

self.request = "xx"

self.session = 'oo'

_request_ctx_stack = LocalStack()

_request_ctx_stack.push(RequestContext())

def _lookup_req_object(arg):

ctx = _request_ctx_stack.top()

return getattr(ctx,arg) # ctx.request / ctx.session

request = functools.partial(_lookup_req_object,'request')

session = functools.partial(_lookup_req_object,'session')

print(request())

print(session())

slots class Foo(object):

__slots__ = ('name',)

def __init__(self):

self.name = 'alex'

# self.age = 18

obj = Foo()

print(obj.name)

# print(obj.age)

请求上下文管理(ctx):request,session - 请求到来之后wsgi会触发__call__方法,由__call__方法再次调用wsgi_app方法

- 在wsgi_app方法中:

- 首先将 请求相关+空session 封装到一个RequestContext对象中,即:ctx。

- 将ctx交给LocalStack对象,再由LocalStack将ctx添加到Local中,Local结构:

__storage__ = {

1231:{stack:[ctx,] }

}

- 根据请求中的cookie中提取名称为sessionid对应的值,对cookie进行加密+反序列化,再次赋值给ctx中的session

-> 视图函数

- 把session中的数据再次写入到cookie中。

- 将ctx删除

- 结果返回给用户浏览器

- 断开socket连接

LocalProxy from flask import Flask,request,session

app = Flask(__name__)

@app.route('/index')

def index():

# 1. request是LocalProxy对象

# 2. 对象中有method、执行__getattr__

print(request.method)

# request['method']

# request + 1

# 1. session是LocalProxy对象

# 2. LocalProxy对象的__setitem__

session['x'] = 123

return "Index"

if __name__ == '__main__':

app.run()

# app.__call__

# app.wsgi_app

"""

第一阶段:请求到来

将request和Session相关数据封装到ctx=RequestContext对象中。

再通过LocalStack将ctx添加到Local中。

__storage__ = {

1231:{'stack':[ctx(request,session)]}

}

第二阶段:视图函数中获取request或session

方式一:直接找LocalStack获取

from flask.globals import _request_ctx_stack

print(_request_ctx_stack.top.request.method)

方式二:通过代理LocalProxy(小东北)获取

from flask import Flask,request

print(request.method)

"""

g只存在在一次请求中当你return时g就会被清除。

10.文件的上传和解压

view.py from flask import Blueprint, render_template, Flask, request, redirect,session

import os

import uuid

from ..utils import helper

ind = Blueprint('ind', __name__)

ind.config["MAX_CONTENT_LENGTH"] = = 1024 * 1024 * 7

@ind.before_request

def process_request():

if not session.get("user_info"):

return redirect("/login")

return None

@ind.route('/home')

def home():

return render_template('home.html')

@ind.route('/user_list')

def user_list():

# import pymysql

# conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')

# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

# cursor.execute("SELECT id,user,nickname FROM userinfo")

# data_list = cursor.fetchall()

# cursor.close()

# conn.close()

data_list = helper.fetch_all("SELECT id,user,nickname FROM userinfo",[])

return render_template('user_list.html',data_list=data_list)

@ind.route('/detail/')

def detail(nid):

# import pymysql

# conn = Config.POOL.connection()

# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

# cursor.execute("SELECT id,line,ctime FROM record where user_id=%s",(nid,))

# record_list = cursor.fetchall()

# cursor.close()

# conn.close()

record_list = helper.fetch_all("SELECT id,line,ctime FROM record where user_id=%s",(nid,))

return render_template('detail.html',record_list=record_list)

@ind.route('/upload',methods=['GET','POST'])

def upload():

if request.method == "GET":

return render_template('upload.html')

from werkzeug.datastructures import FileStorage

file_obj = request.files.get('code')

# 1. 检查上传文件后缀名

name_ext = file_obj.filename.rsplit('.',maxsplit=1)

if len(name_ext) != 2:

return "请上传zip压缩文件"

if name_ext[1] != 'zip':

return "请上传zip压缩文件"

"""

# 2. 接收用户上传文件,并写入到服务器本地.

file_path = os.path.join("files",file_obj.filename)

# 从file_obj.stream中读取内容,写入到文件

file_obj.save(file_path)

# 3. 解压zip文件

import shutil

# 通过open打开压缩文件,读取内容再进行解压。

shutil._unpack_zipfile(file_path,'xsadfasdfasdf')

"""

# 2+3, 接收用户上传文件,并解压到指定目录

import shutil

target_path = os.path.join('files',str(uuid.uuid4()))

shutil._unpack_zipfile(file_obj.stream,target_path)

# 4. 遍历某目录下的所有文件

# for item in os.listdir(target_path):

# print(item)

total_num = 0

for base_path,folder_list,file_list in os.walk(target_path):

for file_name in file_list:

file_path = os.path.join(base_path,file_name)

file_ext = file_path.rsplit('.',maxsplit=1)

if len(file_ext) != 2:

continue

if file_ext[1] != 'py':

continue

file_num = 0

with open(file_path,'rb') as f:

for line in f:

line = line.strip()

if not line:

continue

if line.startswith(b'#'):

continue

file_num += 1

total_num += file_num

# 获取当前时间

import datetime

ctime = datetime.date.today()

print(total_num,ctime,session['user_info']['id'])

# import pymysql

# conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')

# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

# cursor.execute("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))

# data = cursor.fetchone()

# cursor.close()

# conn.close()

data = helper.fetch_one("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))

if data:

return "今天已经上传"

# import pymysql

# conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')

# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

# cursor.execute("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))

# conn.commit()

# cursor.close()

# conn.close()

helper.insert("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))

return "上传成功"

html.py {% extends "layout.html"%}

{% block content %}

代码上传

{% endblock %}

11.Database Join 池

安装pip3 install DBUtils 创建连接池可以共享出去,谁要用就来获取一个连接,这样可以管理连接的数量,提高链接的利用率。 from DBUtils.PooledDB import PooledDB, SharedDBConnection

import pymysql

POOL = PooledDB(

creator=pymysql, # 使用链接数据库的模块

maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数

mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建

maxcached=5, # 链接池中最多闲置的链接,0和None不限制

maxshared=3,

# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。

blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错

maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制

setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]

ping=0,

# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always

host='127.0.0.1',

port=3306,

user='root',

password='123456',

database='s9day118',

charset='utf8'

)

# 获取连接

conn = POOL.connection()

cursor = conn.cursor(pymysql.cursors.DictCursor)

cursor.execute("select * from tb1")

result = cursor.fetchall()

cursor.close()

# 归还连接

conn.close()

12.wtforms

WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证。安装pip3 install wtforms

12.1 用户登录注册示例

用户登录

当用户登录时候,需要对用户提交的用户名和密码进行多种格式校验。如: 用户不能为空;用户长度必须大于6; 密码不能为空;密码长度必须大于12;密码必须包含 字母、数字、特殊字符等(自定义正则); #!/usr/bin/env python

# -*- coding:utf-8 -*-

from flask import Flask, render_template, request, redirect

from wtforms import Form

from wtforms.fields import core

from wtforms.fields import html5

from wtforms.fields import simple

from wtforms import validators

from wtforms import widgets

app = Flask(__name__, template_folder='templates')

app.debug = True

class LoginForm(Form):

name = simple.StringField(

label='用户名',

validators=[

validators.DataRequired(message='用户名不能为空.'),

validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')

],

widget=widgets.TextInput(),

render_kw={'class': 'form-control'}

)

pwd = simple.PasswordField(

label='密码',

validators=[

validators.DataRequired(message='密码不能为空.'),

validators.Length(min=8, message='用户名长度必须大于%(min)d'),

validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",

message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

],

widget=widgets.PasswordInput(),

render_kw={'class': 'form-control'}

)

@app.route('/login', methods=['GET', 'POST'])

def login():

if request.method == 'GET':

form = LoginForm()

return render_template('login.html', form=form)

else:

form = LoginForm(formdata=request.form)

if form.validate():

print('用户提交数据通过格式验证,提交的值为:', form.data)

else:

print(form.errors)

return render_template('login.html', form=form)

if __name__ == '__main__':

app.run()

#######################################################

Title

登录

{{form.name.label}} {{form.name}} {{form.name.errors[0] }}

{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}

用户注册

注册页面需要让用户输入:用户名、密码、密码重复、性别、爱好等。 from flask import Flask, render_template, request, redirect

from wtforms import Form

from wtforms.fields import core

from wtforms.fields import html5

from wtforms.fields import simple

from wtforms import validators

from wtforms import widgets

app = Flask(__name__, template_folder='templates')

app.debug = True

class RegisterForm(Form):

name = simple.StringField(

label='用户名',

validators=[

validators.DataRequired()

],

widget=widgets.TextInput(),

render_kw={'class': 'form-control'},

default='alex'

)

pwd = simple.PasswordField(

label='密码',

validators=[

validators.DataRequired(message='密码不能为空.')

],

widget=widgets.PasswordInput(),

render_kw={'class': 'form-control'}

)

pwd_confirm = simple.PasswordField(

label='重复密码',

validators=[

validators.DataRequired(message='重复密码不能为空.'),

validators.EqualTo('pwd', message="两次密码输入不一致")

],

widget=widgets.PasswordInput(),

render_kw={'class': 'form-control'}

)

email = html5.EmailField(

label='邮箱',

validators=[

validators.DataRequired(message='邮箱不能为空.'),

validators.Email(message='邮箱格式错误')

],

widget=widgets.TextInput(input_type='email'),

render_kw={'class': 'form-control'}

)

gender = core.RadioField(

label='性别',

choices=(

(1, '男'),

(2, '女'),

),

coerce=int

)

city = core.SelectField(

label='城市',

choices=(

('bj', '北京'),

('sh', '上海'),

)

)

hobby = core.SelectMultipleField(

label='爱好',

choices=(

(1, '篮球'),

(2, '足球'),

),

coerce=int

)

favor = core.SelectMultipleField(

label='喜好',

choices=(

(1, '篮球'),

(2, '足球'),

),

widget=widgets.ListWidget(prefix_label=False),

option_widget=widgets.CheckboxInput(),

coerce=int,

default=[1, 2]

)

def __init__(self, *args, **kwargs):

super(RegisterForm, self).__init__(*args, **kwargs)

self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))

def validate_pwd_confirm(self, field):

"""

自定义pwd_confirm字段规则,例:与pwd字段是否一致

:param field:

:return:

"""

# 最开始初始化时,self.data中已经有所有的值

if field.data != self.data['pwd']:

# raise validators.ValidationError("密码不一致") # 继续后续验证

raise validators.StopValidation("密码不一致") # 不再继续后续验证

@app.route('/register', methods=['GET', 'POST'])

def register():

if request.method == 'GET':

form = RegisterForm(data={'gender': 1})

return render_template('register.html', form=form)

else:

form = RegisterForm(formdata=request.form)

if form.validate():

print('用户提交数据通过格式验证,提交的值为:', form.data)

else:

print(form.errors)

return render_template('register.html', form=form)

if __name__ == '__main__':

app.run()

############################################################

Title

用户注册

{% for item in form %}

{{item.label}}: {{item}} {{item.errors[0] }}

{% endfor %}

动态获取数据库数据 from flask import Flask,request,render_template,session,current_app,g,redirect

from wtforms import Form

from wtforms.fields import simple

from wtforms.fields import html5

from wtforms.fields import core

from wtforms import widgets

from wtforms import validators

app = Flask(__name__)

class LoginForm(Form):

name = simple.StringField(

validators=[

validators.DataRequired(message='用户名不能为空.'),

# validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')

],

widget=widgets.TextInput(),

render_kw={'placeholder':'请输入用户名'}

)

pwd = simple.PasswordField(

validators=[

validators.DataRequired(message='密码不能为空.'),

# validators.Length(min=8, message='用户名长度必须大于%(min)d'),

# validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",

# message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

],

render_kw={'placeholder':'请输入密码'}

)

@app.route('/login',methods=['GET','POST'])

def login():

if request.method == "GET":

form = LoginForm()

# print(form.name,type(form.name)) # form.name是StringField()对象, StringField().__str__

# print(form.pwd,type(form.pwd)) # form.pwd是PasswordField()对象,PasswordField().__str__

return render_template('login.html',form=form)

form = LoginForm(formdata=request.form)

if form.validate():

print(form.data)

return redirect('https://www.luffycity.com/home')

else:

# print(form.errors)

return render_template('login.html', form=form)

import helper

class UserForm(Form):

city = core.SelectField(

label='城市',

choices=(),

coerce=int # 从数据库获取的数据返回后是字符串类型的,把他写上后会先将其转为int后再返回到前端

)

name = simple.StringField(label='姓名')

# 当你定义成静态字段是,只有启动时才会加载一次,但是从数据库需要实时拿到数据,下面是解决方法。你应该可以看懂。

def __init__(self,*args,**kwargs):

super(UserForm,self).__init__(*args,**kwargs)

self.city.choices=helper.fetch_all('select id,name from tb1',[],type=None)

@app.route('/user')

def user():

if request.method == "GET":

#form = UserForm(data={'name':'alex','city':3})

form = UserForm()

return render_template('user.html',form=form)

if __name__ == '__main__':

app.run()

meta #!/usr/bin/env python

# -*- coding:utf-8 -*-

from flask import Flask, render_template, request, redirect, session

from wtforms import Form

from wtforms.csrf.core import CSRF

from wtforms.fields import core

from wtforms.fields import html5

from wtforms.fields import simple

from wtforms import validators

from wtforms import widgets

from hashlib import md5

app = Flask(__name__, template_folder='templates')

app.debug = True

class MyCSRF(CSRF):

"""

Generate a CSRF token based on the user's IP. I am probably not very

secure, so don't use me.

"""

def setup_form(self, form):

self.csrf_context = form.meta.csrf_context()

self.csrf_secret = form.meta.csrf_secret

return super(MyCSRF, self).setup_form(form)

def generate_csrf_token(self, csrf_token):

gid = self.csrf_secret + self.csrf_context

token = md5(gid.encode('utf-8')).hexdigest()

return token

def validate_csrf_token(self, form, field):

print(field.data, field.current_token)

if field.data != field.current_token:

raise ValueError('Invalid CSRF')

class TestForm(Form):

name = html5.EmailField(label='用户名')

pwd = simple.StringField(label='密码')

class Meta:

# -- CSRF

# 是否自动生成CSRF标签

csrf = True

# 生成CSRF标签name

csrf_field_name = 'csrf_token'

# 自动生成标签的值,加密用的csrf_secret

csrf_secret = 'xxxxxx'

# 自动生成标签的值,加密用的csrf_context

csrf_context = lambda x: request.url

# 生成和比较csrf标签

csrf_class = MyCSRF

# -- i18n

# 是否支持本地化

# locales = False

locales = ('zh', 'en')

# 是否对本地化进行缓存

cache_translations = True

# 保存本地化缓存信息的字段

translations_cache = {}

@app.route('/index/', methods=['GET', 'POST'])

def index():

if request.method == 'GET':

form = TestForm()

else:

form = TestForm(formdata=request.form)

if form.validate():

print(form)

return render_template('index.html', form=form)

if __name__ == '__main__':

app.run()

12.2 其他

metaclass # new方法的返回值决定对象到底是什么?

class Bar(object):

pass

class Foo(object):

def __new__(cls, *args, **kwargs):

# return super(Foo,cls).__new__(cls,*args, **kwargs)

return Bar()

obj = Foo()

print(obj)

# 当Foo()时先执行__new__方法,返回一个Bar,obj就等于Bar对象

##########################################################################

class MyType(type):

def __init__(self, *args, **kwargs):

print('MyType创建类',self)

super(MyType, self).__init__(*args, **kwargs)

def __call__(self, *args, **kwargs):

obj = super(MyType, self).__call__(*args, **kwargs)

print('类创建对象', self, obj)

return obj

class Foo(object,metaclass=MyType):

user = 'wupeiqi'

age = 18

obj = Foo()

# 在创建Foo对象前先执行mytype的__call__方法,__call__方法后才会执行Foo。不写metaclass默认是由type类来做的。

# 是先执行父类的__call__后再执行自己的__init__方法。

"""

创建类时,先执行type的__init__。

类的实例化时,执行type的__call__,__call__方法的的返回值就是实例化的对象。

__call__内部调用:

类.__new__,创建对象

类.__init__,对象的初始化

"""

实例化流程分析 # 源码流程

1. 执行type的 __call__ 方法,读取字段到静态字段 cls._unbound_fields 中; meta类读取到cls._wtforms_meta中

2. 执行构造方法

a. 循环cls._unbound_fields中的字段,并执行字段的bind方法,然后将返回值添加到 self._fields[name] 中。

即:

_fields = {

name: wtforms.fields.core.StringField(),

}

PS:由于字段中的__new__方法,实例化时:name = simple.StringField(label='用户名'),创建的是UnboundField(cls, *args, **kwargs),当执行完bind之后,才变成执行 wtforms.fields.core.StringField()

b. 循环_fields,为对象设置属性

for name, field in iteritems(self._fields):

# Set all the fields to attributes so that they obscure the class

# attributes with the same names.

setattr(self, name, field)

c. 执行process,为字段设置默认值:self.process(formdata, obj, data=data, **kwargs)

优先级:obj,data,formdata;

再循环执行每个字段的process方法,为每个字段设置值:

for name, field, in iteritems(self._fields):

if obj is not None and hasattr(obj, name):

field.process(formdata, getattr(obj, name))

elif name in kwargs:

field.process(formdata, kwargs[name])

else:

field.process(formdata)

执行每个字段的process方法,为字段的data和字段的raw_data赋值

def process(self, formdata, data=unset_value):

self.process_errors = []

if data is unset_value:

try:

data = self.default()

except TypeError:

data = self.default

self.object_data = data

try:

self.process_data(data)

except ValueError as e:

self.process_errors.append(e.args[0])

if formdata:

try:

if self.name in formdata:

self.raw_data = formdata.getlist(self.name)

else:

self.raw_data = []

self.process_formdata(self.raw_data)

except ValueError as e:

self.process_errors.append(e.args[0])

try:

for filter in self.filters:

self.data = filter(self.data)

except ValueError as e:

self.process_errors.append(e.args[0])

d. 页面上执行print(form.name) 时,打印标签

因为执行了:

字段的 __str__ 方法

字符的 __call__ 方法

self.meta.render_field(self, kwargs)

def render_field(self, field, render_kw):

other_kw = getattr(field, 'render_kw', None)

if other_kw is not None:

render_kw = dict(other_kw, **render_kw)

return field.widget(field, **render_kw)

执行字段的插件对象的 __call__ 方法,返回标签字符串

验证流程分析 a. 执行form的validate方法,获取钩子方法

def validate(self):

extra = {}

for name in self._fields:

inline = getattr(self.__class__, 'validate_%s' % name, None)

if inline is not None:

extra[name] = [inline]

return super(Form, self).validate(extra)

b. 循环每一个字段,执行字段的 validate 方法进行校验(参数传递了钩子函数)

def validate(self, extra_validators=None):

self._errors = None

success = True

for name, field in iteritems(self._fields):

if extra_validators is not None and name in extra_validators:

extra = extra_validators[name]

else:

extra = tuple()

if not field.validate(self, extra):

success = False

return success

c. 每个字段进行验证时候

字段的pre_validate 【预留的扩展】

字段的_run_validation_chain,对正则和字段的钩子函数进行校验

字段的post_validate【预留的扩展】

13.SQLAlchemy

MySQLdb,只支持python2.0的,而pymysql是支持python2.0和3.0的。 import MySQLdb

conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')

cur = conn.cursor()

reCount = cur.execute('insert into UserInfo(Name,Address) values(%s,%s)',('alex','usa'))

# reCount = cur.execute('insert into UserInfo(Name,Address) values(%(id)s, %(name)s)',{'id':12345,'name':'wupeiqi'})

conn.commit()

cur.close()

conn.close()

print reCount

SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。 安装pip3 install sqlalchemy 组成部分:

Engine,框架的引擎Connection Pooling ,数据库连接池Dialect,选择连接数据库的DB API种类Schema/Types,架构和类型SQL Exprression Language,SQL表达式语言 SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如: MySQL-Python

mysql+mysqldb://:@[:]/

pymysql

mysql+pymysql://:@/[?]

MySQL-Connector

mysql+mysqlconnector://:@[:]/

cx_Oracle

oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

执行原生SQL语句 import time

import threading

import sqlalchemy

from sqlalchemy import create_engine

from sqlalchemy.engine.base import Engine

engine = create_engine(

"mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",

max_overflow=0, # 超过连接池大小外最多创建的连接

pool_size=5, # 连接池大小

pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

)

def task(arg):

conn = engine.raw_connection()

cursor = conn.cursor()

cursor.execute(

"select * from t1"

)

result = cursor.fetchall()

cursor.close()

conn.close()

for i in range(20):

t = threading.Thread(target=task, args=(i,))

t.start()

########################################################################################

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import time

import threading

import sqlalchemy

from sqlalchemy import create_engine

from sqlalchemy.engine.base import Engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)

def task(arg):

conn = engine.contextual_connect()

with conn:

cur = conn.execute(

"select * from t1"

)

result = cur.fetchall()

print(result)

for i in range(20):

t = threading.Thread(target=task, args=(i,))

t.start()

########################################################################################

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import time

import threading

import sqlalchemy

from sqlalchemy import create_engine

from sqlalchemy.engine.base import Engine

from sqlalchemy.engine.result import ResultProxy

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)

def task(arg):

cur = engine.execute("select * from t1")

result = cur.fetchall()

cur.close()

print(result)

for i in range(20):

t = threading.Thread(target=task, args=(i,))

t.start()

# 注意: 查看连接 show status like 'Threads%';

13.1 ORM

创建单表 #!/usr/bin/env python

# -*- coding:utf-8 -*-

import datetime

from sqlalchemy import create_engine

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()

class Users(Base):

__tablename__ = 'users'

id = Column(Integer, primary_key=True)

name = Column(String(32), index=True, nullable=False)

# email = Column(String(32), unique=True)

# ctime = Column(DateTime, default=datetime.datetime.now)

# extra = Column(Text, nullable=True)

__table_args__ = (

# UniqueConstraint('id', 'name', name='uix_id_name'),

# Index('ix_id_name', 'name', 'email'),

)

def init_db():

"""

根据类创建数据库表

:return:

"""

engine = create_engine(

"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",

max_overflow=0, # 超过连接池大小外最多创建的连接

pool_size=5, # 连接池大小

pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

)

Base.metadata.create_all(engine)

def drop_db():

"""

根据类删除数据库表

:return:

"""

engine = create_engine(

"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",

max_overflow=0, # 超过连接池大小外最多创建的连接

pool_size=5, # 连接池大小

pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

)

Base.metadata.drop_all(engine)

if __name__ == '__main__':

drop_db()

init_db()

创建多个表并包含Fk、M2M关系 #!/usr/bin/env python

# -*- coding:utf-8 -*-

import datetime

from sqlalchemy import create_engine

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

from sqlalchemy.orm import relationship

Base = declarative_base()

# ##################### 单表示例 #########################

class Users(Base):

__tablename__ = 'users'

id = Column(Integer, primary_key=True)

name = Column(String(32), index=True)

age = Column(Integer, default=18)

email = Column(String(32), unique=True)

ctime = Column(DateTime, default=datetime.datetime.now)

extra = Column(Text, nullable=True)

__table_args__ = (

# UniqueConstraint('id', 'name', name='uix_id_name'),

# Index('ix_id_name', 'name', 'extra'),

)

class Hosts(Base):

__tablename__ = 'hosts'

id = Column(Integer, primary_key=True)

name = Column(String(32), index=True)

ctime = Column(DateTime, default=datetime.datetime.now)

# ##################### 一对多示例 #########################

class Hobby(Base):

__tablename__ = 'hobby'

id = Column(Integer, primary_key=True)

caption = Column(String(50), default='篮球')

class Person(Base):

__tablename__ = 'person'

nid = Column(Integer, primary_key=True)

name = Column(String(32), index=True, nullable=True)

hobby_id = Column(Integer, ForeignKey("hobby.id"))

# 与生成表结构无关,仅用于查询方便

hobby = relationship("Hobby", backref='pers')

# ##################### 多对多示例 #########################

class Server2Group(Base):

__tablename__ = 'server2group'

id = Column(Integer, primary_key=True, autoincrement=True)

server_id = Column(Integer, ForeignKey('server.id'))

group_id = Column(Integer, ForeignKey('group.id'))

class Group(Base):

__tablename__ = 'group'

id = Column(Integer, primary_key=True)

name = Column(String(64), unique=True, nullable=False)

# 与生成表结构无关,仅用于查询方便

servers = relationship('Server', secondary='server2group', backref='groups')

class Server(Base):

__tablename__ = 'server'

id = Column(Integer, primary_key=True, autoincrement=True)

hostname = Column(String(64), unique=True, nullable=False)

def init_db():

"""

根据类创建数据库表

:return:

"""

engine = create_engine(

"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",

max_overflow=0, # 超过连接池大小外最多创建的连接

pool_size=5, # 连接池大小

pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

)

Base.metadata.create_all(engine)

def drop_db():

"""

根据类删除数据库表

:return:

"""

engine = create_engine(

"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",

max_overflow=0, # 超过连接池大小外最多创建的连接

pool_size=5, # 连接池大小

pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

)

Base.metadata.drop_all(engine)

if __name__ == '__main__':

drop_db()

init_db()

操作数据库表 #!/usr/bin/env python

# -*- coding:utf-8 -*-

from sqlalchemy.orm import sessionmaker

from sqlalchemy import create_engine

from models import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)

# 每次执行数据库操作时,都需要创建一个session

session = Session()

# ############# 执行ORM操作 #############

obj1 = Users(name="alex1")

session.add(obj1)

# 提交事务

session.commit()

# 关闭session

session.close()

基于scoped_session实现线程安全 #!/usr/bin/env python

# -*- coding:utf-8 -*-

from sqlalchemy.orm import sessionmaker

from sqlalchemy import create_engine

from sqlalchemy.orm import scoped_session

from models import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)

"""

# 线程安全,基于本地线程实现每个线程用同一个session

# 特殊的:scoped_session中有原来方法的Session中的一下方法:

public_methods = (

'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',

'close', 'commit', 'connection', 'delete', 'execute', 'expire',

'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',

'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',

'bulk_update_mappings',

'merge', 'query', 'refresh', 'rollback',

'scalar'

)

"""

session = scoped_session(Session)

# ############# 执行ORM操作 #############

obj1 = Users(name="alex1")

session.add(obj1)

# 提交事务

session.commit()

# 关闭session

session.close()

多线程执行示例 #!/usr/bin/env python

# -*- coding:utf-8 -*-

import time

import threading

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index

from sqlalchemy.orm import sessionmaker, relationship

from sqlalchemy import create_engine

from db import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)

def task(arg):

session = Session()

obj1 = Users(name="alex1")

session.add(obj1)

session.commit()

for i in range(10):

t = threading.Thread(target=task, args=(i,))

t.start()

基本增删改查示例 #!/usr/bin/env python

# -*- coding:utf-8 -*-

import time

import threading

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index

from sqlalchemy.orm import sessionmaker, relationship

from sqlalchemy import create_engine

from sqlalchemy.sql import text

from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)

session = Session()

# ################ 添加 ################

"""

obj1 = Users(name="wupeiqi")

session.add(obj1)

session.add_all([

Users(name="wupeiqi"),

Users(name="alex"),

Hosts(name="c1.com"),

])

session.commit()

"""

# ################ 删除 ################

"""

session.query(Users).filter(Users.id > 2).delete()

session.commit()

"""

# ################ 修改 ################

"""

session.query(Users).filter(Users.id > 0).update({"name" : "099"})

session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)

session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")

session.commit()

"""

# ################ 查询 ################

"""

r1 = session.query(Users).all()

r2 = session.query(Users.name.label('xx'), Users.age).all()

r3 = session.query(Users).filter(Users.name == "alex").all()

r4 = session.query(Users).filter_by(name='alex').all()

r5 = session.query(Users).filter_by(name='alex').first()

r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()

r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()

"""

session.close()

常用操作 # 条件

ret = session.query(Users).filter_by(name='alex').all()

ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()

ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()

ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()

ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()

ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()

from sqlalchemy import and_, or_

ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()

ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()

ret = session.query(Users).filter(

or_(

Users.id < 2,

and_(Users.name == 'eric', Users.id > 3),

Users.extra != ""

)).all()

# 通配符

ret = session.query(Users).filter(Users.name.like('e%')).all()

ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制

ret = session.query(Users)[1:2]

# 排序

ret = session.query(Users).order_by(Users.name.desc()).all()

ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组

from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()

ret = session.query(

func.max(Users.id),

func.sum(Users.id),

func.min(Users.id)).group_by(Users.name).all()

ret = session.query(

func.max(Users.id),

func.sum(Users.id),

func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()

# 组合

q1 = session.query(Users.name).filter(Users.id > 2)

q2 = session.query(Favor.caption).filter(Favor.nid < 2)

ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)

q2 = session.query(Favor.caption).filter(Favor.nid < 2)

ret = q1.union_all(q2).all()

原生SQL语句 #!/usr/bin/env python

# -*- coding:utf-8 -*-

import time

import threading

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index

from sqlalchemy.orm import sessionmaker, relationship

from sqlalchemy import create_engine

from sqlalchemy.sql import text

from sqlalchemy.engine.result import ResultProxy

from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)

session = Session()

# 查询

# cursor = session.execute('select * from users')

# result = cursor.fetchall()

# 添加

cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})

session.commit()

print(cursor.lastrowid)

session.close()

基于relationship操作ForeignKey #!/usr/bin/env python

# -*- coding:utf-8 -*-

import time

import threading

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index

from sqlalchemy.orm import sessionmaker, relationship

from sqlalchemy import create_engine

from sqlalchemy.sql import text

from sqlalchemy.engine.result import ResultProxy

from db import Users, Hosts, Hobby, Person

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)

session = Session()

# 添加

"""

session.add_all([

Hobby(caption='乒乓球'),

Hobby(caption='羽毛球'),

Person(name='张三', hobby_id=3),

Person(name='李四', hobby_id=4),

])

person = Person(name='张九', hobby=Hobby(caption='姑娘'))

session.add(person)

hb = Hobby(caption='人妖')

hb.pers = [Person(name='文飞'), Person(name='博雅')]

session.add(hb)

session.commit()

"""

# 使用relationship正向查询

"""

v = session.query(Person).first()

print(v.name)

print(v.hobby.caption)

"""

# 使用relationship反向查询

"""

v = session.query(Hobby).first()

print(v.caption)

print(v.pers)

"""

session.close()

基于relationship操作m2m #!/usr/bin/env python

# -*- coding:utf-8 -*-

import time

import threading

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index

from sqlalchemy.orm import sessionmaker, relationship

from sqlalchemy import create_engine

from sqlalchemy.sql import text

from sqlalchemy.engine.result import ResultProxy

from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)

session = Session()

# 添加

"""

session.add_all([

Server(hostname='c1.com'),

Server(hostname='c2.com'),

Group(name='A组'),

Group(name='B组'),

])

session.commit()

s2g = Server2Group(server_id=1, group_id=1)

session.add(s2g)

session.commit()

gp = Group(name='C组')

gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]

session.add(gp)

session.commit()

ser = Server(hostname='c6.com')

ser.groups = [Group(name='F组'),Group(name='G组')]

session.add(ser)

session.commit()

"""

# 使用relationship正向查询

"""

v = session.query(Group).first()

print(v.name)

print(v.servers)

"""

# 使用relationship反向查询

"""

v = session.query(Server).first()

print(v.hostname)

print(v.groups)

"""

session.close()

其他 #!/usr/bin/env python

# -*- coding:utf-8 -*-

import time

import threading

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index

from sqlalchemy.orm import sessionmaker, relationship

from sqlalchemy import create_engine

from sqlalchemy.sql import text, func

from sqlalchemy.engine.result import ResultProxy

from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)

session = Session()

# 关联子查询

subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()

result = session.query(Group.name, subqry)

"""

SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid

FROM server

WHERE server.id = `group`.id) AS anon_1

FROM `group`

"""

# 原生SQL

"""

# 查询

cursor = session.execute('select * from users')

result = cursor.fetchall()

# 添加

cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})

session.commit()

print(cursor.lastrowid)

"""

session.close()

14.flask武装

Flask_SQLAlchemy a. 下载安装

pip3 install flask-sqlalchemy

b. chun.__init__.py

导入并实例化SQLAlchemy

from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

注意事项:

- 必须在导入蓝图之前

- 必须导入models.py

c. 初始化

db.init_app(app)

d. 在配置文件中写入配置

# ##### SQLALchemy配置文件 #####

SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123456@127.0.0.1:3306/s9day122?charset=utf8"

SQLALCHEMY_POOL_SIZE = 10

SQLALCHEMY_MAX_OVERFLOW = 5

e. 创建models.py中的类(对应数据库表)

chun/models.py

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column

from sqlalchemy import Integer,String,Text,Date,DateTime

from sqlalchemy import create_engine

from chun import db

class Users(db.Model):

__tablename__ = 'users'

id = Column(Integer, primary_key=True)

name = Column(String(32), index=True, nullable=False)

depart_id = Column(Integer)

f. 生成表(使用app上下文)

from chun import db,create_app

app = create_app()

app_ctx = app.app_context() # app_ctx = app/g

with app_ctx: # __enter__,通过LocalStack放入Local中

db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置

g. 基于ORM对数据库进行操作。

from flask import Blueprint

from chun import db

from chun import models

us = Blueprint('us',__name__)

@us.route('/index')

def index():

# 使用SQLAlchemy在数据库中插入一条数据

# db.session.add(models.Users(name='高件套',depart_id=1))

# db.session.commit()

# db.session.remove()

result = db.session.query(models.Users).all()

print(result)

db.session.remove()

return 'Index'

flask-script install the module "pip3 install flask-script "

a. 增加 runserver

from chun import create_app

from flask_script import Manager

app = create_app()

manager = Manager(app)

if __name__ == '__main__':

# app.run()

manager.run()

b. 位置传参

from chun import create_app

from flask_script import Manager

app = create_app()

manager = Manager(app)

@manager.command

def custom(arg):

"""

自定义命令

python manage.py custom 123

:param arg:

:return:

"""

print(arg)

if __name__ == '__main__':

# app.run()

manager.run()

c. 关键字传参

from chun import create_app

from flask_script import Manager

app = create_app()

manager = Manager(app)

@manager.option('-n', '--name', dest='name')

@manager.option('-u', '--url', dest='url')

def cmd(name, url):

"""

自定义命令

执行: python manage.py cmd -n wupeiqi -u http://www.oldboyedu.com

:param name:

:param url:

:return:

"""

print(name, url)

if __name__ == '__main__':

# app.run()

manager.run()

flask-migrate install the module "pip3 install flask-migrate"

flask-migrate rely on flask-script.

#!/usr/bin/env python

# -*- coding:utf-8 -*-

from sansa import create_app

from sansa import db

from flask_script import Manager

from flask_migrate import Migrate, MigrateCommand

app = create_app()

manager = Manager(app)

Migrate(app, db)

"""

# 数据库迁移命名

python manage.py db init

python manage.py db migrate

python manage.py db upgrade

"""

manager.add_command('db', MigrateCommand)

if __name__ == '__main__':

manager.run()

# app.run()

找到项目使用的所有组件和版本。 install the module "pip install pipreqs"

# 终端输入后就可以找到所有的依赖

pipreqs ./ --encoding=utf-8

python虚拟环境 install module "pip3 install virtualenv"

create "virtualenv env1 --no-site-packages"

找到虚拟环境的script后输入"activate"激活或"deactivate"退出。

精彩链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。