MySQL-主从复制与读写分离
MySQL
# 读写分离的由来
秒杀业务
秒杀、抢购系统的业务特点:
非常多的人来参与,并发量很大
时间点集中
看看其它的商品,都多写少
数据库集群
一个主库,负责写入数据,我们称之为:写库
其它都是从库,负责读取数据,我们称之为:读库
# 数据库主从库理论
主从同步如何工作
# 步骤一
# 主:
修改配置 重启mysql 连接mysql,查看主状态
# 从:
修改配置 重启mysql
# 步骤二
# 主:
授权
# 从:
设置主从关系 查看从状态 出现error 修改配置 查看状态
# 解决方案
数据库集群,对我们的要求
- 读库和写库的数据一致
- 写数据必须写到写库
- 读数据必须到读库读
# 解决方案-应用层
优点:
- 多数据源切换方便,由程序自动完成
- 不需要引入中间件
- 理论上支持任何数据库
缺点:
- 由程序员完成,运维参与不到
- 不能做到动态增加数据源
解决方案:
- 驱动实现
com.mysql.jdbc.ReplicationDriverSharding-jdbc
- SpringAOP + mybatisplugin + 注解
- Spring动态数据源 + mybatisplugin
# 解决方案-中间件层
优点:
- 源程序不需要做任何改动就可以实现读写分离
- 动态添加数据源不需要重启程序
缺点:
- 程序依赖于中间件,会导致切换数据库变得困难
- 由中间件做了中转代理,性能有所下降
解决方案:
- MySQL-Proxy https://downloads.mysql.com/archives/proxy/
- Amoeba for MySQL http://docs.hexnova.com/amoeba/
- MyCat http://mycat.io/
- Atlas https://github.com/Qihoo360/Atlas
- OneProxy
现在有比使用中间件更好的解决方案,这就是运行在公有云上的,基于中间件技术+公有云
数据库打造成的分布式数据库。目前典型的有阿里云的DRDS/PetaData,腾讯云的DCDBF
ORTDSQL,以及UCloud最近推出的UDDB
# 实现
Spring + Mybatis Plugs 实现读写分离
如果你的后台结构是spring+mybatis
可以通过 spring的AbstractRoutingDataSource和 mybatis Plugin 拦截器
非常友好的实现读写分离,原有代码不需要任何改变
datasource.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd">
<context:component-scan base-package="com.study" />
<bean id="abstractDataSource" abstract="true" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<!-- 配置初始化大小、最小、最大 -->
<property name="initialSize" value="1" />
<property name="minIdle" value="10" />
<property name="maxActive" value="10" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="60000" />
</bean>
<!-- 写库 -->
<bean id="dataSourceWrite" parent="abstractDataSource">
<!-- 基本属性 url、user、password -->
<property name="url" value="jdbc:mysql://allen.aliyun.com:3306/mybatis?serverTimezone=UTC&characterEncoding=utf-8&autoReconnect=true&allowMultiQueries=true" />
<property name="username" value="root" />
<property name="password" value="root@123456" />
</bean>
<!-- 读库 -->
<bean id="dataSourceRead" parent="abstractDataSource">
<!-- 基本属性 url、user、password -->
<property name="url" value="jdbc:mysql://allen.aliyun.com:3307/mybatis?serverTimezone=UTC&characterEncoding=utf-8&autoReconnect=true&allowMultiQueries=true" />
<property name="username" value="root" />
<property name="password" value="root@123456" />
</bean>
<!-- 动态数据源 -->
<bean id="dataSource" class="com.study.rwdb.DynamicDataSource">
<property name="writeDataSource" ref="dataSourceWrite"></property>
<property name="readDataSource" ref="dataSourceRead"></property>
</bean>
<tx:annotation-driven transaction-manager="transactionManager" />
<!-- 动态事务管理 -->
<bean id="transactionManager" class="com.study.rwdb.DynamicDataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<!-- 配置sqlSessionFactory -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<!-- 实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件 -->
<property name="dataSource" ref="dataSource" />
<!--configLocation属性指定mybatis的核心配置文件 -->
<property name="configLocation" value="classpath:mybatis-config.xml" />
<!-- 所有配置的mapper文件 -->
<property name="mapperLocations" value="classpath:com/study/mapper/*.xml" />
<!-- 所有的实体 -->
<property name="typeAliasesPackage" value="com.study.entity" />
<property name="plugins">
<array>
<bean class="com.study.rwdb.DynamicPlugin" />
</array>
</property>
</bean>
<!-- 配置扫描器 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<!-- 扫描包以及它的子包下的所有映射接口类 -->
<property name="basePackage" value="com.study.mapper" />
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
</bean>
</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
DynamicDataSourceGlobal.java:
/**
* 读写操作对应数据源
*/
public enum DynamicDataSourceGlobal {
READ, WRITE;
}
1
2
3
4
5
6
2
3
4
5
6
DynamicDataSourceHolder.java:
/**
* 使用ThreadLocal技术来记录当前线程中的数据源的key
*/
public final class DynamicDataSourceHolder {
// 使用ThreadLocal记录当前线程的数据源key
private static final ThreadLocal<DynamicDataSourceGlobal> holder = new ThreadLocal<DynamicDataSourceGlobal>();
private DynamicDataSourceHolder() {
// ......
}
/**
* 设置数据源
*
* @param dataSource
*/
public static void putDataSource(DynamicDataSourceGlobal dataSource){
holder.set(dataSource);
}
/**
* 获取数据源
* @return
*/
public static DynamicDataSourceGlobal getDataSource(){
return holder.get();
}
/**
* 清理数据源
*/
public static void clearDataSource() {
holder.remove();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
DynamicDataSource.java:
import java.util.HashMap;
import java.util.Map;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
/**
* 获取数据源,用于动态切换数据源
* <p>
* <pre>
* 实现Spring提供的AbstractRoutingDataSource,只需要实现determineCurrentLookupKey方法即可,
* 由于DynamicDataSource是单例的,线程不安全的,所以采用ThreadLocal保证线程安全,
* 由DynamicDataSourceHolder完成。
* </pre>
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
// 写数据源 3306
private Object writeDataSource;
// 读数据源 3307
private Object readDataSource;
@Override
public void afterPropertiesSet() {
if (this.writeDataSource == null) {
throw new IllegalArgumentException("Property 'writeDataSource' is required");
}
// 覆盖AbstractRoutingDataSource中的默认数据源为 write
setDefaultTargetDataSource(writeDataSource);
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);
if (readDataSource != null) {
// 如果读数据源不为空就压入map
targetDataSources.put(DynamicDataSourceGlobal.READ.name(), readDataSource);
}
// 设置目标数据源为map中压入的数据源
setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}
/**
* 获取与数据源相关的key 此key是Map<String,DataSource> resolvedDataSources 中与数据源绑定的key值
* 在通过determineTargetDataSource获取目标数据源时使用
*/
@Override
protected Object determineCurrentLookupKey() {
// 使用DynamicDataSourceHolder保证线程安全,并且得到当前线程中的数据源key
DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();
if (dynamicDataSourceGlobal == null || dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE) {
return DynamicDataSourceGlobal.WRITE.name();
}
return DynamicDataSourceGlobal.READ.name();
}
// get and set method
public void setWriteDataSource(Object writeDataSource) {
this.writeDataSource = writeDataSource;
}
public Object getWriteDataSource() {
return writeDataSource;
}
public Object getReadDataSource() {
return readDataSource;
}
public void setReadDataSource(Object readDataSource) {
this.readDataSource = readDataSource;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
DynamicPlugin.java:
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.executor.keygen.SelectKeyGenerator;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* Mybatis插件,通过拦截Executor
*/
@Intercepts({
@Signature(type = Executor.class, method = "update", args = { MappedStatement.class, Object.class }),
@Signature(type = Executor.class, method = "query", args = { MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class })
})
public class DynamicPlugin implements Interceptor {
private Logger log = LoggerFactory.getLogger(DynamicPlugin.class);
private static final String REGEX = ".*insert$|.*delete$|.*update$";
private static final Map<String, DynamicDataSourceGlobal> cacheMap = new ConcurrentHashMap<>();
@Override
public Object intercept(Invocation invocation) throws Throwable {
// 如果拥有事务上下文,则将连接绑定到事务上下文中
boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
if (!synchronizationActive) {
Object[] objects = invocation.getArgs();
MappedStatement ms = (MappedStatement) objects[0];
DynamicDataSourceGlobal dynamicDataSourceGlobal = null;
if ((dynamicDataSourceGlobal = cacheMap.get(ms.getId())) == null) {
// 读方法
if (ms.getSqlCommandType().equals(SqlCommandType.SELECT)) { // select * from user; update insert
// !selectKey 为自增id查询主键(SELECT LAST_INSERT_ID() )方法,使用主库
if (ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
} else {
BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);
String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", " ");
if (sql.matches(REGEX)) {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
} else {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.READ;
}
}
} else {
// 写方法
dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
}
System.out.println("设置方法[{"+ms.getId()+"}] use [{"+dynamicDataSourceGlobal.name()+"}] Strategy, SqlCommandType [{"+ms.getSqlCommandType().name()+"}]..");
log.warn("设置方法[{}] use [{}] Strategy, SqlCommandType [{}]..", ms.getId(), dynamicDataSourceGlobal.name(), ms.getSqlCommandType().name());
// 把id(方法名)和数据源存入map,下次命中后就直接执行
cacheMap.put(ms.getId(), dynamicDataSourceGlobal);
}
DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal);
}
return invocation.proceed();
}
@Override
public Object plugin(Object target) {
if (target instanceof Executor) {
return Plugin.wrap(target, this);
} else {
return target;
}
}
@Override
public void setProperties(Properties properties) {
//
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
DynamicDataSourceTransactionManager.java:
/**
* 自定义事务
*/
public class DynamicDataSourceTransactionManager extends DataSourceTransactionManager {
private static final long serialVersionUID = 1L;
/**
* 只读事务到读库,读写事务到写库
*
* @param transaction
* @param definition
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
// 设置数据源
boolean readOnly = definition.isReadOnly();
if (readOnly) {
DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.READ);
} else {
DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.WRITE);
}
super.doBegin(transaction, definition);
}
/**
* 清理本地线程的数据源
*
* @param transaction
*/
@Override
protected void doCleanupAfterCompletion(Object transaction) {
super.doCleanupAfterCompletion(transaction);
DynamicDataSourceHolder.clearDataSource();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36