1,通过docker部署sanic项目
通过Docker和Docker Compose部署SANIC应用程序是一项很容易实现的任务,下面的示例提供了示例simple_server.py的部署
FROM python:3.5
MAINTAINER Channel Cat <[email protected]> ADD . /code
RUN pip3 install git+https://github.com/channelcat/sanic EXPOSE 8000 WORKDIR /code CMD ["python", "simple_server.py"]
version: '2'
services:
sanic:
build: .
ports:
- "8000:8000"
2,监控和错误处理
Sanic通过sanic.handlers.errorhandler提供全局异常处理程序的可扩展的最低限度的实现。此示例演示如何扩展它以启用某些自定义行为
"""
使用SANIC的错误处理程序框架截取未捕获异常的示例。
这可能对希望使用哨兵、气闸等的开发者有用。
或者自定义系统来记录和监视生产中的意外错误。
首先,我们创建自己的类继承自sanic.exceptions中的处理程序,
当我们创建我们的理智的实例时,传递它的一个实例。在这里面
类的默认处理程序,我们可以执行任何操作,包括将异常发送到
外部服务
"""
from sanic.handlers import ErrorHandler
from sanic.exceptions import SanicException
"""
与CustomHandler类相关的导入和代码 (通常情况下,这将在单独的文件中)
""" class CustomHandler(ErrorHandler): def default(self, request, exception):
# 在这里,我们可以访问异常对象
# 并且可以用它做任何事情(日志、发送到外部服务等)
if not isinstance(exception, SanicException):
print(exception)
# 那么,我们必须通过返回来完成异常处理
# 我们对客户的回应
# 为此,我们可以调用超级类的默认处理程序
return super().default(request, exception) """
这是一个普通的sanic服务器,除了
服务器的错误处理程序到CustomHandler的一个实例
""" from sanic import Sanic app = Sanic(__name__) handler = CustomHandler()
app.error_handler = handler @app.route("/")
async def test(request):
# 这里,发生了一些导致意外异常的事情
# 这个异常将流向我们的自定义处理程序。
raise SanicException("You Broke It!") if __name__ == '__main__':
app.run(host="0.0.0.0", port=True)
3,使用外部服务提供监控
import logging
import socket
from os import getenv
from platform import node
from uuid import getnode as get_mac from logdna import LogDNAHandler
from sanic import Sanic
from sanic.response import json
from sanic.request import Request log = logging.getLogger("logdna")
log.setLevel(logging.INFO) def get_my_ip_address(remote_server="google".com):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect((remote_server, 80))
return s.getsockname()[0] def get_mac_address():
h = iter(hex(get_mac())[2:].zfill(12))
return ":".join(i + next(h) for i in h) logdna_options = {
"app": __name__,
"index_meta": True,
"hostname": node(),
"ip": get_my_ip_address(),
"mac": get_mac_address()
} logdna_handler = LogDNAHandler(getenv("LOGDNA_API_KEY"), options=logdna_options) logdna = logging.getLogger(__name__)
logdna.setLevel(logging.INFO)
logdna.addHandler(logdna_handler) app = Sanic(__name__) @app.middleware
def log_request(request: Request):
logdna.info("I was Here with a new Request to URL: {}".format(request.url)) @app.route("/")
def default(request):
return json({
"response": "I was here"
}) if __name__ == "__main__":
app.run(
host="0.0.0.0",
port=getenv("PORT", 8080)
)
4,RayGun
from os import getenv from raygun4py.raygunprovider import RaygunSender from sanic import Sanic
from sanic.exceptions import SanicException
from sanic.handlers import ErrorHandler class RaygunExceptionReporter(ErrorHandler): def __init__(self, raygun_api_key=None):
super().__init__()
if raygun_api_key is None:
raygun_api_key = getenv("RAYGUN_API_KEY") self.sender = RaygunSender(raygun_api_key) def default(self, request, exception):
self.sender.send_exception(exception=exception)
return super().default(request, exception) raygun_error_reporter = RaygunExceptionReporter()
app = Sanic(__name__, error_handler=raygun_error_reporter) @app.route("/raise")
async def test(request):
raise SanicException('You Broke It!') if __name__ == '__main__':
app.run(
host="0.0.0.0",
port=getenv("PORT", 8080)
)
5,Rollbar
import rollbar from sanic.handlers import ErrorHandler
from sanic import Sanic
from sanic.exceptions import SanicException
from os import getenv rollbar.init(getenv("ROLLBAR_API_KEY")) class RollbarExceptionHandler(ErrorHandler): def default(self, request, exception):
rollbar.report_message(str(exception))
return super().default(request, exception) app = Sanic(__name__, error_handler=RollbarExceptionHandler()) @app.route("/raise")
def create_error(request):
raise SanicException("I was here and I don't like where I am") if __name__ == "__main__":
app.run(
host="0.0.0.0",
port=getenv("PORT", 8080)
)
6,Sentry
from os import getenv from sentry_sdk import init as sentry_init
from sentry_sdk.integrations.sanic import SanicIntegration from sanic import Sanic
from sanic.response import json sentry_init(
dsn=getenv("SENTRY_DSN"),
integrations=[SanicIntegration()],
) app = Sanic(__name__) # noinspection PyUnusedLocal
@app.route("/working")
async def working_path(request):
return json({
"response": "Working API Response"
}) # noinspection PyUnusedLocal
@app.route("/raise-error")
async def raise_error(request):
raise Exception("Testing Sentry Integration") if __name__ == '__main__':
app.run(
host="0.0.0.0",
port=getenv("PORT", 8080)
)
7,安全
下面的示例代码展示一个简单的基于认证和授权机制的装饰器,且在你的sanicAPI端设置安全
# -*- coding: utf-8 -*- from sanic import Sanic
from functools import wraps
from sanic.response import json app = Sanic() def check_request_for_authorization_status(request):
# 注意:定义检查,例如cookie、会话.
flag = True
return flag def authorized():
def decorator(f):
@wraps(f)
async def decorated_function(request, *args, **kwargs):
# 运行检查请求的方法
# 客户授权状态
is_authorized = check_request_for_authorization_status(request) if is_authorized:
# 用户被授权.
# 运行处理程序方法并返回响应
response = await f(request, *args, **kwargs)
return response
else:
#用户未经授权
return json({'status': 'not_authorized'}, 403)
return decorated_function
return decorator @app.route("/")
@authorized()
async def test(request):
return json({'status': 'authorized'}) if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
8,sanic的Websocket(提供了一种增加路由的方法)
<!DOCTYPE html>
<html>
<head>
<title>WebSocket demo</title>
</head>
<body>
<script>
var ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/feed'),
messages = document.createElement('ul');
ws.onmessage = function (event) {
var messages = document.getElementsByTagName('ul')[0],
message = document.createElement('li'),
content = document.createTextNode('Received: ' + event.data);
message.appendChild(content);
messages.appendChild(message);
};
document.body.appendChild(messages);
window.setInterval(function() {
data = 'bye!'
ws.send(data);
var messages = document.getElementsByTagName('ul')[0],
message = document.createElement('li'),
content = document.createTextNode('Sent: ' + data);
message.appendChild(content);
messages.appendChild(message);
}, 1000);
</script>
</body>
</html>
from sanic import Sanic
from sanic.response import file app = Sanic(__name__) @app.route('/')
async def index(request):
return await file('websocket.html') @app.websocket('/feed')
async def feed(request, ws):
while True:
data = 'hello!'
print('Sending: ' + data)
await ws.send(data)
data = await ws.recv()
print('Received: ' + data) if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000, debug=True)
9主机托管:
from sanic import response
from sanic import Sanic
from sanic.blueprints import Blueprint # Usage
# curl -H "Host: example.com" localhost:8000
# curl -H "Host: sub.example.com" localhost:8000
# curl -H "Host: bp.example.com" localhost:8000/question
# curl -H "Host: bp.example.com" localhost:8000/answer app = Sanic()
bp = Blueprint("bp", host="bp.example.com") @app.route('/', host=["example.com",
"somethingelse.com",
"therestofyourdomains.com"])
async def hello(request):
return response.text("Some defaults") @app.route('/', host="sub.example.com")
async def hello(request):
return response.text("42") @bp.route("/question")
async def hello(request):
return response.text("What is the meaning of life?") @bp.route("/answer")
async def hello(request):
return response.text("42") app.blueprint(bp) if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
10,支持并行测试运行的单元测试
下面的示例演示如何使用Pytest xdist插件提供的并行测试执行支持来启动和运行单元测试SANIC应用程序
"""
SANIC服务器的pytest xdist示例
安装测试工具:
$pip安装pytest pytest xdist
使用xdist参数运行:
$pytest示例/pytest_xdist.py-n 8 8名工人
"""
import re
from sanic import Sanic
from sanic.response import text
from sanic.testing import PORT as PORT_BASE, SanicTestClient
import pytest @pytest.fixture(scope="session")
def test_port(worker_id):
m = re.search(r'[0-9]+', worker_id)
if m:
num_id = m.group(0)
else:
num_id = 0
port = PORT_BASE + int(num_id)
return port @pytest.fixture(scope="session")
def app():
app = Sanic() @app.route('/')
async def index(request):
return text('OK') return app @pytest.fixture(scope="session")
def client(app, test_port):
return SanicTestClient(app, test_port) @pytest.mark.parametrize('run_id', range(100))
def test_index(client, run_id):
request, response = client._sanic_endpoint_test('get', '/')
assert response.status == 200
assert response.text == 'OK'
11,修改请求对象
Sanic中的请求对象是一种dict对象,这意味着request对象可以作为常规dict对象进行操作。
from sanic import Sanic
from sanic.response import text
from random import randint app = Sanic() @app.middleware('request')
def append_request(request):
# Add new key with random value
request['num'] = randint(0, 100) @app.get('/pop')
def pop_handler(request):
# Pop key from request object
num = request.pop('num')
return text(num) @app.get('/key_exist')
def key_exist_handler(request):
# Check the key is exist or not
if 'num' in request:
return text('num exist in request') return text('num does not exist in reqeust') app.run(host="0.0.0.0", port=8000, debug=True)