日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
像Flink一樣使用Redis

Apache Flink和 Redis 是兩個(gè)強(qiáng)大的工具,可以一起使用來(lái)構(gòu)建可以處理大量數(shù)據(jù)的實(shí)時(shí)數(shù)據(jù)處理管道。Flink 為處理數(shù)據(jù)流提供了一個(gè)高度可擴(kuò)展和容錯(cuò)的平臺(tái),而 Redis 提供了一個(gè)高性能的內(nèi)存數(shù)據(jù)庫(kù),可用于存儲(chǔ)和查詢(xún)數(shù)據(jù)。在本文中,將探討如何使用 Flink 來(lái)使用異步函數(shù)調(diào)用 Redis,并展示如何使用它以非阻塞方式將數(shù)據(jù)推送到 Redis。

成都創(chuàng)新互聯(lián)長(zhǎng)期為上千余家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開(kāi)放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為吳川企業(yè)提供專(zhuān)業(yè)的成都網(wǎng)站建設(shè)、做網(wǎng)站,吳川網(wǎng)站改版等技術(shù)服務(wù)。擁有10余年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開(kāi)發(fā)。

Redis的故事

“Redis:不僅僅是一個(gè)緩存

Redis 是一種功能強(qiáng)大的 NoSQL 內(nèi)存數(shù)據(jù)結(jié)構(gòu)存儲(chǔ),已成為開(kāi)發(fā)人員的首選工具。雖然它通常被認(rèn)為只是一個(gè)緩存,但 Redis 遠(yuǎn)不止于此。它可以作為數(shù)據(jù)庫(kù)、消息代理和緩存三者合一。

Redis 的優(yōu)勢(shì)之一是它的多功能性。它支持各種數(shù)據(jù)類(lèi)型,包括字符串、列表、集合、有序集合、哈希、流、HyperLogLogs 和位圖。Redis 還提供地理空間索引和半徑查詢(xún),使其成為基于位置的應(yīng)用程序的寶貴工具。

Redis 的功能超出了它的數(shù)據(jù)模型。它具有內(nèi)置的復(fù)制、Lua 腳本和事務(wù),并且可以使用 Redis Cluster 自動(dòng)分區(qū)數(shù)據(jù)。此外,Redis 通過(guò) Redis Sentinel 提供高可用性。

注意:在本文中,將更多地關(guān)注Redis集群模式

Redis 集群使用帶哈希槽的算法分片來(lái)確定哪個(gè)分片擁有給定的鍵并簡(jiǎn)化添加新實(shí)例的過(guò)程。同時(shí),它使用 Gossiping 來(lái)確定集群的健康狀況,如果主節(jié)點(diǎn)沒(méi)有響應(yīng),可以提升輔助節(jié)點(diǎn)以保持集群健康。必須有奇數(shù)個(gè)主節(jié)點(diǎn)和兩個(gè)副本才能進(jìn)行穩(wěn)健設(shè)置,以避免腦裂現(xiàn)象(集群無(wú)法決定提升誰(shuí)并最終做出分裂決定)

為了與 Redis 集群對(duì)話,將使用lettuce和 Redis Async Java 客戶端。

Flink 的故事

Apache Flink 是一個(gè)開(kāi)源、統(tǒng)一的流處理和批處理框架,旨在處理實(shí)時(shí)、高吞吐量和容錯(cuò)數(shù)據(jù)處理。它建立在 Apache Gelly 框架之上,旨在支持有界和無(wú)界流上的復(fù)雜事件處理和有狀態(tài)計(jì)算,它的快速之處在于其利用內(nèi)存中性能和異步檢查本地狀態(tài)。

故事的主人公

與數(shù)據(jù)庫(kù)的異步交互是流處理應(yīng)用程序的游戲規(guī)則改變者。通過(guò)這種方法,單個(gè)函數(shù)實(shí)例可以同時(shí)處理多個(gè)請(qǐng)求,從而允許并發(fā)響應(yīng)并顯著提高吞吐量。通過(guò)將等待時(shí)間與其他請(qǐng)求和響應(yīng)重疊,處理管道變得更加高效。

我們將以電商數(shù)據(jù)為例,計(jì)算24小時(shí)滑動(dòng)窗口中每個(gè)品類(lèi)的銷(xiāo)售額,滑動(dòng)時(shí)間為30秒,下沉到Redis,以便更快地查找下游服務(wù)。

充足的數(shù)據(jù)集

Category, TimeStamp
Electronics,1679832334
Furniture,1679832336
Fashion,1679832378
Food,16798323536

Flink Kafka 消費(fèi)者類(lèi)

package Aysnc_kafka_redis;

import AsyncIO.RedisSink;
import akka.japi.tuple.Tuple3;
import deserializer.Ecommdeserialize;
import model.Ecomm;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;

public class FlinkAsyncRedis {

public static void main(String[] args) throws Exception {


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Ecommdeserialize jsonde = new Ecommdeserialize();

KafkaSource source = KafkaSource.builder()
.setTopics("{dummytopic}")
.setBootstrapServers("{dummybootstrap}")
.setGroupId("test_flink")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(jsonde)
.build();


DataStream orderData = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");


orderData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
@Override
public long extractTimestamp(Ecomm element) {
return element.getEventTimestamp(); // extract watermark column from stream
}
});

SingleOutputStreamOperator> aggregatedData = orderData.keyBy(Ecomm::getCategory)
.window(SlidingEventTimeWindows.of(Time.hours(24),Time.seconds(30)))
.apply((WindowFunction, String, TimeWindow>) (key, window, input, out) -> {
long count = 0;
for (Ecomm event : input) {
count++; // increment the count for each event in the window
}
out.collect(new Tuple3<>(key, window.getEnd(), count)); // output the category, window end time, and count
});


// calling async I/0 operator to sink data to redis in UnOrdered way
SingleOutputStreamOperator sinkResults = AsyncDataStream.unorderedWait(aggregatedData,new RedisSink(
"{redisClusterUrl}"),
1000, // the timeout defines how long an asynchronous operation take before it is finally considered failed
TimeUnit.MILLISECONDS,
100); //capacity This parameter defines how many asynchronous requests may be in progress at the same time.

sinkResults.print(); // print out the redis set response stored in the future for every key

env.execute("RedisAsyncSink"); // you will be able to see your job running on cluster by this name


}

}

Redis 設(shè)置鍵異步 I/0 運(yùn)算符

package AsyncIO;

import akka.japi.tuple.Tuple3;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import lombok.AllArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import scala.collection.immutable.List;

import java.util.ArrayList;
import java.util.Collections;

@AllArgsConstructor
public class RedisSink extends RichAsyncFunction, String> {

String redisUrl;

public RedisSink(String redisUrl){
this.redisUrl=redisUrl;
}

private transient RedisClusterClient client = null;
private transient StatefulRedisClusterConnection clusterConnection = null;
private transient RedisAdvancedClusterAsyncCommands asyncCall = null;


// method executes any operator-specific initialization
@Override
public void open(Configuration parameters) {
if (client == null ) {
client = RedisClusterClient.create(redisUrl);
}
if (clusterConnection == null) {
clusterConnection = client.connect();
}
if (asyncCall == null) {
asyncCall = clusterConnection.async();
}
}

// core logic to set key in redis using async connection and return result of the call via ResultFuture
@Override
public void asyncInvoke(Tuple3 stream, ResultFuture resultFuture) {

String productKey = stream.t1();
System.out.println("RedisKey:" + productKey); //for logging
String count = stream.t3().toString();
System.out.println("Redisvalue:" + count); //for logging
RedisFuture setResult = asyncCall.set(productKey,count);

setResult.whenComplete((result, throwable) -> {if(throwable!=null){
System.out.println("Callback from redis failed:" + throwable);
resultFuture.complete(new ArrayList<>());
}
else{
resultFuture.complete(new ArrayList(Collections.singleton(result)));
}});
}

// method closes what was opened during initialization to free any resources
// held by the operator (e.g. open network connections, io streams)
@Override
public void close() throws Exception {
client.close();
}

}

用例:

  • 數(shù)據(jù)科學(xué)模型可以使用流式傳輸?shù)?Redis 的數(shù)據(jù)來(lái)查找和生成更多在銷(xiāo)售季節(jié)經(jīng)常銷(xiāo)售的類(lèi)別的產(chǎn)品。
  • 它可用于在網(wǎng)頁(yè)上展示圖表和數(shù)字作為銷(xiāo)售統(tǒng)計(jì)數(shù)據(jù),以在用戶中產(chǎn)生積極購(gòu)買(mǎi)的動(dòng)力。

要點(diǎn):

  • Flink 為處理數(shù)據(jù)流提供了一個(gè)高度可擴(kuò)展和容錯(cuò)的平臺(tái),而 Redis 提供了一個(gè)高性能的內(nèi)存數(shù)據(jù)庫(kù),可用于存儲(chǔ)和查詢(xún)數(shù)據(jù)。
  • 異步編程可用于通過(guò)允許對(duì)外部系統(tǒng)(如 Redis)進(jìn)行非阻塞調(diào)用來(lái)提高數(shù)據(jù)處理管道的性能。
  • 兩者的結(jié)合可能有助于帶來(lái)實(shí)時(shí)數(shù)據(jù)決策文化。

網(wǎng)頁(yè)名稱(chēng):像Flink一樣使用Redis
網(wǎng)頁(yè)URL:http://www.dlmjj.cn/article/cdggdcs.html