日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
數(shù)據(jù)庫中間件MyCAT源碼分析——跨庫兩表Join

1. 概述

MyCAT 支持跨庫表 Join,目前版本僅支持跨庫兩表 Join。雖然如此,已經(jīng)能夠滿足我們大部分的業(yè)務(wù)場(chǎng)景。況且,Join 過多的表可能帶來的性能問題也是很麻煩的。

本文主要分享:

  1. 整體流程、調(diào)用順序圖
  2. 核心代碼的分析

前置閱讀:《MyCAT 源碼分析 —— 【單庫單表】查詢》。

OK,Let's Go。

2. 主流程

當(dāng)執(zhí)行跨庫兩表 Join SQL 時(shí),經(jīng)歷的大體流程如下:

SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ ${SQL} 。RouteService#route(...) 解析注解 mycat:catlet 后,路由給 HintCatletHandler 作進(jìn)一步處理。

HintCatletHandler 獲取注解對(duì)應(yīng)的 Catlet 實(shí)現(xiàn)類,io.mycat.catlets.ShareJoin 就是其中一種實(shí)現(xiàn)(目前也只有這一種實(shí)現(xiàn)),提供了跨庫兩表 Join 的功能。從類命名上看,ShareJoin 很大可能性后續(xù)會(huì)提供完整的跨庫多表的 Join 功能。

核心代碼如下:

 
 
 
 
  1. // HintCatletHandler.java
  2. public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema,
  3.                            int sqlType, String realSQL, String charset, ServerConnection sc,
  4.                            LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap)
  5.        throws SQLNonTransientException {
  6.    String cateletClass = hintSQLValue;
  7.    if (LOGGER.isDebugEnabled()) {
  8.        LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL);
  9.    }
  10.    try {
  11.        Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass);
  12.        catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool);
  13.        catlet.processSQL(realSQL, new EngineCtx(sc.getSession2()));
  14.    } catch (Exception e) {
  15.        LOGGER.warn("catlet error " + e);
  16.        throw new SQLNonTransientException(e);
  17.    }
  18.    return null;

3. ShareJoin

目前支持跨庫兩表 Join。ShareJoin 將 SQL 拆分成左表 SQL 和 右表 SQL,發(fā)送給各數(shù)據(jù)節(jié)點(diǎn)執(zhí)行,匯總數(shù)據(jù)結(jié)果進(jìn)行合后返回。

偽代碼如下:

 
 
 
 
  1. // SELECT u.id, o.id FROM t_order o 
  2. // INNER JOIN t_user u ON o.uid = u.id
  3. // 【順序】查詢左表
  4. String leftSQL = "SELECT o.id, u.id FROM t_order o";
  5. List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql);
  6. // 【并行】查詢右表
  7. String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";
  8. for (dn : dns) { // 此處是并行執(zhí)行,使用回調(diào)邏輯
  9.     for (rightRecord : dn.select(rightSQL)) { // 查詢右表
  10.         // 合并結(jié)果
  11.         for (leftRecord : leftList) {
  12.             if (leftRecord.uid == rightRecord.id) {
  13.                 write(leftRecord + leftRecord.uid 拼接結(jié)果);
  14.             }
  15.         }
  16.     }

實(shí)際情況會(huì)更加復(fù)雜,我們接下來一點(diǎn)點(diǎn)往下看。

3.1 JoinParser

JoinParser 負(fù)責(zé)對(duì) SQL 進(jìn)行解析。整體流程如下:

舉個(gè)例子,/*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 解析后,TableFilter 結(jié)果如下:

  • tName :表名
  • tAlia :表自定義命名
  • where :過濾條件
  • order :排序條件
  • parenTable :左連接的 Join 的表名。t_user表 在 join屬性 的 parenTable 為 "o",即 t_order。
  • joinParentkey :左連接的 Join 字段
  • joinKey :join 字段。t_user表 在 join屬性 為 id。
  • join :子 tableFilter。即,該表連接的右邊的表。
  • parent :和 join屬性 相對(duì)。

看到此處,大家可能有疑問,為什么要把 SQL 解析成 TableFilter。JoinParser 根據(jù) TableFilter 生成數(shù)據(jù)節(jié)點(diǎn)執(zhí)行 SQL。代碼如下:

 
 
 
 
  1. // TableFilter.java
  2. public String getSQL() {
  3.    String sql = "";
  4.    // fields
  5.    for (Entry entry : fieldAliasMap.entrySet()) {
  6.        String key = entry.getKey();
  7.        String val = entry.getValue();
  8.        if (val == null) {
  9.            sql = unionsql(sql, getFieldfrom(key), ",");
  10.        } else {
  11.            sql = unionsql(sql, getFieldfrom(key) + " as " + val, ",");
  12.        }
  13.    }
  14.    // where
  15.    if (parent == null) {    // on/where 等于號(hào)左邊的表
  16.        String parentJoinKey = getJoinKey(true);
  17.        // fix sharejoin bug:
  18.        // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:
  19.        // 原因是左表的select列沒有包含 join 列,在獲取結(jié)果時(shí)報(bào)上面的錯(cuò)誤
  20.        if (sql != null && parentJoinKey != null &&
  21.                !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) {
  22.            sql += ", " + parentJoinKey;
  23.        }
  24.        sql = "select " + sql + " from " + tName;
  25.        if (!(where.trim().equals(""))) {
  26.            sql += " where " + where.trim();
  27.        }
  28.    } else {    // on/where 等于號(hào)右邊邊的表
  29.        if (allField) {
  30.            sql = "select " + sql + " from " + tName;
  31.        } else {
  32.            sql = unionField("select " + joinKey, sql, ",");
  33.            sql = sql + " from " + tName;
  34.            //sql="select "+joinKey+","+sql+" from "+tName;
  35.        }
  36.        if (!(where.trim().equals(""))) {
  37.            sql += " where " + where.trim() + " and (" + joinKey + " in %s )";
  38.        } else {
  39.            sql += " where " + joinKey + " in %s ";
  40.        }
  41.    }
  42.    // order
  43.    if (!(order.trim().equals(""))) {
  44.        sql += " order by " + order.trim();
  45.    }
  46.    // limit
  47.    if (parent == null) {
  48.        if ((rowCount > 0) && (offset > 0)) {
  49.            sql += " limit" + offset + "," + rowCount;
  50.        } else {
  51.            if (rowCount > 0) {
  52.                sql += " limit " + rowCount;
  53.            }
  54.        }
  55.    }
  56.    return sql;
  • 當(dāng) parent 為空時(shí),即on/where 等于號(hào)左邊的表。例如:select id, uid from t_order。
  • 當(dāng) parent 不為空時(shí),即on/where 等于號(hào)右邊的表。例如:select id, username from t_user where id in (1, 2, 3)。

3.2 ShareJoin.processSQL(...)

當(dāng) SQL 解析完后,生成左邊的表執(zhí)行的 SQL,發(fā)送給對(duì)應(yīng)的數(shù)據(jù)節(jié)點(diǎn)查詢數(shù)據(jù)。大體流程如下:

當(dāng) SQL 為 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 時(shí), sql = getSql() 的返回結(jié)果為 select id, uid from t_order。

生成左邊的表執(zhí)行的 SQL 后,順序順序順序發(fā)送給對(duì)應(yīng)的數(shù)據(jù)節(jié)點(diǎn)查詢數(shù)據(jù)。具體順序查詢是怎么實(shí)現(xiàn)的,我們來看下章 BatchSQLJob。

3.3 BatchSQLJob

EngineCtx 對(duì) BatchSQLJob 封裝,提供上層兩個(gè)方法:

  • executeNativeSQLSequnceJob :順序(非并發(fā))在每個(gè)數(shù)據(jù)節(jié)點(diǎn)執(zhí)行SQL任務(wù)
  • executeNativeSQLParallJob :并發(fā)在每個(gè)數(shù)據(jù)節(jié)點(diǎn)執(zhí)行SQL任務(wù)

核心代碼如下:

 
 
 
 
  1. // EngineCtx.java
  2. public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,
  3.         SQLJobHandler jobHandler) {
  4.     for (String dataNode : dataNodes) {
  5.         SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
  6.                 jobHandler, this);
  7.         bachJob.addJob(job, false);
  8.     }
  9. }
  10. public void executeNativeSQLParallJob(String[] dataNodes, String sql,
  11.         SQLJobHandler jobHandler) {
  12.     for (String dataNode : dataNodes) {
  13.         SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
  14.                 jobHandler, this);
  15.         bachJob.addJob(job, true);
  16.     }

BatchSQLJob 通過執(zhí)行中任務(wù)列表、待執(zhí)行任務(wù)列表來實(shí)現(xiàn)順序/并發(fā)執(zhí)行任務(wù)。核心代碼如下:

 
 
 
 
  1. // BatchSQLJob.java
  2. /**
  3. * 執(zhí)行中任務(wù)列表
  4. */
  5. private ConcurrentHashMap runningJobs = new ConcurrentHashMap();
  6. /**
  7. * 待執(zhí)行任務(wù)列表
  8. */
  9. private ConcurrentLinkedQueue waitingJobs = new ConcurrentLinkedQueue();
  10. public void addJob(SQLJob newJob, boolean parallExecute) {
  11.    if (parallExecute) {
  12.        runJob(newJob);
  13.    } else {
  14.        waitingJobs.offer(newJob);
  15.        if (runningJobs.isEmpty()) { // 若無正在執(zhí)行中的任務(wù),則從等待隊(duì)列里獲取任務(wù)進(jìn)行執(zhí)行。
  16.            SQLJob job = waitingJobs.poll();
  17.            if (job != null) {
  18.                runJob(job);
  19.            }
  20.        }
  21.    }
  22. }
  23. public boolean jobFinished(SQLJob sqlJob) {
  24.     runningJobs.remove(sqlJob.getId());
  25.     SQLJob job = waitingJobs.poll();
  26.     if (job != null) {
  27.         runJob(job);
  28.         return false;
  29.     } else {
  30.         if (noMoreJobInput) {
  31.             return runningJobs.isEmpty() && waitingJobs.isEmpty();
  32.         } else {
  33.             return false;
  34.         }
  35.     }
  • 順序執(zhí)行時(shí),當(dāng) runningJobs 存在執(zhí)行中的任務(wù)時(shí),#addJob(...) 時(shí),不立即執(zhí)行,添加到 waitingJobs。當(dāng) SQLJob 完成時(shí),順序調(diào)用下一個(gè)任務(wù)。
  • 并發(fā)執(zhí)行時(shí),#addJob(...) 時(shí),立即執(zhí)行。

SQLJob SQL 異步執(zhí)行任務(wù)。其 jobHandler(SQLJobHandler) 屬性,在 SQL 執(zhí)行有返回結(jié)果時(shí),會(huì)進(jìn)行回調(diào),從而實(shí)現(xiàn)異步執(zhí)行。

在 ShareJoin 里,SQLJobHandler 有兩個(gè)實(shí)現(xiàn):ShareDBJoinHandler、ShareRowOutPutDataHandler。前者,左邊的表執(zhí)行的 SQL 回調(diào);后者,右邊的表執(zhí)行的 SQL 回調(diào)。

3.4 ShareDBJoinHandler

ShareDBJoinHandler,左邊的表執(zhí)行的 SQL 回調(diào)。流程如下:

  • #fieldEofResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 fields,放入內(nèi)存。
  • #rowResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 row,放入內(nèi)存。
  • #rowEofResponse(...) :接收完一個(gè)數(shù)據(jù)節(jié)點(diǎn)返回所有的 row。當(dāng)所有數(shù)據(jù)節(jié)點(diǎn)都完成 SQL 執(zhí)行時(shí),提交右邊的表執(zhí)行的 SQL 任務(wù),并行執(zhí)行,即圖中#createQryJob(...)。

當(dāng) SQL 為 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 時(shí), sql = getChildSQL() 的返回結(jié)果為 select id, username from t_user where id in (1, 2, 3)。

核心代碼如下:

 
 
 
 
  1. // ShareJoin.java
  2. private void createQryJob(int batchSize) {
  3.    int count = 0;
  4.    Map batchRows = new ConcurrentHashMap();
  5.    String theId = null;
  6.    StringBuilder sb = new StringBuilder().append('(');
  7.    String svalue = "";
  8.    for (Map.Entry e : ids.entrySet()) {
  9.        theId = e.getKey();
  10.        byte[] rowbyte = rows.remove(theId);
  11.        if (rowbyte != null) {
  12.            batchRows.put(theId, rowbyte);
  13.        }
  14.        if (!svalue.equals(e.getValue())) {
  15.            if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING
  16.                    || joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 為varchar
  17.                sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')
  18.            } else { // 默認(rèn)joinkey為int/long
  19.                sb.append(e.getValue()).append(','); // (1,2,3)
  20.            }
  21.        }
  22.        svalue = e.getValue();
  23.        if (count++ > batchSize) {
  24.            break;
  25.        }
  26.    }
  27.    if (count == 0) {
  28.        return;
  29.    }
  30.    jointTableIsData = true;
  31.    sb.deleteCharAt(sb.length() - 1).append(')');
  32.    String sql = String.format(joinParser.getChildSQL(), sb);
  33.    getRoute(sql);
  34.    ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));

3.5 ShareRowOutPutDataHandler

ShareRowOutPutDataHandler,右邊的表執(zhí)行的 SQL 回調(diào)。流程如下:

  • #fieldEofResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 fields,返回 header 給 MySQL Client。
  • #rowResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 row,匹配左表的記錄,返回合并后返回的 row 給 MySQL Client。
  • #rowEofResponse(...) :當(dāng)所有 row 都返回完后,返回 eof 給 MySQL Client。

核心代碼如下:

 
 
 
 
  1. // ShareRowOutPutDataHandler.java
  2. public boolean onRowData(String dataNode, byte[] rowData) {
  3.    RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);
  4.    //拷貝一份batchRows
  5.    Map batchRowsCopy = new ConcurrentHashMap();
  6.    batchRowsCopy.putAll(arows);
  7.    // 獲取Id字段,
  8.    String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));
  9.    // 查找ID對(duì)應(yīng)的A表的記錄
  10.    byte[] arow = getRow(batchRowsCopy, id, joinL);
  11.    while (arow != null) {
  12.        RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());
  13.        for (int i = 1; i < rowDataPkgold.fieldCount; i++) {
  14.            // 設(shè)置b.name 字段
  15.            byte[] bname = rowDataPkgold.fieldValues.get(i);
  16.            rowDataPkg.add(bname);
  17.            rowDataPkg.addFieldCount(1);
  18.        }
  19.        // huangyiming add
  20.        MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();
  21.        if (null == middlerResultHandler) {
  22.            ctx.writeRow(rowDataPkg);
  23.        } else {
  24.            if (middlerResultHandler instanceof MiddlerQueryResultHandler) {
  25.                byte[] columnData = rowDataPkg.fieldValues.get(0);
  26.                if (columnData != null && columnData.length > 0) {
  27.                    String rowValue = new String(columnData);
  28.                    middlerResultHandler.add(rowValue);
  29.                }
  30.                //}
  31.            }
  32.        }
  33.        arow = getRow(batchRowsCopy, id, joinL);
  34.    }
  35.    return false;

4. 彩蛋

如下是本文涉及到的核心類,有興趣的同學(xué)可以翻一翻。

ShareJoin 另外不支持的功能:

  1. 只支持 inner join,不支持 left join、right join 等等連接。
  2. 不支持 order by。
  3. 不支持 group by 以及 相關(guān)聚合函數(shù)。
  4. 即使 join 左表的字段未聲明為返回 fields 也會(huì)返回。

恩,MyCAT 弱XA 源碼繼續(xù)走起!


標(biāo)題名稱:數(shù)據(jù)庫中間件MyCAT源碼分析——跨庫兩表Join
分享鏈接:http://www.dlmjj.cn/article/djphcii.html