新聞中心
這篇文章主要講解了“1、如何用flink的table和sql構(gòu)建pom文件”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“1、如何用flink的table和sql構(gòu)建pom文件”吧!

成都創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括江干網(wǎng)站建設(shè)、江干網(wǎng)站制作、江干網(wǎng)頁制作以及江干網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,江干網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到江干省份的部分城市,未來相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
構(gòu)建pom文件
4.0.0 org.example flinksqldemo 1.0-SNAPSHOT UTF-8 UTF-8 2.11 2.11.8 0.10.2.1 1.12.0 2.7.3 compile org.apache.maven.plugins maven-compiler-plugin 8 8 org.apache.flink flink-table-planner-blink_2.11 1.12.0 org.apache.flink flink-java ${flink.version} ${setting.scope} org.apache.flink flink-streaming-java_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-clients_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-connector-kafka-0.10_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} ${setting.scope} org.apache.flink flink-connector-filesystem_${scala.binary.version} ${flink.version} org.apache.kafka kafka_${scala.binary.version} ${kafka.version} ${setting.scope} org.apache.hadoop hadoop-common ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-hdfs ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-client ${hadoop.version} ${setting.scope} org.slf4j slf4j-api 1.7.25 com.alibaba fastjson 1.2.72 redis.clients jedis 2.7.3 com.google.guava guava 29.0-jre
2、編寫代碼
package com.jd.data;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");
// DataStreamSource stream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator map = stream.map(new MapFunction() {
public SensorReading map(String s) throws Exception {
String[] split = s.split(",");
return new SensorReading(split[0], split[1], split[2]);
}
});
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 使用 table api
// Table table = tableEnv.fromDataStream(map);
// table.printSchema();
// Table select = table.select("a,b");
// 使用 sql api
tableEnv.createTemporaryView("test", map);
Table select = tableEnv.sqlQuery(" select a, b from test");
DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);
sensorReading2DataStream.map(new MapFunction() {
@Override
public Object map(SensorReading2 value) throws Exception {
System.out.println(value.a+" "+ value.b);
return null;
}
});
env.execute();
}
} package com.jd.data;
public class SensorReading {
public String a;
public String b;
public String c;
public SensorReading(){
}
public SensorReading(String a, String b, String c) {
this.a = a;
this.b = b;
this.c = c;
}
public String getA() {
return a;
}
public void setA(String a) {
this.a = a;
}
public String getB() {
return b;
}
public void setB(String b) {
this.b = b;
}
public String getC() {
return c;
}
public void setC(String c) {
this.c = c;
}
}package com.jd.data;
public class SensorReading2 {
public String a;
public String b;
public SensorReading2(){
}
public SensorReading2(String a, String b) {
this.a = a;
this.b = b;
}
public String getA() {
return a;
}
public void setA(String a) {
this.a = a;
}
public String getB() {
return b;
}
public void setB(String b) {
this.b = b;
}
}注意:pojo 中屬性必須是public的, 包含無參構(gòu)造器
感謝各位的閱讀,以上就是“1、如何用flink的table和sql構(gòu)建pom文件”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)1、如何用flink的table和sql構(gòu)建pom文件這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
文章標(biāo)題:1、如何用flink的table和sql?構(gòu)建pom文件
文章位置:http://www.dlmjj.cn/article/iegiei.html


咨詢
建站咨詢
