新聞中心
這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)ShardingContent的功能有哪些,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
創(chuàng)新互聯(lián)建站從2013年開始,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項目網(wǎng)站制作、網(wǎng)站設(shè)計網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元濟(jì)源做網(wǎng)站,已為上家服務(wù),為濟(jì)源各地企業(yè)和個人服務(wù),聯(lián)系電話:18980820575
成都創(chuàng)新互聯(lián)是專業(yè)的市南網(wǎng)站建設(shè)公司,市南接單;提供成都做網(wǎng)站、成都網(wǎng)站設(shè)計,網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行市南網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊,希望更多企業(yè)前來合作!
ShardingContent主要做了那些功能呢?主要有兩部分:
數(shù)據(jù)源分片元數(shù)據(jù)
主要根據(jù)數(shù)據(jù)源連接獲取對應(yīng)的url,通過解析url參數(shù)來封裝數(shù)據(jù)源分片元數(shù)據(jù);數(shù)據(jù)源分片元數(shù)據(jù)主要后續(xù)SQL路由DCL(比如:授權(quán)、創(chuàng)建用戶等)操作使用
表分片元數(shù)據(jù)
主要根據(jù)數(shù)據(jù)節(jié)點來獲取真實表的元數(shù)據(jù);而表分片元數(shù)據(jù)主要后續(xù)SQL解析填充使用
源碼分析
1.ShardingContext構(gòu)造,主要分析ShardingTableMetaData
public ShardingContext(final MapdataSourceMap, final ShardingRule shardingRule, final DatabaseType databaseType, final Properties props) throws SQLException { this.shardingRule = shardingRule; //獲取數(shù)據(jù)源原始元數(shù)據(jù)信息 this.cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap); //數(shù)據(jù)源類型 this.databaseType = databaseType; //sharding 配置參數(shù) //比如:sql打印、線程池大小配置等 shardingProperties = new ShardingProperties(null == props ? new Properties() : props); //Statement、PrepareStatement執(zhí)行線程池大小 //一個分片數(shù)據(jù)源將使用獨(dú)立的線程池,它不會在同一個JVM中共享線程池甚至不同的數(shù)據(jù)源 //默認(rèn)無限制 int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE); //執(zhí)行引擎 executeEngine = new ShardingExecuteEngine(executorSize); //數(shù)據(jù)源分片元數(shù)據(jù) //以MySQL為例,建立連接獲取mysql url,將解析后的url參數(shù)信息封裝到ShardingDataSourceMetaData ShardingDataSourceMetaData shardingDataSourceMetaData = new ShardingDataSourceMetaData(getDataSourceURLs(dataSourceMap), shardingRule, databaseType); //表分片元數(shù)據(jù) //以mysql為例,會建立連接獲取表的元信息(字段、字段類型、索引) ShardingTableMetaData shardingTableMetaData = new ShardingTableMetaData(getTableMetaDataInitializer(dataSourceMap, shardingDataSourceMetaData).load(shardingRule)); //封裝數(shù)據(jù)源分片元數(shù)據(jù)、表分片元數(shù)據(jù) metaData = new ShardingMetaData(shardingDataSourceMetaData, shardingTableMetaData); //解析結(jié)果緩存 parsingResultCache = new ParsingResultCache(); } // private TableMetaDataInitializer getTableMetaDataInitializer(final Map dataSourceMap, final ShardingDataSourceMetaData shardingDataSourceMetaData) { return new TableMetaDataInitializer(shardingDataSourceMetaData, executeEngine, new JDBCTableMetaDataConnectionManager(dataSourceMap), shardingProperties. getValue(ShardingPropertiesConstant.MAX_CONNECTIONS_SIZE_PER_QUERY), shardingProperties. getValue(ShardingPropertiesConstant.CHECK_TABLE_METADATA_ENABLED)); }
2.加載TableMetaDataInitializer#load
public TableMetaDataInitializer(final ShardingDataSourceMetaData shardingDataSourceMetaData, final ShardingExecuteEngine executeEngine, final TableMetaDataConnectionManager connectionManager, final int maxConnectionsSizePerQuery, final boolean isCheckingMetaData) { //數(shù)據(jù)源分片元數(shù)據(jù) this.shardingDataSourceMetaData = shardingDataSourceMetaData; //數(shù)據(jù)源連接管理器 this.connectionManager = connectionManager; //表元數(shù)據(jù)加載器 tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, executeEngine, connectionManager, maxConnectionsSizePerQuery, isCheckingMetaData); } /** * Load table meta data. * * @param logicTableName logic table name * @param shardingRule sharding rule * @return table meta data */ @SneakyThrows public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) { return tableMetaDataLoader.load(logicTableName, shardingRule); } /** * Load all table meta data. * * @param shardingRule sharding rule * @return all table meta data */ @SneakyThrows public Mapload(final ShardingRule shardingRule) { Map result = new HashMap<>(); //加載分片表 result.putAll(loadShardingTables(shardingRule)); //加載未分片表 result.putAll(loadDefaultTables(shardingRule)); return result; } private Map loadShardingTables(final ShardingRule shardingRule) throws SQLException { Map result = new HashMap<>(shardingRule.getTableRules().size(), 1); for (TableRule each : shardingRule.getTableRules()) { //加載邏輯表對應(yīng)真實表的元數(shù)據(jù) //邏輯表:表元數(shù)據(jù) result.put(each.getLogicTable(), tableMetaDataLoader.load(each.getLogicTable(), shardingRule)); } return result; } private Map loadDefaultTables(final ShardingRule shardingRule) throws SQLException { Map result = new HashMap<>(shardingRule.getTableRules().size(), 1); //查詢默認(rèn)數(shù)據(jù)源,沒有則查找主庫 Optional actualDefaultDataSourceName = shardingRule.findActualDefaultDataSourceName(); if (actualDefaultDataSourceName.isPresent()) { //獲取所有表元數(shù)據(jù) //真實表:表元數(shù)據(jù) for (String each : getAllTableNames(actualDefaultDataSourceName.get())) { result.put(each, tableMetaDataLoader.load(each, shardingRule)); } } return result; } private Collection getAllTableNames(final String dataSourceName) throws SQLException { Collection result = new LinkedHashSet<>(); DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName); String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName(); try (Connection connection = connectionManager.getConnection(dataSourceName); ResultSet resultSet = connection.getMetaData().getTables(catalog, getCurrentSchemaName(connection), null, new String[]{"TABLE"})) { while (resultSet.next()) { String tableName = resultSet.getString("TABLE_NAME"); if (!tableName.contains("$") && !tableName.contains("/")) { result.add(tableName); } } } return result; } private String getCurrentSchemaName(final Connection connection) throws SQLException { try { return connection.getSchema(); } catch (final AbstractMethodError | SQLFeatureNotSupportedException ignore) { return null; } }
3.加載表元數(shù)據(jù)TableMetaDataLoader#load
/** * Load table meta data. * * @param logicTableName logic table name * @param shardingRule sharding rule * @return table meta data * @throws SQLException SQL exception */ public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) throws SQLException { //獲取表元數(shù)據(jù) ListactualTableMetaDataList = load(getDataNodeGroups(logicTableName, shardingRule), shardingRule.getShardingDataSourceNames()); //檢查actualTableMetaDataList的元數(shù)據(jù) checkUniformed(logicTableName, actualTableMetaDataList); return actualTableMetaDataList.iterator().next(); } private List load(final Map > dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException { //將封裝好的數(shù)據(jù)節(jié)點組提交給執(zhí)行引擎執(zhí)行 return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback () { @Override public Collection execute(final Collection dataNodes, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException { String dataSourceName = dataNodes.iterator().next().getDataSourceName(); DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName); String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName(); return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, dataNodes); } }); } private Collection load(final String dataSourceName, final String catalog, final Collection dataNodes) throws SQLException { Collection result = new LinkedList<>(); try (Connection connection = connectionManager.getConnection(dataSourceName)) { for (DataNode each : dataNodes) { //獲取表元數(shù)據(jù) result.add(createTableMetaData(connection, catalog, each.getTableName())); } } return result; } private Map > getDataNodeGroups(final String logicTableName, final ShardingRule shardingRule) { //根據(jù)邏輯表獲取對應(yīng)的數(shù)據(jù)源:真實表數(shù)據(jù)節(jié)點 //比如: //ds_0 -> [ds_0:t_order_0, ds_0:t_order_1] //ds_1 -> [ds_1.t_order_0, ds_1.t_order_1] Map > result = shardingRule.getTableRule(logicTableName).getDataNodeGroups(); //默認(rèn)false,設(shè)置為true會處理所有數(shù)據(jù)節(jié)點真實表 if (isCheckingMetaData) { return result; } //返回一個數(shù)據(jù)節(jié)點即可 String firstKey = result.keySet().iterator().next(); return Collections.singletonMap(firstKey, Collections.singletonList(result.get(firstKey).get(0))); } /** * 將數(shù)據(jù)節(jié)點組封裝成分片執(zhí)行組 * * @param dataNodeGroups 數(shù)據(jù)節(jié)點組 * * ds_0 -> [ds_0:t_order_0, ds_0:t_order_1] ** @return */ private Collection> getDataNodeGroups(final Map > dataNodeGroups) { Collection > result = new LinkedList<>(); //遍歷對應(yīng)數(shù)據(jù)源下的數(shù)據(jù)節(jié)點 for (Entry > entry : dataNodeGroups.entrySet()) { //封裝分片執(zhí)行組ShardingExecuteGroup result.addAll(getDataNodeGroups(entry.getValue())); } return result; } private Collection > getDataNodeGroups(final List dataNodes) { Collection > result = new LinkedList<>(); //maxConnectionsSizePerQuery最大查詢連接數(shù)默認(rèn)為1 //將dataNodes換分Math.max份 for (List each : Lists.partition(dataNodes, Math.max(dataNodes.size() / maxConnectionsSizePerQuery, 1))) { result.add(new ShardingExecuteGroup<>(each)); } return result; } private TableMetaData createTableMetaData(final Connection connection, final String catalog, final String actualTableName) throws SQLException { //判斷表是否存在 if (isTableExist(connection, catalog, actualTableName)) { //封裝表元數(shù)據(jù) return new TableMetaData(getColumnMetaDataList(connection, catalog, actualTableName), getLogicIndexes(connection, catalog, actualTableName)); } return new TableMetaData(Collections. emptyList(), Collections. emptySet()); } private boolean isTableExist(final Connection connection, final String catalog, final String actualTableName) throws SQLException { try (ResultSet resultSet = connection.getMetaData().getTables(catalog, null, actualTableName, null)) { return resultSet.next(); } } /** * 獲取表字段元數(shù)據(jù) * * @param connection 連接 * @param catalog schema * @param actualTableName 真實表 * @return * @throws SQLException */ private List getColumnMetaDataList(final Connection connection, final String catalog, final String actualTableName) throws SQLException { List result = new LinkedList<>(); Collection primaryKeys = getPrimaryKeys(connection, catalog, actualTableName); try (ResultSet resultSet = connection.getMetaData().getColumns(catalog, null, actualTableName, "%")) { while (resultSet.next()) { String columnName = resultSet.getString("COLUMN_NAME"); String columnType = resultSet.getString("TYPE_NAME"); result.add(new ColumnMetaData(columnName, columnType, primaryKeys.contains(columnName))); } } return result; } /** * 獲取表主鍵 */ private Collection getPrimaryKeys(final Connection connection, final String catalog, final String actualTableName) throws SQLException { Collection result = new HashSet<>(); try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(catalog, null, actualTableName)) { while (resultSet.next()) { result.add(resultSet.getString("COLUMN_NAME")); } } return result; } /** * 獲取表索引 */ private Collection getLogicIndexes(final Connection connection, final String catalog, final String actualTableName) throws SQLException { Collection result = new HashSet<>(); try (ResultSet resultSet = connection.getMetaData().getIndexInfo(catalog, catalog, actualTableName, false, false)) { while (resultSet.next()) { Optional logicIndex = getLogicIndex(resultSet.getString("INDEX_NAME"), actualTableName); if (logicIndex.isPresent()) { result.add(logicIndex.get()); } } } return result; } private Optional getLogicIndex(final String actualIndexName, final String actualTableName) { //索引要以`_tableName`命名,比如: //idx_t_order String indexNameSuffix = "_" + actualTableName; if (actualIndexName.contains(indexNameSuffix)) { return Optional.of(actualIndexName.replace(indexNameSuffix, "")); } return Optional.absent(); }
4.執(zhí)行ShardingExecuteEngine#groupExecute
/** * Execute for group. * * @param inputGroups input groups * @param callback sharding execute callback * @param type of input value * @paramtype of return value * @return execute result * @throws SQLException throw if execute failure */ public List groupExecute(final Collection > inputGroups, final ShardingGroupExecuteCallback callback) throws SQLException { return groupExecute(inputGroups, null, callback, false); } /** * Execute for group. * * @param inputGroups input groups * @param firstCallback first sharding execute callback * @param callback sharding execute callback * @param serial whether using multi thread execute or not * @param type of input value * @param type of return value * @return execute result * @throws SQLException throw if execute failure */ public List groupExecute( final Collection > inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback, final boolean serial) throws SQLException { if (inputGroups.isEmpty()) { return Collections.emptyList(); } //serial: 串行 //parallel: 并行 return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback); } private List serialExecute(final Collection > inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback) throws SQLException { Iterator > inputGroupsIterator = inputGroups.iterator(); ShardingExecuteGroup firstInputs = inputGroupsIterator.next(); //單獨(dú)執(zhí)行第一個組 //當(dāng)firstCallback不為空時使用firstCallback,否則使用callback List result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback)); //遍歷執(zhí)行 for (ShardingExecuteGroup each : Lists.newArrayList(inputGroupsIterator)) { result.addAll(syncGroupExecute(each, callback)); } return result; } private List parallelExecute(final Collection > inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback) throws SQLException { Iterator > inputGroupsIterator = inputGroups.iterator(); //獲取第一個組 ShardingExecuteGroup firstInputs = inputGroupsIterator.next(); //將剩余組提交到線程池中執(zhí)行 Collection >> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback); //執(zhí)行第一個組,合并同步執(zhí)行、異步執(zhí)行結(jié)果 return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures); } /** * 異步執(zhí)行 */ private Collection >> asyncGroupExecute(final List > inputGroups, final ShardingGroupExecuteCallback callback) { Collection >> result = new LinkedList<>(); for (ShardingExecuteGroup each : inputGroups) { result.add(asyncGroupExecute(each, callback)); } return result; } private ListenableFuture > asyncGroupExecute(final ShardingExecuteGroup inputGroup, final ShardingGroupExecuteCallback callback) { final Map dataMap = ShardingExecuteDataMap.getDataMap(); //提交到線程池 return executorService.submit(new Callable >() { @Override public Collection call() throws SQLException { return callback.execute(inputGroup.getInputs(), false, dataMap); } }); } /** * 同步執(zhí)行 */ private Collection syncGroupExecute(final ShardingExecuteGroup executeGroup, final ShardingGroupExecuteCallback callback) throws SQLException { return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap()); } private List getGroupResults(final Collection firstResults, final Collection >> restFutures) throws SQLException { List result = new LinkedList<>(firstResults); for (ListenableFuture > each : restFutures) { try { result.addAll(each.get()); } catch (final InterruptedException | ExecutionException ex) { return throwException(ex); } } return result; }
上述就是小編為大家分享的ShardingContent的功能有哪些了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
文章標(biāo)題:ShardingContent的功能有哪些
網(wǎng)站鏈接:http://www.dlmjj.cn/article/iijpdh.html