См.также
WebOb — это библиотека, сериализующая HTTP запрос (текст) в объект и позволяющая генерировать HTTP ответы. В частности работает с окружение WSGI.
Изначально библиотеку написал Ян Бикинг, затем разработкой занимался Сергей Щетинин, а теперь она перешла в руки Pylons Foundation.
Вместо странных конструкций вида:
from urlparse import parse_qs
values = parse_qs(environ['QUERY_STRING'])
page = values.get('page', ['1', ]).pop()
Мы можем использовать:
from webob import Request
req = Request(environ)
page = req.params.get('page', '1')
Класс Request
оборачивает окружение, пришедшее от
Веб-сервера, в случае HTTP-запроса.
Мы можем сами создать окружение для класса Request
и
получить объект запроса, как в примере ниже.
1 2 3 4 5 6 7 8 9 10 11 12 13 | environ = {
'HTTP_HOST': 'localhost:80',
'PATH_INFO': '/article',
'QUERY_STRING': 'id=1',
'REQUEST_METHOD': 'GET',
'SCRIPT_NAME': ''
}
from webob import Request
req = Request(environ)
from pprint import pprint
pprint(req.environ) |
1 2 3 4 5 | {'HTTP_HOST': 'localhost:80',
'PATH_INFO': '/article',
'QUERY_STRING': 'id=1',
'REQUEST_METHOD': 'GET',
'SCRIPT_NAME': ''} |
Request
имеет конструктор, который создает минимальное
окружение запроса. При помощи метода blank
можно имитировать HTTP запрос:
1 2 3 4 5 | from webob import Request
req = Request.blank('/blog?page=4')
from pprint import pprint
pprint(req.environ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | {'HTTP_HOST': 'localhost:80',
'PATH_INFO': '/blog',
'QUERY_STRING': 'page=4',
'REQUEST_METHOD': 'GET',
'SCRIPT_NAME': '',
'SERVER_NAME': 'localhost',
'SERVER_PORT': '80',
'SERVER_PROTOCOL': 'HTTP/1.0',
'wsgi.errors': <open file '<stderr>', mode 'w' at 0x7f4ff5d111e0>,
'wsgi.input': <_io.BytesIO object at 0x7f4ff3b622f0>,
'wsgi.multiprocess': False,
'wsgi.multithread': False,
'wsgi.run_once': False,
'wsgi.url_scheme': 'http',
'wsgi.version': (1, 0)} |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from webob import Request
req = Request.blank('/blog?page=4')
print(req.method)
print(req.scheme)
print(req.path_info)
print(req.host)
print(req.host_url)
print(req.application_url)
print(req.path_url)
print(req.url)
print(req.path)
print(req.path_qs)
print(req.query_string) |
1 2 3 4 5 6 7 8 9 10 11 | GET
http
/blog
localhost:80
http://localhost
http://localhost
http://localhost/blog
http://localhost/blog?page=4
/blog
/blog?page=4
page=4 |
1 2 3 4 5 6 7 | from webob import Request
req = Request.blank('/test?check=a&check=b&name=Bob')
print(req.GET)
print(req.GET['check'])
print(req.GET.getall('check'))
print(list(req.GET.items())) |
1 2 3 4 | GET([('check', 'a'), ('check', 'b'), ('name', 'Bob')])
b
['a', 'b']
[('check', 'a'), ('check', 'b'), ('name', 'Bob')] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | from webob import Request
req = Request.blank('/test')
print(req.POST) # empty
print(list(req.POST.items()))
print()
# Set POST
req.method = 'POST'
req.body = b'name=Vasya&email=vasya@example.com'
print(req.POST) # not empty
print(req.POST['name'])
print(req.POST['email']) |
1 2 3 4 5 6 | <NoVars: Not a form request>
[]
MultiDict([('name', 'Vasya'), ('email', 'vasya@example.com')])
Vasya
vasya@example.com |
Если вы не уверенны, каким методом были отправлены данные, можно воспользоваться
атрибутом params
.
1 2 3 4 5 6 7 8 9 10 11 | from webob import Request
req = Request.blank('/test?check=a&check=b&name=Bob')
# Set POST
req.method = 'POST'
req.body = b'name=Vasya&email=vasya@example.com'
print(req.params)
print(req.params.getall('check'))
print(req.params['email'])
print(req.params['name']) |
1 2 3 4 | NestedMultiDict([('check', 'a'), ('check', 'b'), ('name', 'Bob'), ('name', 'Vasya'), ('email', 'vasya@example.com')])
['a', 'b']
vasya@example.com
Bob |
1 2 3 4 5 6 7 8 | from webob import Request
req = Request.blank('/test')
# Set Cookie
req.headers['Cookie'] = 'session_id=9999999;foo=abcdef;bar=2'
print(req.cookies)
print(req.cookies['foo']) |
1 2 | <RequestCookies (dict-like) with values {'bar': '2', 'foo': 'abcdef', 'session_id': '9999999'}>
abcdef |
webob.request.Request
умеет запускать WSGI-приложения. Это может
понадобиться, например, при написании тестов.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | from webob import Request
def wsgi_app(environ, start_response):
request = Request(environ)
if request.path == '/test':
start_response('200 OK', [('Content-type', 'text/plain')])
return ['Hi!']
start_response('404 Not Found', [('Content-type', 'text/plain')])
req = Request.blank('/test')
status, headers, app_iter = req.call_application(wsgi_app)
print(status)
print(headers)
print(app_iter)
print
req = Request.blank('/bar')
status, headers, app_iter = req.call_application(wsgi_app)
print(status)
print(headers)
print(app_iter) |
1 2 3 4 5 6 7 | 200 OK
[('Content-type', 'text/plain')]
['Hi!']
404 Not Found
[('Content-type', 'text/plain')]
None |
Класс, который содержит все необходимое для создания ответа WSGI-приложения.
Конструктор класса Response
имеет минимальный набор
для HTTP ответа:
>>> from webob import Response
>>> res = Response()
>>> res.status
'200 OK'
>>> res.headerlist
[('Content-Type', 'text/html; charset=UTF-8'), ('Content-Length', '0')]
>>> res.body
''
В процессе выполнения программы ответ можно переопределить:
>>> res.status = 404
>>> res.status
'404 Not Found'
>>> res.status_code
404
>>> res.headerlist = [('Content-type', 'text/html')]
>>> res.body = b'test'
>>> print res
404 Not Found
Content-type: text/html
Content-Length: 4
test
>>> res.body = u"test"
Traceback (most recent call last):
...
TypeError: You cannot set Response.body to a unicode object (use Response.text)
>>> res.text = u"test"
Traceback (most recent call last):
...
AttributeError: You cannot access Response.text unless charset is set
>>> res.charset = 'utf8'
>>> res.text = u"test"
>>> res.body
'test'
Также можно задать значения передав их в конструктор, например
Response(charset='utf8')
.
>>> from webob import Response
>>> resp = Response(body=b'Hello World!')
>>> resp.content_type
'text/html'
>>> resp.content_type = 'text/plain'
>>> print resp
200 OK
Content-Length: 12
Content-Type: text/plain; charset=UTF-8
Hello World!
get_response
генерирует HTTP ответ.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | from webob import Request, Response
def wsgi_app(environ, start_response):
response = Response()
response.content_type = 'text/plain'
parts = []
for name, value in sorted(environ.items()):
parts.append('%s: %r' % (name, value))
response.body = str.encode(
'\n'.join(parts)
)
return response(environ, start_response)
req = Request.blank('/test')
print(req.call_application(wsgi_app)) # WSGI-application response
print()
print(req.get_response(wsgi_app)) # HTTP response |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | ('200 OK', [('Content-Type', 'text/plain; charset=UTF-8'), ('Content-Length', '411')], [b"HTTP_HOST: 'localhost:80'\nPATH_INFO: '/test'\nQUERY_STRING: ''\nREQUEST_METHOD: 'GET'\nSCRIPT_NAME: ''\nSERVER_NAME: 'localhost'\nSERVER_PORT: '80'\nSERVER_PROTOCOL: 'HTTP/1.0'\nwsgi.errors: <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>\nwsgi.input: <_io.BytesIO object at 0x7f692e219048>\nwsgi.multiprocess: False\nwsgi.multithread: False\nwsgi.run_once: False\nwsgi.url_scheme: 'http'\nwsgi.version: (1, 0)"])
200 OK
Content-Type: text/plain; charset=UTF-8
Content-Length: 411
HTTP_HOST: 'localhost:80'
PATH_INFO: '/test'
QUERY_STRING: ''
REQUEST_METHOD: 'GET'
SCRIPT_NAME: ''
SERVER_NAME: 'localhost'
SERVER_PORT: '80'
SERVER_PROTOCOL: 'HTTP/1.0'
wsgi.errors: <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>
wsgi.input: <_io.BytesIO object at 0x7f692e219048>
wsgi.multiprocess: False
wsgi.multithread: False
wsgi.run_once: False
wsgi.url_scheme: 'http'
wsgi.version: (1, 0) |
>>> from webob.exc import *
>>> exc = HTTPTemporaryRedirect(location='foo')
>>> req = Request.blank('/path/to/something')
>>> print str(req.get_response(exc)).strip()
307 Temporary Redirect
Location: http://localhost/path/to/foo
Content-Length: 126
Content-Type: text/plain; charset=UTF-8
307 Temporary Redirect
The resource has been moved to http://localhost/path/to/foo; you should be redirected automatically.
Добавим декоратор wsgify
, который будет делать для каждого «вида» всю
WSGI-магию и добавлять объект request
.
1 2 3 4 5 6 7 8 | def wsgify(view):
from webob import Request
def wrapped(environ, start_response):
request = Request(environ)
app = view(request).response()
return app(environ, start_response)
return wrapped |
В самих представлениях request
передается как параметр конструктора, а
ответ реализуется в виде метода класса response
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | @wsgify
class BlogIndex(object):
def __init__(self, request):
self.page = request.GET.get('page', '1')
from paginate import Page
self.paged_articles = Page(
ARTICLES,
page=self.page,
items_per_page=8,
)
def response(self):
from webob import Response
return Response(env.get_template('index.html')
.render(articles=self.paged_articles)
.encode('utf-8')) |
1 2 3 | @wsgify
class BlogIndex(object):
... |
Метод response
должен возвращать WSGI-приложение. В нашем случае это объект
класса Response
из библиотеки webob
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | @wsgify
class BlogIndex(object):
def __init__(self, request):
self.page = request.GET.get('page', '1')
from paginate import Page
self.paged_articles = Page(
ARTICLES,
page=self.page,
items_per_page=8,
)
def response(self):
from webob import Response
return Response(env.get_template('index.html')
.render(articles=self.paged_articles)
.encode('utf-8')) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | @wsgify
class BlogCreate(object):
def __init__(self, request):
self.request = request
def response(self):
from webob import Response
if self.request.method == 'POST':
max_id = max([art['id'] for art in ARTICLES])
ARTICLES.append(
{'id': max_id+1,
'title': self.request.POST['title'],
'content': self.request.POST['content']
}
)
return Response(status=302, location='/')
return Response(env.get_template('create.html').render(article=None)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | @wsgify
class BlogCreate(object):
def __init__(self, request):
self.request = request
def response(self):
from webob import Response
if self.request.method == 'POST':
max_id = max([art['id'] for art in ARTICLES])
ARTICLES.append(
{'id': max_id+1,
'title': self.request.POST['title'],
'content': self.request.POST['content']
}
)
return Response(status=302, location='/')
return Response(env.get_template('create.html').render(article=None)) |
1 2 3 | @wsgify
class BlogCreate(object):
... |
1 2 3 4 5 6 7 8 9 | class BaseArticle(object):
def __init__(self, request):
self.request = request
article_id = self.request.environ['wsgiorg.routing_args'][1]['id']
(self.index,
self.article) = next(((i, art) for i, art in enumerate(ARTICLES)
if art['id'] == int(article_id)),
(None, None)) |
1 2 3 4 5 6 7 8 9 10 11 12 | class BlogRead(BaseArticle):
def __iter__(self):
if not self.article:
self.start('404 Not Found', [('content-type', 'text/plain')])
yield b'not found'
return
self.start('200 OK', [('Content-Type', 'text/html')])
yield str.encode(
env.get_template('read.html').render(article=self.article)
) |
1 2 3 4 5 6 7 8 9 | @wsgify
class BlogRead(BaseArticle):
def response(self):
from webob import Response
if not self.article:
return Response(status=404)
return Response(env.get_template('read.html')
.render(article=self.article)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | class BlogUpdate(BaseArticle):
def __iter__(self):
if self.environ['REQUEST_METHOD'].upper() == 'POST':
from urllib.parse import parse_qs
values = parse_qs(self.environ['wsgi.input'].read())
self.article['title'] = values[b'title'].pop().decode()
self.article['content'] = values[b'content'].pop().decode()
self.start('302 Found',
[('Content-Type', 'text/html'),
('Location', '/')])
return
self.start('200 OK', [('Content-Type', 'text/html')])
yield str.encode(
env.get_template('create.html').render(article=self.article)
) |
1 2 3 4 5 6 7 8 9 10 11 | @wsgify
class BlogUpdate(BaseArticle):
def response(self):
from webob import Response
if self.request.method == 'POST':
self.article['title'] = self.request.POST['title']
self.article['content'] = self.request.POST['content']
return Response(status=302, location='/')
return Response(env.get_template('create.html')
.render(article=self.article)) |
1 2 3 4 5 6 7 8 | class BlogDelete(BaseArticle):
def __iter__(self):
self.start('302 Found', # '301 Moved Permanently',
[('Content-Type', 'text/html'),
('Location', '/')])
ARTICLES.pop(self.index)
yield b'' |
1 2 3 4 5 6 7 | @wsgify
class BlogDelete(BaseArticle):
def response(self):
from webob import Response
ARTICLES.pop(self.index)
return Response(status=302, location='/') |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | from models import ARTICLES
from jinja2 import Environment, FileSystemLoader
env = Environment(loader=FileSystemLoader('templates'))
def wsgify(view):
from webob import Request
def wrapped(environ, start_response):
request = Request(environ)
app = view(request).response()
return app(environ, start_response)
return wrapped
class BaseArticle(object):
def __init__(self, request):
self.request = request
article_id = self.request.environ['wsgiorg.routing_args'][1]['id']
(self.index,
self.article) = next(((i, art) for i, art in enumerate(ARTICLES)
if art['id'] == int(article_id)),
(None, None))
@wsgify
class BlogIndex(object):
def __init__(self, request):
self.page = request.GET.get('page', '1')
from paginate import Page
self.paged_articles = Page(
ARTICLES,
page=self.page,
items_per_page=8,
)
def response(self):
from webob import Response
return Response(env.get_template('index.html')
.render(articles=self.paged_articles)
.encode('utf-8'))
@wsgify
class BlogCreate(object):
def __init__(self, request):
self.request = request
def response(self):
from webob import Response
if self.request.method == 'POST':
max_id = max([art['id'] for art in ARTICLES])
ARTICLES.append(
{'id': max_id+1,
'title': self.request.POST['title'],
'content': self.request.POST['content']
}
)
return Response(status=302, location='/')
return Response(env.get_template('create.html').render(article=None))
@wsgify
class BlogRead(BaseArticle):
def response(self):
from webob import Response
if not self.article:
return Response(status=404)
return Response(env.get_template('read.html')
.render(article=self.article))
@wsgify
class BlogUpdate(BaseArticle):
def response(self):
from webob import Response
if self.request.method == 'POST':
self.article['title'] = self.request.POST['title']
self.article['content'] = self.request.POST['content']
return Response(status=302, location='/')
return Response(env.get_template('create.html')
.render(article=self.article))
@wsgify
class BlogDelete(BaseArticle):
def response(self):
from webob import Response
ARTICLES.pop(self.index)
return Response(status=302, location='/') |