日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
ShardingContent的功能有哪些

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)ShardingContent的功能有哪些,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

創(chuàng)新互聯(lián)建站從2013年開始,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項目網(wǎng)站制作、網(wǎng)站設(shè)計網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元濟(jì)源做網(wǎng)站,已為上家服務(wù),為濟(jì)源各地企業(yè)和個人服務(wù),聯(lián)系電話:18980820575

成都創(chuàng)新互聯(lián)是專業(yè)的市南網(wǎng)站建設(shè)公司,市南接單;提供成都做網(wǎng)站、成都網(wǎng)站設(shè)計,網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行市南網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊,希望更多企業(yè)前來合作!

ShardingContent主要做了那些功能呢?主要有兩部分:

  • 數(shù)據(jù)源分片元數(shù)據(jù)

        主要根據(jù)數(shù)據(jù)源連接獲取對應(yīng)的url,通過解析url參數(shù)來封裝數(shù)據(jù)源分片元數(shù)據(jù);數(shù)據(jù)源分片元數(shù)據(jù)主要后續(xù)SQL路由DCL(比如:授權(quán)、創(chuàng)建用戶等)操作使用

  • 表分片元數(shù)據(jù)

        主要根據(jù)數(shù)據(jù)節(jié)點來獲取真實表的元數(shù)據(jù);而表分片元數(shù)據(jù)主要后續(xù)SQL解析填充使用

源碼分析

1.ShardingContext構(gòu)造,主要分析ShardingTableMetaData

public  ShardingContext(final Map dataSourceMap, final ShardingRule shardingRule, final DatabaseType databaseType, final Properties props) throws SQLException {
        this.shardingRule = shardingRule;
        //獲取數(shù)據(jù)源原始元數(shù)據(jù)信息
        this.cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);
        //數(shù)據(jù)源類型
        this.databaseType = databaseType;
        //sharding 配置參數(shù)
        //比如:sql打印、線程池大小配置等
        shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
        //Statement、PrepareStatement執(zhí)行線程池大小
        //一個分片數(shù)據(jù)源將使用獨(dú)立的線程池,它不會在同一個JVM中共享線程池甚至不同的數(shù)據(jù)源
        //默認(rèn)無限制
        int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
        //執(zhí)行引擎
        executeEngine = new ShardingExecuteEngine(executorSize);
        //數(shù)據(jù)源分片元數(shù)據(jù)
        //以MySQL為例,建立連接獲取mysql url,將解析后的url參數(shù)信息封裝到ShardingDataSourceMetaData
        ShardingDataSourceMetaData shardingDataSourceMetaData = new ShardingDataSourceMetaData(getDataSourceURLs(dataSourceMap), shardingRule, databaseType);
        //表分片元數(shù)據(jù)
        //以mysql為例,會建立連接獲取表的元信息(字段、字段類型、索引)
        ShardingTableMetaData shardingTableMetaData = new ShardingTableMetaData(getTableMetaDataInitializer(dataSourceMap, shardingDataSourceMetaData).load(shardingRule));
        //封裝數(shù)據(jù)源分片元數(shù)據(jù)、表分片元數(shù)據(jù)
        metaData = new ShardingMetaData(shardingDataSourceMetaData, shardingTableMetaData);
        //解析結(jié)果緩存
        parsingResultCache = new ParsingResultCache();
    }

//
    private TableMetaDataInitializer getTableMetaDataInitializer(final Map dataSourceMap, final ShardingDataSourceMetaData shardingDataSourceMetaData) {
        return new TableMetaDataInitializer(shardingDataSourceMetaData, executeEngine, new JDBCTableMetaDataConnectionManager(dataSourceMap),
                shardingProperties.getValue(ShardingPropertiesConstant.MAX_CONNECTIONS_SIZE_PER_QUERY),
                shardingProperties.getValue(ShardingPropertiesConstant.CHECK_TABLE_METADATA_ENABLED));
    }

2.加載TableMetaDataInitializer#load

    public TableMetaDataInitializer(final ShardingDataSourceMetaData shardingDataSourceMetaData, final ShardingExecuteEngine executeEngine, 
                                    final TableMetaDataConnectionManager connectionManager, final int maxConnectionsSizePerQuery, final boolean isCheckingMetaData) {
        //數(shù)據(jù)源分片元數(shù)據(jù)
        this.shardingDataSourceMetaData = shardingDataSourceMetaData;
        //數(shù)據(jù)源連接管理器
        this.connectionManager = connectionManager;
        //表元數(shù)據(jù)加載器
        tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, executeEngine, connectionManager, maxConnectionsSizePerQuery, isCheckingMetaData);
    }

    /**
     * Load table meta data.
     *
     * @param logicTableName logic table name
     * @param shardingRule sharding rule
     * @return table meta data
     */
    @SneakyThrows
    public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) {
        return tableMetaDataLoader.load(logicTableName, shardingRule);
    }
    
    /**
     * Load all table meta data.
     * 
     * @param shardingRule sharding rule
     * @return all table meta data
     */
    @SneakyThrows
    public Map load(final ShardingRule shardingRule) {
        Map result = new HashMap<>();
        //加載分片表
        result.putAll(loadShardingTables(shardingRule));
        //加載未分片表
        result.putAll(loadDefaultTables(shardingRule));
        return result;
    }
    
    private Map loadShardingTables(final ShardingRule shardingRule) throws SQLException {
        Map result = new HashMap<>(shardingRule.getTableRules().size(), 1);
        for (TableRule each : shardingRule.getTableRules()) {
            //加載邏輯表對應(yīng)真實表的元數(shù)據(jù)
            //邏輯表:表元數(shù)據(jù)
            result.put(each.getLogicTable(), tableMetaDataLoader.load(each.getLogicTable(), shardingRule));
        }
        return result;
    }
    
    private Map loadDefaultTables(final ShardingRule shardingRule) throws SQLException {
        Map result = new HashMap<>(shardingRule.getTableRules().size(), 1);
        //查詢默認(rèn)數(shù)據(jù)源,沒有則查找主庫
        Optional actualDefaultDataSourceName = shardingRule.findActualDefaultDataSourceName();
        if (actualDefaultDataSourceName.isPresent()) {
            //獲取所有表元數(shù)據(jù)
            //真實表:表元數(shù)據(jù)
            for (String each : getAllTableNames(actualDefaultDataSourceName.get())) {
                result.put(each, tableMetaDataLoader.load(each, shardingRule));
            }
        }
        return result;
    }
    
    private Collection getAllTableNames(final String dataSourceName) throws SQLException {
        Collection result = new LinkedHashSet<>();
        DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
        String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName();
        try (Connection connection = connectionManager.getConnection(dataSourceName);
             ResultSet resultSet = connection.getMetaData().getTables(catalog, getCurrentSchemaName(connection), null, new String[]{"TABLE"})) {
            while (resultSet.next()) {
                String tableName = resultSet.getString("TABLE_NAME");
                if (!tableName.contains("$") && !tableName.contains("/")) {
                    result.add(tableName);
                }
            }
        }
        return result;
    }
    
    private String getCurrentSchemaName(final Connection connection) throws SQLException {
        try {
            return connection.getSchema();
        } catch (final AbstractMethodError | SQLFeatureNotSupportedException ignore) {
            return null;
        }
    }

3.加載表元數(shù)據(jù)TableMetaDataLoader#load

    /**
     * Load table meta data.
     *
     * @param logicTableName logic table name
     * @param shardingRule sharding rule
     * @return table meta data
     * @throws SQLException SQL exception
     */
    public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) throws SQLException {
        //獲取表元數(shù)據(jù)
        List actualTableMetaDataList = load(getDataNodeGroups(logicTableName, shardingRule), shardingRule.getShardingDataSourceNames());
        //檢查actualTableMetaDataList的元數(shù)據(jù)
        checkUniformed(logicTableName, actualTableMetaDataList);
        return actualTableMetaDataList.iterator().next();
    }
    
    private List load(final Map> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException {
        //將封裝好的數(shù)據(jù)節(jié)點組提交給執(zhí)行引擎執(zhí)行
        return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback() {
            
            @Override
            public Collection execute(final Collection dataNodes, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException {
                String dataSourceName = dataNodes.iterator().next().getDataSourceName();
                DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
                String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName();
                return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, dataNodes);
            }
        });
    }
    
    private Collection load(final String dataSourceName, final String catalog, final Collection dataNodes) throws SQLException {
        Collection result = new LinkedList<>();
        try (Connection connection = connectionManager.getConnection(dataSourceName)) {
            for (DataNode each : dataNodes) {
                //獲取表元數(shù)據(jù)
                result.add(createTableMetaData(connection, catalog, each.getTableName()));
            }
        }
        return result;
    }
    
    private Map> getDataNodeGroups(final String logicTableName, final ShardingRule shardingRule) {
        //根據(jù)邏輯表獲取對應(yīng)的數(shù)據(jù)源:真實表數(shù)據(jù)節(jié)點
        //比如:
        //ds_0 -> [ds_0:t_order_0, ds_0:t_order_1]
        //ds_1 -> [ds_1.t_order_0, ds_1.t_order_1]
        Map> result = shardingRule.getTableRule(logicTableName).getDataNodeGroups();
        //默認(rèn)false,設(shè)置為true會處理所有數(shù)據(jù)節(jié)點真實表
        if (isCheckingMetaData) {
            return result;
        }
        //返回一個數(shù)據(jù)節(jié)點即可
        String firstKey = result.keySet().iterator().next();
        return Collections.singletonMap(firstKey, Collections.singletonList(result.get(firstKey).get(0)));
    }

    /**
     * 將數(shù)據(jù)節(jié)點組封裝成分片執(zhí)行組
     *
     * @param dataNodeGroups 數(shù)據(jù)節(jié)點組
     * 
     *      ds_0 -> [ds_0:t_order_0, ds_0:t_order_1]
     * 
     * @return      */     private Collection> getDataNodeGroups(final Map> dataNodeGroups) {         Collection> result = new LinkedList<>();         //遍歷對應(yīng)數(shù)據(jù)源下的數(shù)據(jù)節(jié)點         for (Entry> entry : dataNodeGroups.entrySet()) {             //封裝分片執(zhí)行組ShardingExecuteGroup             result.addAll(getDataNodeGroups(entry.getValue()));         }         return result;     }          private Collection> getDataNodeGroups(final List dataNodes) {         Collection> result = new LinkedList<>();         //maxConnectionsSizePerQuery最大查詢連接數(shù)默認(rèn)為1         //將dataNodes換分Math.max份         for (List each : Lists.partition(dataNodes, Math.max(dataNodes.size() / maxConnectionsSizePerQuery, 1))) {             result.add(new ShardingExecuteGroup<>(each));         }         return result;     }          private TableMetaData createTableMetaData(final Connection connection, final String catalog, final String actualTableName) throws SQLException {         //判斷表是否存在         if (isTableExist(connection, catalog, actualTableName)) {             //封裝表元數(shù)據(jù)             return new TableMetaData(getColumnMetaDataList(connection, catalog, actualTableName), getLogicIndexes(connection, catalog, actualTableName));         }         return new TableMetaData(Collections.emptyList(), Collections.emptySet());     }          private boolean isTableExist(final Connection connection, final String catalog, final String actualTableName) throws SQLException {         try (ResultSet resultSet = connection.getMetaData().getTables(catalog, null, actualTableName, null)) {             return resultSet.next();         }     }     /**      * 獲取表字段元數(shù)據(jù)      *      * @param connection 連接      * @param catalog schema      * @param actualTableName 真實表      * @return      * @throws SQLException      */     private List getColumnMetaDataList(final Connection connection, final String catalog, final String actualTableName) throws SQLException {         List result = new LinkedList<>();         Collection primaryKeys = getPrimaryKeys(connection, catalog, actualTableName);         try (ResultSet resultSet = connection.getMetaData().getColumns(catalog, null, actualTableName, "%")) {             while (resultSet.next()) {                 String columnName = resultSet.getString("COLUMN_NAME");                 String columnType = resultSet.getString("TYPE_NAME");                 result.add(new ColumnMetaData(columnName, columnType, primaryKeys.contains(columnName)));             }         }         return result;     }     /**      * 獲取表主鍵      */     private Collection getPrimaryKeys(final Connection connection, final String catalog, final String actualTableName) throws SQLException {         Collection result = new HashSet<>();         try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(catalog, null, actualTableName)) {             while (resultSet.next()) {                 result.add(resultSet.getString("COLUMN_NAME"));             }         }         return result;     }     /**      * 獲取表索引      */     private Collection getLogicIndexes(final Connection connection, final String catalog, final String actualTableName) throws SQLException {         Collection result = new HashSet<>();         try (ResultSet resultSet = connection.getMetaData().getIndexInfo(catalog, catalog, actualTableName, false, false)) {             while (resultSet.next()) {                 Optional logicIndex = getLogicIndex(resultSet.getString("INDEX_NAME"), actualTableName);                 if (logicIndex.isPresent()) {                     result.add(logicIndex.get());                 }             }         }         return result;     }          private Optional getLogicIndex(final String actualIndexName, final String actualTableName) {         //索引要以`_tableName`命名,比如:         //idx_t_order         String indexNameSuffix = "_" + actualTableName;         if (actualIndexName.contains(indexNameSuffix)) {             return Optional.of(actualIndexName.replace(indexNameSuffix, ""));         }         return Optional.absent();     }

4.執(zhí)行ShardingExecuteEngine#groupExecute

    /**
     * Execute for group.
     *
     * @param inputGroups input groups
     * @param callback sharding execute callback
     * @param  type of input value
     * @param  type of return value
     * @return execute result
     * @throws SQLException throw if execute failure
     */
    public  List groupExecute(final Collection> inputGroups, final ShardingGroupExecuteCallback callback) throws SQLException {
        return groupExecute(inputGroups, null, callback, false);
    }
    
    /**
     * Execute for group.
     *
     * @param inputGroups input groups
     * @param firstCallback first sharding execute callback
     * @param callback sharding execute callback
     * @param serial whether using multi thread execute or not
     * @param  type of input value
     * @param  type of return value
     * @return execute result
     * @throws SQLException throw if execute failure
     */
    public  List groupExecute(
        final Collection> inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback, final boolean serial)
        throws SQLException {
        if (inputGroups.isEmpty()) {
            return Collections.emptyList();
        }
        //serial: 串行
        //parallel: 并行
        return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
    }
    
    private  List serialExecute(final Collection> inputGroups, final ShardingGroupExecuteCallback firstCallback,
                                         final ShardingGroupExecuteCallback callback) throws SQLException {
        Iterator> inputGroupsIterator = inputGroups.iterator();
        ShardingExecuteGroup firstInputs = inputGroupsIterator.next();
        //單獨(dú)執(zhí)行第一個組
        //當(dāng)firstCallback不為空時使用firstCallback,否則使用callback
        List result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback));
        //遍歷執(zhí)行
        for (ShardingExecuteGroup each : Lists.newArrayList(inputGroupsIterator)) {
            result.addAll(syncGroupExecute(each, callback));
        }
        return result;
    }
    
    private  List parallelExecute(final Collection> inputGroups, final ShardingGroupExecuteCallback firstCallback,
                                           final ShardingGroupExecuteCallback callback) throws SQLException {
        Iterator> inputGroupsIterator = inputGroups.iterator();
        //獲取第一個組
        ShardingExecuteGroup firstInputs = inputGroupsIterator.next();
        //將剩余組提交到線程池中執(zhí)行
        Collection>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback);
        //執(zhí)行第一個組,合并同步執(zhí)行、異步執(zhí)行結(jié)果
        return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
    }

    /**
     * 異步執(zhí)行
     */
    private  Collection>> asyncGroupExecute(final List> inputGroups, final ShardingGroupExecuteCallback callback) {
        Collection>> result = new LinkedList<>();
        for (ShardingExecuteGroup each : inputGroups) {
            result.add(asyncGroupExecute(each, callback));
        }
        return result;
    }
    
    private  ListenableFuture> asyncGroupExecute(final ShardingExecuteGroup inputGroup, final ShardingGroupExecuteCallback callback) {
        final Map dataMap = ShardingExecuteDataMap.getDataMap();
        //提交到線程池
        return executorService.submit(new Callable>() {
            
            @Override
            public Collection call() throws SQLException {
                return callback.execute(inputGroup.getInputs(), false, dataMap);
            }
        });
    }

    /**
     * 同步執(zhí)行
     */
    private  Collection syncGroupExecute(final ShardingExecuteGroup executeGroup, final ShardingGroupExecuteCallback callback) throws SQLException {
        return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap());
    }
    
    private  List getGroupResults(final Collection firstResults, final Collection>> restFutures) throws SQLException {
        List result = new LinkedList<>(firstResults);
        for (ListenableFuture> each : restFutures) {
            try {
                result.addAll(each.get());
            } catch (final InterruptedException | ExecutionException ex) {
                return throwException(ex);
            }
        }
        return result;
    }

上述就是小編為大家分享的ShardingContent的功能有哪些了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


文章標(biāo)題:ShardingContent的功能有哪些
網(wǎng)站鏈接:http://www.dlmjj.cn/article/iijpdh.html