本文实现Redis的协议层,协议层负责解析指令,然后将指令交给核心database执行
echo database用来测试协议层的代码
https://github.com/csgopher/go-redis
数组:以"*"开头,后跟成员个数
interface/resp/conn.go
type Connection interface {
Write([]byte) error
GetDBIndex() int
SelectDB(int)
}
interface/resp/reply.go
type Reply interface {
ToBytes() []byte
}
resp/reply/consts.go
type PongReply struct{} var pongBytes = []byte("+PONG\r\n") func (r *PongReply) ToBytes() []byte { return noBytes }
定义五种回复:回复pong,ok,null,空数组,空
resp/reply/reply.go
type ErrorReply interface {
Error() string
ToBytes() []byte
}
ErrorReply:定义错误接口
resp/reply/errors.go
type UnknownErrReply struct{}
var unknownErrBytes = []byte("-Err unknown\r\n")
func (r *UnknownErrReply) ToBytes() []byte {
return unknownErrBytes
}
func (r *UnknownErrReply) Error() string {
return "Err unknown"
}
type ArgNumErrReply struct {
Cmd string
}
func (r *ArgNumErrReply) ToBytes() []byte {
return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}
func (r *ArgNumErrReply) Error() string {
return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}
func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
return ArgNumErrReply{
Cmd: cmd,
}
}
type SyntaxErrReply struct{}
var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = SyntaxErrReply{}
func MakeSyntaxErrReply() *SyntaxErrReply {
return theSyntaxErrReply
}
func (r *SyntaxErrReply) ToBytes() []byte {
return syntaxErrBytes
}
func (r *SyntaxErrReply) Error() string {
return "Err syntax error"
}
type WrongTypeErrReply struct{}
var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")
func (r *WrongTypeErrReply) ToBytes() []byte {
return wrongTypeErrBytes
}
func (r *WrongTypeErrReply) Error() string {
return "WRONGTYPE Operation against a key holding the wrong kind of value"
}
type ProtocolErrReply struct {
Msg string
}
func (r *ProtocolErrReply) ToBytes() []byte {
return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}
func (r *ProtocolErrReply) Error() string {
return "ERR Protocol error: '" + r.Msg
}
var (
nullBulkReplyBytes = []byte("$-1")
// 协议的结尾
CRLF = "\r\n"
)
type BulkReply struct {
Arg []byte
}
func MakeBulkReply(arg []byte) *BulkReply {
return BulkReply{
Status: status,
}
}
func IsErrorReply(reply resp.Reply) bool {
return reply.ToBytes()[0] == '-'
}
BulkReply:回复一个字符串MultiBulkReply:回复字符串数组StatusReply:状态回复IntReply:数字回复StandardErrReply:标准错误回复IsErrorReply:判断是否为错误回复ToBytes:将字符串转成RESP协议规定的格式
resp/parser/parser.go
type Payload struct {
Data resp.Reply
Err error
}
type readState struct {
readingMultiLine bool
expectedArgsCount int
msgType byte
args [][]byte
bulkLen int64
}
func (s *readState) finished() bool {
return s.expectedArgsCount > 0 len(s.args) == s.expectedArgsCount
}
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parse0(reader, ch)
return ch
}
func parse0(reader io.Reader, ch chan<- *Payload) {
......
}
Payload结构体:客服端给我们发的数据
readState结构体:
readingMultiLine:解析单行还是多行数据
expectedArgsCount:应该读取的参数个数
msgType:消息类型
args:消息内容
bulkLen:数据长度
finished方法:判断解析是否完成ParseStream方法:异步解析数据后放入管道,返回管道数据
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
var msg []byte
var err error
if state.bulkLen == 0 {
state.bulkLen = 0
}
return msg, false, nil
}
func parseMultiBulkHeader(msg []byte, state *readState) error {
var err error
var expectedLine uint64
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
}
parseMultiBulkHeader:解析数组的头部,设置期望的行数和相关参数.parseBulkHeader:解析多行字符串的头部.
func parseSingleLineReply(msg []byte) (resp.Reply, error) {
str := strings.TrimSuffix(string(msg), "\r\n")
var result resp.Reply
switch msg[0] {
case '+': // status reply
if state.bulkLen <= 0 { // null bulk in multi bulks
state.args = append(state.args, line)
}
return nil
}
func parse0(reader io.Reader, ch chan<- *Payload) {
ch <- Payload{
ch <- Payload{
ch <- Payload{
ch <- Payload{
ch <- Payload{
ch <- Payload{
ch <- Payload{
ch <- Payload{
ch <- Payload{
}
}
}
parse0:解析指令,解析完成后通过channel发出去
resp/connection/conn.go
type Connection struct {
c.selectedDB = dbNum
}
resp/handler/handler.go
var (
unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)
type RespHandler struct {
activeConn sync.Map
db databaseface.Database
closing atomic.Boolean
}
func MakeHandler() *RespHandler {
var db databaseface.Database
db = database.NewEchoDatabase()
return RespHandler{
return true
})
h.db.Close()
return nil
}
RespHandler:和之前的echo类似,加了核心层的db.exec执行解析的指令
interface/database/database.go
type CmdLine = [][]byte
type Database interface {
Exec(client resp.Connection, args [][]byte) resp.Reply
AfterClientClose(c resp.Connection)
Close()
}
type DataEntity struct {
Data interface{}
}
Exec:核心层的执行AfterClientClose:关闭之后的善后方法CmdLine:二维字节数组的指令别名DataEntity:表示Redis的数据,包括string, list, set等等
database/echo_database.go
type EchoDatabase struct {
}
func NewEchoDatabase() *EchoDatabase {
return EchoDatabase{}
}
func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
return reply.MakeMultiBulkReply(args)
}
func (e EchoDatabase) AfterClientClose(c resp.Connection) {
logger.Info("EchoDatabase AfterClientClose")
}
func (e EchoDatabase) Close() {
logger.Info("EchoDatabase Close")
}
echo_database:测试协议层Exec:指令解析后,再使用MakeMultiBulkReply包装一下返回去
main.go
err := tcp.ListenAndServeWithSignal(
tcp.Config{
config.Properties.Port),
},
handler.MakeHandler())
if err != nil {
logger.Error(err)
}
main改成刚才写的:handler.MakeHandler()
以上就是土嘎嘎小编大虾米为大家整理的相关主题介绍,如果您觉得小编更新的文章只要能对粉丝们有用,就是我们最大的鼓励和动力,不要忘记讲本站分享给您身边的朋友哦!!