使用asyncio實現redis客戶端

redis協議格式請參考,http://doc.redisfans.com/topic/protocol.htmlhtml

這裏簡單介紹下:python

*<參數數量> \r\n
$<參數 1 的字節數量> \r\n
<參數 1 的數據> \r\n
$<參數 N 的字節數量> \r\n
<參數 N 的數據> \r\n

發送給redis服務器時的數據要按照redis要求的協議格式發送,只有這樣redis服務器才能成功解析。redis

首先根據協議格式寫一個封包方法,代碼以下:服務器

    def format_command(self, commands):
        length = len(commands)
        command = "*{}\r\n".format(length)
        for v in commands:
            bytes = v.encode("utf-8")
            bytes_length = len(bytes)
            sub_command = "${}\r\n".format(bytes_length) + "{}\r\n".format(v)
            command += sub_command
        return command

看到format_command函數中的「*」和「$」符號了麼。其實就是根據commands列表中的數據而後按照redis協議格式封裝起來的。async

弄懂了如何安裝redis協議封裝數據以後,就能夠把數據發送到redis服務器了。tcp

asyncio的官方demo可參考:函數

https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-client-using-streamsoop

下面就是完整的代碼,無其餘依賴,順利執行以後,能夠經過redis-cli命令行查看是否設置成功。spa



class AsyncRedis:

    def __init__(self, host, port, loop):
        self.host = host
        self.port = port
        self.loop = loop
        self.separator = "\r\n".encode()

    async def connect(self):
        reader, writer = await asyncio.open_connection(self.host, self.port, loop=self.loop)
        self.reader = reader
        self.writer = writer

    def format_command(self, commands):
        length = len(commands)
        command = "*{}\r\n".format(length)
        for v in commands:
            bytes = v.encode("utf-8")
            bytes_length = len(bytes)
            sub_command = "${}\r\n".format(bytes_length) + "{}\r\n".format(v)
            command += sub_command
        print(command)
        return command

    def execute_command(self, command):
        self.writer.write(command.encode("utf-8"))

    async def set(self, key, value):
        command = self.format_command(["SET", key, value])
        self.execute_command(command)
        ret, error = await self.wait_ret()
        print(ret)
        return ret

    async def hset(self, hash_key, key, value):
        command = self.format_command(["HSET", hash_key, key, value])
        self.execute_command(command)

    async def get(self, key):
        command = self.format_command(['GET', key])
        self.execute_command(command)
        ret = await self.wait_ret()
        return ret

    async def wait_ret(self):
        ret = await self.reader.readuntil(self.separator)
        ret = ret.decode()
        mark = ret[0:1]
        if mark == "$":
            pos = ret.index("\r\n")
            ret = ret[1:pos]
            ret = await self.reader.read(int(ret))
            ret = ret.decode()
            return ret, True
        elif mark == "+":
            pos = ret.index("\r\n")
            ret = ret[1:pos]
            return ret, True
        elif mark == "-":
            pos = ret.index("\r\n")
            ret = ret[1:pos]
            return ret, False

    async def close(self):
        self.writer.close()

import asyncio

async def NewRedis(loop):
    redis = AsyncRedis("127.0.0.1", 6379, loop)
    await redis.connect()
    # await redis.get("name")
    await redis.set("name", "雲想衣裳花想容,春風拂檻露華濃。\r\n 若非羣玉山頭見,會向瑤臺月下逢。")
loop = asyncio.get_event_loop()
loop.run_until_complete(NewRedis(loop))
loop.close()
相關文章
相關標籤/搜索