web项目功能案例总结
Common模块
Utils
org.springframework.util.StringUtils;
登录
基于口令
outh2.0
QQ、微信、github等登录微软
分布式单点登录
mall: 客户端保留用户token,通过请求发送回来,当前用户信息保存在Redis中
redis中
-
设置
REDIS_DATABASE + ":" + REDIS_KEY_AUTH_CODE + ":" + telephone
为authKey的key(验证码) -
设置
REDIS_DATABASE + ":" + REDIS_KEY_MEMBER + ":" + member.getId()
为member对象的key
需要获取当前用户的时候,从前端传来的token转换成userDto对象,用userDto.getId()得到memberId,然后拼接成Redis中的key获取到member
管理/授权
绪论
主流的权限模型主要分为以下五种:
- ACL模型:访问控制列表
- DAC模型:自主访问控制
- MAC模型:强制访问控制
- ABAC模型:基于属性的访问控制
- RBAC模型:基于角色的权限访问控制
主体:用户
客体:对象
Access Control List,ACL是最早的、最基本的一种访问控制机制,是基于客体进行控制的模型,在其他模型中也有ACL的身影。为了解决相同权限的用户挨个配置的问题,后来也采用了用户组的方式。
原理:每一个客体都有一个列表,列表中记录的是哪些主体可以对这个客体做哪些行为,非常简单。
缺点:当主体的数量较多时,配置和维护工作就会成本大、易出错。
Discretionary Access Control,DAC是ACL的一种拓展。
原理:在ACL模型的基础上,允许主体可以将自己拥有的权限自主地授予其他主体,所以权限可以任意传递。
例如:常见于文件系统,LINUX,UNIX、WindowsNT版本的操作系统都提供DAC的支持。
缺点:对权限控制比较分散,例如无法简单地将一组文件设置统一的权限开放给指定的一群用户。主体的权限太大,无意间就可能泄露信息。
Mandatory Access Control,MAC模型中主要的是双向验证机制。常见于机密机构或者其他等级观念强烈的行业,如军用和市政安全领域的软件。
原理:主体有一个权限标识,客体也有一个权限标识,而主体能否对该客体进行操作取决于双方的权限标识的关系。
例如:将军分为上将>中将>少将,军事文件保密等级分为绝密>机密>秘密,规定不同军衔仅能访问不同保密等级的文件,如少将只能访问秘密文件;当某一账号访问某一文件时,系统会验证账号的军衔,也验证文件的保密等级,当军衔和保密等级相对应时才可以访问。
缺点:控制太严格,实现工作量大,缺乏灵活性。
Attribute-Based Access Control,能很好地解决RBAC的缺点,在新增资源时容易维护。
原理:通过动态计算一个或一组属性是否满足某种机制来授权,是一种很灵活的权限模型,可以按需实现不同颗粒度的权限控制。
属性通常有四类:
- 主体属性,如用户年龄、性别等;
- 客体属性,如一篇文章等;
- 环境属性,即空间限制、时间限制、频度限制;
- 操作属性,即行为类型,如读写、只读等。
Role-Based Access Control,核心在于用户只和角色关联,而角色代表对了权限,是一系列权限的集合。
RBAC三要素:
- 用户:系统中所有的账户
- 角色:一系列权限的集合(如:管理员,开发者,审计管理员等)
- 权限:菜单,按钮,数据的增删改查等详细权限。
在RBAC中,权限与角色相关联,用户通过成为适当角色的成员而得到这些角色的权限。
优点:便于角色划分,更灵活的授权管理;最小颗粒度授权;
RBAC模型可以分为:RBAC0、RBAC1、RBAC2、RBAC3 四个阶段,一般公司使用RBAC0的模型就可以。另外,RBAC0相当于底层逻辑,后三者都是在RBAC0模型上的拔高。
RBAC0模型
用户和角色、角色和权限多对多关系。
简单来说就是一个用户拥有多个角色,一个角色可以被多个用户拥有,这是用户和角色的多对多关系;同样的,角色和权限也是如此。
RBAC1模型
相对于RBAC0模型,增加了角色分级的逻辑,类似于树形结构,下一节点继承上一节点的所有权限,如role1根节点下有role1.1和role1.2两个子节点
RBAC2模型
基于RBAC0模型,对角色增加了更多约束条件。
如角色互斥,如不能兼任。
如角色数量限制,例如:一个角色专门为公司CEO创建的,最后发现公司有10个人拥有CEO角色,一个公司有10个CEO?
- 举点儿政治官员例子,权当学习官员
RBAC2 模型主要是为了增加角色赋予的限制条件,这也符合权限系统的目标:权责明确,系统使用安全、保密。
RBAC3模型
同样是基于RBAC0模型,但是综合了RBAC1和RBAC2的所有特点
蝶灭玩加减法呢你!
动态权限
之前静态权限:@PreAuthorize("hasAuthority('pms:product:create')")
接下来我们详细介绍下如何使用Spring Security实现基于路径的动态权限。
首先我们需要创建一个过滤器,用于实现动态权限控制,所有的鉴权操作都会在super.beforeInvocation(fi)
中进行。
/**
* 动态权限过滤器,用于实现基于路径的动态权限过滤
*/
public class DynamicSecurityFilter extends AbstractSecurityInterceptor implements Filter {
@Autowired
private DynamicSecurityMetadataSource dynamicSecurityMetadataSource;
@Autowired
private IgnoreUrlsConfig ignoreUrlsConfig;
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
FilterInvocation fi = new FilterInvocation(servletRequest, servletResponse, filterChain);
//OPTIONS请求直接放行
if(request.getMethod().equals(HttpMethod.OPTIONS.toString())){
fi.getChain().doFilter(fi.getRequest(), fi.getResponse());
return;
}
//白名单请求直接放行
PathMatcher pathMatcher = new AntPathMatcher();
for (String path : ignoreUrlsConfig.getUrls()) {
if(pathMatcher.match(path,request.getRequestURI())){
fi.getChain().doFilter(fi.getRequest(), fi.getResponse());
return;
}
}
//此处会调用AccessDecisionManager中的decide方法进行鉴权操作
InterceptorStatusToken token = super.beforeInvocation(fi);
try {
fi.getChain().doFilter(fi.getRequest(), fi.getResponse());
} finally {
super.afterInvocation(token, null);
}
}
}
接下来我们需要自己实现SecurityMetadataSource接口的getAttributes方法,用于获取当前访问路径所需资源。
/**
* 动态权限数据源,用于获取动态权限规则
* Created by macro on 2020/2/7.
*/
public class DynamicSecurityMetadataSource implements FilterInvocationSecurityMetadataSource {
private static Map<String, ConfigAttribute> configAttributeMap = null;
@Autowired
private DynamicSecurityService dynamicSecurityService;
@PostConstruct
public void loadDataSource() {
configAttributeMap = dynamicSecurityService.loadDataSource();
}
public void clearDataSource() {
configAttributeMap.clear();
configAttributeMap = null;
}
@Override
public Collection<ConfigAttribute> getAttributes(Object o) throws IllegalArgumentException {
if (configAttributeMap == null) this.loadDataSource();
List<ConfigAttribute> configAttributes = new ArrayList<>();
//获取当前访问的路径
String url = ((FilterInvocation) o).getRequestUrl();
String path = URLUtil.getPath(url);
PathMatcher pathMatcher = new AntPathMatcher();
Iterator<String> iterator = configAttributeMap.keySet().iterator();
//获取访问该路径所需资源
while (iterator.hasNext()) {
String pattern = iterator.next();
if (pathMatcher.match(pattern, path)) {
configAttributes.add(configAttributeMap.get(pattern));
}
}
// 未设置操作请求权限,返回空集合
return configAttributes;
}
}
验证
登录后的验证,常见技术如cookie、session、jwt
验证码
生成验证码时,将自定义的Redis键值加上手机号生成一个Redis的key,以验证码为value存入到Redis中,并设置过期时间(这里为120s)。校验验证码时根据手机号码来获取Redis里面存储的验证码,并与传入的验证码进行比对。
自定义redis key
redis:
key:
prefix:
authCode: "portal:authCode:"
expire:
authCode: 120 # 验证码超期时间
import com.macro.mall.tiny.common.api.CommonResult;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.StringUtils;
@Service
public class UmsMemberServiceImpl implements UmsMemberService {
@Autowired
private RedisService redisService;
@Value("${redis.key.prefix.authCode}")
private String REDIS_KEY_PREFIX_AUTH_CODE;
@Value("${redis.key.expire.authCode}")
private Long AUTH_CODE_EXPIRE_SECONDS;
@Override
public CommonResult generateAuthCode(String telephone) {
StringBuilder sb = new StringBuilder();
Random random = new Random();
for (int i = 0; i < 6; i++) {
sb.append(random.nextInt(10));
}
//验证码绑定手机号并存储到redis
redisService.set(REDIS_KEY_PREFIX_AUTH_CODE + telephone, sb.toString());
redisService.expire(REDIS_KEY_PREFIX_AUTH_CODE + telephone, AUTH_CODE_EXPIRE_SECONDS);
return CommonResult.success(sb.toString(), "获取验证码成功");
}
//对输入的验证码进行校验
@Override
public CommonResult verifyAuthCode(String telephone, String authCode) {
if (StringUtils.isEmpty(authCode)) {
return CommonResult.failed("请输入验证码");
}
String realAuthCode = redisService.get(REDIS_KEY_PREFIX_AUTH_CODE + telephone);
boolean result = authCode.equals(realAuthCode);
if (result) {
return CommonResult.success(null, "验证码校验成功");
} else {
return CommonResult.failed("验证码不正确");
}
}
}
分页查询
pageHelper
limit
会导致全表查询
https://www.db-fiddle.com/f/3JSpBxVgcqL3W2AzfRNCyq/1?ref=hackernoon.com
左边的 Schema SQL 将插入 10 万行数据,右边有一个性能很差的查询和一个较好的解决方案。
只需单击顶部的 Run,就可以比较它们的执行时间。第一个查询的运行时间至少是第二个查询的 30 倍。
数据越多,情况就越糟。看看我对 10 万行数据进行的 PoC:
https://github.com/IvoPereira/Efficient-Pagination-SQL-PoC?ref=hackernoon.com
大于号
where id > 10 limit 20
这是一种基于指针的分页。
你要在本地保存上一次接收到的主键 (通常是一个 ID) 和 LIMIT,而不是 OFFSET 和 LIMIT,那么每一次的查询可能都与此类似。
两种limit的执行过程
上面的两种查询方式。对应 limit offset, size
和 limit size
两种方式。
而其实 limit size
,相当于 limit 0, size
。也就是从0开始取size条数据。
也就是说,两种方式的区别在于offset是否为0。
我们先来看下limit sql的内部执行逻辑。
Mysql架构
mysql内部分为server层和存储引擎层。一般情况下存储引擎都用innodb。
server层有很多模块,其中需要关注的是执行器是用于跟存储引擎打交道的组件。
执行器可以通过调用存储引擎提供的接口,将一行行数据取出,当这些数据完全符合要求(比如满足其他where条件),则会放到结果集中,最后返回给调用mysql的客户端(go、java写的应用程序)。
叶子结点里放的信息会根据当前的索引是主键还是非主键有所不同。
- 如果是主键索引,它的叶子节点会存放完整的行数据信息。
- 如果是非主键索引,那它的叶子节点则会存放主键,如果想获得行数据信息,则需要再跑到主键索引去拿一次数据,这叫回表。
- 啥意思,主键和非主键有不同的索引树?
limit子句中的offset会导致先查找全部结果再抛弃前面offset的。
假如我们将sql语句修改成下面这样。
select * from 表名 where id >=(select id from 表名 order by id limit 6000000, 1) order by id limit 10;
上面这条sql语句,里面先执行子查询 select id from page order by id limit 6000000, 1
, 这个操作,其实也是将在innodb中的主键索引中获取到6000000+1
条数据,然后server层会抛弃前6000000条,只保留最后一条数据的id。
但不同的地方在于,在返回server层的过程中,只会拷贝数据行内的id这一列,而不会拷贝所有列,当数据量较大时,这部分的耗时还是比较明显的。
3s的查询优化到了1.5s,很挫,,
基于非主键索引的limit执行过程
上面提到的是主键索引的执行过程,我们再来看下基于非主键索引的limit执行过程。
比如下面的sql语句
select * from page order by user_name limit 0, 10;
server层会调用innodb的接口,在innodb里的非主键索引中获取到第0条数据对应的主键id后,回表到主键索引中找到对应的完整行数据,然后返回给server层,server层将其放到结果集中,返回给客户端。
而当offset>0时,且offset的值较小时,逻辑也类似,区别在于,offset>0时会丢弃前面的offset条数据。
也就是说非主键索引的limit过程,比主键索引的limit过程,多了个回表的消耗。
但当offset变得非常大时,比如600万,此时执行explain。
非主键索引offset值超大时走全表扫描
可以看到type那一栏显示的是ALL,也就是全表扫描。
这是因为server层的优化器,会在执行器执行sql语句前,判断下哪种执行计划的代价更小。
很明显,优化器在看到非主键索引的600w次回表之后,摇了摇头,还不如全表一条条记录去判断算了,于是选择了全表扫描。
因此,当limit offset过大时,非主键索引查询非常容易变成全表扫描。是真·性能杀手。
这种情况也能通过一些方式去优化。比如
select * from page t1, (select id from page order by user_name limit 6000000, 100) t2 WHERE t1.id = t2.id;
通过select id from page order by user_name limit 6000000, 100
。先走innodb层的user_name非主键索引取出id,因为只拿主键id,不需要回表,所以这块性能会稍微快点,在返回server层之后,同样抛弃前600w条数据,保留最后的100个id。然后再用这100个id去跟t1表做id匹配,此时走的是主键索引,将匹配到的100条行数据返回。这样就绕开了之前的600w条数据的回表。
当然,跟上面的case一样,还是没有解决要白拿600w条数据然后抛弃的问题,这也是非常挫的优化。
像这种,当offset变得超大时,比如到了百万千万的量级,问题就突然变得严肃了。
这里就产生了个专门的术语,叫深度分页。
深度分页问题
深度分页问题,是个很恶心的问题,恶心就恶心在,这个问题,它其实无解。
不管你是用mysql还是es,你都只能通过一些手段去"减缓"问题的严重性。
遇到这个问题,我们就该回过头来想想。
为什么我们的代码会产生深度分页问题?
它背后的原始需求是什么,我们可以根据这个做一些规避。
如果你是想取出全表的数据
有些需求是这样的,我们有一张数据库表,但我们希望将这个数据库表里的所有数据取出,异构到es,或者hive里,这时候如果直接执行
select * from page;
这个sql一执行,狗看了都摇头。
因为数据量较大,mysql根本没办法一次性获取到全部数据,妥妥超时报错。
于是不少mysql小白会通过limit offset size
分页的形式去分批获取,刚开始都是好的,等慢慢地,哪天数据表变得奇大无比,就有可能出现前面提到的深度分页问题。
这种场景是最好解决的。
我们可以将所有的数据根据id主键进行排序,然后分批次取,将当前批次的最大id作为下次筛选的条件进行查询。
可以看下伪代码
batch获取数据
这个操作,可以通过主键索引,每次定位到id在哪,然后往后遍历100个数据,这样不管是多少万的数据,查询性能都很稳定。
batch分批获取user表
如果是给用户做分页展示
如果深度分页背后的原始需求只是产品经理希望做一个展示页的功能,比如商品展示页,那么我们就应该好好跟产品经理battle一下了。
什么样的翻页,需要翻到10多万以后,这明显是不合理的需求。
是不是可以改一下需求,让它更接近用户的使用行为?
比如,我们在使用谷歌搜索时看到的翻页功能。
一般来说,谷歌搜索基本上都在20页以内,作为一个用户,我就很少会翻到第10页之后。
作为参考。
如果我们要做搜索或筛选类的页面的话,就别用mysql了,用es,并且也需要控制展示的结果数,比如一万以内,这样不至于让分页过深。
如果因为各种原因,必须使用mysql。那同样,也需要控制下返回结果数量,比如数量1k以内。
这样就能勉强支持各种翻页,跳页(比如突然跳到第6页然后再跳到第106页)。
但如果能从产品的形式上就做成不支持跳页会更好,比如只支持上一页或下一页。
上下页的形式
这样我们就可以使用上面提到的start_id方式,采用分批获取,每批数据以start_id为起始位置。这个解法最大的好处是不管翻到多少页,查询速度永远稳定。
变成像抖音那样只能上划或下划,专业点,叫瀑布流。
总结
limit offset, size
比limit size
要慢,且offset的值越大,sql的执行速度越慢。- 当offset过大,会引发深度分页问题,目前不管是mysql还是es都没有很好的方法去解决这个问题。只能通过限制查询数量或分批获取的方式进行规避。
- 遇到深度分页的问题,多思考其原始需求,大部分时候是不应该出现深度分页的场景的,必要时多去影响产品经理。
- 如果数据量很少,比如1k的量级,且长期不太可能有巨大的增长,还是用
limit offset, size
的方案吧,整挺好,能用就行。
方法1: 直接使用数据库提供的SQL语句
- 语句样式: MySQL中,可用如下方法: SELECT * FROM 表名称 LIMIT M,N
- 适应场景:适用于数据量较少的情况(元组百/千级)
- 原因/缺点:全表扫描,速度会很慢 且 有的数据库结果集返回不稳定(如某次返回1,2,3,另外的一次返回2,1,3)。Limit限制的是从结果集的M位置处取出N条输出,其余抛弃。
方法2: 建立主键或唯一索引, 利用索引(假设每页10条)
语句样式: MySQL中,可用如下方法: SELECT * FROM 表名称 WHERE id_pk > (pageNum*10) LIMIT M
- 适应场景: 适用于数据量多的情况(元组数上万)
- 原因: 索引扫描,速度会很快。
有朋友提出: 因为数据查询出来并不是按照pk_id排序的,所以会有漏掉数据的情况,只能选择方法3。
方法3:基于索引再排序
语句样式: MySQL中,可用如下方法:
SELECT * FROM 表名称
WHERE id_pk > (pageNum*10)
ORDER BY id_pk ASC LIMIT M
- 适应场景: 适用于数据量多的情况(元组数上万). 最好ORDER BY后的列对象是主键或唯一所以,使得ORDERBY操作能利用索引被消除但结果集是稳定的(稳定的含义,参见方法1)
- 原因: 索引扫描,速度会很快.
但MySQL的排序操作,只有ASC没有DESC(DESC是假的,未来会做真正的DESC,期待…)。
方法4:基于索引使用prepare
第一个问号表示pageNum,第二个?表示每页元组数。
- 语句样式: MySQL中,可用如下方法:
PREPARE stmt_name FROM SELECT * FROM 表名称 WHERE id_pk > (?* ?) ORDER BY id_pk ASC LIMIT M
- 适应场景: 大数据量
- 原因: 索引扫描,速度会很快。
prepare语句又比一般的查询语句快一点。
方法5:利用MySQL支持ORDER操作可以利用索引快速定位部分元组,避免全表扫描。
比如: 读第1000到1019行数据(pk是主键/唯一键)。
SELECT * FROM your_table
WHERE pk>=1000
ORDER BY pk ASC
LIMIT 0,20
方法6:利用"子查询/连接+索引"快速定位元组的位置,然后再读取元组。
利用子查询示例:
SELECT * FROM your_table
WHERE id <=
(SELECT id FROM your_table
ORDER BY id desc
LIMIT ($page-1)*$pagesize
ORDER BY id desc
LIMIT $pagesize
利用连接示例:
SELECT * FROM your_table AS t1
JOIN (
SELECT id FROM your_table
ORDER BY id desc LIMIT ($page-1)*$pagesize
) AS t2
WHERE t1.id <= t2.id
ORDER BY t1.id desc LIMIT $pagesize;
mysql大数据时使用limit分页,随着页码的增大,查询效率越低下。
测试实验
1、直接用limit start, count分页语句, 也是我程序中用的方法:
count当起始页较小时,查询没有性能问题,我们分别看下从10, 100, 1000, 10000开始分页的执行时间(每页取20条)。
select * from product limit start。
当起始页较小时,查询没有性能问题,我们分别看下从10, 100, 1000, 10000开始分页的执行时间(每页取20条)。
如下:
select * from product limit 10, 20;//0.016秒
select * from product limit 100, 20;//0.016秒
select * from product limit 1000, 20;//0.047秒
select * from product limit 10000, 20;//0.094秒
我们已经看出随着起始记录的增加,时间也随着增大, 这说明分页语句limit跟起始页码是有很大关系的,那么我们把起始记录改为40w看下(也就是记录的一半左右).
select * from product limit 400000, 20;//3.229秒
再看我们取最后一页记录的时间
select * from product limit 866613, 20;//37.44秒
像这种分页最大的页码页显然这种时间是无法忍受的。
从中我们也能总结出两件事情:
- limit语句的查询时间与起始记录的位置成正比
- mysql的limit语句是很方便,但是对记录很多的表并不适合直接使用。
2、对limit分页问题的性能优化方法
利用表的覆盖索引来加速分页查询
我们都知道,利用了索引查询的语句中如果只包含了那个索引列(覆盖索引),那么这种情况会查询很快。
因为利用索引查找有优化算法,且数据就在查询索引上面,不用再去找相关的数据地址了,这样节省了很多时间。另外Mysql中也有相关的索引缓存,在并发高的时候利用缓存就效果更好了。
在我们的例子中,我们知道id字段是主键,自然就包含了默认的主键索引。现在让我们看看利用覆盖索引的查询效果如何。
这次我们之间查询最后一页的数据(利用覆盖索引,只包含id列),如下:
select id from product limit 866613, 20;//0.2秒
相对于查询了所有列的37.44秒,提升了大概100多倍的速度。
那么如果我们也要查询所有列,有两种方法,一种是id>=的形式,另一种就是利用join,看下实际情况:
SELECT * FROM product
WHERE ID > =(
select id from product limit 866613, 1)
limit 20
查询时间为0.2秒!
另一种写法
SELECT * FROM product a JOIN
(select id from product limit 866613, 20) b
ON a.ID = b.id
查询时间为0.3秒!
3、复合索引优化方法
MySql 性能到底能有多高?MySql 这个数据库绝对是适合dba级的高手去玩的,一般做一点1万篇新闻的小型系统怎么写都可以,用xx框架可以实现快速开发。可是数据量到了10万,百万至千万,他的性能还能那么高吗?一点小小的失误,可能造成整个系统的改写,甚至使系统无法正常运行!好了,不那么多废话了。
用事实说话,看例子:
数据表 collect ( id, title ,info ,vtype) 就这4个字段,其中 title 用定长,info 用text, id 是主键,vtype是tinyint,vtype是索引。这是一个基本的新闻系统的简单模型。现在往里面填充数据,填充10万篇新闻。最后collect 为 10万条记录,数据库表占用约1.6G。
OK ,看下面这条sql语句:
select id,title from collect limit 1000,10;
很快;基本上0.01秒就OK,再看下面的
select id,title from collect limit 90000,10;
从9万条开始分页,结果?
8-9秒完成,my god 哪出问题了?其实要优化这条数据,网上找得到答案。看下面一条语句:
select id from collect order by id limit 90000,10;
很快,0.04秒就OK。为什么?因为用了id主键做索引当然快。网上的写法是:
select id,title from collect where id>=
(select id from collect
order by id limit 90000,1)
limit 10;
这就是用了id做索引的结果。可是问题复杂那么一点点,就完了。看下面的语句
select id from collect
where vtype=1
order by id limit 90000,10; //很慢,用了8-9秒!
到了这里我相信很多人会和我一样,有崩溃感觉!vtype 做了索引了啊?怎么会慢呢?vtype做了索引是不错,你直接
select id from collect
where vtype=1 limit 1000,10;
是很快的,基本上0.05秒,可是提高90倍,从9万开始,那就是0.05*90=4.5秒的速度了。和测试结果8-9秒到了一个数量级。
从这里开始有人提出了分表的思路,这个和dis #cuz 论坛是一样的思路。思路如下:
建一个索引表:t (id,title,vtype) 并设置成定长,然后做分页,分页出结果再到 collect 里面去找info 。是否可行呢?实验下就知道了。
10万条记录到 t(id,title,vtype) 里,数据表大小20M左右。用
select id from t
where vtype=1
order by id limit 90000,10;
很快了。基本上0.1-0.2秒可以跑完。为什么会这样呢?我猜想是因为collect 数据太多,所以分页要跑很长的路。limit 完全和数据表的大小有关的。其实这样做还是全表扫描,只是因为数据量小,只有10万才快。OK, 来个疯狂的实验,加到100万条,测试性能。加了10倍的数据,马上t表就到了200多M,而且是定长。还是刚才的查询语句,时间是0.1-0.2秒完成!分表性能没问题?
错!因为我们的limit还是9万,所以快。给个大的,90万开始
select id from t
where vtype=1
order by id limit 900000,10;
看看结果,时间是1-2秒!why ?
分表了时间还是这么长,非常之郁闷!有人说定长会提高limit的性能,开始我也以为,因为一条记录的长度是固定的,mysql 应该可以算出90万的位置才对啊?可是我们高估了mysql 的智能,他不是商务数据库,事实证明定长和非定长对limit影响不大?怪不得有人说discuz到了100万条记录就会很慢,我相信这是真的,这个和数据库设计有关!
难道MySQL 无法突破100万的限制吗???到了100万的分页就真的到了极限?
答案是:NO 为什么突破不了100万是因为不会设计mysql造成的。下面介绍非分表法,来个疯狂的测试!一张表搞定100万记录,并且10G 数据库,如何快速分页!
好了,我们的测试又回到 collect表,开始测试结论是:
30万数据,用分表法可行,超过30万他的速度会慢到你无法忍受!当然如果用分表+我这种方法,那是绝对完美的。但是用了我这种方法后,不用分表也可以完美解决!
答案就是:复合索引!有一次设计mysql索引的时候,无意中发现索引名字可以任取,可以选择几个字段进来,这有什么用呢?
开始的
select id from collect
order by id limit 90000,10;
这么快就是因为走了索引,可是如果加了where 就不走索引了。抱着试试看的想法加了 search(vtype,id) 这样的索引。
然后测试
select id from collect
where vtype=1 limit 90000,10;
非常快!0.04秒完成!
再测试:
select id ,title from collect
where vtype=1 limit 90000,10;
非常遗憾,8-9秒,没走search索引!
再测试:search(id,vtype),还是select id 这个语句,也非常遗憾,0.5秒。
综上:如果对于有where 条件,又想走索引用limit的,必须设计一个索引,将where 放第一位,limit用到的主键放第2位,而且只能select 主键!
完美解决了分页问题了。可以快速返回id就有希望优化limit,按这样的逻辑,百万级的limit应该在0.0x秒就可以分完。看来mysql语句的优化和索引是非常重要的!
日志模块
分为操作日志和运行日志
操作日志
业务日志,记录业务操作
自定义注解,可以存在数据库中
需要数据操作日志的可以看看这两个开源项目,可以解决从xx改为xx。
https://github.com/qqxx6661/logRecord
https://github.com/mouzt/mzt-biz-log
系统日志
日志框架
ELK
docker系统搭建
首先我们需要搭建ELK日志收集系统,这里使用在Docker环境下安装的方式。
- 安装并运行Elasticsearch容器,使用如下命令即可;
docker run -p 9200:9200 -p 9300:9300 --name elasticsearch \
-e "discovery.type=single-node" \
-e "cluster.name=elasticsearch" \
-e "ES_JAVA_OPTS=-Xms512m -Xmx1024m" \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-d elasticsearch:7.17.3
- 启动时会发现
/usr/share/elasticsearch/data
目录没有访问权限,只需要修改/mydata/elasticsearch/data
目录的权限,再重新启动即可;
chmod 777 /mydata/elasticsearch/data/
- 安装并运行Logstash容器,使用如下命令即可,
logstash.conf
文件地址:https://github.com/macrozheng/mall/blob/master/document/elk/logstash.conf
docker run --name logstash -p 4560:4560 -p 4561:4561 -p 4562:4562 -p 4563:4563 \
--link elasticsearch:es \
-v /mydata/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
-d logstash:7.17.3
- 进入容器内部,安装
json_lines
插件;
docker exec -it logstash /bin/bash
logstash-plugin install logstash-codec-json_lines
- 安装并运行Kibana容器,使用如下命令即可;
docker run --name kibana -p 5601:5601 \
--link elasticsearch:es \
-e "elasticsearch.hosts=http://es:9200" \
-d kibana:7.17.3
- ELK日志收集系统启动完成后,就可以访问Kibana的界面了,访问地址:http://192.168.3.105:5601
日志规范
- error:错误日志,指比较严重的错误,对正常业务有影响,需要运维配置监控的;
- warn:警告日志,一般的错误,对业务影响不大,但是需要开发关注;
- info:信息日志,记录排查问题的关键信息,如调用时间、出参入参等等;
- debug:用于开发DEBUG的,关键逻辑里面的运行时数据;
- trace:最详细的信息,一般这些信息只记录到日志文件中。
日志要打印出方法的入参、出参、返回值(可以设置为debug级)
日志格式
理想的日志格式,应当包括这些最基本的信息:如当前时间戳(一般毫秒精确度)、日志级别,线程名字等等。在logback日志里可以这么配置:
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level [%thread][%logger{0}] %m%n</pattern>
</encoder>
</appender>
对于trace/debug这些比较低的日志级别,必须进行日志级别的开关判断。
if (log.isDebugEnabled()) {
log.debug("userId is: {}", user.getId());
}
不能直接使用日志系统(Log4j、Logback)中的 API,而是使用日志框架SLF4J中的API。
SLF4J 是门面模式的日志框架,有利于维护和各个类的日志处理方式统一,并且可以在保证不修改代码的情况下,很方便的实现底层日志框架的更换。
建议使用参数占位{},而不是用+拼接。
{}是替换?字符串拼接耗时
异步输出日志,以logback为例:
<appender name="FILE_ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="ASYNC"/>
</appender>
禁止在线上环境开启debug
e.getMessage()
不会记录详细的堆栈异常信息,只会记录错误基本描述信息,不利于排查问题。
e.printStackTrace()打印出的堆栈日志跟业务代码日志是交错混合在一起的,通常排查异常日志不太方便。
e.printStackTrace()语句产生的字符串记录的是堆栈信息,如果信息太长太多,字符串常量池所在的内存块没有空间了,即内存满了,那么,用户的请求就卡住啦
打印e
记录异常的不要抛异常
日志文件分离,方法多样,如access.log,或者error级别error.log、按业务分、等等
日志收集原理
日志收集系统的原理是这样的,首先应用集成了Logstash插件,通过TCP向Logstash传输日志。Logstash接收到日志后根据日志类型将日志存储到Elasticsearch的不同索引上去,Kibana从Elasticsearch中读取日志,然后我们就可以在Kibana中进行可视化日志分析了
前后端通信
文件上传
防止重复提交
常见方法:自定义注解@RepeatSubmit
ruoyi?配置拦截器,比较参数和时间,比较sessionID
也有切面类+redis的
//设置Redis的key
Boolean absent = redisTemplate.opsForValue().setIfAbsent(key,"userInfo", annotation.limit(), TimeUnit.SECONDS);
Assert.isTrue(absent, "请勿重复提交");
https://www.cnblogs.com/goloving/p/16026003.html
新版ruoyi-cloud:前端防止重复提交
在axios的拦截器里
异常处理
@RestControllerAdvice是一个组合注解,由@ControllerAdvice、@ResponseBody组成,而@ControllerAdvice继承了@Component,因此@RestControllerAdvice本质上是个Component,用于定义@ExceptionHandler,@InitBinder和@ModelAttribute方法,适用于所有使用@RequestMapping方法。
https://blog.csdn.net/user2025/article/details/105458842
如果在类上注解了@RestControllerAdvice方法,则可以在该类的方法上添加@ExceptionHandler、@InitBinder、@ModelAttribute注解
- @ExceptionHandler:用于指定异常处理方法。当与@RestControllerAdvice配合使用时,用于全局处理控制器里的异常。
- @InitBinder:用来设置WebDataBinder,用于自动绑定前台请求参数到Model中。
- @ModelAttribute:本来作用是绑定键值对到Model中,当与@ControllerAdvice配合使用时,可以让全局的@RequestMapping都能获得在此处设置的键值对
定义全局异常处理类时,是使用@RestControllerAdvice还是@ControllerAdvice,要看返回的数据是不是全是json
@ControllerAdvice还可以返回
ModelAndView
对象,如传统前后端不分离应用,错误页面(如404)在后端的情景
支付系统
支付中心系统对内为各个业务线提供统一的支付、退款等服务,对外对接三方支付或银行服务实现资金的流转。如下图:
支付流程
上图展示了用户支付的主要流程,分为三个步骤:
- 用户在业务订单确认页,唤起收银台页面。
- 用户在收银台页面选择支付方式,确认支付,显示第三方支付页面,输入密码,进行真实支付行为。
- 系统处理用户支付结果,并通知给用户及各个相关系统。
下面详细说下这三个步骤:
1. 唤起商户收银台
- 用户在订单确认页点击“去支付“按钮,调用收银台支付下单接口。
- 收银台将订单信息缓存并入库,然后将订单标识拼装到收银台URL上返回给订单系统。
- 订单系统接收到收银台地址跳转到收银台页面。
上图展示了两个业务线(景区业务线,酒店业务线)唤起的收银台页面,大概可以分为三个区域:
页面上部分显示的是支付剩余时间和应付金额;
中间部分是订单信息,根据收银台定义的数据格式,业务线动态传递过来的;
剩余部分展示的是支付渠道,支付渠道也是业务线根据自己的需求在支付后台管理系统配置的,想要哪些支付方式以及它们的顺序都可以自定义。
2. 用户确认支付
- 用户在收银台页面选择支付方式(支付宝支付,微信支付,银行卡支付等),点击立即支付按钮,
- 调用支付中心创单接口,支付中心调用三方支付创单接口,同步返回支付信息,支付中心对返回参数进行处理,返回给收银台,
- 收银台携带支付中心返回的参数,调用三方接口,唤起三方收银台,
- 用户输入密码,立即支付。
3. 支付结果处理
- 三方系统进行扣款处理,返回收银台结果(目前微信支付返回支付中,支付宝返回支付终态(支付成功或支付失败)),
以下几个步骤是异步执行的,不分先后。
- 收银台拿到三方返回的结果,确认用户已经支付,则分配定时任务轮询查询(注意超时时间)后台支付结果,拿到终态之后跳转到相应页面,
- 三方系统支付成功后会通知支付中心结果,支付中心做好自身逻辑处理,异步通知订单系统,然后返回三方系统通知结果,
- 如果长时间未收到三方支付结果的通知,为了防止掉单,支付中心会发起主动查询来获取支付最终结果,以保证支付结果的及时更新。
- 当然业务线订单系统为了防止支付系统出现异步通知问题,也可以定时轮询支付中心的支付状态,防止掉单。(图中未画)
支付中心系统一些问题及解决方案
1. 支付订单超时关闭问题
如果用户长时间没有支付,一般都会有一个超时时间(如上图商户收银台的支付剩余时间),到达这个超时时间支付单会自动关闭。实现此需求有很多方式,比如:
1. 轮询 DB
定时轮询DB,取出达到超时时间且在支付中的数据,然后执行关闭逻辑。
缺点:1. 存在延迟,取决于定时任务的频率。2. 影响数据库性能。
2. JDK 延时队列(DelayQueue)和时间轮算法
这两种的算法的实现方式自行搜索。
共同的缺点是 1. 数据易丢失,由于数据存储在内存中,服务重启后数据全部消失。2. 有内存限制,如果数据量过大,会出现OOM异常。
3. RocketMQ 延时队列
RocketMQ 支持消息延时发送,社区版不支持任意等级的延迟,目前默认支持18个延时等级:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
比如支付单30分钟过期,在支付单创建成功后发送延迟消息(延时等级为 16),消费者在30分钟后会拉取到该消息然后执行关闭逻辑。
RocketMQ 延时队列,无论在数据安全性和及时性都有明显的优势,但是目前社区版没有支持任意级别的延迟。
目前我们使用的是 RocketMQ 延时队列实现的订单关闭。
2. 保证支付结果实时性
三方支付系统支付成功后99.9%的情况下都会回调通知我们,但也难免有意外,比如三方延迟回调或者三方系统宕机,为了保证支付结果的实时性,三方支付也要求我们不能完全依赖于回调接口,所以我们需要定时的调用主动查询接口来查询三方的支付结果。这里我们也是使用的 RocketMQ 延时队列实现的:
- 调用三方支付创单成功后,发送<支付主动查询>延时MQ消息。
- 消费消息,判断支付状态是否到达终态,如果到达终态,则返回处理成功,否则调用三方支付查询接口,如果支付成功则处理成功业务,返回处理成功。
- 如果客户未支付则判断是否达到最大的重试次数,如果达到最大重试次数则停止<支付主动查询>的重试,否则解析重试规则,发送下一轮的延时消息。
有三个重要参数,这些参数可以放到配置中心或者配置库中,
// 初始延迟级别,对应RocketMQ延时等级,比如3对应的延时时间就是10s
private Integer queryInitLevel = 3;
// 重试次数
private Integer queryCount = 6;
// 重试级别,对应RocketMQ延时等级,5s,10s,30s,1m,10m,20m
private String queryDelayLevels = 2,3,4,5,14,15;
支付创单成功后发送延时消息:
public void payQueryTask(String orderNo) {
PayQueryMessage payQueryMessage = new PayQueryMessage();
payQueryMessage.setOrderNo(orderNo);
RetryMessage<PayQueryMessage> retryMessage = new RetryMessage<>();
retryMessage.setTotalCount(queryCount);
retryMessage.setDelayLevels(queryDelayLevels);
retryMessage.setTopic(TopicConst.PAY_QUERY_TOPIC);
retryMessage.setEventType(RetryEventTypeEnum.PAY_QUERY);
retryMessage.setEventDesc(RetryEventTypeEnum.PAY_QUERY.getDesc());
retryMessage.setData(payQueryMessage);
log.info("{} - 发送消息, retryMessage: {}", LOG_DESC, retryMessage);
rocketMqProducer.asyncSend(retryMessage.getTopic(), JsonUtil.toJson(retryMessage),
CodeEnum.codeOf(RocketMQDelayLevelEnum.class, queryInitLevel).orElse(RocketMQDelayLevelEnum.FiveSeconds), LOG_DESC);
}
判断的是否继续执行任务:
public void sendDelayRetry(RetryMessage<?> retryMessage) {
int currentCount;
retryMessage.setCurrentCount(currentCount = retryMessage.getCurrentCount() + 1);
// 重试达到最大次数
if (currentCount > retryMessage.getTotalCount()) {
log.warn("{} - 达到最大次数-{}, 停止重试! retryMessage: {}", retryMessage.getEventDesc(), retryMessage.getTotalCount(), JsonUtil.toJson(retryMessage));
return;
}
log.info("{} - 发送重试消息-{}/{}, retryMessage: {}", retryMessage.getEventDesc(), retryMessage.getCurrentCount(), retryMessage.getTotalCount(), JsonUtil.toJson(retryMessage));
int delayLevel = Integer.parseInt(retryMessage.getDelayLevels().split(",")[retryMessage.getCurrentCount() - 1]);
rocketMqProducer.asyncSend(retryMessage.getTopic(), retryMessage,
CodeEnum.codeOf(RocketMQDelayLevelEnum.class, delayLevel).orElse(RocketMQDelayLevelEnum.FiveSeconds), retryMessage.getEventDesc()+", 发送重试消息");
}
3. 支付结果通知上游容错
在回调通知上游系统支付结果时,可能会回调失败,比如网络异常或上游系统发生短时故障,如果发生这种情况我们单靠简单的重试是无法完全解决问题的。为了尽可能的通知成功,我们需要针对没有通知成功的数据,每隔一段时间通知一次,那这个需求和我们上一个问题差不多,所以可以复用我们的延时重试框架。
流程和保证支付结果实时
的差不多,不再赘述。
支付中心系统中设计模式的应用
模板方法
模板方法模式思想:定义一个操作中的算法的骨架,而将一些步骤延迟到子类中。
模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。
简单的理解就是定义一个模版方法,然后子类实现模版方法中的抽象方法实现个性化的需求。
就支付而言,无论何种支付产品,都是走的同一个支付流程,那我们就可以定义一个支付流程的模板,然后每种支付产品实现这个模板中特定步骤来实现自己的特定需求。
策略
策略模式主要思想:定义一系列的算法,把它们一个个封装起来,并且使它们可相互替换。
在支付系统中,支付结果主动查询需要查询不同的渠道,比如支付宝,微信,银联等,每个渠道查询的方式和参数不尽相同,可以将每种渠道查询封装成不同的策略类,然后根据查询条件来调用不同的策略类。
查询策略有两个策略接口,callChannel
功能是组装查询参数和查询三方,execute
是处理三方返回的结果统一为支付中心状态。(因callChannel
有其他地方共用所以分开了两个方法)。
Spring 下使用策略模式,在项目启动时,将所有的策略类加载到Map中,然后使用时直接在Map中获取。
@Component
public class PayQueryStrategyContext {
private final Map<String, PayQueryStrategy> payQueryStrategyMap = Maps.newConcurrentMap();
public PayQueryStrategyContext(Map<String, PayQueryStrategy> payQueryStrategyMap) {
this.payQueryStrategyMap.clear();
payQueryStrategyMap.forEach(this.payQueryStrategyMap::put);
}
public PayQueryStrategy getPayQuery(@NotNull String channelCode) {
return this.payQueryStrategyMap.get(OperationTypeConst.Pay_Query + channelCode);
}
}
防重复
为了防止掉单,这里可以这样处理:
1、支付订单增加一个中间状态“支付中”,当同一个订单去支付的时候,先检查有没有状态为“支付中”的支付流水,当然支付(prepay)的时候要加个锁。支付完成以后更新支付流水状态的时候再讲其改成“支付成功”状态。
2、支付中心这边要自己定义一个超时时间(比如:30秒),在此时间范围内如果没有收到支付成功回调,则应调用接口主动查询支付结果,比如10s、20s、30s查一次,如果在最大查询次数内没有查到结果,应做异常处理
3、支付中心收到支付结果以后,将结果同步给业务系统,可以发MQ,也可以直接调用,直接调用的话要加重试(比如:SpringBoot Retry)
4、无论是支付中心,还是业务应用,在接收支付结果通知时都要考虑接口幂等性,消息只处理一次,其余的忽略
5、业务应用也应做超时主动查询支付结果
对于上面说的超时主动查询可以在发起支付的时候将这些支付订单放到一张表中,用定时任务去扫
为了防止订单重复提交,可以这样处理:
1、创建订单的时候,用订单信息计算一个哈希值,判断redis中是否有key,有则不允许重复提交,没有则生成一个新key,放到redis中设置个过期时间,然后创建订单。其实就是在一段时间内不可重复相同的操作
- 为什么不直接用订单号之类的?hash不是还有可能冲突吗
附上微信支付最佳实践:
文件上传
springboot的tomcat默认10m大小
private final Map<String, PayQueryStrategy> payQueryStrategyMap = Maps.newConcurrentMap();
public PayQueryStrategyContext(Map<String, PayQueryStrategy> payQueryStrategyMap) {
this.payQueryStrategyMap.clear();
payQueryStrategyMap.forEach(this.payQueryStrategyMap::put);
}
public PayQueryStrategy getPayQuery(@NotNull String channelCode) {
return this.payQueryStrategyMap.get(OperationTypeConst.Pay_Query + channelCode);
}
}
防重复
为了防止掉单,这里可以这样处理:
1、支付订单增加一个中间状态“支付中”,当同一个订单去支付的时候,先检查有没有状态为“支付中”的支付流水,当然支付(prepay)的时候要加个锁。支付完成以后更新支付流水状态的时候再讲其改成“支付成功”状态。
2、支付中心这边要自己定义一个超时时间(比如:30秒),在此时间范围内如果没有收到支付成功回调,则应调用接口主动查询支付结果,比如10s、20s、30s查一次,如果在最大查询次数内没有查到结果,应做异常处理
3、支付中心收到支付结果以后,将结果同步给业务系统,可以发MQ,也可以直接调用,直接调用的话要加重试(比如:SpringBoot Retry)
4、无论是支付中心,还是业务应用,在接收支付结果通知时都要考虑接口幂等性,消息只处理一次,其余的忽略
5、业务应用也应做超时主动查询支付结果
对于上面说的超时主动查询可以在发起支付的时候将这些支付订单放到一张表中,用定时任务去扫
为了防止订单重复提交,可以这样处理:
1、创建订单的时候,用订单信息计算一个哈希值,判断redis中是否有key,有则不允许重复提交,没有则生成一个新key,放到redis中设置个过期时间,然后创建订单。其实就是在一段时间内不可重复相同的操作
- 为什么不直接用订单号之类的?hash不是还有可能冲突吗
附上微信支付最佳实践:
文件上传
springboot的tomcat默认10m大小