日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
十分鐘教你寫一個(gè)數(shù)據(jù)庫

今天教大家借助一款框架快速實(shí)現(xiàn)一個(gè)數(shù)據(jù)庫,這個(gè)框架就是Calcite,下面會(huì)帶大家通過兩個(gè)例子快速教會(huì)大家怎么實(shí)現(xiàn),一個(gè)是可以通過 SQL 語句的方式可以直接查詢文件內(nèi)容,第二個(gè)是模擬 Mysql 查詢功能,以及最后告訴大家怎么實(shí)現(xiàn) SQL 查詢 Kafka 數(shù)據(jù)。

Calcite

Calcite 是一個(gè)用于優(yōu)化異構(gòu)數(shù)據(jù)源的查詢處理的可插拔基礎(chǔ)框架(他是一個(gè)框架),可以將任意數(shù)據(jù)(Any data, Anywhere)DML 轉(zhuǎn)換成基于 SQL 的 DML 引擎,并且我們可以選擇性的使用它的部分功能。

Calcite能干什么

  • 使用 SQL 訪問內(nèi)存中某個(gè)數(shù)據(jù)
  • 使用 SQL 訪問某個(gè)文件的數(shù)據(jù)
  • 跨數(shù)據(jù)源的數(shù)據(jù)訪問、聚合、排序等(例如 Mysql 和 Redis 數(shù)據(jù)源中的數(shù)據(jù)進(jìn)行join)

當(dāng)我們需要自建一個(gè)數(shù)據(jù)庫的時(shí)候,數(shù)據(jù)可以為任何格式的,比如text、word、xml、mysql、es、csv、第三方接口數(shù)據(jù)等等,我們只有數(shù)據(jù),我們想讓這些數(shù)據(jù)支持 SQL 形式動(dòng)態(tài)增刪改查。

另外,像Hive、Drill、Flink、Phoenix 和 Storm 等項(xiàng)目中,數(shù)據(jù)處理系統(tǒng)都是使用 Calcite 來做 SQL 解析和查詢優(yōu)化,當(dāng)然,還有部分用來構(gòu)建自己的 JDBC driver。

名詞解釋

Token

就是將標(biāo)準(zhǔn) SQL(可以理解為Mysql)關(guān)鍵詞以及關(guān)鍵詞之間的字符串截取出來,每一個(gè)token,會(huì)被封裝為一個(gè)SqlNode,SqlNode會(huì)衍生很多子類,比如Select會(huì)被封裝為SqlSelect,當(dāng)前 SqlNode 也能反解析為 SQL 文本。

RelDataTypeField

某個(gè)字段的名稱和類型信息

RelDataType

多個(gè) RelDataTypeField 組成了 RelDataType,可以理解為數(shù)據(jù)行

Table

一個(gè)完整的表的信息

Schema

所有元數(shù)據(jù)的組合,可以理解為一組 Table 或者庫的概念

開始使用

1. 引入包


org.apache.calcite
calcite-core

1.32.0

2. 創(chuàng)建model.json文件和表結(jié)構(gòu)csv

model.json 里面主要描述或者說告訴 Calcite 如何創(chuàng)建 Schema,也就是告訴框架怎么創(chuàng)建出庫。

{
"version": "1.0",
"defaultSchema": "CSV",
"schemas": [
{
"name": "CSV",
"type": "custom",
"factory": "csv.CsvSchemaFactory",
"operand": {
"directory": "csv"
}
}
]
}

接下來還需要定義一個(gè) csv 文件,用來定義表結(jié)構(gòu)。

NAME:string,MONEY:string
aixiaoxian,10000萬
xiaobai,10000萬
adong,10000萬
maomao,10000萬
xixi,10000萬
zizi,10000萬
wuwu,10000萬
kuku,10000萬

整個(gè)項(xiàng)目的結(jié)構(gòu)大概就是這樣:

3. 實(shí)現(xiàn)Schema的工廠類

在上述文件中指定的包路徑下去編寫 CsvSchemaFactory 類,實(shí)現(xiàn) SchemaFactory 接口,并且實(shí)現(xiàn)里面唯一的方法 create 方法,創(chuàng)建Schema(庫)。

public class CsvSchemaFactory implements SchemaFactory {

@Override
public Schema create(SchemaPlus parentSchema, String name,
Map operand) {
final String directory = (String) operand.get("directory");
File directoryFile = new File(directory);
return new CsvSchema(directoryFile, "scannable");
}
}

4. 自定義Schma類

有了 SchemaFactory,接下來需要自定義 Schema 類。

自定義的 Schema 需要實(shí)現(xiàn) Schema 接口,但是直接實(shí)現(xiàn)要實(shí)現(xiàn)的方法太多,我們?nèi)?shí)現(xiàn)官方的 AbstractSchema 類,這樣就只需要實(shí)現(xiàn)一個(gè)方法就行(如果有其他定制化需求可以實(shí)現(xiàn)原生接口)。

核心的邏輯就是createTableMap方法,用于創(chuàng)建出 Table 表。

他會(huì)掃描指定的Resource下面的所有 csv 文件,將每個(gè)文件映射成Table對(duì)象,最終以map形式返回,Schema接口的其他幾個(gè)方法會(huì)用到這個(gè)對(duì)象。

@Override
protected Map getTableMap() {
if (tableMap == null) {
tableMap = createTableMap();
}
return tableMap;
}
private Map createTableMap() {

final Source baseSource = Sources.of(directoryFile);

File[] files = directoryFile.listFiles((dir, name) -> {
final String nameSansGz = trim(name, ".gz");
return nameSansGz.endsWith(".csv");
});
if (files == null) {
System.out.println("directory " + directoryFile + " not found");
files = new File[0];
}

final ImmutableMap.Builder builder = ImmutableMap.builder();
for (File file : files) {
Source source = Sources.of(file);
final Source sourceSansCsv = source.trimOrNull(".csv");
if (sourceSansCsv != null) {
final Table table = createTable(source);
builder.put(sourceSansCsv.relative(baseSource).path(), table);
}
}
return builder.build();
}

5. 自定義 Table

Schema 有了,并且數(shù)據(jù)文件 csv 也映射成 Table 了,一個(gè) csv 文件對(duì)應(yīng)一個(gè) Table。

接下來我們?nèi)プ远x Table,自定義 Table 的核心是我們要定義字段的類型和名稱,以及如何讀取 csv文件。

先獲取數(shù)據(jù)類型和名稱,即單表結(jié)構(gòu),從csv文件頭中獲?。ó?dāng)前文件頭需要我們自己定義,包括規(guī)則我們也可以定制化)。

public abstract class CsvTable extends AbstractTable {
protected final Source source;
protected final @Nullable RelProtoDataType protoRowType;
private @Nullable RelDataType rowType;
private @Nullable List fieldTypes;


CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
this.source = source;
this.protoRowType = protoRowType;
}

@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
if (protoRowType != null) {
return protoRowType.apply(typeFactory);
}
if (rowType == null) {
rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
null);
}
return rowType;
}


public List getFieldTypes(RelDataTypeFactory typeFactory) {
if (fieldTypes == null) {
fieldTypes = new ArrayList<>();
CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
fieldTypes);
}
return fieldTypes;
}

public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
Source source, @Nullable List fieldTypes) {
final List types = new ArrayList<>();
final List names = new ArrayList<>();
try (CSVReader reader = openCsv(source)) {
String[] strings = reader.readNext();
if (strings == null) {
strings = new String[]{"EmptyFileHasNoColumns:boolean"};
}
for (String string : strings) {
final String name;
final RelDataType fieldType;

final int colon = string.indexOf(':');
if (colon >= 0) {
name = string.substring(0, colon);
String typeString = string.substring(colon + 1);
Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
if (decimalMatcher.matches()) {
int precision = Integer.parseInt(decimalMatcher.group(1));
int scale = Integer.parseInt(decimalMatcher.group(2));
fieldType = parseDecimalSqlType(typeFactory, precision, scale);
} else {
switch (typeString) {
case "string":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
case "boolean":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
break;
case "byte":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
break;
case "char":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
break;
case "short":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
break;
case "int":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
break;
case "long":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
break;
case "float":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
break;
case "double":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
break;
case "date":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
break;
case "timestamp":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
break;
case "time":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
break;
default:
LOGGER.warn(
"Found unknown type: {} in file: {} for column: {}. Will assume the type of "
+ "column is string.",
typeString, source.path(), name);
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
}
}
} else {

name = string;
fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
}
names.add(name);
types.add(fieldType);
if (fieldTypes != null) {
fieldTypes.add(fieldType);
}
}
} catch (IOException e) {

}
if (names.isEmpty()) {
names.add("line");
types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
}
return typeFactory.createStructType(Pair.zip(names, types));
}
}

獲取文件中的數(shù)據(jù),上面把Table的表結(jié)構(gòu)字段名稱和類型都獲取到了以后,就剩最后一步了,獲取文件中的數(shù)據(jù)。我們需要自定義一個(gè)類,實(shí)現(xiàn) ScannableTable 接口,并且實(shí)現(xiàn)里面唯一的方法 scan 方法,其實(shí)本質(zhì)上就是讀文件,然后把文件的每一行的數(shù)據(jù)和上述獲取的 fileType 進(jìn)行匹配。

@Override
public Enumerable scan(DataContext root) {
JavaTypeFactory typeFactory = root.getTypeFactory();
final List fieldTypes = getFieldTypes(typeFactory);
final List fields = ImmutableIntList.identity(fieldTypes.size());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<@Nullable Object[]>() {
@Override
public Enumerator<@Nullable Object[]> enumerator() {

return new CsvEnumerator<>(source, cancelFlag, false, null,
CsvEnumerator.arrayConverter(fieldTypes, fields, false));
}
};
}


public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
@Nullable String @Nullable [] filterValues, RowConverter rowConverter) {
this.cancelFlag = cancelFlag;
this.rowConverter = rowConverter;
this.filterValues = filterValues == null ? null
: ImmutableNullableList.copyOf(filterValues);
try {

this.reader = openCsv(source);

this.reader.readNext();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public E current() {
return castNonNull(current);
}

@Override
public boolean moveNext() {
try {
outer:
for (; ; ) {
if (cancelFlag.get()) {
return false;
}
final String[] strings = reader.readNext();
if (strings == null) {
current = null;
reader.close();
return false;
}
if (filterValues != null) {
for (int i = 0; i < strings.length; i++) {
String filterValue = filterValues.get(i);
if (filterValue != null) {
if (!filterValue.equals(strings[i])) {
continue outer;
}
}
}
}
current = rowConverter.convertRow(strings);
return true;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
if (fieldType == null || string == null) {
return string;
}
switch (fieldType.getSqlTypeName()) {
case BOOLEAN:
if (string.length() == 0) {
return null;
}
return Boolean.parseBoolean(string);
case TINYINT:
if (string.length() == 0) {
return null;
}
return Byte.parseByte(string);
case SMALLINT:
if (string.length() == 0) {
return null;
}
return Short.parseShort(string);
case INTEGER:
if (string.length() == 0) {
return null;
}
return Integer.parseInt(string);
case BIGINT:
if (string.length() == 0) {
return null;
}
return Long.parseLong(string);
case FLOAT:
if (string.length() == 0) {
return null;
}
return Float.parseFloat(string);
case DOUBLE:
if (string.length() == 0) {
return null;
}
return Double.parseDouble(string);
case DECIMAL:
if (string.length() == 0) {
return null;
}
return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
case DATE:
if (string.length() == 0) {
return nu
分享題目:十分鐘教你寫一個(gè)數(shù)據(jù)庫
文章分享:http://www.dlmjj.cn/article/dhjcihh.html