工作过程,参考python官方手册实现了一个socket长连接的客户端,使用了asyncio库中的streamreader,实现过程中着实费了一番功夫,虽然工作交差了,但感觉不好,还是想看看网上有没有学习视频,在B站找到这个,认为A. Jesse Jiryu Davis这位python大神的视频讲解,极为通俗易懂,
协程学习
copy视频例子学习阶段
服务端
也不知道视频中怎么实现,偷懒用flask实现一个超级简单的server端
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return "hello haha"
@app.route("/foo")
def foo():
return "=foo==" * 100
@app.route("/bar")
def bar():
return "=bar==" * 100
if __name__ == "__main__":
app.run()
执行方法:python3 flask_server.py
执行效果:
ubuntu@VM-0-13-ubuntu:~$ python3 flask_server.py
* Serving Flask app "flask_server" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [28/Nov/2020 20:08:25] "GET /foo HTTP/1.0" 200 -
socket客户端(v1.0)
知识点:selector,非阻塞socket
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import socket
import time
selector = DefaultSelector()
def get(path):
s = socket.socket()
s.setblocking(False)
try:
s.connect(('127.0.0.1',5000))
except BlockingIOError:
pass
request = 'GET %s HTTP/1.0\r\n\r\n' % path
#可写
selector.register(s.fileno(), EVENT_WRITE)
selector.select()
selector.unregister(s.fileno())
s.send(request.encode())
chunks = []
while True:
#可读
selector.register(s.fileno(), EVENT_READ)
selector.select()
selector.unregister(s.fileno())
chunk = s.recv(10)
if chunk:
print("recive chunk:%s" % chunk)
chunks.append(chunk)
else:
body = (b''.join(chunks)).decode()
print(body.split('\n')[0])
return
start = time.time()
for i in range(10):
get('/foo')
get('/bar')
print('took %.2f sec' % (time.time()-start))
执行效果
...
recive chunk:b'ar===bar=='
HTTP/1.0 200 OK
took 0.11 sec
socket客户端(v1.1)
知识点:回调函数 callback
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import socket
import time
selector = DefaultSelector()
n_tasks = 0
def get(path):
global n_tasks
s = socket.socket()
s.setblocking(False)
try:
s.connect(('127.0.0.1',5000))
except BlockingIOError:
pass
request = 'GET %s HTTP/1.0\r\n\r\n' % path
callback = lambda: connected(s,request)
selector.register(s.fileno(),EVENT_WRITE,data=callback)
n_tasks +=1
def connected(s,request):~~~~
selector.unregister(s.fileno())
# s可写
s.send(request.encode())
chunks = []
callback = lambda :readable(s,chunks)
selector.register(s.fileno(), EVENT_READ,data=callback)
def readable(s,chunks):
global n_tasks
selector.unregister(s.fileno())
chunk = s.recv(100)
if chunk:
print("recive chunk:%s" % chunk)
chunks.append(chunk)
callback = lambda: readable(s, chunks)
selector.register(s.fileno(), EVENT_READ,data=callback)
n_tasks -= 1
else:
body = (b''.join(chunks)).decode()
print(body.split('\n')[0])
return
start = time.time()
for i in range(10)
get('/foo')
get('/bar')
while n_tasks:
events = selector.select()
for event,mask in events:
cb = event.data
cb()
print('took %.2f sec' % (time.time()-start))
socket客户端(v1.2)
知识点:future对象
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import socket
import time
selector = DefaultSelector()
n_tasks = 0
class Future:
def __init__(self):
self.callbacks = []
def resolve(self):
for c in self.callbacks:
c()
def get(path):
global n_tasks
s = socket.socket()
s.setblocking(False)
try:
s.connect(('127.0.0.1',5000))
except BlockingIOError:
pass
request = 'GET %s HTTP/1.0\r\n\r\n' % path
callback = lambda: connected(s,request)
fut = Future()
fut.callbacks.append(callback)
selector.register(s.fileno(),EVENT_WRITE,data=fut)
n_tasks +=1
def connected(s,request):
selector.unregister(s.fileno())
# s可写
s.send(request.encode())
chunks = []
callback = lambda :readable(s,chunks)
fut = Future()
fut.callbacks.append(callback)
selector.register(s.fileno(), EVENT_READ,data=fut)
def readable(s,chunks):
global n_tasks
selector.unregister(s.fileno())
chunk = s.recv(100)
if chunk:
print("recive chunk:%s" % chunk)
chunks.append(chunk)
callback = lambda: readable(s, chunks)
fut = Future()
fut.callbacks.append(callback)
selector.register(s.fileno(), EVENT_READ,data=fut)
n_tasks -= 1
else:
body = (b''.join(chunks)).decode()
print(body.split('\n')[0])
return
start = time.time()
for i in range(10)
get('/foo')
get('/bar')
while n_tasks:
events = selector.select()
for event,mask in events:
fut = event.data
fut.resolve()
print('took %.2f sec' % (time.time()-start))
socket客户端(v1.3)
知识点:Task,协程
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import socket
import time
selector = DefaultSelector()
n_tasks = 0
class Future:
def __init__(self):
self.callbacks = []
def resolve(self):
for c in self.callbacks:
c()
class Task:
def __init__(self,gen):
self.gen=gen
self.step()
def step(self):
try:
f = next(self.gen)
except StopIteration:
return
f.callbacks.append(self.step)
def get(path):
global n_tasks
n_tasks +=1
s = socket.socket()
s.setblocking(False)
try:
s.connect(('127.0.0.1',5000))
except BlockingIOError:
pass
request = 'GET %s HTTP/1.0\r\n\r\n' % path
fut = Future()
selector.register(s.fileno(), EVENT_WRITE,data=fut)
yield fut
selector.unregister(s.fileno())
s.send(request.encode())
chunks = []
while True:
fut = Future()
selector.register(s.fileno(), EVENT_READ,data=fut)
yield fut
selector.unregister(s.fileno())
chunk = s.recv(100)
if chunk:
print("recive chunk:%s" % chunk)
chunks.append(chunk)
else:
body = (b''.join(chunks)).decode()
print(body.split('\n')[0])
n_tasks-=1
start = time.time()
for i in range(10)
Task(get('/foo'))
Task(get('/bar'))
while n_tasks:
events = selector.select()
for event,mask in events:
fut = event.data
fut.resolve()
print('took %.2f sec' % (time.time()-start))
ECHO小例子,server,client
server端:
import socket
import asyncio
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
class Server(object):
MAX_BUF = 1024
def __init__(self, port):
self._client = []
self._loop = asyncio.new_event_loop()
self._selector = DefaultSelector()
self._sock = self.socket_init()
def socket_init(self):
sock = socket.socket()
sock.bind(('127.0.0.1',5000))
sock.listen(Server.MAX_BUF)
self._selector.register(sock,EVENT_READ,self.accept_client)
return sock
def accept_client(self,sock,mask):
conn, addr = sock.accept()
print(f"recv a conn:{conn} from {addr}")
self._client.append(conn)
print(f"now has client:{self._client}")
conn.setblocking(False)
self._selector.register(conn, EVENT_READ, self.read_from_client)
def read_from_client(self, conn, mask):
data = conn.recv(Server.MAX_BUF)
if data:
print(f"recv data from client:{data}")
conn.send(data) # echo first todo
import time
time.sleep(10)
else:
self._selector.unregister(conn)
conn.close()
def run_forever(self):
while True:
events = self._selector.select()
for event,mask in events:
cb = event.data
cb(event.fileobj,mask)
if __name__ == '__main__':
ser = Server(5000)
ser.run_forever()
client:
import asyncio
from concurrent.futures import TimeoutError
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 5000)
print(f'Send: {message!r}')
writer.write(message.encode())
while not reader.at_eof():
print(type(reader))
print(reader)
print(type(writer))
print(writer)
try:
writer.write(message.encode())
data=await asyncio.wait_for(reader.read(1), 2)
print(f'Received: {data.decode()!r}')
except TimeoutError:
import traceback
print("%s"% traceback.format_exc())
asyncio.get_event_loop().run_until_complete(tcp_echo_client('Hello World!'))
参考
- [1] [一个使用 asyncio 协程的网络爬虫](https://linux.cn/article-8265-1.html)
- [2] [Python协程工作原理](https://www.bilibili.com/video/BV1N4411S7BP?t=32&p=2)