概述
应公司安全管理部门政策要求,需要实现文件上传/下载操作的日志记录,经过分析需要在目前平台上基于springboot搭建一套服务供其他应用具体业务调用,其中该服务涉及到的技术支撑:AOP实现异常处理、queue+spring-scheduler异步执行定时任务、Fegin组件进行服务间通信(通过拦截器设置请求头中token认证信息)
功能实现
AOP进行异常处理
(另外基于AOP实现了RBAC-基于角色的的访问控制)
package test.bs.web.aspect;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import test.common.exception.*;
import test.common.util.Page;
import test.common.util.R;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.ModelAndView;
import javax.validation.ConstraintViolationException;
import java.lang.reflect.Method;
/**
*
* @ClassName: ExceptionHandlerAspect
* @Description: 统一异常处理
* @author test
*
*/
@Aspect
@Component
public class ExceptionHandlerAspect {
private Logger logger = LoggerFactory.getLogger(ExceptionHandlerAspect.class);
private ObjectMapper om = new ObjectMapper();
/**
* @Title: inController
* @Description: 以test开头, controller结尾的包名下任何类的所有方法
*/
@Pointcut("execution(* test..*.controller..*.*(..))")
public void inController() {}
/**
* @Title: inViewController
* @Description: 以test开头, view结尾的包及子包名下任何类的所有方法法(Controller与view 不能相互包含关键字)
*/
@Pointcut("execution(* test..*.view_controller..*.*(..))")
public void inViewController() {}
/**
*
* @Title: handleAPIControllerMethod
* @Description: api 异常处理器
* @param pjp
* @return
* @throws Throwable
*/
@Around("inController()")
public Object handleAPIControllerMethod(ProceedingJoinPoint pjp) throws Throwable {
Object r;
try {
r = pjp.proceed(pjp.getArgs());
} catch(Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
r = handleAPIControllerException(pjp, e);
}
return r;
}
@Around("inViewController()")
public ModelAndView handleViewControllerMethod(ProceedingJoinPoint pjp) throws Throwable{
ModelAndView mv;
try {
mv = (ModelAndView)pjp.proceed(pjp.getArgs());
} catch(Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
mv = handleViewControllerException(pjp, e);
}
return mv;
}
/**
*
* @Title: handleAPIControllerException
* @Description: 具体API异常处理
* @param pjp
* @param e
* @return
* @throws IllegalAccessException
* @throws InstantiationException
*/
@SuppressWarnings("unchecked")
private Object handleAPIControllerException(ProceedingJoinPoint pjp, Throwable e) throws InstantiationException, IllegalAccessException {
Object r = null;
MethodSignature signature = (MethodSignature) pjp.getSignature();
// 获取method对象
Method method = signature.getMethod();
// 获取方法的返回值的类型
Class returnType= method.getReturnType();
// 根据自定义常见的异常抛出异常信息
R<String> eR = getExceptionMessage(e);
if( returnType.equals(R.class) ) {
r = new R(null, eR.getMsg(), eR.getCode());
} else if( returnType.equals(Page.class)) {
r = new Page(null, eR.getMsg(), eR.getCode());
} else {
r = returnType.newInstance(); // 约定必须有默认的构造函数
}
return r;
}
/**
*
* @Title: handleViewControllerException
* @Description: 捕获ModelAndView 异常
* @param pjp
* @param e
* @return
*/
private ModelAndView handleViewControllerException(ProceedingJoinPoint pjp, Throwable e) {
ModelAndView mv = new ModelAndView();
mv.setViewName("error");
// 根据自定义常见的异常抛出异常信息
R<String> eR = getExceptionMessage(e);
mv.addObject("status", eR.getCode())
.addObject("exception", e.getClass())
.addObject("error", eR.getData())
.addObject("message", eR.getMsg());
return mv;
}
/**
*
* @Title: getExceptionMessage
* @Description: 获取异常信息
* @param e
* @return
*/
private R<String> getExceptionMessage(Throwable e) {
R<String> rst = new R<>();
if( e instanceof NullPointerException ){
rst.setCode(R.FAIL);
rst.setData(null);
rst.setMsg(e.getMessage());
} else if (e instanceof RequestParameter400Exception) {
rst.setCode(R.INPUT_ERROR_400);
rst.setData(null);
rst.setMsg("The system has rejected your operation for the following reason:" + e.getMessage());
} else if( e instanceof AuthorizationFailedException) {
rst.setCode(R.NO_LOGIN);
rst.setData(null);
rst.setMsg("The system has rejected your operation for the following reason:" + e.getMessage());
} else if(e instanceof NoPermissioinException) {
rst.setCode(R.NO_PERMISSION);
rst.setData(null);
rst.setMsg("The system has rejected your operation for the following reason:" + e.getMessage());
} else if( e instanceof NotFoundException) {
rst.setCode(R.NOT_FOUND);
rst.setData(null);
rst.setMsg("The system has rejected your operation for the following reason:" + e.getMessage());
}
...
return rst;
}
}
queue+任务调度Scheduler
- 保存文件操作历史记录到队列
package test.bs.internal.ws.queue;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import test.bs.pojo.po.HistoryRecord;
/**
*
* @ClassName: SaveHistoryRecordsQueue
* @Description: 暂保存插入数据库DB的历史记录的队列
*
*/
public class SaveHistoryRecordsQueue {
/**
* 队列大小
*/
private static int QUEUE_MAX_SIZE = 80000;
/**
* 共享队列,使用线程安全的Queue,设置队列最大大小=80000:若不设置则超过内存大小则会导致应用因内存不足而崩溃
*/
private volatile Queue<FileOpHisRecords> pendingTodoQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
private static final Object lockedObj = new Object();
/**
* 默认单例,使用volatile以标识获取当前变量时需要从内存获取:volatile的双重检测同步延迟载入模式
*/
private static volatile SaveHistoryRecordsQueue INSTANCE;
private SaveHistoryRecordsQueue() {
if(pendingTodoQueue == null) {
pendingTodoQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
}
}
// volatile的双重检测同步延迟载入模式
public static SaveHistoryRecordsQueue getInstance() {
if(INSTANCE == null) {
synchronized (lockedObj) {
if(INSTANCE == null) {
INSTANCE = new SaveHistoryRecordsQueue();
}
}
}
return INSTANCE;
}
/**
*
* @Title: getPendingTodoQueue
* @Description: 获取待处理的队列
* @return
*/
public Queue<FileOpHisRecords> getPendingTodoQueue() {
return pendingTodoQueue;
}
}
- 通过使用spring-schedule定时任务调度方式来异步执行队列中的任务以减少DB端的并发负载
package test.bs.internal.ws.schedule;
import java.util.Queue;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import test.bs.internal.ws.queue.SaveHistoryRecordsQueue;
import test.bs.pojo.po.FileOpHisRecords;
import test.bs.service.FileOpHisRecordsService;
/**
* @ClassName: SaveHistoryRecordsScheduler
* @Description: 实现异步保存文件操作历史记录排程以减少 DB 的并发负载
*/
@Component
@RefreshScope
public class SaveHistoryRecordsScheduler {
private Logger logger = LoggerFactory.getLogger(SaveHistoryRecordsScheduler.class);
// 每执行一次排程最多保存操作历史记录的数量
@Value("${test.bs.per.max.counter:30}")
private final int maxProcessCounter = 30;
@Resource
private FileOpHisRecordsService fileOpHisRecordsService; //实现历史记录插入DB的服务
@Autowired
private ObjectMapper om;
// 设置固定任务间隔频率(每次执行均以上次执行结束时间为参考间隔2s时间再执行)
@Scheduled(fixedDelay = 2000) //设置2s秒钟執行一次
protected void fixedRateSchedule() {
Queue<FileOpHisRecords> queue = PendingToSaveFileOperationHistoryQueue.getInstance().getPendingTodoQueue();
if (queue != null && !queue.isEmpty()) {
int counter = 0;
FileOpHisRecords todoItem = null;
boolean insertSelective = false;
while (!queue.isEmpty() && counter < maxProcessCounter) {
counter++;
todoItem = queue.peek(); //取队列最先添加队列中的元素(队列遵循先进先出)
if (todoItem != null) {
try {
insertSelective = fileOpHisRecordsService.insertSelective(todoItem);
if (!insertSelective) {
// 打印日志
logger.error("save operation historyRecord to database fail, todoItem: {}", om.writeValueAsString(todoItem));
}
} catch(Exception e) {
logger.error("save operation historyRecord fail", e);
}
}
queue.poll(); //移除当前元素
}
}
}
}
Fegin组件进行服务调用
- 涉及保存操作历史记录业务的客户端应用需要添加fegin组件以实现远程调用
- 添加Fegin接口
package test.bs.service.feign;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import test.bs.dto.request.FileOpHisDTO;
import test.bs.service.feign.impl.FileOpHisRecordFeignFallbackImpl;
import test.bs.service.feign.interceptor.TokenInterceptor;
import test.common.util.R;
/**
* @ClassName: FileOpHisRecordFeign
* @Description: Fegin接口
* configuration = TokenInterceptor.class:TokenInterceptor作为请求拦截器,用于服*
* 务间请求时进行安全认证(token)
*
* 注意:SaveHistoryRecordsScheduler保存操作记录方法位于test-internal-ws服务中,并且请求的url*
* 是/test/fileHistoryRecord/addHistoryRecord
*/
@FeignClient(value = "test-internal-ws", fallback = FileOpHisRecordFeignFallbackImpl.class, configuration = TokenInterceptor.class)
public interface FileOpHisRecordFeign {
@PostMapping("/test/fileHistoryRecord/addHistoryRecord")
R<Boolean> addFileHistoryRecordsOperate(@RequestBody FileOpHisDTO dto);
}
- 对于平台系统内部web应用(Web应用会自动带token访问),则可以通过Feign关联具体的微服务直接访问,此时Feign发送请求时会先通过拦截器类TokenInterceptor携带Token进行安全认证; 其他情景下发送请求时,则需要在Http请求过程中设置授权头(指定Token)
-
package test.bs.service.feign.interceptor; import java.util.Enumeration; import java.util.LinkedHashMap; import java.util.Map; import javax.servlet.http.HttpServletRequest; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import feign.RequestInterceptor; import feign.RequestTemplate; /** * * @ClassName: TokenInterceptor * @Description: Feign默认请求拦截器 * 请求 Header * Key: Authorization * Value: Bearer + 有效的JWT-Token。 */ @Component public class TokenInterceptor implements RequestInterceptor { private final static Logger logger = LoggerFactory.getLogger(TokenInterceptor.class); @Autowired ITokenService tokenService; @Override public void apply(RequestTemplate template) { forwordIpInfo(template); //fegin请求时在请求头添加客户端IP信息 // 判断请求头中是否含有token if (template.headers().containsKey("Authorization") || template.headers().containsKey("authorization") || template.headers().containsKey("AUTHORIZATION")) { logger.info("token existed"); return; } try { String token = null; HttpServletRequest httpServletRequest = getHttpServletRequest(); if( httpServletRequest != null ) { //从请求上下文获取token token = getHeaders(getHttpServletRequest()).get("authorization"); //使用小写 } if(StringUtils.isBlank(token) ) { // SSO(Single Sign On)单点登录逻辑 token = tokenService.getOauthTokenStr(); if( token != null ) { String authHeaderStr = "Bearer " + token; template.header("Bearer ", authHeaderStr); //请求头加入token } else { logger.error("get token fail , the token is null"); } logger.info("get token from sso, token: {}", token); } else { //上下文获取到的token直接加入到请求头 template.header("Authorization", token); logger.info("get token from current user"); } } catch(Exception e) { e.printStackTrace(); logger.error("Get token Exception", e); } } /** * * @Title: forwordIpInfo * @Description: 请求头添加客户端IP信息 * @param template */ private void forwordIpInfo(RequestTemplate template) { HttpServletRequest httpServletRequest = getHttpServletRequest(); if( httpServletRequest != null ) { Map<String, String> headers = getHeaders(getHttpServletRequest()); template.header("x-forwarded-for", headers.get("x-forwarded-for")); template.header("x-real-ip", headers.get("x-real-ip")); } } private HttpServletRequest getHttpServletRequest() { try { return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); } catch (Exception e) { return null; } } private Map<String, String> getHeaders(HttpServletRequest request) { Map<String, String> map = new LinkedHashMap<>(); if( request == null ) { return map; } Enumeration<String> enumeration = request.getHeaderNames(); while (enumeration.hasMoreElements()) { String key = enumeration.nextElement(); String value = request.getHeader(key); map.put(key, value); } return map; } }
-
- 实现Fegin接口备用类
-
package test.bs.service.feign.impl; import test.bs.dto.request.FileOpHisDTO; import test.bs.service.feign.FileOpHisRecordFeign; import test.common.util.R; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * @ClassName: FileOpHisRecordFeignFallbackImpl * @Description: 备用类--当Feign客户端调用远程服务失败时,会使用这个备用类 */ @Component public class FileOpHisRecordFeignFallbackImpl implements FileOpHisRecordFeign { private final Logger logger = LoggerFactory.getLogger(FileOpHisRecordFeignFallbackImpl.class); @Override public R<Boolean> addFileHistoryRecordsOperate(FileOpHisDTO dto) { logger.error("Call【test-internal-ws -> addFileHistoryRecordsOperate】exception"); return new R<Boolean>(false, "Call【test-internal-ws -> addFileHistoryRecordsOperate】exception", R.FAIL); } }
-
- Feign调用
在需要调用该方法的业务直接调fileOpHisFeign.addFileHistoryRecordsOperate(historyRecordDto)即可实现将文件操作记录存储到数据库,后续可以通过查询将记录以web方式进行展示~
扩展
-
Fegin如何实现负载均衡?
- Fegin的负载均衡是通过集成Ribbon来实现的,Ribbon是Netflix开源的一个客户端负载均衡器,可以与Fegin无缝集成,为Fegin提供负载均衡能力。
- Ribbon发起请求流程
- 首先Ribbon在发起请求前,会从"服务中心"获取服务列表,然后按照一定的负载均衡策略发起请求,从而实现客户端的负载均衡.Ribbon本身也会缓存一份"服务提供者"清单并维护他的有效性,若发现"服务提供者"不可用,则会重新从"服务中心"获取有效的"服务提供者"清单来及时更新
- Ribbon发起请求流程
- Fegin的负载均衡是通过集成Ribbon来实现的,Ribbon是Netflix开源的一个客户端负载均衡器,可以与Fegin无缝集成,为Fegin提供负载均衡能力。
- Fegin如何实现认证传递?
- 因为微服务之间通信是不会携带请求相关信息的,所以当我们需要在服务间传递安全认证信息时,常见做法是使用拦截器传递认证信息,我司是通过实现RequestInterceptor接口来定义TokenInterceptor拦截器,在拦截器里将认证信息添加到请求头中,然后将其注册到Fegin的配置中来实现的(详细可见上面添加Fegin接口处代码描述!!!)
- Fegin如何设置超时和重试?
feign:
hystrix:
enabled: true
httpclinet: #use okhttp will better
enabled: false
okhttp:
enabled: true
ribbon:
ReadTimeout: 15000 # 15s 读超时
ConnectTimeout: 15000 # 15s 连接超时
MaxAutoRetries: 0 #重试次数,我司不允许重试,因为有可能涉及到幂等性问题
MaxAutoRetriesNextServer: 1
MaxTotalConnections: 200 # okhttp及http-client最大连接数量默认为200
MaxConnectionsPerHost: 50 # http-client下默认每台主机默认连接数量为50, okhttp无此配置