工作过程,参考python官方手册实现了一个socket长连接的客户端,使用了asyncio库中的streamreader,实现过程中着实费了一番功夫,虽然工作交差了,但感觉不好,还是想看看网上有没有学习视频,在B站找到这个,认为A. Jesse Jiryu Davis这位python大神的视频讲解,极为通俗易懂,

协程学习

copy视频例子学习阶段

服务端

也不知道视频中怎么实现,偷懒用flask实现一个超级简单的server端

from flask import Flask
app = Flask(__name__)

@app.route("/")
def hello():
    return "hello haha"


@app.route("/foo")
def foo():
    return "=foo==" * 100


@app.route("/bar")
def bar():
    return "=bar==" * 100

if __name__ == "__main__":
    app.run()

执行方法:python3 flask_server.py
执行效果:

ubuntu@VM-0-13-ubuntu:~$ python3 flask_server.py
 * Serving Flask app "flask_server" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [28/Nov/2020 20:08:25] "GET /foo HTTP/1.0" 200 -

socket客户端(v1.0)

知识点:selector,非阻塞socket

from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import socket
import time

selector = DefaultSelector()

def get(path):
    s = socket.socket()
    s.setblocking(False)
    try:
        s.connect(('127.0.0.1',5000))
    except BlockingIOError:
        pass
    request = 'GET %s HTTP/1.0\r\n\r\n' % path

    #可写
    selector.register(s.fileno(), EVENT_WRITE)
    selector.select()
    selector.unregister(s.fileno())

    s.send(request.encode())

    chunks = []
    while True:
        #可读
        selector.register(s.fileno(), EVENT_READ)
        selector.select()
        selector.unregister(s.fileno())
        chunk = s.recv(10)
        if chunk:
            print("recive chunk:%s" % chunk)
            chunks.append(chunk)
        else:
            body = (b''.join(chunks)).decode()
            print(body.split('\n')[0])
            return

start = time.time()
for i in range(10):
    get('/foo')
    get('/bar')
print('took %.2f sec' % (time.time()-start))    

执行效果

...
recive chunk:b'ar===bar=='
HTTP/1.0 200 OK
took 0.11 sec

socket客户端(v1.1)

知识点:回调函数 callback

from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import socket
import time

selector = DefaultSelector()
n_tasks = 0

def get(path):
    global n_tasks
    s = socket.socket()
    s.setblocking(False)
    try:
        s.connect(('127.0.0.1',5000))
    except BlockingIOError:
        pass
    request = 'GET %s HTTP/1.0\r\n\r\n' % path
    callback = lambda: connected(s,request)
    selector.register(s.fileno(),EVENT_WRITE,data=callback)
    n_tasks +=1

def connected(s,request):~~~~
    selector.unregister(s.fileno())
    # s可写
    s.send(request.encode())

    chunks = []
    callback = lambda :readable(s,chunks)
    selector.register(s.fileno(), EVENT_READ,data=callback)

def readable(s,chunks):
    global n_tasks
    selector.unregister(s.fileno())
    chunk = s.recv(100)
    if chunk:
        print("recive chunk:%s" % chunk)
        chunks.append(chunk)
        callback = lambda: readable(s, chunks)
        selector.register(s.fileno(), EVENT_READ,data=callback)
        n_tasks -= 1
    else:
        body = (b''.join(chunks)).decode()
        print(body.split('\n')[0])
        return

start = time.time()

for i in range(10)
    get('/foo')
    get('/bar')


while n_tasks:
    events = selector.select()
    for event,mask in events:
        cb = event.data
        cb()
print('took %.2f sec' % (time.time()-start))    

socket客户端(v1.2)

知识点:future对象

from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import socket
import time

selector = DefaultSelector()
n_tasks = 0

class Future:
    def __init__(self):
        self.callbacks = []

    def resolve(self):
        for c in self.callbacks:
            c()

def get(path):
    global n_tasks
    s = socket.socket()
    s.setblocking(False)
    try:
        s.connect(('127.0.0.1',5000))
    except BlockingIOError:
        pass
    request = 'GET %s HTTP/1.0\r\n\r\n' % path
    callback = lambda: connected(s,request)
    fut = Future()
    fut.callbacks.append(callback)
    selector.register(s.fileno(),EVENT_WRITE,data=fut)
    n_tasks +=1

def connected(s,request):
    selector.unregister(s.fileno())
    # s可写
    s.send(request.encode())

    chunks = []
    callback = lambda :readable(s,chunks)
    fut = Future()
    fut.callbacks.append(callback)
    selector.register(s.fileno(), EVENT_READ,data=fut)

def readable(s,chunks):
    global n_tasks
    selector.unregister(s.fileno())
    chunk = s.recv(100)
    if chunk:
        print("recive chunk:%s" % chunk)
        chunks.append(chunk)
        callback = lambda: readable(s, chunks)
        fut = Future()
        fut.callbacks.append(callback)
        selector.register(s.fileno(), EVENT_READ,data=fut)
        n_tasks -= 1
    else:
        body = (b''.join(chunks)).decode()
        print(body.split('\n')[0])
        return

start = time.time()
for i in range(10)
    get('/foo')
    get('/bar')


while n_tasks:
    events = selector.select()
    for event,mask in events:
        fut = event.data
        fut.resolve()
print('took %.2f sec' % (time.time()-start))    

socket客户端(v1.3)

知识点:Task,协程

from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import socket
import time

selector = DefaultSelector()
n_tasks = 0

class Future:
    def __init__(self):
        self.callbacks = []

    def resolve(self):
        for c in self.callbacks:
            c()

class Task:
    def __init__(self,gen):
        self.gen=gen
        self.step()

    def step(self):
        try:
            f = next(self.gen)
        except StopIteration:
            return
        f.callbacks.append(self.step)

def get(path):
    global n_tasks
    n_tasks +=1
    s = socket.socket()
    s.setblocking(False)
    try:
        s.connect(('127.0.0.1',5000))
    except BlockingIOError:
        pass
    request = 'GET %s HTTP/1.0\r\n\r\n' % path
    fut = Future()
    selector.register(s.fileno(), EVENT_WRITE,data=fut)
    yield fut
    selector.unregister(s.fileno())
    s.send(request.encode())

    chunks = []
    while True:
        fut = Future()
        selector.register(s.fileno(), EVENT_READ,data=fut)
        yield fut
        selector.unregister(s.fileno())
        chunk = s.recv(100)
        if chunk:
            print("recive chunk:%s" % chunk)
            chunks.append(chunk)
        else:
            body = (b''.join(chunks)).decode()
            print(body.split('\n')[0])
            n_tasks-=1

start = time.time()

for i in range(10)
    Task(get('/foo'))
    Task(get('/bar'))


while n_tasks:
    events = selector.select()
    for event,mask in events:
        fut = event.data
        fut.resolve()
print('took %.2f sec' % (time.time()-start))    

ECHO小例子,server,client

server端:

import socket
import asyncio
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

class Server(object):
    MAX_BUF = 1024
    def __init__(self, port):
        self._client = []
        self._loop = asyncio.new_event_loop()
        self._selector = DefaultSelector()
        self._sock = self.socket_init()

    def socket_init(self):
        sock = socket.socket()
        sock.bind(('127.0.0.1',5000))
        sock.listen(Server.MAX_BUF)
        self._selector.register(sock,EVENT_READ,self.accept_client)
        return sock

    def accept_client(self,sock,mask):
        conn, addr = sock.accept()
        print(f"recv a conn:{conn} from {addr}")
        self._client.append(conn)
        print(f"now has client:{self._client}")
        conn.setblocking(False)
        self._selector.register(conn, EVENT_READ, self.read_from_client)

    def read_from_client(self, conn, mask):
        data = conn.recv(Server.MAX_BUF)
        if data:
            print(f"recv data from client:{data}")
            conn.send(data) # echo first todo
            import time
            time.sleep(10)
        else:
            self._selector.unregister(conn)
            conn.close()


    def run_forever(self):
        while True:
            events = self._selector.select()
            for event,mask in events:
                cb = event.data
                cb(event.fileobj,mask)

if __name__ == '__main__':
    ser = Server(5000)
    ser.run_forever()
             

client:

import asyncio
from concurrent.futures import TimeoutError

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 5000)
    print(f'Send: {message!r}')
    writer.write(message.encode())
    while not reader.at_eof():
        print(type(reader))
        print(reader)
        print(type(writer))
        print(writer)
        try:
            writer.write(message.encode())
            data=await asyncio.wait_for(reader.read(1), 2)
            print(f'Received: {data.decode()!r}')
        except TimeoutError:
            import traceback
            print("%s"% traceback.format_exc())

asyncio.get_event_loop().run_until_complete(tcp_echo_client('Hello World!'))

参考

- [1] [一个使用 asyncio 协程的网络爬虫](https://linux.cn/article-8265-1.html)
- [2] [Python协程工作原理](https://www.bilibili.com/video/BV1N4411S7BP?t=32&p=2)
03-05 15:58