【引用官网】为每个分片查询维持一个独立的数据库连接,可以更加有效的利用多线程来提升执行效率。为每个数据库连接开启独立的线程,可以将I/O所产生的消耗并行处理,连接模式(Connection Mode)的概念,将其划分为内存限制模式(MEMORY_STRICTLY)和连接限制模式(CONNECTION_STRICTLY)这两种类型
- 内存限制模式:使用此模式的前提是,ShardingSphere对一次操作所耗费的数据库连接数量不做限制。如果实际执行的SQL需要对某数据库实例中的200张表做操作,则对每张表创建一个新的数据库连接,并通过多线程的方式并发处理,以达成执行效率最大化。并且在SQL满足条件情况下,优先选择流式归并,以防止出现内存溢出或避免频繁垃圾回收情况
- 连接限制模式:使用此模式的前提是,ShardingSphere严格控制对一次操作所耗费的数据库连接数量。如果实际执行的SQL需要对某数据库实例中的200张表做操作,那么只会创建唯一的数据库连接,并对其200张表串行处理。如果一次操作中的分片散落在不同的数据库,仍然采用多线程处理对不同库的操作,但每个库的每次操作仍然只创建一个唯一的数据库连接。这样即可以防止对一次请求对数据库连接占用过多所带来的问题。该模式始终选择内存归并
case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一个简单查询语句,来分析ss大致如何来执行sql,根据分片改写后的sql,应该是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 来执行
准备阶段
1.初始化PreparedStatementExecutor#init,封装Statement执行单元
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
@Getter
private final boolean returnGeneratedKeys;
public PreparedStatementExecutor(
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
this.returnGeneratedKeys = returnGeneratedKeys;
}
/**
* Initialize executor.
*
* @param routeResult route result
* @throws SQLException SQL exception
*/
public void init(final SQLRouteResult routeResult) throws SQLException {
setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement());
//添加路由单元,即数据源对应的sql单元
getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
//缓存statement、参数
cacheStatements();
}
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {
//执行封装Statement执行单元
return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {
@Override
public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
}
@Override
public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
}
});
}
@SuppressWarnings("MagicConstant")
private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
: connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
}
... ...
}
2.执行封装Statement执行单元getSqlExecutePrepareTemplate().getExecuteUnitGroups
@RequiredArgsConstructor
public final class SQLExecutePrepareTemplate {
private final int maxConnectionsSizePerQuery;
/**
* Get execute unit groups.
*
* @param routeUnits route units
* @param callback SQL execute prepare callback
* @return statement execute unit groups
* @throws SQLException SQL exception
*/
public Collection<ShardingExecuteGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
return getSynchronizedExecuteUnitGroups(routeUnits, callback);
}
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
//数据源对应sql单元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?]
Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(routeUnits);
Collection<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
//添加分片执行组
result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
}
return result;
}
private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<RouteUnit> routeUnits) {
Map<String, List<SQLUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1);
for (RouteUnit each : routeUnits) {
if (!result.containsKey(each.getDataSourceName())) {
result.put(each.getDataSourceName(), new LinkedList<SQLUnit>());
}
result.get(each.getDataSourceName()).add(each.getSqlUnit());
}
return result;
}
private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups(
final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
//在maxConnectionSizePerQuery允许的范围内,当一个连接需要执行的请求数量大于1时,意味着当前的数据库连接无法持有相应的数据结果集,则必须采用内存归并;
//反之,当一个连接需要执行的请求数量等于1时,意味着当前的数据库连接可以持有相应的数据结果集,则可以采用流式归并
//计算所需要的分区大小
int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
//按照分区大小进行分区
//事例:
//sqlUnits = [1, 2, 3, 4, 5]
//desiredPartitionSize = 2
//则结果为:[[1, 2], [3,4], [5]]
List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
//maxConnectionsSizePerQuery该参数表示一次查询时每个数据库所允许使用的最大连接数
//根据maxConnectionsSizePerQuery来判断使用连接模式
//CONNECTION_STRICTLY 连接限制模式
//MEMORY_STRICTLY 内存限制模式
ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
//获取分区大小的连接
List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
int count = 0;
//遍历分区,将分区好的sql单元放到指定连接执行
for (List<SQLUnit> each : sqlUnitPartitions) {
result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
}
return result;
}
private ShardingExecuteGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection,
final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
List<StatementExecuteUnit> result = new LinkedList<>();
//遍历sql单元
for (SQLUnit each : sqlUnitGroup) {
//回调,创建statement执行单元
result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode));
}
//封装成分片执行组
return new ShardingExecuteGroup<>(result);
}
}
执行阶段
1.执行查询sql
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
... ...
/**
* Execute query.
*
* @return result set list
* @throws SQLException SQL exception
*/
public List<QueryResult> executeQuery() throws SQLException {
//获取当前是否异常值
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
//创建回调实例
//执行SQLExecuteCallback的execute方法
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(statement, connectionMode);
}
};
return executeCallback(executeCallback);
}
... ...
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
//执行完后刷新分片元数据,比如创建表、修改表etc.
refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement);
return result;
}
... ...
}
2.通过线程池分组执行,并回调callback
@RequiredArgsConstructor
public abstract class SQLExecuteCallback<T> implements ShardingGroupExecuteCallback<StatementExecuteUnit, T> {
//数据库类型
private final DatabaseType databaseType;
//是否异常
private final boolean isExceptionThrown;
@Override
public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, final boolean isTrunkThread,
final Map<String, Object> shardingExecuteDataMap) throws SQLException {
Collection<T> result = new LinkedList<>();
//遍历statement执行单元
for (StatementExecuteUnit each : statementExecuteUnits) {
//执行添加返回结果T
result.add(execute0(each, isTrunkThread, shardingExecuteDataMap));
}
return result;
}
private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException {
//设置当前线程是否异常
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
//根据url获取数据源元数据
DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
//sql执行钩子
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
try {
sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap);
//执行sql
T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
sqlExecutionHook.finishSuccess();
return result;
} catch (final SQLException ex) {
sqlExecutionHook.finishFailure(ex);
ExecutorExceptionHandler.handleException(ex);
return null;
}
}
protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException;
}
3.执行executeSQL,调用第三步的callback中的executeSQL,封装ResultSet
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
... ...
private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
PreparedStatement preparedStatement = (PreparedStatement) statement;
ResultSet resultSet = preparedStatement.executeQuery();
ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule();
//缓存resultSet
getResultSets().add(resultSet);
//判断ConnectionMode
//如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否则使用内存MemoryQueryResult
return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule)
: new MemoryQueryResult(resultSet, shardingRule);
}
... ...
}
本文主要有以下几个重点:
1.封装数据源对应的sql单元,按数据源遍历执行
2.根据maxConnectionsSizePerQuery参数来计算sql单元分区
3.连接模式,根据maxConnectionsSizePerQuery参数、sql单元来计算具体使用内存限制模式还是连接限制模式
3.根据sql单元、连接模式来获取对应数据源连接
4.执行后根据连接模式封装QueryResult,流式Result还是内存Result