實時監聽數據示例

一、基於長輪詢、queue實現html

app01.pyjquery

#基於queue實如今線實時投票

import uuid
import queue

from flask import Flask,render_template,request,session,redirect,jsonify

app=Flask(__name__)
app.secret_key="drgs"
app.debug=True

@app.before_request
def check_login():
    if request.path=="/login":
        return None
    user = session.get("user_info")

    if not user:
        return redirect("/login")

USER_QUEUE={

}
"""
USER_QUEUE={
    uid:[data]
}
"""

@app.route("/login",methods=["GET","POST"])
def login():
    if request.method=="GET":
        return render_template("login.html")
    else:
        user=request.form.get("user")
        pwd=request.form.get("pwd")
        if user=="cao" and pwd=="123":
            uid=str(uuid.uuid4())
            #建立隊列
            USER_QUEUE[uid]=queue.Queue()
            session["user_info"]={"uid":uid,"name":user}
            return redirect("/index")
        else:
            return render_template("login.html",msg="用戶名或密碼錯誤")

VOTE_USER_LIST={
    "1":{"name":"曹超","count":0},
    "2":{"name":"大仙","count":0},
}

@app.route("/index")
def index():
    return render_template("index.html",vote_user_list=VOTE_USER_LIST)



@app.route("/get_vote_count")
def get_vote_count():
    """
    根據用戶uid,獲取用戶隊列
    :return:
    """
    ret = {"status":True,"data":None}
    #用戶惟一標識
    uid=session["user_info"]["uid"]

    q=USER_QUEUE[uid]
    try:
        data=q.get(timeout=10)
        ret["data"]=data
    except queue.Empty as e:
        ret["status"]=False
    return jsonify(ret)


@app.route("/vote",methods=["POST"])
def vote():
    # 投票對象的id
    gid=request.form.get('gid')
    old=VOTE_USER_LIST[gid]["count"]
    new=old+1
    VOTE_USER_LIST[gid]["count"]=new

    data={"gid":gid ,"count":new}
    for q in USER_QUEUE.values():
        q.put(data)
    return "ok"



if __name__=="__main__":
    app.run(host="0.0.0.0",threaded=True)
View Code

templatesweb

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>登陸頁面</title>

</head>
<body>

<form method="post">
    <input type="text" name="user">
    <input type="password" name="pwd">
    <input type="submit" value="登陸">{{seg}}
</form>



</body>
</html>
login.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>投票頁面</title>

</head>
<body>

<h3>選出最具潛力股的男人</h3>

<ul>
    {% for k,v in vote_user_list.items()%}
        <li style="cursor: pointer"    id="{{k}}" ondblclick="vote({{k}})">ID:{{k}} &nbsp姓名:{{v.name}} &nbsp票數:   <span>{{v.count}}</span></li>
    {% endfor %}
</ul>

<script src="/static/jquery-3.3.1.min.js"></script>

<script>
    $(function () {
        get_vote_count()
    });

    function get_vote_count() {
         $.ajax({
            url:"/get_vote_count",
            type:"GET",
            dataType:'JSON',
        success:function (arg) {
            if (arg.status){
                //更新票數
                var gid="#"+arg.data.gid;
                $(gid).find('span').text(arg.data.count);
            }else{
                //10s內沒有人投票
            }
            get_vote_count()
        }
    });
    }

    //投票
    function vote(gid) {
        $.ajax({
            url:"/vote",
            type:"POST",
            data:{"gid":gid},
            dateType:"JSON",
            success:function (arg) {

            }
        })

    }
</script>
</body>
</html>
index.html

二、基於長輪詢、redis實現ajax

app02.pyredis

#基於redis實如今線實時投票

import uuid
import redis
import json

from flask import Flask,render_template,request,session,redirect,jsonify

app=Flask(__name__)
app.secret_key="drgs"
app.debug=True

conn=redis.Redis(host="127.0.0.1",port=6379)

@app.before_request
def check_login():
    if request.path=="/login":
        return None
    user = session.get("user_info")

    if not user:
        return redirect("/login")


@app.route("/login",methods=["GET","POST"])
def login():
    if request.method=="GET":
        return render_template("login.html")
    else:
        user=request.form.get("user")
        pwd=request.form.get("pwd")
        if user=="cao" and pwd=="123":
            uid=str(uuid.uuid4())
            # 放入緩存中
            conn.lpush("uuid",uid)
            session["user_info"]={"uid":uid,"name":user}
            return redirect("/index")
        else:
            return render_template("login.html",msg="用戶名或密碼錯誤")

VOTE_USER_LIST={
    "1":{"name":"曹超","count":0},
    "2":{"name":"大仙","count":0},
}

@app.route("/index")
def index():
    return render_template("index.html",vote_user_list=VOTE_USER_LIST)


@app.route("/get_vote_count")
def get_vote_count():
    """
    根據用戶uid,獲取用戶隊列
    :return:
    """
    ret = {"status":True,"data":None}
    #用戶惟一標識
    uid=session["user_info"]["uid"]

    data=conn.brpop(uid,timeout=10)
    if not data:
        ret["status"]=False
    else:
        # 數據格式轉化
        data_str=str(data[1],encoding="utf-8")
        data_dict=json.loads(data_str)
        ret["data"]=data_dict
    return jsonify(ret)


@app.route("/vote",methods=["POST"])
def vote():
    # 投票對象的id
    gid=request.form.get('gid')
    old=VOTE_USER_LIST[gid]["count"]
    new=old+1
    VOTE_USER_LIST[gid]["count"]=new
    #傳入字節格式
    data=json.dumps({"gid":gid ,"count":new})

    #獲取uuid
    uid_list = conn.lrange('uuid', 0, conn.llen('uuid'))
    for uid in uid_list:
        conn.lpush(uid, data)
    return "ok"



if __name__=="__main__":
    app.run(host="127.0.0.1",port=4000,threaded=True)
View Code

templates同樣json

三、基於websocketflask

text.py緩存

#基於websocket實現實時監聽數據

from geventwebsocket.handler import WebSocketHandler
from gevent.pywsgi import WSGIServer
from flask import Flask,render_template,request,session,redirect,jsonify
import pickle

app=Flask(__name__)


@app.route("/text")
def text():
    return render_template("text.html")

WS_LIST=[]
@app.route("/get_text")
def get_text():
    ws=request.environ.get("wsgi.websocket")
    if not ws:
        print("請使用Websocket協議規範")
    # 此時websocket鏈接建立成功
    WS_LIST.append(ws)

    while True:
        # 接受數據
        message=ws.receive()

        # 關閉
        if not message:
            WS_LIST.remove(ws)
            ws.colse()
            break

        # 發送數據
        for item in WS_LIST:
            item.send(message)

    return "ok"

if __name__=="__main__":
    http_server=WSGIServer(("0.0.0.0",5000),app,handler_class=WebSocketHandler)
    http_server.serve_forever()
View Code

text.htmlwebsocket

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Title</title>

</head>
<body>

<h3>發送消息</h3>
<input id="msg" type="text" >
<input type="submit" valut="發送" onclick="sendmsg()">

<div id="message-list">
    
</div>

<script src="/static/jquery-3.3.1.min.js"></script>
<script>
    <!--鏈接客戶端-->
    ws= new WebSocket("ws://192.168.21.40:5000/get_text");
    ws.onmessage=function (arg) {
        
        var tag=document.createElement("div");
        tag.className="msg-itm";
        tag.innerText=arg.data;
        $("#message-list").append(tag)
    };
    function sendmsg() {
        ws.send($("#msg").val())
        
    }


</script>


</body>
</html>
View Code
相關文章
相關標籤/搜索