用Python编写简单的聊天程序教程

实现思路

x01 服务端的建立

首先,在服务端,使用socket进行消息的接受,每接受一个socket的请求,就开启一个新的线程来管理消息的分发与接受,同时,又存在一个handler来管理所有的线程,从而实现对聊天室的各种功能的处理

x02 客户端的建立

客户端的建立就要比服务端简单多了,客户端的作用只是对消息的发送以及接受,以及按照特定的规则去输入特定的字符从而实现不同的功能的使用,因此,在客户端这里,只需要去使用两个线程,一个是专门用于接受消息,一个是专门用于发送消息的

至于为什么不用一个呢,那是因为,只用一个的话,当接受了消息,在发送之前接受消息的处于阻塞状态,同理,发送消息也是,那么要是将这两个功能放在一个地方实现,就会导致没有办法连续发送或者接受消息了

实现方式

服务端实现

如何用python实现简单的聊天小程序

如何用python实现简单的聊天小程序

import jsonimport threadingfrom socket import *from time import ctimeclass PyChattingServer:    __socket = socket(AF_INET, SOCK_STREAM, 0)    __address = ('', 12231)    __buf = 1024    def __init__(self):        self.__socket.bind(self.__address)        self.__socket.listen(20)        self.__msg_handler = ChattingHandler()    def start_session(self):        print('等待客户连接...')        try:            while True:                cs, caddr = self.__socket.accept()                # 利用handler来管理线程,实现线程之间的socket的相互通信                self.__msg_handler.start_thread(cs, caddr)        except socket.error:            passclass ChattingThread(threading.Thread):    __buf = 1024    def __init__(self, cs, caddr, msg_handler):        super(ChattingThread, self).__init__()        self.__cs = cs        self.__caddr = caddr        self.__msg_handler = msg_handler    # 使用多线程管理会话    def run(self):        try:            print('...连接来自于:', self.__caddr)            data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)'            self.__cs.sendall(bytes(data, 'utf-8'))            while True:                data = self.__cs.recv(self.__buf).decode('utf-8')                if not data:                    break                self.__msg_handler.handle_msg(data, self.__cs)                print(data)        except socket.error as e:            print(e.args)            pass        finally:            self.__msg_handler.close_conn(self.__cs)            self.__cs.close()class ChattingHandler:    __help_str = "[ SYSTEM ]"                  "输入/ls,即可获得所有登陆用户信息"                  "输入/h,即可获得帮助"                  "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊"                  "输入/i,即可屏蔽群聊信息"                  "再次输入/i,即可取消屏蔽"                  "所有首字符为/的信息都不会发送出去"    __buf = 1024    __socket_list = []    __user_name_to_socket = {}    __socket_to_user_name = {}    __user_name_to_broadcast_state = {}    def start_thread(self, cs, caddr):        self.__socket_list.append(cs)        chat_thread = ChattingThread(cs, caddr, self)        chat_thread.start()    def close_conn(self, cs):        if cs not in self.__socket_list:            return        # 去除socket的记录        nickname = "SOMEONE"        if cs in self.__socket_list:            self.__socket_list.remove(cs)        # 去除socket与username之间的映射关系        if cs in self.__socket_to_user_name:            nickname = self.__socket_to_user_name[cs]            self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])            self.__socket_to_user_name.pop(cs)            self.__user_name_to_broadcast_state.pop(nickname)        nickname += " "        # 广播某玩家退出聊天室        self.broadcast_system_msg(nickname + "离开了PY_CHATTING")    # 管理用户输入的信息    def handle_msg(self, msg, cs):        js = json.loads(msg)        if js['type'] == "login":            if js['msg'] not in self.__user_name_to_socket:                if ' ' in js['msg']:                    self.send_to(json.dumps({                        'type': 'login',                        'success': False,                        'msg': '账号不能够带有空格'                    }), cs)                else:                    self.__user_name_to_socket[js['msg']] = cs                    self.__socket_to_user_name[cs] = js['msg']                    self.__user_name_to_broadcast_state[js['msg']] = True                    self.send_to(json.dumps({                        'type': 'login',                        'success': True,                        'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'                    }), cs)                    # 广播其他人,他已经进入聊天室                    self.broadcast_system_msg(js['msg'] + "已经进入了聊天室")            else:                self.send_to(json.dumps({                    'type': 'login',                    'success': False,                    'msg': '账号已存在'                }), cs)        # 若玩家处于屏蔽模式,则无法发送群聊消息        elif js['type'] == "broadcast":            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:                self.broadcast(js['msg'], cs)            else:                self.send_to(json.dumps({                    'type': 'broadcast',                    'msg': '屏蔽模式下无法发送群聊信息'                }), cs)        elif js['type'] == "ls":            self.send_to(json.dumps({                'type': 'ls',                'msg': self.get_all_login_user_info()            }), cs)        elif js['type'] == "help":            self.send_to(json.dumps({                'type': 'help',                'msg': self.__help_str            }), cs)        elif js['type'] == "sendto":            self.single_chatting(cs, js['nickname'], js['msg'])        elif js['type'] == "ignore":            self.exchange_ignore_state(cs)    def exchange_ignore_state(self, cs):        if cs in self.__socket_to_user_name:            state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]            if state:                state = False            else:                state = True            self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs])            self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:                msg = "通常模式"            else:                msg = "屏蔽模式"            self.send_to(json.dumps({                'type': 'ignore',                'success': True,                'msg': '[TIME : %s][ SYSTEM ] : %s' % (ctime(), "模式切换成功,现在是" + msg)            }), cs)        else:            self.send_to({                'type': 'ignore',                'success': False,                'msg': '切换失败'            }, cs)    def single_chatting(self, cs, nickname, msg):        if nickname in self.__user_name_to_socket:            msg = '[TIME : %s][ %s CHATTING TO %s ] : %s' % (                ctime(), self.__socket_to_user_name[cs], nickname, msg)            self.send_to_list(json.dumps({                'type': 'single',                'msg': msg            }), self.__user_name_to_socket[nickname], cs)        else:            self.send_to(json.dumps({                'type': 'single',                'msg': '该用户不存在'            }), cs)        print(nickname)    def send_to_list(self, msg, *cs):        for i in range(len(cs)):            self.send_to(msg, cs[i])    def get_all_login_user_info(self):        login_list = "[ SYSTEM ] ALIVE USER : "        for key in self.__socket_to_user_name:            login_list += self.__socket_to_user_name[key] + ","        return login_list    def send_to(self, msg, cs):        if cs not in self.__socket_list:            self.__socket_list.append(cs)        cs.sendall(bytes(msg, 'utf-8'))    def broadcast_system_msg(self, msg):        data = '[TIME : %s][ SYSTEM ] : %s' % (ctime(), msg)        js = json.dumps({            'type': 'system_msg',            'msg': data        })        # 屏蔽了群聊的玩家也可以获得系统的群发信息        for i in range(len(self.__socket_list)):            if self.__socket_list[i] in self.__socket_to_user_name:                self.__socket_list[i].sendall(bytes(js, 'utf-8'))    def broadcast(self, msg, cs):        data = '[TIME : %s][%s] : %s' % (ctime(), self.__socket_to_user_name[cs], msg)        js = json.dumps({            'type': 'broadcast',            'msg': data        })        # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息        for i in range(len(self.__socket_list)):            if self.__socket_list[i] in self.__socket_to_user_name                     and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:                self.__socket_list[i].sendall(bytes(js, 'utf-8'))def main():    server = PyChattingServer()    server.start_session()main()

登录后复制

客户端的实现

如何用python实现简单的聊天小程序

如何用python实现简单的聊天小程序

import jsonimport threadingfrom socket import *is_login = Falseis_broadcast = Trueclass ClientReceiveThread(threading.Thread):    __buf = 1024    def __init__(self, cs):        super(ClientReceiveThread, self).__init__()        self.__cs = cs    def run(self):        self.receive_msg()    def receive_msg(self):        while True:            msg = self.__cs.recv(self.__buf).decode('utf-8')            if not msg:                break            js = json.loads(msg)            if js['type'] == "login":                if js['success']:                    global is_login                    is_login = True                print(js['msg'])            elif js['type'] == "ignore":                if js['success']:                    global is_broadcast                    if is_broadcast:                        is_broadcast = False                    else:                        is_broadcast = True                print(js['msg'])            else:                if not is_broadcast:                    print("[现在处于屏蔽模式]")                print(js['msg'])class ClientSendMsgThread(threading.Thread):    def __init__(self, cs):        super(ClientSendMsgThread, self).__init__()        self.__cs = cs    def run(self):        self.send_msg()    # 根据不同的输入格式来进行不同的聊天方式    def send_msg(self):        while True:            js = None            msg = input()            if not is_login:                js = json.dumps({                    'type': 'login',                    'msg': msg                })            elif msg[0] == "@":                data = msg.split(' ')                if not data:                    print("请重新输入")                    break                nickname = data[0]                nickname = nickname.strip("@")                if len(data) == 1:                    data.append(" ")                js = json.dumps({                    'type': 'sendto',                    'nickname': nickname,                    'msg': data[1]                })            elif msg == "/help":                js = json.dumps({                    'type': 'help',                    'msg': None                })            elif msg == "/ls":                js = json.dumps({                    'type': 'ls',                    'msg': None                })            elif msg == "/i":                js = json.dumps({                    'type': 'ignore',                    'msg': None                })            else:                if msg[0] != '/':                    js = json.dumps({                        'type': 'broadcast',                        'msg': msg                    })            if js is not None:                self.__cs.sendall(bytes(js, 'utf-8'))def main():    buf = 1024    # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了    address = ("127.0.0.1", 12231)    cs = socket(AF_INET, SOCK_STREAM, 0)    cs.connect(address)    data = cs.recv(buf).decode("utf-8")    if data:        print(data)    receive_thread = ClientReceiveThread(cs)    receive_thread.start()    send_thread = ClientSendMsgThread(cs)    send_thread.start()    while True:        passmain()

登录后复制

以上就是用Python编写简单的聊天程序教程的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2237454.html

(0)
上一篇 2025年2月26日 18:58:32
下一篇 2025年2月26日 18:58:49

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • 小程序中canvas如何实现图案在线定制的功能

    本篇文章给大家带来的内容是关于小程序中canvas如何实现图案在线定制的功能,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。 前言 最近收到一个这样的需求,要求做一个基于 vue 和 element-ui 的通用后台框架页,…

    编程技术 2025年3月29日
    100
  • css-theme如何通过源码生成一份包含多套皮肤配置的样式文件

    本篇文章给大家带来的内容是关于css-theme如何通过源码生成一份包含多套皮肤配置的样式文件,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。 css-theme 通过单一css文件生成多套主题,并合并入一个css文件中 特…

    编程技术 2025年3月29日
    100
  • DeepSeek如何本地部署-探索DeepSeek本地部署全面指南

    deepseek本地部署指南:高效运行深度学习模型 DeepSeek是一款强大的本地部署深度学习工具,专为AI开发者打造。本文将指导您完成DeepSeek的本地部署,并提供一些实用技巧,助您快速上手。 DeepSeek本地部署步骤 首先,请…

    2025年3月29日
    100
  • deepseek怎么用python调用

    在开始之前,请确保你的计算机上已安装python和pip。打开命令提示符(windows)或终端(mac/linux),输入以下命令来验证python和pip的安装情况: “` python –version pip…

    2025年3月29日
    100
  • 普通人怎样高效利用DeepSeek-DeepSeek使用技巧助普通人一臂之力

    在当今信息爆炸的时代,高效地获取和处理数据成为了我们日常生活和工作中不可或缺的技能。deepseek作为一款强大的数据搜索与分析工具,为普通人提供了一个便捷的途径来挖掘隐藏在海量数据中的宝贵信息。接下来,我们将详细介绍普通人如何用好deep…

    2025年3月29日
    100
  • 一键搭建DeepSeek-详尽指南教你如何本地部署DeepSeek

    本文将指导您如何在本地环境快速部署deepseek,无需繁琐配置。只需几步,即可轻松运行deepseek。 准备工作: 确保您的系统符合DeepSeek的最低运行要求,并已安装所有必要依赖项。 下载脚本: 从官方GitHub仓库获取最新的一…

    2025年3月29日
    100
  • deepseek怎么编程

    DeepSeek并非编程语言,而是深度搜索概念。实现DeepSeek需基于现有语言选择。针对不同应用场景,需要选择合适的语言和算法,并结合机器学习技术。代码质量、可维护性、测试至关重要。根据需求选择合适的编程语言、算法和工具,并编写高质量代…

    2025年3月29日
    100
  • 怎么下载deepseek 小米

    如何下载 DeepSeek 小米?在小米应用商店搜索“DeepSeek”,如未找到,则继续步骤 2。确定您的需求(搜索文件、数据分析),并找到包含 DeepSeek 功能的相应工具(如文件管理器、数据分析软件)。 怎么下载DeepSeek小…

    2025年3月29日
    100
  • deepseek该怎么搜索

    直接使用DeepSeek自带的搜索功能即可,它强大的语义分析算法能准确理解搜索意图,提供相关信息。但对于冷门领域、最新信息或需要思考问题的搜索,需要调整关键词或使用更具体的描述、结合其他实时信息来源,并明白DeepSeek只是一个工具,需要…

    2025年3月29日
    100
  • deepseek怎么问他

    有效使用DeepSeek的关键在于清晰提问:直接、具体地表达问题。提供具体细节和背景信息。对于复杂的询问,包含多个角度和反驳观点。关注特定方面,例如代码的性能瓶颈。对得到的答案保持批判性思维,结合专业知识进行判断。 DeepSeek怎么问它…

    2025年3月29日
    100

发表回复

登录后才能评论