在我的《python高级编程和异步io编程》中我讲解了socket编程,这里贴一段用socket实现聊天室的功能的源码,因为最近工作比较忙,后期我会将这里的代码细节分析出来,目前先把代码贴上。大家自行研究运行一下。
server端:
"""
server select
"""
import sys
import time
import socket
import select
import logging
from queue import Queue
import queue
g_select_timeout = 10
class Server(object):
def __init__(self, host='0.0.0.0', port=3333, timeout=2, client_nums=10):
self.__host = host
self.__port = port
self.__timeout = timeout
self.__client_nums = client_nums
self.__buffer_size = 1024
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setblocking(False)
self.server.settimeout(self.__timeout)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # keepalive
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 端口复用
server_host = (self.__host, self.__port)
try:
self.server.bind(server_host)
self.server.listen(self.__client_nums)
except:
raise
self.inputs = [self.server] # select 接收文件描述符列表
self.outputs = [] # 输出文件描述符列表
self.message_queues = {} # 消息队列
self.client_info = {}
def run(self):
while True:
readable, writable, exceptional = select.select(self.inputs, self.outputs, self.inputs, g_select_timeout)
if not (readable or writable or exceptional):
continue
for s in readable:
if s is self.server: # 是客户端连接
connection, client_address = s.accept()
# print "connection", connection
print("%s connect." % str(client_address))
connection.setblocking(0) # 非阻塞
self.inputs.append(connection) # 客户端添加到inputs
self.client_info[connection] = str(client_address)
self.message_queues[connection] = Queue() # 每个客户端一个消息队列
else: # 是client, 数据发送过来
try:
data = s.recv(self.__buffer_size)
except:
err_msg = "Client Error!"
logging.error(err_msg)
if data:
# print data
data = "%s %s say: %s" % (time.strftime("%Y-%m-%d %H:%M:%S"), self.client_info[s], data)
self.message_queues[s].put(data) # 队列添加消息
if s not in self.outputs: # 要回复消息
self.outputs.append(s)
else: # 客户端断开
# Interpret empty result as closed connection
print
"Client:%s Close." % str(self.client_info[s])
if s in self.outputs:
self.outputs.remove(s)
self.inputs.remove(s)
s.close()delself.message_queues[s]delself.client_info[s]for s in writable:# outputs 有消息就要发出去了try:
next_msg =self.message_queues[s].get_nowait()# 非阻塞获取except queue.Empty:
err_msg ="Output Queue is Empty!"# g_logFd.writeFormatMsg(g_logFd.LEVEL_INFO, err_msg)self.outputs.remove(s)exceptExceptionas e:# 发送的时候客户端关闭了则会出现writable和readable同时有数据,会出现message_queues的keyerror
err_msg ="Send Data Error! ErrMsg:%s"% str(e)
logging.error(err_msg)if s inself.outputs:self.outputs.remove(s)else:for cli inself.client_info:# 发送给其他客户端if cli isnot s:try:
cli.sendall(next_msg.encode("utf8"))exceptExceptionas e:# 发送失败就关掉
err_msg ="Send Data to %s Error! ErrMsg:%s"%(str(self.client_info[cli]), str(e))
logging.error(err_msg)print"Client: %s Close Error."% str(self.client_info[cli])if cli inself.inputs:self.inputs.remove(cli)
cli.close()if cli inself.outputs:self.outputs.remove(s)if cli inself.message_queues:delself.message_queues[s]delself.client_info[cli]for s in exceptional:
logging.error("Client:%s Close Error."% str(self.client_info[cli]))if s inself.inputs:self.inputs.remove(s)
s.close()if s inself.outputs:self.outputs.remove(s)if s inself.message_queues:delself.message_queues[s]delself.client_info[s]if"__main__"== __name__:Server().run()
client端
#!/usr/local/bin/python
# *-* coding:utf-8 -*-
"""
client.py
"""
import sys
import time
import socket
import threading
class Client(object):
def __init__(self, host, port=3333, timeout=1, reconnect=2):
self.__host = host
self.__port = port
self.__timeout = timeout
self.__buffer_size = 1024
self.__flag = 1
self.client = None
self.__lock = threading.Lock()
@property
def flag(self):
return self.__flag
@flag.setter
def flag(self, new_num):
self.__flag = new_num
def __connect(self):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.bind(('0.0.0.0', 12345,))
client.setblocking(True)
client.settimeout(self.__timeout)
client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 端口复用
server_host = (self.__host, self.__port)
try:
client.connect(server_host)
except:
raise
return client
def send_msg(self):
if not self.client:
return
while True:
time.sleep(0.1)
# data = raw_input()
data = sys.stdin.readline().strip()
if "exit" == data.lower():
with self.__lock:
self.flag = 0
break
self.client.sendall(data.encode("utf8"))
return
def recv_msg(self):
if not self.client:
return
while True:
data = None
with self.__lock:
if not self.flag:
print('ByeBye~~')
break
try:
data = self.client.recv(self.__buffer_size)
except socket.timeout:
continue
except:
raise
if data:
print("%s\n" % data)
time.sleep(0.1)
return
def run(self):
self.client = self.__connect()
send_proc = threading.Thread(target=self.send_msg)
recv_proc = threading.Thread(target=self.recv_msg)
recv_proc.start()
send_proc.start()
recv_proc.join()
send_proc.join()
self.client.close()
if "__main__" == __name__:
Client('localhost').run()
运行方式:电动叉车公司
启动server
python server.py启动client1
python client.py- 启动client2
python client.py
在client1的console中输入任何字符串,client2中立马就可以收到