一个仿Redis的内存数据库(主要用来做命令解析)服务端,

 客户端使用的开源工具 : https://dom4j.github.io/     github:https://github.com/hehaoyuan/mini-Redis

Redis 简介:
Redis是一个开源的底层使用C语言编写的key-value内存数据库。可用于缓存数据、事件发布订阅、高速队列等场景,而且支持丰富的数据类型:string(字符串)、hash(哈希)、list(列表)、set(无序集合)、zset(有序集合)


使用场景:

使用场景
随着数据量的增长,MySQL已经满足不了大型互联网类应用的需求。因此,Redis基于内存存储数据,在某些场景下,就会大大提高效率。

缓存:对于热点数据,缓存以后可能读取数十万次,因此对于热点数据,不但缓存的价值非常大,而且当总数据量比较大的时候直接从数据库中查询会比较影响性能。例如:对经常需要查询且变动不是很频繁的数据,可以考虑基于Redis实现缓存。
会话缓存:Redis还可以进行会话缓存。例如:将web session存放在Redis中。
计数器:因为Redis具有原子性,所以在某些方面可以避免并发问题,比如:统计点击率、点赞率、收藏率等。
消息队列:Redis能作为一个很好的消息队列来使用,依赖List类型利用LPUSH命令将数据添加到链表头部,通过BRPOP命令将元素从链表尾部取出。
社交列表:社交属性相关的列表信息,例如,用户点赞列表、用户分享列表、用户收藏列表、用户关注列表、用户粉丝列表等,使用Hash类型数据结构是个不错的选择。
最新动态:按照时间顺序排列的最新动态,也是一个很好的应用,可以使用Sorted Set类型的分数权重存储时间戳进行排序。


Redis数据类型
String(字符串)

String是Redis的最基本数据结构,以一个键和一个值存储在Redis内部,类似java的Map结构,可以通过键去找值。

Hash(哈希)

Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象,类似Java里面的Map<String, Object>。

List(列表)

根据插入顺序排序的字符串元素的集合,底层实现是链表。

Set(集合)

Redis的Set是string类型的无序集合,它是通过HashTable实现的。

zset(sorted set:有序集合)

Redis zset和set一样也是string类型元素的集合,且不允许重复。但每个字符串元素与浮点数值相关联,称为分数,元素总是按其分数排序,可以检索一系列元素。

Redis协议规范
Redis客户端使用名为RESP(REdis序列化协议)的协议与Redis服务器通信。虽然该协议是专为Redis设计的,但它可以用于其他客户端 - 服务器软件项目。

RESP可以序列化不同的数据类型,如整数,字符串,数组,还有一种特定的错误类型。请求从客户端发送到Redis服务器,作为表示要执行的命令的参数的字符串数组,Redis使用特定于命令的数据类型进行回复。

注意:此处概述的协议仅用于客户端 - 服务器通信。Redis Cluster使用不同的二进制协议,以便在节点之间交换消息。

网络层:客户端连接到Redis服务器,创建到端口6379的TCP连接。

请求 - 响应模型:Redis接受由不同参数组成的命令。收到命令后,将对其进行协议解析,并将回复发送回客户端。

RESP协议说明
RESP协议是在Redis 1.2中引入的,但它成为了与Redis 2.0中的Redis服务器通信的标准方式。这是您应该在Redis客户端中实现的协议。

RESP实际上是一个支持以下数据类型的序列化协议:Simple Strings、Errors、Integers、Bulk Strings、Arrays。

RESP在Redis中用作请求 - 响应协议的方式如下:

客户端将命令作为Bulk Strings的RESP数组发送到Redis服务器,服务器根据命令实现回复一种RESP类型。在RESP中,某些数据的类型取决于第一个字节:

对于Simple Strings,回复的第一个字节是“+”
对于Errors,回复的第一个字节是“ - ”
对于Integers,回复的第一个字节是“:”
对于Bulk Strings,回复的第一个字节是“$”
对于Arrays,回复的第一个字节是“*”
此外,RESP能够使用指定的Bulk Strings或Array的特殊变体来表示Null值(如:"$-1\r\n" 和 "*-1\r\n" 都表示null)。在RESP中,协议的不同部分始终以“\ r \ n”(CRLF)结束。


结构原理图:

mini Redis(项目 二)-LMLPHP

需要的外包依赖:

    <dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency> <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>

第一步:

将客户端发到服务端的命令,输入流后,进行解码,

再将解完码后的对象根据Command命令编码,不同命令的类通过其类加载器,调用其自身将命令对应的操作输出流到缓存中

package com.hhy;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List; public class Protocol {
public static Object read(InputStream is) throws IOException {
return process(is);
} public static Command readCommand(InputStream is) throws Exception {
Object o = read(is);
// 作为 Server 来说,一定不会收到 "+OK\r\n"
if (!(o instanceof List)) {
throw new Exception("命令必须是 Array 类型");
} List<Object> list = (List<Object>)o;
if (list.size() < 1) {
throw new Exception("命令元素个数必须大于 1");
} Object o2 = list.remove(0);
if (!(o2 instanceof byte[])) {
throw new Exception("错误的命令类型");
} byte[] array = (byte[])o2;
String commandName = new String(array);
String className = String.format("com.hhy.commands.%sCommand", commandName.toUpperCase());
Class<?> cls = Class.forName(className);
//不属于这个接口
if (!Command.class.isAssignableFrom(cls)) {
throw new Exception("错误的命令");
}
Command command = (Command)cls.newInstance();
command.setArgs(list); return command;
} private static String processSimpleString(InputStream is) throws IOException {
return readLine(is);
} private static String processError(InputStream is) throws IOException {
return readLine(is);
} private static long processInteger(InputStream is) throws IOException {
return readInteger(is);
} private static byte[] processBulkString(InputStream is) throws IOException {
int len = (int)readInteger(is);
if (len == -1) {
// "$-1\r\n" ==> null
return null;
} byte[] r = new byte[len];
is.read(r, 0, len);
/*
for (int i = 0; i < len; i++) {
int b = is.read();
r[i] = (byte)b;
}
*/ // "$5\r\nhello\r\n";
is.read();
is.read(); return r;
} private static List<Object> processArray(InputStream is) throws IOException {
int len = (int)readInteger(is);
if (len == -1) {
// "*-1\r\n" ==> null
return null;
} List<Object> list = new ArrayList<>(len);
for (int i = 0; i < len; i++) {
try {
list.add(process(is));
} catch (RemoteException e) {
list.add(e);
}
} return list;
}
private static Object process(InputStream is) throws IOException {
int b = is.read();
if (b == -1) {
throw new RuntimeException("不应该读到结尾的");
} switch (b) {
case '+':
return processSimpleString(is);
case '-':
//考虑异常被当做对象写进数组,所以定义一个额外的对象类,否则就会抛出异常,不会吧异常当做对象
throw new RemoteException(processError(is));
case ':':
return processInteger(is);
case '$':
return processBulkString(is);
case '*':
return processArray(is);
default: throw new RuntimeException("不识别的类型");
}
} private static String readLine(InputStream is) throws IOException {
boolean needRead = true;
StringBuilder sb = new StringBuilder();
int b = -1;
while (true) {
if (needRead == true) {
b = is.read();
if (b == -1) {
throw new RuntimeException("不应该读到结尾的");
}
} else {
needRead = true;
} if (b == '\r') {
int c = is.read();
if (c == -1) {
throw new RuntimeException("不应该读到结尾的");
} if (c == '\n') {
break;
} if (c == '\r') {
sb.append((char) b);
b = c;
needRead = false;
} else {
sb.append((char) b);
sb.append((char) c);
}
} else {
sb.append((char)b);
}
}
return sb.toString();
} public static long readInteger(InputStream is) throws IOException {
boolean isNegative = false;
StringBuilder sb = new StringBuilder();
int b = is.read();
if (b == -1) {
throw new RuntimeException("不应该读到结尾");
} if (b == '-') {
isNegative = true;
} else {
sb.append((char)b);
} while (true) {
b = is.read();
if (b == -1) {
throw new RuntimeException("不应该读到结尾的");
} if (b == '\r') {
int c = is.read();
if (c == -1) {
throw new RuntimeException("不应该读到结尾的");
} if (c == '\n') {
break;
} throw new RuntimeException("没有读到\\r\\n");
} else {
sb.append((char)b);
}
} long v = Long.parseLong(sb.toString());
if (isNegative) {
v = -v;
} return v;
} public static void writeError(OutputStream os, String message) throws IOException {
os.write('-');
os.write(message.getBytes());
os.write("\r\n".getBytes());
} public static void writeInteger(OutputStream os, long v) throws IOException {
// v = 10
//:10\r\n // v = -1
//:-1\r\n os.write(':');
os.write(String.valueOf(v).getBytes());
os.write("\r\n".getBytes());
} public static void writeArray(OutputStream os, List<?> list) throws Exception {
os.write('*');
os.write(String.valueOf(list.size()).getBytes());
os.write("\r\n".getBytes());
for (Object o : list) {
if (o instanceof String) {
writeBulkString(os, (String)o);
} else if (o instanceof Integer) {
writeInteger(os, (Integer)o);
} else if (o instanceof Long) {
writeInteger(os, (Long)o);
} else {
throw new Exception("错误的类型");
}
}
} public static void writeBulkString(OutputStream os, String s) throws IOException {
byte[] buf = s.getBytes();
os.write('$');
os.write(String.valueOf(buf.length).getBytes());
os.write("\r\n".getBytes());
os.write(buf);
os.write("\r\n".getBytes());
} public static void writeNull(OutputStream os) throws IOException {
os.write('$');
os.write('-');
os.write('1');
os.write('\r');
os.write('\n');
}
}

命令的接口与实现类(只实现了两类,List与HashMap的增加与查询):

package com.hhy;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List; public interface Command {
void setArgs(List<Object> args); void run(OutputStream os) throws IOException;
}

lLPUSH:

package com.hhy.commands;

import com.hhy.Command;
import com.hhy.Database;
import com.hhy.Protocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.IOException;
import java.io.OutputStream;
import java.util.List; public class LPUSHCommand implements Command {
private static final Logger logger = LoggerFactory.getLogger(LPUSHCommand.class);
private List<Object> args; @Override
public void setArgs(List<Object> args) {
this.args = args;
} @Override
public void run(OutputStream os) throws IOException {
if (args.size() != 2) {
Protocol.writeError(os, "命令至少需要两个参数");
return;
}
String key = new String((byte[])args.get(0));
String value = new String((byte[])args.get(1));
logger.debug("运行的是 lpush 命令: {} {}", key, value); // 这种方式不是一个很好的线程同步的方式
List<String> list = Database.getList(key);
list.add(0, value); logger.debug("插入后数据共有 {} 个", list.size()); Protocol.writeInteger(os, list.size());
}
}

LRANGE:

package com.hhy.commands;

import com.hhy.Command;
import com.hhy.Database;
import com.hhy.Protocol; import java.io.IOException;
import java.io.OutputStream;
import java.util.List; public class LRANGECommand implements Command {
private List<Object> args; @Override
public void setArgs(List<Object> args) {
this.args = args;
} @Override
public void run(OutputStream os) throws IOException {
String key = new String((byte[])args.get(0));
int start = Integer.parseInt(new String((byte[])args.get(1)));
int end = Integer.parseInt(new String((byte[])args.get(2))); List<String> list = Database.getList(key);
if (end < 0) {
end = list.size() + end;
}
List<String> result = list.subList(start, end + 1);
try {
Protocol.writeArray(os, result);
} catch (Exception e) {
e.printStackTrace();
}
}
}

HGET:

package com.hhy.commands;

import com.hhy.Command;
import com.hhy.Database;
import com.hhy.Protocol; import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map; public class HGETCommand implements Command {
private List<Object> args; @Override
public void setArgs(List<Object> args) {
this.args = args;
} @Override
public void run(OutputStream os) throws IOException {
String key = new String((byte[])args.get(0));
String field = new String((byte[])args.get(1)); Map<String, String> hash = Database.getHashes(key);
String value = hash.get(field);
if (value != null) {
Protocol.writeBulkString(os, value);
} else {
Protocol.writeNull(os);
}
}
}

HSET:

package com.hhy.commands;

import com.hhy.Command;
import com.hhy.Database;
import com.hhy.Protocol; import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map; public class HSETCommand implements Command {
private List<Object> args; @Override
public void setArgs(List<Object> args) {
this.args = args;
} @Override
public void run(OutputStream os) throws IOException {
String key = new String((byte[])args.get(0));
String field = new String((byte[])args.get(1));
String value = new String((byte[])args.get(2));
Map<String, String> hash =Database.getHashes(key);
boolean isUpdate = hash.containsKey(field);
hash.put(field, value);
if (isUpdate) {
Protocol.writeInteger(os, 0);
} else {
Protocol.writeInteger(os, 1);
}
}
}

自定义异常类:

package com.hhy.exceptions;

public class RemoteException extends Exception {
public RemoteException() {
} public RemoteException(String message) {
super(message);
} public RemoteException(String message, Throwable cause) {
super(message, cause);
} public RemoteException(Throwable cause) {
super(cause);
} public RemoteException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

写到内存中去存储:

package com.hhy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map; public class Database {
private static Map<String, List<String>> lists = new HashMap<>();
private static Map<String, Map<String, String>> hashes = new HashMap<>(); public static List<String> getList(String key) {
/*
List<String> list = lists.computeIfAbsent(key, k -> {
return new ArrayList<>();
});
*/ List<String> list = lists.get(key);
if (list == null) {
list = new ArrayList<>();
lists.put(key, list);
} return list;
} public static Map<String, String> getHashes(String key) {
Map<String, String> hash = hashes.get(key);
if (hash == null) {
hash = new HashMap<>();
hashes.put(key, hash);
} return hash;
}
}

多线程编程:

package com.hhy;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket; public class MutliThread implements Runnable { private Socket client; public MutliThread(Socket client) {
this.client = client;
} @Override
public void run() {
while (true) {
try {
InputStream inputStream = client.getInputStream();
OutputStream outputStream = client.getOutputStream(); while (true) {
Command command = Protocol.readCommand(inputStream);
command.run(outputStream);
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

server服务端:

package com.hhy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class Server {
private static final Logger logger = LoggerFactory.getLogger(Server.class); public static void main(String[] args) throws IOException{
int port = 6379; ServerSocket serverSocket = new ServerSocket(port);//ServerScoket监听端口 System.out.println("服务器等待连接..."+serverSocket.getLocalSocketAddress()); ExecutorService executorService = Executors.newFixedThreadPool(20); while(true){ Socket client = serverSocket.accept(); System.out.println("有客户端连接到服务器..."+client.getRemoteSocketAddress()); executorService.execute(new MutliThread(client));
}
}
}

如何根据二进制字节流解析出命令名称?
项目中实现了一个 Protocol 类,专门用于协议解析,将二进制流转为Java对象。

如何根据命令名称获取到命令所对应的对象?
根据命令名称找到对应的类名称,这里采用了约定俗成的办法,把类名称和命令名称起成一样的。
根据类名称获取指定的对象,很容易想到通过反射。具体做法是每次命令创建一个新的对象,相同的命令共用同一个对象(通过单例模式实现)。

05-08 15:01