日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
13.sparkstreaming之快速入門-創(chuàng)新互聯(lián)

簡(jiǎn)介

Spark Streaming是Spark核心API的擴(kuò)展,可以實(shí)現(xiàn)可伸縮、高吞吐量、具備容錯(cuò)機(jī)制的實(shí)時(shí)流時(shí)數(shù)據(jù)的處理。支持多種數(shù)據(jù)源,比如Kafka、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets。

網(wǎng)站前端頁(yè)面設(shè)計(jì)會(huì)用DIV+CSS架構(gòu),布局出來(lái)的網(wǎng)站外觀簡(jiǎn)潔大氣。HTML靜態(tài),html5+CSS3網(wǎng)站,自適應(yīng)電腦、手機(jī)、平板,符合用戶體驗(yàn)的習(xí)慣,更容易與用戶產(chǎn)生互動(dòng)。專業(yè)網(wǎng)絡(luò)公司的服務(wù)理念是“高性價(jià)比建站,讓企業(yè)網(wǎng)站具備營(yíng)銷價(jià)值,促進(jìn)長(zhǎng)期合作共贏模式”。

可以使用諸如map、reduce、join和window等高級(jí)函數(shù)進(jìn)行復(fù)雜算法(比如,機(jī)器學(xué)習(xí)和圖計(jì)算)的處理。最后還可以將處理結(jié)果存儲(chǔ)到文件系統(tǒng),數(shù)據(jù)庫(kù)和儀表盤。

13.spark streaming之快速入門

架構(gòu)與抽象

抽象

Spark Streaming接收實(shí)時(shí)流的數(shù)據(jù),并根據(jù)一定的時(shí)間間隔拆分成一批批的數(shù)據(jù),然后通過(guò)Spark Engine處理這些批數(shù)據(jù),最終得到處理后的一批批結(jié)果數(shù)據(jù)。

13.spark streaming之快速入門

Spark Streaming提供了一個(gè)叫做DStream(discretized stream,離散流)的抽象概念,DStream由一系列的RDD組成,表示每個(gè)批次中連續(xù)的數(shù)據(jù)流。DStream可以從輸入源(比如,Kafka、Flume、Kinesis等)中創(chuàng)建,也可以從其他DStream中使用高級(jí)算子操作轉(zhuǎn)換生成。

13.spark streaming之快速入門

DStream的所有操作其實(shí)都是對(duì)DStream中所有RDD的操作。比如,在單詞統(tǒng)計(jì)案例中,flatMap轉(zhuǎn)化操作會(huì)應(yīng)用到每個(gè)行RDD上來(lái)生成單詞RDD。

13.spark streaming之快速入門

架構(gòu)

13.spark streaming之快速入門

  • Receiver:Spark Streaming內(nèi)置的數(shù)據(jù)流接收器或自定義接收器,用于從數(shù)據(jù)源接收源源不斷的數(shù)據(jù)流。

  • CurrentBuffer:用于緩存輸入流接收器接收的數(shù)據(jù)流。

  • BlockIntervalTimer:一個(gè)定時(shí)器,用于將CurrentBuffer中緩存的數(shù)據(jù)流封裝為Block后放入blocksForPushing隊(duì)列中。

  • BlocksForPushing:待處理的Block

  • BlockPushingThread:此線程每隔100毫秒從BlocksForPushing隊(duì)列中取出一個(gè)Block存入存儲(chǔ)系統(tǒng),并緩存到ReceivedBlockQueue隊(duì)列中。

  • Block Batch:Block批次,按照批次時(shí)間間隔,從ReceivedBlockQueue隊(duì)列中獲取一批Block。

  • JobGenerator:Job生成器,用于給每一批Block生成一個(gè)Job。

DStream 轉(zhuǎn)化操作

DStream轉(zhuǎn)化操作分為無(wú)狀態(tài)(stateless)和有狀態(tài)(stateful)兩種。

  • 無(wú)狀態(tài)轉(zhuǎn)化操作中,每個(gè)批次的處理不依賴于之前批次的數(shù)據(jù)。

  • 有狀態(tài)轉(zhuǎn)化操作需要使用之前批次的數(shù)據(jù)或中間結(jié)果來(lái)計(jì)算當(dāng)前批次的數(shù)據(jù)。

無(wú)狀態(tài)轉(zhuǎn)化操作

無(wú)狀態(tài)轉(zhuǎn)化操作就是把簡(jiǎn)單的RDD轉(zhuǎn)化操作應(yīng)用到每個(gè)批次上,轉(zhuǎn)化DStream中的每個(gè)RDD。

常用的無(wú)狀態(tài)轉(zhuǎn)化操作

函數(shù)名稱 作用 scala示例
map()對(duì)DStream中的每個(gè)元素應(yīng)用指定函數(shù),返回由各元素輸出的元素組成的DStreamds.map(x => x+1)
flatMap()對(duì)DStream中的每個(gè)元素應(yīng)用指定函數(shù),返回由各元素輸出的迭代器組成的DStreamds.flatMap(x => x.split(" "))
filter返回由給定DStream中通過(guò)篩選的元素組成的DStreamds.filter(x => x!=1)
repartition()改變DStream的分區(qū)數(shù)ds.repartition(10)
reduceByKey將每個(gè)批次中鍵相同的記錄聚合ds.reduceByKey((x,y) => x+y)
groupByKey將每個(gè)批次中的記錄根據(jù)鍵分組ds.groupByKey()
  • 使用map()和reduceByKey()在每個(gè)時(shí)間區(qū)間中對(duì)日志根據(jù)IP地址進(jìn)行計(jì)數(shù)。

    • scala
    //假設(shè)ApacheAccessingLog是用來(lái)從Apache日志中解析條目的工具類
    val accessLogDStream = logData.map(line => ApacheAccessingLog.parseFromLogLine(line))
    val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1)
    val ipCountsDStream = ipDStream.reduceByKey((x,y) => x+y)
    • java
    //假設(shè)ApacheAccessingLog是用來(lái)從Apache日志中解析條目的工具類
    static final class IpTuple implements PairFunction {
        public Tuple2 call(ApacheAccessLog log) {
            return new Tuple2<>(log.getIpAddress(), 1L);
        }
    }
    
    JavaDStream accessLogDStream = logData.map(new ParseFromLogLine());
    JavaPairDStream ipDStream = accessLogDStream.mapToPair(new IpTuple());
    JavaPairDStream(String, Long) ipCountsDStream = ipDStream.reduceByKey(new LongSumReducer());
  • 以IP地址為鍵,將請(qǐng)求計(jì)數(shù)的數(shù)據(jù)和傳輸數(shù)據(jù)量的數(shù)據(jù)連接起來(lái)

    • scala
    val ipBytesDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), entry.getContentSize()))
    val ipBytesSumDStream = ipBytesDStream.reduceByKey((x,y) => x+y)
    val ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream)
    • java
    JavaPairDStream ipBytesDStream = accessLogsDStream.mapToPair(new IpContentTuple());
    JavaPairDStream ipBytesSumDStream = ipBytesDStream.reduceByKey(new LongSumReducer());
    JavaPairDStream> ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream);
  • 使用transform()操作實(shí)現(xiàn)自定義轉(zhuǎn)化操作,從日志記錄中提取異常值。

    • scala
    val outlierDStream = accessLogsDStream.transform{
        rdd => extractOutliers(rdd)
    }
    • java
    JavaPairDStream ipRawDStream = accessLogsDStream.transform(
        new Function, JavaRDD>() {
            public JavaPairRDD call(JavaRDD rdd) {
                return extractOutliers(rdd);
            }
        }
    );

有狀態(tài)轉(zhuǎn)化操作

DStream的有狀態(tài)轉(zhuǎn)化操作是跨時(shí)間區(qū)間跟蹤數(shù)據(jù)的操作,先前批次的數(shù)據(jù)也被用來(lái)在新的批次中計(jì)算結(jié)果。

有狀態(tài)轉(zhuǎn)化操作主要有兩種類型:滑動(dòng)窗口和updateStateByKey()。前者以一個(gè)時(shí)間階段為滑動(dòng)窗口進(jìn)行操作,后者用來(lái)跟蹤每個(gè)鍵的狀態(tài)變化。

設(shè)置檢查點(diǎn)

有狀態(tài)轉(zhuǎn)化操作需要在StreamingContext中打開檢查點(diǎn)機(jī)制確保容錯(cuò)性。

ssc.checkpoint("hdfs://...")
基于窗口的轉(zhuǎn)化操作
簡(jiǎn)介

基于窗口的操作會(huì)在一個(gè)比StreamingContext批次間隔更長(zhǎng)的時(shí)間范圍內(nèi),通過(guò)整合多個(gè)批次的結(jié)果,計(jì)算出整個(gè)窗口的結(jié)果。

基于窗口的轉(zhuǎn)化操作需要兩個(gè)參數(shù),分別是窗口時(shí)長(zhǎng)和滑動(dòng)時(shí)長(zhǎng)。兩者都是批次間隔的整數(shù)倍。

  • 窗口時(shí)長(zhǎng):控制每次計(jì)算最近的windowDuration/batchInterval個(gè)批次的數(shù)據(jù)。

  • 滑動(dòng)步長(zhǎng):默認(rèn)值與批次間隔相等。用來(lái)控制對(duì)新DStream進(jìn)行計(jì)算的時(shí)間間隔。
簡(jiǎn)單案例
  • 使用window()對(duì)窗口進(jìn)行計(jì)數(shù)

    • scala
    val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))
    val windowCounts = accessLogsWindow.count()
    • java
    JavaDStream accessLogsWindow = accessLogsDStream.window(Durations.seconds(30), Duration.seconds(10));
    JavaDStream windowCounts = accessLogsWindow.count();
  • 使用reduceByKeyAndWindow對(duì)每個(gè)IP地址的訪問(wèn)量計(jì)數(shù)

    • scala
    val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
    val ipCountDStream = ipDStream.reduceByKeyAndWindow(
        {(x,y) => x+y}, //加入新進(jìn)入窗口的批次中的元素
        {(x,y) => x-y}, //移除離開窗口的老批次中的元素
        Seconds(30), //窗口時(shí)長(zhǎng)
        Seconds(10) //滑動(dòng)步長(zhǎng)
    )
    • java
    class ExtractIp extends PairFunction {
        public Tuple2 call(ApacheAccessLog entry) {
            return new Tuple2(entry.getIpAddress(), 1L);
        }
    }
    
    class AddLongs extends Function2() {
        public Long call(Long v1, Long v2) {
            return v1 + v2;
        }
    }
    
    class SubtractLongs extends Function2() {
        public Long call(Long v1, Long v2) {
            return v1 - v2;
        }
    }
    
    JavaPairDStream ipAddressPairDStream = accessLogsDStream.mapToPair(new ExtractIp());
    JavaPairDStream ipCountDStream = ipAddressPairDStream.reduceByKeyAndWindow(
        new AddLongs(), //加上新進(jìn)入窗口的批次中的元素
        new SubtractLongs(), //移除離開窗口的老批次中的元素
        Durations.seconds(30), //窗口時(shí)長(zhǎng)
        Durations.seconds(10) //滑動(dòng)步長(zhǎng)
    )
  • 使用countByWindow和countByValueAndWindow對(duì)窗口計(jì)數(shù)

    • scala

      val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
      val ipAddre***equestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))
      val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
    • java
    JavaDStream ip = accessLogsDStream.map(new Function() {
       public String call(ApacheAccessLog entry) {
            return entry.getIpAddress();
       }
    });
    
    JavaDStream requestCount = accessLogsDStream.countByWindow(Dirations.seconds(30), Durations.seconds(10));
    JavaPairDStream ipAddre***equestCount = ip.countByValueAndWindow(Dirations.seconds(30), Durations.seconds(10));
updateStateByKey轉(zhuǎn)化操作
簡(jiǎn)介

updateStateByKey提供了跨批次維護(hù)狀態(tài)的功能,用于鍵值對(duì)形式的DStream。

updateStateByKey提供了一個(gè)update(events, oldState)函數(shù),接收與某鍵相關(guān)的事件及該鍵之前對(duì)應(yīng)的狀態(tài),返回該鍵對(duì)應(yīng)的新狀態(tài)。

  • events:當(dāng)前批次中收到的事件列表
  • oldState:一個(gè)可選的狀態(tài)對(duì)象,存放在Option內(nèi);如果一個(gè)鍵沒(méi)有之前的狀態(tài),這個(gè)值為空。
  • newState:由函數(shù)返回,也以O(shè)ption形式存在;可以返回一個(gè)空的Option表示刪除該狀態(tài)。
簡(jiǎn)單案例

使用updateStateByKey()跟蹤日志消息中各HTTP響應(yīng)代碼的計(jì)數(shù)。

  • scala
def updateRunningSum(values: Seq[Long], state: Option[Long]) = {
    Some(state.getOrElse(0L) + values.size)
}

val responseCodeDStream = accessLogsDStream.map(log => (log.getResponseCode(), 1L))
val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)
  • java
class UpdateRunningSum implements Function2, Optional, Optional> {
    public Optional call(List nums, Optional current) {
        long sum = current.or(0L);
        return Optional.of(sum + nums.size());
    }
};

JavaPairDStream responseCodeCountDStream = accessLogsDStream.mapToPair(
    new PairFunction() {
        public Tuple2 call(ApacheAccessLog log) {
            return new Tuple2(log.getResponseCode(), 1L);
        }
    }
).updateStateByKey(new UpdateRunningSum());

DStream 行動(dòng)操作

DStream行動(dòng)操作同RDD的行動(dòng)操作。比如,將DStream保存為SequenceFile文件。

  • scala
val writableIpAddre***equestCount = ipAddre***equestCount.map{
    (ip, count) => 
  • java
JavaPairDStream writableDStream = ipDStream.mapToPair(
    new PairFunction, Text, LongWritable>() {
        public Tuple2 call(Tuple2 e) {
            return new Tuple2(new Text(e._1()), new LongWritable(e._2()));
        }
    }
);

writableDStream.saveAsHadoopFiles("outputDir", "txt", Text.class, LongWritable.class, SequenceFileOutputFormat.class);

忠于技術(shù),熱愛分享。歡迎關(guān)注公眾號(hào):java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。

13.spark streaming之快速入門

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。


文章名稱:13.sparkstreaming之快速入門-創(chuàng)新互聯(lián)
本文路徑:http://www.dlmjj.cn/article/epggs.html