1. socketserver
服务端:
# socketserver Server
import socketserver
class MyServer(socketserver.BaseRequestHandler):
def handle(self):
while True:
conn = self.request
conn.send("Server ... ".encode())
msg = conn.recv(1024).decode()
print(msg)
conn.close()
server = socketserver.ThreadingTCPServer(("127.0.0.1",9000),MyServer)
server.serve_forever()
客户端:
# socketserver Client
import socket,os,time
sk = socket.socket()
sk.connect(("127.0.0.1",9000))
while True:
msg = sk.recv(1024).decode()
print(msg)
time.sleep(1)
msg2 = "process:%s"%(os.getpid())
sk.send(msg2.encode())
sk.close()
2. 进程池Pool
服务端:
# 进程池Pool Server
from multiprocessing import Pool
import socket,os
def func(conn,addr):
while True:
conn.send("Server ...".encode())
msg = conn.recv(1024).decode()
print("pid %s >>>:%s"%(os.getpid(),msg))
conn.close()
if __name__ == "__main__":
sk = socket.socket()
sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk.bind(("127.0.0.1",9000))
sk.listen()
p = Pool(5)
while True:
conn,addr = sk.accept()
p.apply_async(func,args=(conn,addr))
p.close()
p.join()
sk.close()
客户端:
# 进程池Pool Client
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",9000))
while True:
msg = sk.recv(1024).decode()
print(msg)
msg2 = input("Client >>>:")
sk.send(msg2.encode())
sk.close()
3. 进程池ProcessPoolExecutor
服务端:
# 进程池ProcessPoolExecutor Server
from concurrent.futures import ProcessPoolExecutor
import socket,os
def func(conn):
while True:
conn.send("server ...".encode())
msg = conn.recv(1024).decode()
print("pid %s >>>:%s"%(os.getpid(),msg))
if __name__ == "__main__":
sk = socket.socket()
sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk.bind(("127.0.0.1",9000))
sk.listen()
# 默认进程数 = 逻辑处理器个数
p = ProcessPoolExecutor(6)
while True:
conn,addr = sk.accept()
p.submit(func,conn)
p.shutdown()
sk.close()
客户端:
# 进程池ProcessPoolExecutor Client
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",9000))
while True:
msg = sk.recv(1024).decode()
print(msg)
msg2 = input("input:")
sk.send(msg2.encode())
sk.close()
4. 线程池ThreadPoolExecutor
服务端:
# 线程池ThreadPoolExecutor Server
from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import socket
def func(conn):
while True:
conn.send("server ... ".encode())
msg = conn.recv(1024).decode()
print("thread %s >>>:%s"%(currentThread().ident,msg))
if __name__ == "__main__":
sk = socket.socket()
sk.bind(("127.0.0.1",9000))
sk.listen()
# 默认最大线程数 = 逻辑处理器个数 * 5
tp = ThreadPoolExecutor(3)
while True:
conn,addr = sk.accept()
tp.submit(func,conn)
tp.shutdown()
sk.close()
客户端:
# 线程池ThreadPoolExecutor Client
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",9000))
while True:
msg = sk.recv(1024).decode()
print(msg)
msg2 = input("input:")
sk.send(msg2.encode())
sk.close()
5. 协程
服务端:
# 协程gevent Server
from gevent import monkey;monkey.patch_all()
import socket,gevent
def func(conn):
while True:
conn.send("server ...".encode())
msg = conn.recv(1024).decode()
print(">>>:%s"%(msg))
if __name__ == "__main__":
sk = socket.socket()
sk.bind(("127.0.0.1",9000))
sk.listen()
while True:
conn,addr = sk.accept()
g = gevent.spawn(func,conn)
sk.close()
客户端:
# 协程gevent Client
from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",9000))
def func():
while True:
msg = sk.recv(1024).decode()
print(msg)
msg2 = "thread %s"%(currentThread().ident)
sk.send(msg2.encode())
tp = ThreadPoolExecutor(3)
while True:
tp.submit(func)
tp.shutdown()
sk.close()