在业务场景下,我们要把业务的操作记录最好汇总成报表。比如某个单据的修改人,修改时间,提交人,提交时间等。
最简单的做法就是直接一张表,每次有事件直接更新进去,例如这样
字段 | 类型 |
---|---|
id | int |
business_id | int |
submit_at | timestamp |
submit_by | int |
modify_at | timestamp |
modify_by | int |
但是,直接更新有一个问题,就是历史记录会丢失,并且如果并发修改多个字段,会因为锁表导致修改失败。
那么我们调整下方案,做一个日志记录表。然后通过job把时间记录聚合到报表。
字段 | 类型 |
---|---|
id | bigint |
event_type | varchar |
event_at | timestamp |
event_by | int |
这样做最大的好处就是 数据都是新增的 不存在锁表的问题,不用担心并发。
再加一个执行记录表,记录当前的最大的id
字段 | 类型 |
---|---|
id | int |
event_id | bigint |
job_exec_at | timestamp |
上代码
BusinessStack
@Data
@SuperBuilder
public class BusinessStack {
private Integer id;
private Date submitAt;
private Integer submitBy;
private Date modifyAt;
private Integer modifyBy;
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
BusinessStack that = (BusinessStack) o;
return new EqualsBuilder()
.append(id, that.id)
.isEquals();
}
@Override public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(id)
.toHashCode();
}
}
EventLog
@Data
@SuperBuilder
public class EventLog {
private Long id;
private Integer businessId;
private String eventType;
private Date eventAt;
private Integer eventBy;
}
EventTrigger
@Data
@SuperBuilder
public class EventTrigger {
private Integer id;
private Long eventLogId;
private Date jobExecAt;
}
EventLogService
@AllArgsConstructor
@Service
public class EventLogService {
private final JdbcTemplate jdbcTemplate;
/**
* 分页查询ID之后的数据
*/
public List<EventLog> queryEventLogs( Long id,int limit){
String sql = "select * from event_log where id>? order id asc limit ? ";
RowMapper<EventLog> rowMapper=new BeanPropertyRowMapper<>(EventLog.class);
return jdbcTemplate.query(sql,new Object[]{id,limit},rowMapper);
}
}
EventTriggerService
@AllArgsConstructor
@Service
public class EventTriggerService {
private final JdbcTemplate jdbcTemplate;
public EventTrigger queryLatestOne(){
String sql = "select * from event_trigger limit 1";
RowMapper<EventTrigger> rowMapper=new BeanPropertyRowMapper<>(EventTrigger.class);
return jdbcTemplate.queryForObject(sql,rowMapper);
}
}
BussinessStackService
@AllArgsConstructor
@Service
public class BusinessStackService {
private final JdbcTemplate jdbcTemplate;
public List<BusinessStack> queryByIds(List<Integer> ids){
String sql ="select * from business_stack where id in (?)";
RowMapper<BusinessStack> rowMapper=new BeanPropertyRowMapper<>(BusinessStack.class);
return jdbcTemplate.query(sql,new Object[]{ids},rowMapper);
}
@Transactional
public void sync(Long id,List<BusinessStack> inserts,List<BusinessStack> updates){
String sql = "update event_trigger set event_log_id=?,job_exec_at=? where event_log_id<?";
Date date = (Date) Date.from(Instant.now());
int res = jdbcTemplate.update(sql,new Object[]{id,date,id});
if(res==0){
return;
}
if(!CollectionUtils.isEmpty(inserts)){
insertBatch(inserts);
}
if(!CollectionUtils.isEmpty(updates)){
updateBatch(updates);
}
}
public void insertBatch(List<BusinessStack> businessStacks){
String sql = "insert into business_stack(submit_at,submit_by,modify_at,modify_by) values (?,?,?,?)";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override public void setValues(PreparedStatement ps, int i) throws SQLException {
BusinessStack businessStack = businessStacks.get(i);
ps.setDate(1, (Date) businessStack.getSubmitAt());
ps.setInt(2,businessStack.getSubmitBy());
ps.setDate(3, (Date) businessStack.getModifyAt());
ps.setInt(4,businessStack.getModifyBy());
}
@Override public int getBatchSize() {
return businessStacks.size();
}
});
}
public void updateBatch(List<BusinessStack> businessStacks){
String sql = "update business_stack set submit_at=?,submit_by=?,modify_at=?,modify_by=? where id=?";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override public void setValues( PreparedStatement ps, int i) throws SQLException {
BusinessStack businessStack = businessStacks.get(i);
ps.setDate(1, (Date) businessStack.getSubmitAt());
ps.setInt(2,businessStack.getSubmitBy());
ps.setDate(3, (Date) businessStack.getModifyAt());
ps.setInt(4,businessStack.getModifyBy());
ps.setInt(5,businessStack.getId());
}
@Override public int getBatchSize() {
return businessStacks.size();
}
});
}
}
JobSchedule
@AllArgsConstructor
public class JobSchedule {
private EventLogService eventLogService;
private EventTriggerService eventTriggerService;
private BusinessStackService businessStackService;
public void job() {
int limit = 100;
int errorCount = 0;
boolean hasNext = true;
while (hasNext) {
try{
EventTrigger eventTrigger = eventTriggerService.queryLatestOne();
if (Objects.isNull(eventTrigger)) {
break;
}
List<EventLog> eventLogs = eventLogService.queryEventLogs(eventTrigger.getEventLogId(), limit);
if(CollectionUtils.isEmpty(eventLogs)){
hasNext = false;
break;
}
if(eventLogs.size()<limit){
hasNext = false;
}
Long maxId = eventLogs.get(eventLogs.size()-1).getId();
Map<Integer, BusinessStack> map = assemble(eventLogs);
List<BusinessStack> businessStacks = businessStackService.queryByIds(Lists.newArrayList(map.keySet()));
List<BusinessStack> inserts = Lists.newArrayList();
inserts.addAll(map.values());
List<BusinessStack> updates = Lists.newArrayList();
updates.addAll(map.values());
inserts.retainAll(businessStacks) ;//
updates.removeAll(inserts);
businessStackService.sync(maxId,inserts,updates);
} catch (Exception e){
errorCount++;
if(errorCount>10){
hasNext = false;
}
}
}
}
private Map<Integer, BusinessStack> assemble(List<EventLog> eventLogs) {
if (CollectionUtils.isEmpty(eventLogs)) {
return new HashMap<>();
}
Map<Integer, BusinessStack> map = Maps.newHashMap();
eventLogs.forEach(eventLog -> {
BusinessStack businessStack = map.get(eventLog.getBusinessId());
if (Objects.nonNull(businessStack)) {
fillValues(businessStack, eventLog);
} else {
businessStack = BusinessStack.builder().id(eventLog.getBusinessId()).build();
fillValues(businessStack, eventLog);
map.put(eventLog.getBusinessId(), businessStack);
}
});
return map;
}
public void fillValues(BusinessStack businessStack, EventLog eventLog) {
switch (eventLog.getEventType()) {
case "SUBMIT":
businessStack.setSubmitAt(eventLog.getEventAt());
businessStack.setSubmitBy(eventLog.getEventBy());
break;
case "MODIFY":
businessStack.setModifyAt(eventLog.getEventAt());
businessStack.setModifyBy(eventLog.getEventBy());
break;
default:
break;
}
}
}
然后在Job上去加个schedule就定时去执行就可以了。
有人会萌萌的问,为什么不在数据库直接做好聚合,而要去内存中处理?那么问题来了,聚合就要group by ,而group by 就要先排序,排序就要全表扫描,而这种日志表一般又是很大的,会占用数据库很多资源。另外,这种处理方式的理念有点类似flink的time window,是滚动处理的。每次读取的数据不大,对应用的影响的很小的,而且,现在的应用部署,不管是基于k8s 还是用守护进程,即时极端情况出现了问题,也是经常重启而已,用户是无感的,但是你把数据库拖慢了,用户是有感知的。所以,为什么要了解运维。
还有人萌萌的说,发生并发了怎么办?首先,如果是分布式的应用,可以考虑加个分布式锁,另外,event_trigger表的event_log_id这个字段,在正常情况下是会呈增长趋势的。在更新的时候,可以把这个字段当成乐观锁来用。不是一定要叫version才是乐观锁,水煮蛋会吃,茶叶蛋就不会吃了?
当然,这个方案只是一个简单实现,如果不考虑历史记录,用个消息队列,都解决了。