您当前的位置: 首页 >  Python

彭世瑜

暂无认证

  • 0浏览

    0关注

    2791博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Python: flask-socketio使用Websocket协议进行通讯

彭世瑜 发布时间:2021-04-22 09:44:17 ,浏览量:0

文档:

  • PyPI: https://pypi.org/project/Flask-SocketIO/
  • Github: https://github.com/miguelgrinberg/Flask-SocketIO
  • doc: https://flask-socketio.readthedocs.io
  • socket.io: https://socket.io/

安装

pip install flask-socketio gevent-websocket
代码实例
from flask import Flask, render_template, request
from flask_socketio import SocketIO

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
app.jinja_env.auto_reload = True

socketio = SocketIO(app)


@app.route('/')
def index():
    return render_template('index.html')


@app.route('/send_all')
def send_all():
    """
    广播
    :return: 
    """
    message = request.args.get('message')

    socketio.send(message)

    return {'status': 'ok'}


@app.route('/send')
def send_message():
    """
    单独发送
    :return:
    """
    sid = request.args.get('sid')
    message = request.args.get('message')

    socketio.send(message, to=sid)

    return {'status': 'ok'}


@socketio.on('connect')
def connect():
    print('connect')
    socketio.send({'sid': request.sid})


@socketio.on('disconnect')
def disconnect():
    print('disconnect')


@socketio.on('message')
def handle_message(data):
    print(data)
    socketio.send({'data': data})


if __name__ == '__main__':
    socketio.run(app, debug=True)

Http 发送测试请求

import requests

params = {
    'sid': 'm4AymrH2TIFHCcQNAAAF',
    'message': 'send'
}

res = requests.get('http://localhost:5000/send', params=params)
print(res.text)


params = {
    'message': 'send_all'
}

res = requests.get('http://localhost:5000/send_all', params=params)
print(res.text)
关注
打赏
1665367115
查看更多评论
立即登录/注册

微信扫码登录

0.3220s