Redis-基础篇


一、Redis简介

1.认识NoSQL

1.1 什么是NoSQL


  • NoSQL最常见的解释是”non-relational“,泛指非关系型的数据库,很多人也说它是”Not Only SQL

  • NoSQL 不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。

  • 区别于关系数据库,它们不保证关系数据的ACID特性

  • 常见的NoSQL数据库有:RedisMemCacheMongoDB

1.2 NoSQL适用场景

  • 对数据高并发的读写

  • 海量数据的读写

  • 对数据高可扩展性的

1.3 NoSQL不适用场景

  • 需要事务支持

  • 基于sql的结构化查询存储,处理复杂的关系,需要即席查询。

1.4 NoSQL与SQL的差异


SQL NoSQL
数据结构 结构化 非结构化
数据关联 关联的 无关联的
查询方式 SQL查询 非SQL
事务特性 ACID BASE
存储方式 磁盘 内存
扩展性 垂直 水平
使用场景 1)数据结构固定
2)相关业务对数据安全性、一致性要求较高
1)数据结构不固定
2)对一致性、安全性要求不高
3)对性能要求

2.认识Redis

Redis诞生于2009年全称是Remote Dictionary Server,远程词典服务器,是一个基于内存的键值型NoSQL数据库。

Redis的特征:

  • 键值(key-value)型,value支持多种不同数据结构,功能丰富
  • 单线程,每个命令具备原子性
  • 低延迟,速度快(基于内存、IO多路复用、良好的编码)
  • 支持数据持久化
  • 支持主从集群、分片集群
  • 支持多语言客户端

二、Redis安装

1.Mac安装Redis

1.1 安装redis

brew install redis

1.2 启动Redis

# 启动
brew services start redis
==> Successfully started `redis` (label: homebrew.mxcl.redis)
# 关闭
brew services stop redis

# 连接redis
redis-cli

# 关闭连接,并退出
127.0.0.1:6379> shutdown
not connected> quit

1.3 其它操作

远程连接Redis

redis-cli -h xxx.xx.xx.xx -p 6379 -a 123456
  • -h xxx.xx.xx.xx:指定要连接的redis节点的IP地址,默认是127.0.0.1
  • -p 6379:指定要连接的redis节点的端口,默认是6379
  • -a 123456:指定redis的访问密码

指定密码

#方法1
#在配置文件中配置requirepass的密码(当redis重启后密码依然有效)。
requirepass foobared 修改成 : requirepass  123321

#方法2(当redis重启后密码无效)。
127.0.0.1:6379>config set requirepass 123321 #设置密码
Ok
127.0.0.1:6379>config get requirepass #查看密码


127.0.0.1:6379> auth 123321 #指定密码(登录时未指定密码可以用此命令制定密码)
Ok

心跳测试

127.0.0.1:6379> ping 
PONG

2.Linux安装Redis


本次安装Redis是基于Linux系统下安装的,因此需要一台Linux服务器或者虚拟机。如果您使用的是自己购买的服务器,请提前开放6379端口,避免后续出现的莫名其妙的错误!

2.1 安装依赖


Redis是基于C语言编写的,因此首先需要安装Redis所需要的gcc依赖

yum install -y gcc tcl

安装成功如下图所示:

2.2 安装Redis


redis-6.2.6.tar上传至/usr/local/src目录

在xShell中cd/usr/local/src目录执行以下命令进行解压操作

tar -xzf redis-6.2.6.tar.gz

解压成功后依次执行以下命令

cd redis-6.2.6
make
make install

安装成功后打开/usr/local/bin目录(该目录为Redis默认的安装目录)

2.3 启动Redis

Redis的启动方式有很多种,例如:前台启动后台启动开机自启

前台启动(不推荐)


这种启动属于前台启动,会阻塞整个会话窗口,窗口关闭或者按下CTRL + C则Redis停止。不推荐使用。

安装完成后,在任意目录输入redis-server命令即可启动Redis

redis-server

启动成功如下图所示

后台启动(不推荐)


如果要让Redis以后台方式启动,则必须修改Redis配置文件,配置文件所在目录就是之前我们解压的安装包下

因为我们要修改配置文件,因此我们需要先将原文件备份一份

cd /usr/local/src/redis-6.2.6
cp redis.conf redis.conf.bck

然后修改redis.conf文件中的一些配置

# 允许访问的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0
bind 0.0.0.0
# 守护进程,修改为yes后即可后台运行
daemonize yes 
# 密码,设置后访问Redis必须输入密码
requirepass 1325

Redis其他常用配置

# 监听的端口
port 6379
# 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
dir .
# 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
databases 1
# 设置redis能够使用的最大内存
maxmemory 512mb
# 日志文件,默认为空,不记录日志,可以指定日志文件名
logfile "redis.log"

启动Redis

# 进入redis安装目录 
cd /usr/local/src/redis-6.2.6
# 启动
redis-server redis.conf

停止Redis服务

# 通过kill命令直接杀死进程
kill -9 redis进程id
# 利用redis-cli来执行 shutdown 命令,即可停止 Redis 服务,
# 因为之前配置了密码,因此需要通过 -a 来指定密码
redis-cli -a 132537 shutdown

开机自启(推荐)


我们也可以通过配置来实现开机自启

首先,新建一个系统服务文件

vi /etc/systemd/system/redis.service

将以下命令粘贴进去

[Unit]
Description=redis-server
After=network.target

[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /usr/local/src/redis-6.2.6/redis.conf
PrivateTmp=true

[Install]
WantedBy=multi-user.target

然后重载系统服务

systemctl daemon-reload

现在,我们可以用下面这组命令来操作redis了

# 启动
systemctl start redis
# 停止
systemctl stop redis
# 重启
systemctl restart redis
# 查看状态
systemctl status redis

执行下面的命令,可以让redis开机自启

systemctl enable redis

三、Redis命令

通过Redis的中文文档学习:http://www.redis.cn/commands.html

通过菜鸟教程官网来学习:https://www.runoob.com/redis/redis-keys.html

Redis是一个key-value的数据库,key一般是String类型,不过value的类型多种多样

1.通用命令

指令 描述
keys 查看符合模板的所有key,不建议在生产环境设备上使用
del 删除一个指定的key
exists 判断key是否存在
expire 给一个key设置有效期,有效期到期时该key会被自动删除
ttl 查看一个KEY的剩余有效期,-1表示永不过期,-2表示已过期
type 查看你的key是什么类型
unlink 根据value选择非阻塞删除
select 切换数据库
dbsize 查看当前数据库的key的数量
flushdb 清空当前库
flushall 通杀全部库

可以通过help [command] 可以查看一个命令的具体用法!

2.基本类型

2.1 字符串-String

String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是512M

String的常见命令

命令 描述
set 添加或者修改已经存在的一个String类型的键值对
get 根据key获取String类型的value
mset 批量添加多个String类型的键值对
mget 根据多个key获取多个String类型的value
Strlen 获得value的长度
incr/decr 让一个整型的key自增/自减1
incrby/decrby 让一个整型的key自增/自减,并指定步长,例如:incrby num 2 让num值自增2
incrbyfloat 让一个浮点类型的数字自增并指定步长
setnx 添加一个String类型的键值对,前提是这个key不存在,才执行
setex 添加一个String类型的键值对,并且指定有效期
getrange 获得value的范围,例如:getrange name 0 4
setrange 从指定位置覆盖key存储的value,例如:setrange name 0 jian
getset 以新换旧,设置了新值同时获得旧值。例如:getset name xiaojian

数据结构:String的数据结构为简单动态字符串(Simple Dynamic String,缩写SDS)。是可以修改的字符串,内部结构实现上类似于Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配.

2.2 哈希-Hash

hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。类似Java里面的HashMap

每个 hash 可以存储 232 - 1 键值对(40多亿)。

Hash的常见命令

命令 描述
hset key field value 添加或者修改hash类型key的field的值
hget key field 获取一个hash类型key的field的值
hmset hmset 和 hset 效果相同 ,4.0之后hmset可以弃用了
hmget 批量获取多个hash类型key的field的值
hgetall 获取一个hash类型的key中的所有的field和value
hkeys 获取一个hash类型的key中的所有的field
hvals 获取一个hash类型的key中的所有的value
hincrby 让一个hash类型key的字段值自增并指定步长
hsetnx 添加一个hash类型的key的field值,前提是这个field不存在,否则不执行
127.0.0.1:6379> hset userkey name "jack" age 18 birth "2004-07"
(integer) 3
127.0.0.1:6379> hgetall userkey
1) "name"
2) "jack"
3) "age"
4) "18"
5) "birth"
6) "2004-07"

数据结构

Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable。

2.3 列表-List

Redis中的List类型与Java中的LinkedList类似,可以看做是一个双向链表结构。既可以支持正向检索和也可以支持反向检索。

一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。

特征也与LinkedList类似:

  • 有序
  • 元素可以重复
  • 插入和删除快
  • 查询速度一般

常用来存储一个有序数据,例如:朋友圈点赞列表,评论列表等。

List的常见命令

命令 描述
lpush key element … 向列表左侧插入一个或多个元素
lpop key 移除并返回列表左侧的第一个元素
rpush key element … 向列表右侧插入一个或多个元素
rpop key 移除并返回列表右侧的第一个元素
lrange key star end 返回一段角标范围内的所有元素
blpop和brpop 与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil
rpoplpush <key1><key2> 从<key1>列表右边吐出一个值,插到<key2>列表左边。
lindex <key><index> 按照索引下标获得元素(从左到右)
llen <key> 获得列表长度
lrem <key><n><value> 从左边删除n个value
lset <key><index><value> 将列表key下标为index的值替换成value

数据结构

List的数据结构为快速链表quickList和压缩链表ziplist。

当列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist。它将所有的元素紧挨着一起存储,分配的是一块连续的内存。

当数据量比较多的时候才会改成quicklist。因为普通的链表需要的附加指针空间太大,会比较浪费空间。比如这个列表里存的只是int类型的数据,结构上还需要两个额外的指针prev和next。

将链表和ziplist结合起来组成了quicklist。也就是将多个ziplist使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。

思考问题

  • 如何利用List结构模拟一个栈?

    • 先进后出,入口和出口在同一边
  • 如何利用List结构模拟一个队列?

    • 先进先出,入口和出口在不同边
  • 如何利用List结构模拟一个阻塞队列?

    • 入口和出口在不同边
    • 出队时采用BLPOP或BRPOP

2.4 集合-Set

Redis的Set是string类型的无序集合。它底层其实是一个value为null的hash表,与Java中的HashSet类似,因此具备与HashSet类似的特征

  • 无序
  • 元素不可重复
  • 查找快
  • 支持交集、并集、差集等功能

Set的常见命令有

命令 描述
sadd key member … 向set中添加一个或多个元素
srem key member … 移除set中的指定元素
scard key 返回set中元素的个数
sismember key member 判断一个元素是否存在于set中
smembers key 获取set中的所有元素
sinter key1 key2 … 求key1与key2的交集
sdiff key1 key2 … 求key1与key2的差集
sunion key1 key2 .. 求key1和key2的并集
spop key 随机从该集合中吐出一个值
srandmember <key><n> 随机从该集合中取出n个值。不会从集合中删除
smove <source><des>value 把集合中一个值从一个集合移动到另一个集合

交集、差集、并集图示

数据结构

Set数据结构是dict字典,字典是用哈希表实现的。Java中HashSet的内部实现使用的是HashMap,只不过所有的value都指向同一个对象。Redis的set结构也是一样,它的内部也使用hash结构,所有的value都指向同一个内部值。

2.5 有序集合-ZSet

Redis的ZSet(SortedSet)是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加 hash表。

SortedSet具备下列特性:

  • 可排序
  • 元素不重复
  • 查询速度快

因为SortedSet的可排序特性,经常被用来实现排行榜这样的功能。

SortedSet的常见命令

命令 描述
zadd key score member 添加一个或多个元素到sorted set ,如果已经存在则更新其score值
zrem key member 删除sorted set中的一个指定元素
zscore key member 获取sorted set中的指定元素的score值
zrankkey member 获取sorted set 中的指定元素的排名
zcard key 获取sorted set中的元素个数
zcount key min max 统计score值在给定范围内的所有元素的个数
zincrby key increment member 让sorted set中的指定元素自增,步长为指定的increment值
zrange key min max 按照score排序后,获取指定排名范围内的元素
zrangebyscore key min max 按照score排序后,获取指定score范围内的元素
zdiff、zinter、zunion 求差集、交集、并集

注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可

数据结构

SortedSet(zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String, Double>,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。

zset底层使用了两个数据结构

(1)hash,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值。

(2)跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表。

3.特殊类型

3.1 Bitmaps

简介

Redis 提供了 Bitmaps 可以实现对位的操作,可以把 Bitmaps 想象成一个以位为单位的数组, 数组的每个单元只能存储 0 和 1, 数组的下标在 Bitmaps 中叫做偏移量。

Bitmaps的常见命令

  • setbit <key> <offset> <value> 设置Bitmaps中某个偏移量的值(0或1)
  • getbit <key> <offset> 获取Bitmaps中某个偏移量的值
  • bitcount <key> [start end] 统计字符串从start字节到end字节比特值为1的数量
  • bitop and(or/not/xor) <destkey> [key…] 可以做多个Bitmaps的and(交集) 、 or(并集) 、 not(非) 、 xor(异或) 操作并将结果保存在destkey中。
  • BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
BITFIELD key GET encoding offset|[OVERFLOW WRAP|SAT|FAIL] SET encoding offset value|INCRBY encoding offset increment 

#  GET查询 SET修改 INCRBY自增
#  encoding 设置符号位和操作长度,u代表无符号,i代表有符号
#  offset 偏移量,从第几位开始

# 查询bit数组中从0位开始的2位,返回10进制
BITFIELD key GET u2 0
# 例如数据是11100,则返回3
  • BITFIELD_RO :查询BitMap中bit数组,并以十进制形式返回,RO:READ_ONLY
# 功能跟BITFIELD的查询功能一样
BITFIELD_RO key GET encoding offset [GET encoding offset ...]

实例1

每个独立用户是否访问过网站,结果存放在Bitmaps中, 将访问的用户记做1, 没有访问的用户记做0, 用偏移量作为用户的id。 假设现在有20个用户,userid=1, 6, 11, 15, 19的用户对网站进行了访问, 那么当前Bitmaps初始化结果如图

users:20220717代表2022-07-17这天的独立访问用户的Bitmaps

127.0.0.1:6379> setbit users:20220717 1 1
(integer) 0
127.0.0.1:6379> setbit users:20220717 6 1
(integer) 0
127.0.0.1:6379> setbit users:20220717 11 1
(integer) 0
127.0.0.1:6379> setbit users:20220717 15 1
(integer) 0
127.0.0.1:6379> setbit users:20220717 19 1
(integer) 0

获取id=6,8的用户是否在2022-07-17这天访问过, 返回0说明没有访问过,返回1说明访问过

127.0.0.1:6379> getbit users:20220717 6
(integer) 1
127.0.0.1:6379> getbit users:20220717 8
(integer) 0

计算2022-07-17这天的独立访问用户数量

127.0.0.1:6379> bitcount users:20220717
(integer) 5

start和end代表起始和结束字节数, 下面计算用户id在第1个字节到第3个字节之间的独立访问用户数, 对应的用户id是11, 15, 19。

127.0.0.1:6379> bitcount users:20220717 1 3
(integer) 3

举例: K1 【01000001 01000000 00000000 00100001】,对应【0,1,2,3】

  • bitcount K1 1 2 : 统计下标1、2字节组中bit=1的个数,即 01000000 00000000 –》1

  • bitcount K1 1 3 : 统计下标1、3字节组中bit=1的个数,即01000000 00000000 00100001 –》3

  • bitcount K1 0 -2 : 统计下标0到下标倒数第2,字节组中bit=1的个数,即01000001 01000000 00000000 –》3

实例2

2022-07-02 日访问网站的userid=1,2,5,9。

setbit users:20220702 1 1

setbit users:20220702 2 1

setbit users:20220702 5 1

setbit users:20220702 9 1

2022-07-03 日访问网站的userid=0,1,4,9。

setbit users:20220703 0 1

setbit users:20220703 1 1

setbit users:20220703 4 1

setbit users:20220703 9 1

计算出两天都访问过网站的用户数量

127.0.0.1:6379> bitop and users:and:20220702_03 users:20220702 users:20220703
(integer) 2
127.0.0.1:6379> bitcount users:and:20220702_03
(integer) 2

计算出任意一天都访问过网站的用户数量(例如月活跃就是类似这种) , 可以使用or求并集

127.0.0.1:6379> bitop or users:or:20220702_03 users:20220702 users:20220703
(integer) 2
127.0.0.1:6379> bitcount users:or:20220702_03
(integer) 6

Bitmaps与set对比

假设网站有1亿用户, 每天独立访问的用户有5千万, 如果每天用集合类型和Bitmaps分别存储活跃用户可以得到表

数据类型 每个用户id占用空间 需要存储的用户量 全部内存量
集合类型 64位 5千万 64位*5千万 = 400MB
Bitmaps 1位 1亿 1位*1亿 = 12.5MB

很明显, 这种情况下使用Bitmaps能节省很多的内存空间, 尤其是随着时间推移节省的内存是非常可观的

数据类型 一天 一个月 一年
集合类型 400MB 12GB 144GB
Bitmaps 12.5MB 375MB 4.5GB

但Bitmaps并不是万金油, 假如该网站每天的独立访问用户很少, 例如只有10万(大量的僵尸用户) , 那么两者的对比如下表所示, 很显然, 这时候使用Bitmaps就不太合适了, 因为基本上大部分位都是0。

数据类型 每个userid占用空间 需要存储的用户量 全部内存量
集合类型 64位 10万 64位*10万 = 800KB
Bitmaps 1位 1亿 1位* 1亿 = 12.5MB

3.2 HyperLogLog

简介

在工作当中,我们经常会遇到与统计相关的功能需求,比如统计网站PV(PageView页面访问量),可以使用Redis的incr、incrby轻松实现。但像UV(UniqueVisitor,独立访客)、独立IP数、搜索记录数等需要去重和计数的问题如何解决?这种求集合中不重复元素个数的问题称为基数问题

解决基数问题有很多种方案:

  • (1)数据存储在MySQL表中,使用distinct count计算不重复个数

  • (2)使用Redis提供的hash、set、bitmaps等数据结构来处理

以上的方案结果精确,但随着数据不断增加,导致占用空间越来越大,当数据集非常大时是不切实际的。

能否能够降低一定的精度来平衡存储空间?Redis推出了HyperLogLog

  • HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。

  • 在 Redis 里面,每个 HyperLogLog 键只需要花费 12KB 内存,就可以计算接近 264 个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。

  • 但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

什么是基数?

比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数。

HyperLogLog的常见命令

  • pfadd <key> <element> [element …] 添加指定元素到 HyperLogLog 中
127.0.0.1:6379> pfadd hll "redis"
(integer) 1
127.0.0.1:6379> pfadd hll "mysql"
(integer) 1
127.0.0.1:6379> pfadd hll "redis"
(integer) 0
  • pfcount <key> [key …] 计算HLL的近似基数,可以计算多个HLL,比如用HLL存储每天的UV,计算一周的UV可以使用7天的UV合并计算即可
127.0.0.1:6379> pfcount hll
(integer) 2

127.0.0.1:6379> pfadd hll2 "mongodb"
(integer) 1
127.0.0.1:6379> pfadd hll2 "redis"
(integer) 1
127.0.0.1:6379> pfcount hll hll2
(integer) 3
  • pfmerge <destkey> <sourcekey> [sourcekey …] 将一个或多个HLL合并后的结果存储在另一个HLL中,比如每月活跃用户可以使用每天的活跃用户来合并计算可得
127.0.0.1:6379> pfmerge hllsum hll hll2
OK
127.0.0.1:6379> pfcount hllsum
(integer) 3

3.3 Geospatial

简介

Redis 3.2 中增加了对GEO类型的支持。GEO,Geographic,地理信息的缩写。该类型就是元素的2维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作。

命令

  • geoadd <key> < longitude> <latitude> <member> [longitude latitude member…] 添加地理位置(经度,纬度,名称)
geoadd china:city 121.47 31.23 shanghai
geoadd china:city 106.50 29.53 chongqing 114.05 22.52 shenzhen 116.38 39.90 beijing
  • geopos <key><member> [member…] 获得指定地区的坐标值
127.0.0.1:6379> geopos china:city shanghai
1) 1) "121.47000163793563843"
   2) "31.22999903975783553"
  • geodist <key> <member1><member2> [m|km|ft|mi] 获取两个位置之间的直线距离
    • m 表示单位为米[默认值]
    • km 表示单位为千米
    • ft 表示单位为英尺
    • mi 表示单位为英里
127.0.0.1:6379> geodist china:city shanghai beijing km
"1068.1535"
  • georadius <key> <longitude> <latitude> radius m|km|ft|mi 以给定的经纬度为中心,找出某一半径内的元素
127.0.0.1:6379> georadius china:city 110 30 1000 km
1) "chongqing"
2) "shenzhen"

四、Jedis

1.引入依赖

<!--引入Jedis依赖-->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.2.0</version>
</dependency>

<!--引入单元测试依赖-->
<dependency>
    <groupId>org.junit.jupiter</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>5.8.2</version>
    <scope>test</scope>
</dependency>

2.与Redis建立连接

public class JedisTest {
    public static void main(String[] args) {
        // 获取连接
        Jedis jedis = new Jedis("127.0.0.1",6379);
        // 如果 Redis 服务设置了密码,需要下面这行,没有就不需要
        // jedis.auth("123456"); 
        // 选择库(默认是下标为0的库)
        jedis.select(0);
        System.out.println("连接成功");
        //查看服务是否运行
        System.out.println("服务正在运行: "+jedis.ping());
        jedis.close();
    }
}

3.测试相关数据类型

Jedis-API: Key

jedis.set("k1", "v1");
jedis.set("k2", "v2");
jedis.set("k3", "v3");
Set<String> keys = jedis.keys("*");
System.out.println(keys.size());
for (String key : keys) {
    System.out.println(key);
}
System.out.println(jedis.exists("k1"));
System.out.println(jedis.ttl("k1"));                
System.out.println(jedis.get("k1"));

Jedis-API: String

jedis.mset("str1","v1","str2","v2","str3","v3");
System.out.println(jedis.mget("str1","str2","str3"));

Jedis-API: List

List<String> list = jedis.lrange("mylist",0,-1);
for (String element : list) {
    System.out.println(element);
}

Jedis-API: set

jedis.sadd("orders", "order01");
jedis.sadd("orders", "order02");
jedis.sadd("orders", "order03");
jedis.sadd("orders", "order04");
Set<String> smembers = jedis.smembers("orders");
for (String order : smembers) {
System.out.println(order);
}
jedis.srem("orders", "order02");

Jedis-API: hash

jedis.hset("hash1","userName","lisi");
System.out.println(jedis.hget("hash1","userName"));
Map<String,String> map = new HashMap<String,String>();
map.put("telphone","13735679666");
map.put("address","beijing");
map.put("email","abc@163.com");
jedis.hmset("hash2",map);
List<String> result = jedis.hmget("hash2", "telphone","email");
for (String element : result) {
    System.out.println(element);
}

Jedis-API: zset

jedis.zadd("zset01", 100d, "z3");
jedis.zadd("zset01", 90d, "l4");
jedis.zadd("zset01", 80d, "w5");
jedis.zadd("zset01", 70d, "z6");

List<String> zrange = jedis.zrange("zset01", 0, -1);
for (String e : zrange) {
  System.out.println(e);
}

4.Jedis连接池


Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此推荐大家使用Jedis连接池代替Jedis的直连方式

public class JedisConnectionFactory {
    private static final JedisPool jedisPool;

    static {
        //配置连接池
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(8);
        jedisPoolConfig.setMaxIdle(8);
        jedisPoolConfig.setMinIdle(0);
        jedisPoolConfig.setMaxWaitMillis(200);
        //创建连接池对象
        jedisPool = new JedisPool(jedisPoolConfig,"127.0.0.1",6379,1000,"132537");
    }

    public static Jedis getJedis(){
       return jedisPool.getResource();
    }
}

5、实例—手机验证码

要求:
1、输入手机号,点击发送后随机生成6位数字码,2分钟有效
2、输入验证码,点击验证,返回成功或失败
3、每个手机号每天只能输入3次

思路:

  1. 生成随机6位数字验证码:Random
  2. 验证码在2分钟内有效:把验证码放到redis里面,设置过期时间120秒
  3. 判断验证码是否一致:从redis获取验证码和输入的验证码进行比较
  4. 每个手机每天只能发送3次验证码:incr每次发送后+1,大于2的时候,提交不能发送

生成六位的验证码:

//1.生成6位数字验证码
public static String getCode() {
    Random random = new Random();
    String code = "";
    for(int i=0;i<6;i++) {
        int rand = random.nextInt(10);
        code += rand;
    }
    return code;
}

验证码只能发送三次:

//2 每个手机每天只能发送三次,验证码放到redis中,设置过期时间120
public static void verifyCode(String phone) {
    //连接redis
    Jedis jedis = new Jedis("127.0.0.1",6379);

    //拼接key
    //手机发送次数key
    String countKey = "VerifyCode"+phone+":count";
    //验证码key
    String codeKey = "VerifyCode"+phone+":code";

    //每个手机每天只能发送三次
    String count = jedis.get(countKey);
    if(count == null) {
        //没有发送次数,第一次发送
        //设置发送次数是1
        jedis.setex(countKey,24*60*60,"1");
    } else if(Integer.parseInt(count)<=2) {
        //发送次数+1
        jedis.incr(countKey);
    } else if(Integer.parseInt(count)>2) {
        //发送三次,不能再发送
        System.out.println("今天发送次数已经超过三次");
        jedis.close();
    }

    //发送验证码放到redis里面
    String vcode = getCode();
    jedis.setex(codeKey,120,vcode);//120秒
    jedis.close();
}

判断验证码是否一致:

//3 验证码校验
public static void judgeCode(String phone,String code) {
    //从redis获取验证码
    Jedis jedis = new Jedis("127.0.0.1",6379);
    //验证码key
    String codeKey = "VerifyCode"+phone+":code";
    String redisCode = jedis.get(codeKey);
    //判断
    if(redisCode.equals(code)) {
        System.out.println("成功");
    }else {
        System.out.println("失败");
    }
    jedis.close();
}

完整功能代码展示

public class PhoneCode {

    public static void main(String[] args) {
        //模拟验证码发送
        verifyCode("13678765435");

        //模拟验证码校验
        //judgeCode("13678765435","217173");
    }


    //3 验证码校验
    public static void judgeCode(String phone,String code) {
        //从redis获取验证码
        Jedis jedis = new Jedis("127.0.0.1",6379);
        //验证码key
        String codeKey = "VerifyCode"+phone+":code";
        String redisCode = jedis.get(codeKey);
        //判断
        if(redisCode.equals(code)) {
            System.out.println("成功");
        }else {
            System.out.println("失败");
        }
        jedis.close();
    }

    //2 每个手机每天只能发送三次,验证码放到redis中,设置过期时间120
    public static void verifyCode(String phone) {
        //连接redis
        Jedis jedis = new Jedis("127.0.0.1",6379);

        //拼接key
        //手机发送次数key
        String countKey = "VerifyCode"+phone+":count";
        //验证码key
        String codeKey = "VerifyCode"+phone+":code";

        //每个手机每天只能发送三次
        String count = jedis.get(countKey);
        if(count == null) {
            //没有发送次数,第一次发送
            //设置发送次数是1
            jedis.setex(countKey,24*60*60,"1");//有效期1天
        } else if(Integer.parseInt(count)<=2) {
            //发送次数+1
            jedis.incr(countKey);
        } else if(Integer.parseInt(count)>2) {
            //发送三次,不能再发送
            System.out.println("今天发送次数已经超过三次");
            jedis.close();
            return;//超过三次之后就会自动退出不会再发送了,不添加这一行,即使显示发送次数,但还会有验证码还是会改变
        }

        //发送验证码放到redis里面
        String vcode = getCode();//调用生成的验证码
        jedis.setex(codeKey,120,vcode);//设置生成的验证码只有120秒的时间
        jedis.close();
    }

    //1 生成6位数字验证码,code是验证码
    public static String getCode() {
        Random random = new Random();
        String code = "";
        for(int i=0;i<6;i++) {
            int rand = random.nextInt(10);
            code += rand;
        }
        return code;
    }
}

五、Redis与Spring Boot整合

1、引入redis相关依赖

<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!--连接池依赖-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>

2、编写配置文件

application.yml 版本

spring:
  redis:
    host: 127.0.0.1 #指定redis所在的host
    port: 6379  #指定redis的端口
    password: 123456  #设置redis密码
    lettuce:
      pool:
        max-active: 8 #最大连接数
        max-idle: 8 #最大空闲数
        min-idle: 0 #最小空闲数
        max-wait: 100ms #连接等待时间

application.properties 版本

#Redis服务器地址
spring.redis.host=127.0.0.1
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0

3、编写测试类

@SpringBootTest
class SpringRedisApplicationTests {
    @Resource
    private RedisTemplate redisTemplate;
    @Test
    void testString() {
        // 1.通过RedisTemplate获取操作String类型的ValueOperations对象
        ValueOperations ops = redisTemplate.opsForValue();
        // 2.插入一条数据
        ops.set("name","jianjian");
        // 3.获取数据
        String name = (String) ops.get("name");
        System.out.println("name = " + name);
    }
}

4、RedisSerializer配置


RedisTemplate可以接收任意Object作为值写入Redis,只不过写入前会把Object序列化为字节形式,默认是采用JDK序列化,得到的结果是这样的

缺点:

  • 可读性差
  • 内存占用较大

那么如何解决以上的问题呢?我们可以通过自定义RedisTemplate序列化的方式来解决。

编写一个配置类RedisConfig

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
        // 1.创建RedisTemplate对象
        RedisTemplate<String ,Object> redisTemplate = new RedisTemplate<>();
        // 2.设置连接工厂
        redisTemplate.setConnectionFactory(factory);

        // 3.创建序列化对象
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();

        // 4.设置key和hashKey采用String的序列化方式
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);

        // 5.设置value和hashValue采用json的序列化方式
        redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);

        return redisTemplate;
    }
}

此时我们已经将RedisTemplate的key设置为String序列化,value设置为Json序列化的方式,再来执行方法测试

由于我们设置的value序列化方式是Json的,因此我们可以直接向redis中插入一个对象

@Test
void testSaveUser() {
    redisTemplate.opsForValue().set("user:100", new User("Vz", 21));
    User user = (User) redisTemplate.opsForValue().get("user:100");
    System.out.println("User = " + user);
}

尽管Json序列化可以满足我们的需求,但是依旧存在一些问题。

如上图所示,为了在反序列化时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。

那么我们如何解决这个问题呢?我们可以通过下文的StringRedisTemplate来解决这个问题。

5、StringRedisTemplate配置

为了节省内存空间,我们并不会使用JSON序列化器来处理value,而是统一使用String序列化器,要求只能存储String类型的key和value。当需要存储Java对象时,手动完成对象的序列化和反序列化。

Spring默认提供了一个StringRedisTemplate类,它的key和value的序列化方式默认就是String方式。省去了我们自定义RedisTemplate的过程

编写一个测试类使用StringRedisTemplate来执行以下方法

@SpringBootTest
class RedisStringTemplateTest {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Test
    void testSaveUser() throws JsonProcessingException {
        // 1.创建一个Json序列化对象
        ObjectMapper objectMapper = new ObjectMapper();
        // 2.将要存入的对象通过Json序列化对象转换为字符串
        String userJson1 = objectMapper.writeValueAsString(new User("Vz", 21));
        // 3.通过StringRedisTemplate将数据存入redis
        stringRedisTemplate.opsForValue().set("user:100",userJson1);
        // 4.通过key取出value
        String userJson2 = stringRedisTemplate.opsForValue().get("user:100");
        // 5.由于取出的值是String类型的Json字符串,因此我们需要通过Json序列化对象来转换为java对象
        User user = objectMapper.readValue(userJson2, User.class);
        // 6.打印结果
        System.out.println("user = " + user);
    }
}

执行完毕回到Redis的图形化客户端查看结果

六、事务 & 锁

  • Redis 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。

  • 事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

  • Redis 事务的主要作用就是串联多个命令防止别的命令插队。

1.multi、exec、discard

从输入 Multi 命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入 Exec 后,Redis 会将之前的命令队列中的命令依次执行。组队的过程中可以通过discard来放弃组队。

127.0.0.1:6379[1]> multi
OK
127.0.0.1:6379[1](TX)> set k1 v1
QUEUED
127.0.0.1:6379[1](TX)> set k2 v2
QUEUED
127.0.0.1:6379[1](TX)> exec
1) OK
2) OK

2.错误处理

组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。

127.0.0.1:6379[1]> multi
OK
127.0.0.1:6379[1](TX)> set m1 v1
QUEUED
127.0.0.1:6379[1](TX)> set m2
(error) ERR wrong number of arguments for 'set' command
127.0.0.1:6379[1](TX)> set m3 v3
QUEUED
127.0.0.1:6379[1](TX)> exec
(error) EXECABORT Transaction discarded because of previous errors.

如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。

127.0.0.1:6379[1]> multi
OK
127.0.0.1:6379[1](TX)> set m1 v1
QUEUED
127.0.0.1:6379[1](TX)> incr m1 #错误语句
QUEUED
127.0.0.1:6379[1](TX)> set m2 v2
QUEUED
127.0.0.1:6379[1](TX)> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK

3.事务冲突的问题

想想一个场景:有很多人有你的账户,同时去参加双十一抢购

一个请求想给金额减8000

一个请求想给金额减5000

一个请求想给金额减1000

这样就会导致事务冲突,如何解决呢?

3.1 悲观锁

悲观锁(Pessimistic Lock), 每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会 block 直到它拿到锁。

悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等

3.2 乐观锁

乐观锁(Optimistic Lock),每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量

乐观锁:会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过。

乐观锁的典型代表:就是CAS(Compare And Swap),利用cas进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换内存值,其中do while 是为了在操作失败时,再次进行自旋操作,即把之前的逻辑再操作一次。

int var5;
do {
    var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;

3.3 watch & unwatch

在执行 multi 之前,先执行 watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

127.0.0.1:6379[1]> watch balence
OK
127.0.0.1:6379[1]> multi
OK
127.0.0.1:6379[1](TX)> decrby balence 10
QUEUED
127.0.0.1:6379[1](TX)> incrby debt 10
QUEUED
127.0.0.1:6379[1](TX)> exec
1) (integer) -10
2) (integer) 10

unwatch 取消 WATCH 命令对所有 key 的监视。如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行 UNWATCH 了。

4.Redis事务三特性

  • 单独的隔离操作

    • 事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  • 没有隔离级别的概念

    • 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
  • 不保证原子性

    • 事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

七、持久化

1.RDB

RDB(Redis DataBase) 在指定的时间间隔内内存中的数据集快照写入磁盘, 也就是 Snapshot 快照,它恢复时是将快照文件直接读到内存里

RDB执行步骤

  • Redis 会单独创建(fork)一个子进程来进行持久化

    • Fork 的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程
    • 在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了“写时复制技术
    • 一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。
  • 先将数据写入到一个临时文件中

  • 待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件(dump.rdb)

    • 在redis.conf中配置持久化文件名称,默认为dump.rdb
    • rdb文件的保存路径,也可以修改。默认为Redis启动时命令行所在的目录下

整个过程中,主进程是不进行任何 IO 操作的,这就确保了极高的性能,RDB的缺点是最后一次持久化后的数据可能丢失

触发RDB策略

a>配置文件中默认的快照配置

其中 save 3600 1的含义是:每 3600 秒时,至少有 1 个 key 变化,则触发RDB;save 300 10和save 60 10000同理。

b>通过 savebgsave 命令触发RDB策略

127.0.0.1:6379>save
127.0.0.1:6379>bgsave

可以通过lastsave 命令获取最后一次成功执行快照的时间
127.0.0.1:6379>lastsave

save

  • 优点:节约系统资源

  • 缺点:直接调用 rdbSave ,阻塞 Redis 主进程,直到保存完成为止。在主进程阻塞期间,服务器不能处理客户端的任何请求。

bgsave

  • 优点:fork 出一个子进程,子进程负责调用 rdbSave ,并在保存完成之后向主进程发送信号,通知保存已完成。 Redis 服务器在BGSAVE 执行期间仍然可以继续处理客户端的请求

  • 缺点:由于会fork一个进程,因此更消耗内存

优缺点

优点

  • 适合大规模的数据恢复

  • 对数据完整性和一致性要求不高更适合使用

  • 节省磁盘空间

  • 恢复速度快

缺点

  • Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑

  • 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。

  • 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。

2.AOF

AOF(Append Only File):以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

AOF执行步骤

  • 客户端的请求写命令会被append追加到AOF缓冲区内;

  • AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;

    • appendfsync always 始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好
    • appendfsync everysec 每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。
    • appendfsync no 不主动进行同步,把同步时机交给操作系统。
  • AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;

  • Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;

触发AOF策略

  • AOF默认不开启,需要在配置文件中设置开启

    • 修改默认的appendonly no,改为yes
    • 如遇到AOF文件损坏,通过 /usr/local/bin/redis-check-aof –fix appendonly.aof 进行恢复
  • 可以在redis.conf中配置文件名称,默认为 appendonly.aof

  • AOF文件的保存路径,同RDB的路径一致。

Rewrite压缩

AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制, 当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof

优缺点

优点

  • 备份机制更稳健,丢失数据概率更低。

  • 可读的日志文本,通过操作AOF稳健,可以处理误操作。

缺点

  • 比起RDB占用更多的磁盘空间。

  • 恢复备份速度要慢。

  • 每次读写都同步的话,有一定的性能压力。

  • 存在个别Bug,造成恢复不能。

3.总结

AOF和RDB同时开启,redis听谁的?

AOF和RDB同时开启,系统默认取AOF的数据(数据不会存在丢失)

AOF和RDB用哪个好?

官方推荐两个都启用。

如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那 RDB 方式要比AOF方式更加的高效。不建议单独用 AOF,因为可能会出现Bug。如果只是做纯内存缓存,可以都不用。

八、主从复制

1.简介

主机(master)数据更新后根据配置和策略, 自动同步到备/从机(slaver)的机制,Master 以写为主,Slaver 以读为主

  • 可实现读写分离,性能扩展

  • 可实现容灾快速恢复

主从复制原理

  • 1、当从连接上主服务器之后,从服务器向主服务器发送数据同步消息;

  • 2、主服务器接到从服务器发送过来同步消息,把主服务器数据持久化为 rdb 文件,把 rdb 文件发送给从服务器,从服务器拿到 rdb 进行读取;(全量复制)

    • 全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
  • 3、之后每次主服务器进行写操作之后,和从服务器进行数据同步(增量复制)(即第一次需要从服务器请求,之后每次主服务器主动同步)

    • 增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步

    • 从机挂掉,重新连接主机,自动执行一次全量复制

2.模拟一主二仆

模拟三台 redis 服务器(6379、6380、6381),主机(6379)从机(6380、6381)

我的 redis 配置文件路径: /opt/homebrew/etc/redis.conf

  • 1.新建文件夹 myredis
$ pwd
/Users/jianjian
$ mkdir myredis
$ cd  /myredis
$ pwd
/Users/jianjian/myredis
  • 2.复制 redis.conf 配置文件到该文件夹
$ cp /opt/homebrew/etc/redis.conf  ~/myredis/redis.conf
  • 3.配置一主两从,创建三个配置文件
$ vi redis6379.conf

include /Users/jianjian/myredis/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb

$ vi redis6380.conf

include /Users/jianjian/myredis/redis.conf
pidfile /var/run/redis_6380.pid
port 6380
dbfilename dump6380.rdb

$ vi redis6381.conf

include /Users/jianjian/myredis/redis.conf
pidfile /var/run/redis_6381.pid
port 6381
dbfilename dump6381.rdb
  • 4.启动三台redis服务器
$ redis-server redis6379.conf
$ redis-server redis6380.conf
$ redis-server redis6381.conf
  • 5.查看系统进程,看看三台服务器是否启动
$ ps -ef | grep redis

501 85018 80912   0  3:54PM ttys000    0:00.60 redis-server 127.0.0.1:6380
501 86115 85019   0  3:56PM ttys001    0:00.49 redis-server 127.0.0.1:6379
501 86631 86116   0  3:57PM ttys002    0:00.35 redis-server 127.0.0.1:6381
  • 6.查看三台主机运行情况
$ redis-cli -p 6379
127.0.0.1:6379> info replication

# Replication
role:master
connected_slaves:0
......
$ redis-cli -p 6380
127.0.0.1:6380> info replication

# Replication
role:master
connected_slaves:0
......
$ redis-cli -p 6381
127.0.0.1:6381> info replication

# Replication
role:master
connected_slaves:0
......
  • 7.配置从机
    • slaveof <ip><port> 成为某个实例的从服务器
# 在6380和6381上执行: slaveof 127.0.0.1 6379

#变为从机
$ redis-cli -p 6380
127.0.0.1:6380>slaveof 127.0.0.1 6379
127.0.0.1:6380> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
......

#变为从机
$ redis-cli -p 6381
127.0.0.1:6381>slaveof 127.0.0.1 6379
127.0.0.1:6381> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
  • 8、在主机上写,在从机上可以读取数据
# 主机写入数据
$ redis-cli -p 6379
127.0.0.1:6379> set k1 v1
127.0.0.1:6379> keys *
1) "k1"
# 从机可以读取数据
$ redis-cli -p 6380
127.0.0.1:6380> keys *
1) "k1"
# 从机可以读取数据
$ redis-cli -p 6381
127.0.0.1:6381> get k1
1) "v1"

:如果主机挂掉,重启就行,一切如初。从机重启需重设:slaveof 127.0.0.1 6379

可以将配置增加到文件中。永久生效。

Q&A

slave1、slave2是从头开始复制还是从切入点开始复制?比如从k4进来,那之前的k1,k2,k3是否也可以复制?

从头开始复制;可以

从机是否可以写?set可否?

不可以

主机shutdown后情况如何?从机是上位还是原地待命?

主机挂掉,从机原地待命

主机又回来了后,主机新增记录,从机还能否顺利复制?

可以

其中一台从机down后情况如何?依照原有它能跟上大部队吗?

从机挂掉重启后需要重设:slaveof 127.0.0.1 6379

3.薪火相传

上一个 Slave 可以是下一个 slave 的Master,slave 同样可以接收其他 slave 的连接和同步请求,那么该 slave 作为了链条中下一个的 master,可以有效减轻 master 的写压力,去中心化降低风险。

$ redis-cli -p 6380
127.0.0.1:6380>slaveof 127.0.0.1 6379

$ redis-cli -p 6381
127.0.0.1:6381>slaveof 127.0.0.1 6380

4.反客为主

即 主机挂掉之后,备机上位成为主机,保证服务正常进行

当一个 master 宕机后,后面的 slave 可以立刻升为 master,其后面的 slave 不用做任何修改。用 slaveof no one 将从机变为主机。

# master 宕机
$ redis-cli -p 6379
127.0.0.1:6379> shutdown
# slave1升为master
$ redis-cli -p 6380
127.0.0.1:6380> slave no one
127.0.0.1:6380> set k2 v2
$ redis-cli -p 6381
127.0.0.1:6381> get k2
"v2"

5.哨兵模式

反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从机转换为主机

配置哨兵

  • 新建 sentinel.conf 文件,写入如下内容
$ vi sentinel.conf

sentinel monitor mymaster 127.0.0.1 6379 1

其中 mymaster 为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量。

启动哨兵

$ redis-sentinel sentinel.conf 

当主机挂掉,从机选举中产生新的主机

$ 127.0.0.1:6379> shutdown

这里显示6380变成了主机,原主机6379重启后会变为从机。

哪个从机会被选举为主机呢?根据优先级别:replica-priority

  • 优先级在redis.conf中默认:replica-priority 100 值,越小优先级越高

  • 偏移量是指获得原主机数据最全的

  • 每个redis实例启动后都会随机生成一个40位的runid

九、集群

Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。

Redis 集群提供了以下好处

  • 实现扩容

  • 分摊压力

  • 无中心配置相对简单

Redis 集群的不足

  • 多键操作是不被支持的

  • 多键的Redis事务是不被支持的。lua脚本不被支持

  • 由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

1.模拟集群

模拟6台 redis 服务器,6379、6380、6381、6389、6390、6391

主机(6379、6380、6381);从机(6389、6390、6391),将他们加入一个集群中

  • 配置 redis6379.conf 文件,拷贝 5 个 redis6379.conf 文件
$ vi redis6379.conf

include /Users/jianjian/myredis/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb
## cluster配置修改
dir "/Users/jianjian/myredis/redis_cluster"
logfile "/Users/jianjian/myredis/redis_cluster/redis_err_6379.log"
# 打开集群模式
cluster-enabled yes
# 设定节点配置文件名
cluster-config-file nodes-6379.conf
# 设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换
cluster-node-timeout 15000
$ cp redis6379.conf redis6380.conf
$ cp redis6379.conf redis6381.conf
$ cp redis6379.conf redis6389.conf
$ cp redis6379.conf redis6390.conf
$ cp redis6379.conf redis6391.conf
  • 使用查找替换修改拷贝的5个文件
#将redis6380.conf中的6379替换为6380
$ vi redis6380.conf

:%s/6379/6380  
  • 启动 6个 redis 服务

  • 将六个节点合成一个集群

合成之前,请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。

$ redis-cli --cluster create --cluster-replicas 1 192.168.11.101:6379 192.168.11.101:6380 192.168.11.101:6381 192.168.11.101:6389 192.168.11.101:6390 192.168.11.101:6391

此处不要用127.0.0.1, 请用真实IP地址;--cluster-replicas 1 表示采用最简单的方式配置集群,一台主机,一台从机,正好三组。

  • 采用集群策略连接,设置数据会自动切换到相应的写主机
$ redis-cli -c -p 6379
  • 查看集群信息
127.0.0.1:6379> cluster nodes

2.slots

什么是slots

一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个, 集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。

集群中的每个节点负责处理一部分插槽。 举个例子, 如果一个集群可以有主节点, 其中:

节点 A 负责处理 0 号至 5460 号插槽。

节点 B 负责处理 5461 号至 10922 号插槽。

节点 C 负责处理 10923 号至 16383 号插槽。

在集群中录入值

在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽 slot ,如果不是该客

户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。

redis-cli客户端提供了 –c 参数实现自动重定向。如 redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。

  • 不在一个slot下的键值,是不能使用mget,mset等多键操作。

  • 可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去。

  • 查询集群中的值

CLUSTER GETKEYSINSLOT <slot><count> 返回 count 个 slot 槽中的键

3.故障恢复

如果主节点下线?从节点能否自动升为主节点?注意:15秒超时

主节点恢复后,主从关系会如何?

主节点回来变成从机。

如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?

  • 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为yes ,那么 ,整个集群都挂掉

  • 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么,该插槽数据全都不能使用,也无法存储。

4.集群的 Jedis 开发

即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。

无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。

public class JedisClusterTest {
  public static void main(String[] args) { 
     Set<HostAndPort>set =new HashSet<HostAndPort>();
     set.add(new HostAndPort("192.168.31.211",6379));
     JedisCluster jedisCluster=new JedisCluster(set);
     jedisCluster.set("k1", "v1");
     System.out.println(jedisCluster.get("k1"));
  }
}

十、应用问题

1.缓存穿透

问题描述

key 对应的数据在数据源并不存在,每次针对此 key 的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户 id 获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

  • 1、应用服务器压力变大了
  • 2、redis 命中率降低
  • 3、 一直查询数据库

一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

解决方案

  • 1、对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟

  • 2、设置可访问的名单(白名单):使用 bitmaps 类型定义一个可以访问的名单,名单 id作为 bitmaps 的偏移量,每次访问和 bitmap 里面的 id 进行比较,如果访问 id 不在bitmaps 里面,进行拦截,不允许访问。

  • 3、采用布隆过滤器:布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。将所有可能存在的数据哈希到一个足够大的布隆过滤器中,一个一定不存在的数据会被这个布隆过滤器拦截掉,从而避免了对底层存储系统的查询压力。

  • 4、进行实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

2.缓存击穿

问题描述

key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮。

  • 1、数据库访问压力瞬时增加
  • 2、redis 某个 key 过期了,大量访问使用这个key
  • 3、redis 正常运行

key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。

解决方案

  • 1、预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长

  • 2、实时调整:现场监控哪些数据热门,实时调整key的过期时长

  • 3、使用锁

    • 1.就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。
    • 2.先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key
    • 3.当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;
    • 4.当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。

3.缓存雪崩

问题描述

key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。缓存雪崩与缓存击穿的区别在于这里针对很多 key 缓存,前者则是某一个 key。

  • 1、数据库压力变大服务器崩溃
  • 2、在极少时间段,查询大量 key 的集中过期

缓存失效时的雪崩效应对底层系统的冲击非常可怕!

解决方案

  • 1、 构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等)

  • 2、使用锁或队列:用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况

  • 3、 设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。

  • 4、将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

4.分布式锁

问题描述

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

解决方案

分布式锁主流的实现方案:

  • 1.基于数据库实现分布式锁

  • 2.基于缓存(Redis等)

  • 3.基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

  • 性能:redis最高

  • 可靠性:zookeeper最高

这里,我们就基于redis实现分布式锁。

$ 127.0.0.1:6379>  set key value EX second
$ 127.0.0.1:6379>  set sku:1:info “OK” NX PX 10000
  • EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
  • PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
  • NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
  • XX :只在键已经存在时,才对键进行设置操作。

@GetMapping("testLock")
public void testLock(){
    //1获取锁,setne
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
    //2获取锁成功、查询num的值
    if(lock){
        Object value = redisTemplate.opsForValue().get("num");
        //2.1判断num为空return
        if(StringUtils.isEmpty(value)){
            return;
        }
        //2.2有值就转成成int
        int num = Integer.parseInt(value+"");
        //2.3把redis的num加1
        redisTemplate.opsForValue().set("num", ++num);
        //2.4释放锁,del
        redisTemplate.delete("lock");

    }else{
        //3获取锁失败、每隔0.1秒再获取
        try {
            Thread.sleep(100);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

优化之设置锁的过期时间

问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放

解决:设置过期时间,自动释放锁。

设置过期时间有两种方式:

  • 1.首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
  • 2.在set时指定过期时间(推荐)

优化之UUID防误删

场景:如果业务逻辑的执行时间是7s。执行流程如下

  1. index1业务逻辑没执行完,3秒后锁被自动释放。

  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

  3. index3获取到锁,执行业务逻辑

  4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁

优化之LUA脚本保证删除的原子性

问题:删除操作缺乏原子性。

场景:

  1. index1执行删除时,查询到的lock值确实和uuid相等
uuid=v1
set(lock,uuid);

  1. index1执行删除前,lock刚好过期时间已到,被redis自动释放

在redis中没有了lock,没有了锁。

  1. index2获取了lock

index2线程获取到了cpu的资源,开始执行方法

uuid=v2
set(lock,uuid);
  1. index1执行删除,此时会把index2的lock删除

index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行

删除的index2的锁!

@GetMapping("testLockLua")
public void testLockLua() {
    //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
    String uuid = UUID.randomUUID().toString();
    //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
    String skuId = "25"; // 访问skuId 为25号的商品 100008348542
    String locKey = "lock:" + skuId; // 锁住的是每个商品的数据

    // 3 获取锁
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);

    // 第一种: lock 与过期时间中间不写任何的代码。
    // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
    // 如果true
    if (lock) {
        // 执行的业务逻辑开始
        // 获取缓存中的num 数据
        Object value = redisTemplate.opsForValue().get("num");
        // 如果是空直接返回
        if (StringUtils.isEmpty(value)) {
            return;
        }
        // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
        int num = Integer.parseInt(value + "");
        // 使num 每次+1 放入缓存
        redisTemplate.opsForValue().set("num", String.valueOf(++num));
        /*使用lua脚本来锁*/
        // 定义lua 脚本
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        // 使用redis执行lua执行
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        // 设置一下返回值类型 为Long
        // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
        // 那么返回字符串与0 会有发生错误。
        redisScript.setResultType(Long.class);
        // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
        redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
    } else {
        // 其他线程等待
        try {
            // 睡眠
            Thread.sleep(1000);
            // 睡醒了之后,调用方法。
            testLockLua();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

总结

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
  • 加锁和解锁必须具有原子性。

同一时间只有一个人有锁,而且开锁解锁都是同一个人,不会死锁

十一、6.0新功能

1.ACL

简介

Redis ACL是Access Control List(访问控制列表)的缩写,该功能允许根据可以执行的命令和可以访问的键来限制某些连接。

在Redis 5版本之前,Redis 安全规则只有密码控制 还有通过 rename 来调整高危命令比如 flushdb , KEYS* , shutdown 等。Redis 6 则提供ACL的功能对用户进行更细粒度的权限控制 :

(1)接入权限:用户名和密码

(2)可以执行的命令

(3)可以操作的 KEY

命令

  • acl list 展现用户权限列表

  • acl cat 查看添加权限指令类别,加参数类型名可以查看类型下具体命令
  • acl whoami 查看当前用户

  • acl setuser 创建和编辑用户ACL

下面是有效ACL规则的列表。某些规则只是用于激活或删除标志,或对用户ACL执行给定更改的单个单词。其他规则是字符前缀,它们与命令或类别名称、键模式等连接在一起。

  • 创建新用户默认权限
acl setuser 用户名

  • 设置有用户名、密码、ACL权限、并启用的用户
acl setuser user2 on >password ~cached:* +get

  • 切换用户,验证权限
auth user2 password

2.IO多线程

Redis 6终于支持多线程了,告别单线程了吗?

IO 多线程其实指客户端交互部分网络IO交互处理模块多线程,而非执行命令多线程。Redis6执行命令依然是单线程。

原理架构

Redis 6 加入多线程,但跟 Memcached 这种从 IO处理到数据访问多线程的实现模式有些差异。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、lua、事务,LPUSH/LPOP 等等的并发问题。整体的设计大体如下:

另外,多线程IO默认也是不开启的,需要在配置文件中配置

io-threads-do-reads yes 

io-threads 4

3.Cluster

之前老版 Redis 想要搭集群需要单独安装 ruby 环境,Redis 5 将 redis-trib.rb 的功能集成到 redis-cli 。另外官方 redis-benchmark 工具开始支持 cluster 模式了,通过多线程的方式对多个分片进行压测。

4.其它功能

1、RESP3新的 Redis 通信协议:优化服务端与客户端之间通信

2、Client side caching客户端缓存:基于 RESP3 协议实现的客户端缓存功能。为了进一步提升缓存的性能,将客户端经常访问的数据cache到客户端。减少TCP网络交互。

3、Proxy集群代理模式:Proxy 功能,让 Cluster 拥有像单实例一样的接入方式,降低大家使用cluster的门槛。不过需要注意的是代理不改变 Cluster 的功能限制,不支持的命令还是不会支持,比如跨 slot 的多Key操作。

4、Modules API:Redis 6中模块API开发进展非常大,因为Redis Labs为了开发复杂的功能,从一开始就用上Redis模块。Redis可以变成一个框架,利用Modules来构建不同系统,而不需要从头开始写然后还要BSD许可。Redis一开始就是一个向编写各种系统开放的平台。

参考

Sponsor❤️

您的支持是我不断前进的动力,如果您感觉本文对您有所帮助的话,可以考虑打赏一下本文,用以维持本博客的运营费用,拒绝白嫖,从你我做起!🥰🥰🥰

支付宝 微信

文章作者: 简简
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 简简 !
评论
填上邮箱会收到评论回复提醒哦!!!
 上一篇
Redis-实战篇 Redis-实战篇
⓪系统简介 短信登录:使用Redis共享session来实现 商户查询缓存:理解缓存击穿,缓存穿透,缓存雪崩等问题 优惠卷秒杀 学会Redis的计数器功能, 结合Lua完成高性能的redis操作 学会Redis分布式锁的原理,包括R
2022-08-08
下一篇 
SSM整合 SSM整合
今天开始学习我自己总结的 Java-学习路线 中的《SSM整合》,小简从 0 开始学 Java 知识,并不定期更新所学笔记,期待一年后的蜕变吧!<有同样想法的小伙伴,可以联系我一起交流学习哦!> 🚩时间安排:预计7天更新完
2022-05-26
  目录