您好,欢迎访问三七文档
当前位置:首页 > 商业/管理/HR > 项目/工程管理 > python简单实现websocket
python简单实现websocket协议选择的是新的Hybi-10,参考文章如下:://blog.mycolorway.com/2011/11/22/a-minimal-python-websocket-server/代码如下:#-*-coding:utf8-*-importthreadingimporthashlibimportsocketimportbase64classwebsocket_thread(threading.Thread):def__init__(self,connection):super(websocket_thread,self).__init__()self.connection=connectiondefrun(self):print'newwebsocketclientjoined!'reply='igotu,fromwebsocketserver.'length=len(reply)whileTrue:data=self.connection.recv(1024)printparse_data(data)self.connection.send('%c%c%s'%(0x81,length,reply))defparse_data(msg):v=ord(msg[1])&0x7fifv==0x7e:p=4elifv==0x7f:p=10else:p=2mask=msg[p:p+4]data=msg[p+4:]return''.join([chr(ord(v)^ord(mask[k%4]))fork,vinenumerate(data)])defparse_headers(msg):headers={}header,data=msg.split('\r\n\r\n',1)forlineinheader.split('\r\n')[1:]:key,value=line.split(':',1)headers[key]=valueheaders['data']=datareturnheadersdefgenerate_token(msg):key=msg+'258EAFA5-E914-47DA-95CA-C5AB0DC85B11'ser_key=hashlib.sha1(key).digest()returnbase64.b64encode(ser_key)if__name__=='__main__':sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)sock.bind(('127.0.0.1',9000))sock.listen(5)whileTrue:connection,address=sock.accept()try:data=connection.recv(1024)headers=parse_headers(data)token=generate_token(headers['Sec-WebSocket-Key'])connection.send('\HTTP/1.1101WebSocketProtocolHybi-10\r\n\Upgrade:WebSocket\r\n\Connection:Upgrade\r\n\Sec-WebSocket-Accept:%s\r\n\r\n'%token)thread=websocket_thread(connection)thread.start()exceptsocket.timeout:print'websocketconnectiontimeout'测试页面:!--@!DOCTYPEhtml/htmlheadmetacharset=utf-8/headbodyh3WebSocketTest/h3divid=logindivinputid=serverIPtype=textplaceholder=服务器IPvalue=127.0.0.1autofocus=autofocus/inputid=serverPorttype=textplaceholder=服务器端口value=9000/inputid=btnConnecttype=buttonvalue=连接onclick=connect()//divdivinputid=sendTexttype=textplaceholder=发送文本value=I'mWebSocketClient!/inputid=btnSendtype=buttonvalue=发送onclick=send()//divdivdiv来自服务端的消息/divtextareaid=txtContentcols=50rows=10readonly=readonly/textarea/div/div/bodyscriptvarsocket;functionconnect(){varhost=ws://+$(serverIP).value+:+$(serverPort).value+/socket=newWebSocket(host);try{socket.onopen=function(msg){$(btnConnect).disabled=true;alert(连接成功!);};socket.onmessage=function(msg){if(typeofmsg.data==string){displayContent(msg.data);}else{alert(非文本消息);}};socket.onclose=function(msg){alert(socketclosed!)};}catch(ex){log(ex);}}functionsend(){varmsg=$(sendText).valuesocket.send(msg);}window.onbeforeunload=function(){try{socket.close();socket=null;}catch(ex){}};function$(id){returndocument.getElementById(id);}Date.prototype.Format=function(fmt){//author:meizzvaro={M+:this.getMonth()+1,//月份d+:this.getDate(),//日h+:this.getHours(),//小时m+:this.getMinutes(),//分s+:this.getSeconds(),//秒q+:Math.floor((this.getMonth()+3)/3),//季度S:this.getMilliseconds()//毫秒};if(/(y+)/.test(fmt))fmt=fmt.replace(RegExp.$1,(this.getFullYear()+).substr(4-RegExp.$1.length));for(varkino)if(newRegExp((+k+)).test(fmt))fmt=fmt.replace(RegExp.$1,(RegExp.$1.length==1)?(o[k]):((00+o[k]).substr((+o[k]).length)));returnfmt;}functiondisplayContent(msg){$(txtContent).value+=\r\n+newDate().Format(yyyy/MM/ddhh:mm:ss)+:+msg;}functiononkey(event){if(event.keyCode==13){send();}}/script/html
本文标题:python简单实现websocket
链接地址:https://www.777doc.com/doc-2853944 .html