赞
踩
from socket import * class WsClinet: def __init__(self,ip,port): self.ip = "127.0.0.1" self.port = int(port) # 缓存字节数 self.buff_len = 1024 # 实例化socket对象,AF_INET:ip协议 SOCK_STREAM:tcp协议 self.clientSocket= socket(AF_INET,SOCK_STREAM) self.clientSocket.connect((self.ip,self.port)) def sendMsg(self,msg): self.clientSocket.send(msg.encode()) def __del__(self): self.clientSocket.close()
def __init__(self):
self.websocket =WsClinet("127.0.0.1",6060)
def test_ws(self):
self.websocket.sendMsg("Hi major")
return "res"
from socket import * ip = "127.0.0.1" port = 9090 # 缓存字节数 buff_len = 1024 # 实例化socket对象,AF_INET:ip协议 SOCK_STREAM:tcp协议 serverSocket= socket(AF_INET,SOCK_STREAM) # socket绑定ip地址和端口 serverSocket.bind((ip,port)) # 等待客户端连接 serverSocket.listen(10) # 置于等待连接状态,10表示最多可接受客户端 dataSocket,addr = serverSocket.accept() print("接收到客户端:",addr) while True: recv = dataSocket.recv(buff_len) if not recv: break info = recv.decode() print(info) dataSocket.send("接受".encode()) dataSocket.close() serverSocket.close()
from socket import * ip = "127.0.0.1" port = 9090 # 缓存字节数 buff_len = 1024 # 实例化socket对象,AF_INET:ip协议 SOCK_STREAM:tcp协议 clientSocket= socket(AF_INET,SOCK_STREAM) clientSocket.connect((ip,port)) while True: clientSocket.send("tsrtr".encode()) recv = clientSocket.recv(buff_len) if not recv: break print(recv.decode()) clientSocket.close()
from socket import socket,AF_INET, SOCK_STREAM import threading import time class WsClinet(): def __init__(self,IP_ADDR,IP_PORT): self.IP_ADDR = IP_ADDR #"127.0.0.1" self.IP_PORT = IP_PORT # "9090" self.dataSocket = socket(AF_INET, SOCK_STREAM) def __del__(self): print("注销类") self.dataSocket.close() # 向服务器端发送消息 def clientSend(self,msg): self.dataSocket.send(msg.encode()) # 启动服务器 def runServer(self): print("准备连接!") # 连接服务端socket self.dataSocket.connect((self.IP_ADDR, int(self.IP_PORT))) while True: try: # 等待接收服务段的消息,如果没有消息就一直等待 recved = self.dataSocket.recv(1024) if not recved: continue print(recved.decode('utf-8')) time.sleep(0.01) except Exception as exp: self.dataSocket.close() print(exp) # 多线程启动 def startServer(self): # 多线程启动,否则会堵塞 thread = threading.Thread(target=self.runServer) thread.start() print("ws 启动") # thread.join() # main function if __name__ == '__main__': print("======client======") ws = WsClinet("127.0.0.1",6062) ws.startServer()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。