What Apache Flink
Apache Flink 是一個(gè)==分布式大數(shù)據(jù)處理引擎==,可對(duì)==有限數(shù)據(jù)流和無限數(shù)據(jù)流==進(jìn)行==有狀態(tài)計(jì)算==??刹渴鹪?=各種集群環(huán)境==,對(duì)各種大小的數(shù)據(jù)規(guī)模進(jìn)行快速計(jì)算。
分布式大數(shù)據(jù)處理引擎
-
是一個(gè)分布式的、高可用的用于大數(shù)據(jù)處理的計(jì)算引擎
有限流和無限流
- 有限流:有始有終的數(shù)據(jù)流。即傳統(tǒng)意義上的批數(shù)據(jù),進(jìn)行批處理
-
無限流:有始無終的數(shù)據(jù)流。即現(xiàn)實(shí)生活中的流數(shù)據(jù),進(jìn)行流處理
有狀態(tài)計(jì)算
-
良好的狀態(tài)機(jī)制,進(jìn)行較好的容錯(cuò)處理和任務(wù)恢復(fù)。同時(shí)實(shí)現(xiàn) Exactly-Once 語義。
各種集群環(huán)境
-
可部署standalone、Flink on yarn、Flink on Mesos、Flink on k8s等等
Flink Application
Streams
數(shù)據(jù)在真實(shí)世界中是不停產(chǎn)生不停發(fā)出的,所以數(shù)據(jù)處理也應(yīng)該還原真實(shí),做到真正的流處理。而批處理則是流處理的特殊情況
- 即上面說的有限流和無限流,貼官網(wǎng)圖說明。
State
在流計(jì)算場景中,其實(shí)所有流計(jì)算本質(zhì)上都是增量計(jì)算(Incremental Processing)。
例如,計(jì)算前幾個(gè)小時(shí)或者一直以來的某個(gè)指標(biāo)(PV、UV等),計(jì)算完一條數(shù)據(jù)之后需要保存其計(jì)算結(jié)果即狀態(tài),以便和下一條計(jì)算結(jié)果合并。
另外,保留計(jì)算狀態(tài),進(jìn)行 CheckPoint 可以很好地實(shí)現(xiàn)流計(jì)算的容錯(cuò)和任務(wù)恢復(fù),也可以實(shí)現(xiàn)Exactly Once處理語義
Time
三類時(shí)間:
- Event Time:事件真實(shí)產(chǎn)生的時(shí)間
- Processing Time:事件被 Flink 程序處理的時(shí)間
- Ingestion Time:事件進(jìn)入到 Flink 程序的時(shí)間
API
API分三層,越接近SQL層,越抽象,靈活性越低,但更簡單易用。
- SQL/Table層:直接使用SQL進(jìn)行數(shù)據(jù)處理
- DataStream/DataSet API:最核心的API,對(duì)流數(shù)據(jù)進(jìn)行處理,可在其上實(shí)現(xiàn)自定義的WaterMark、Windows、State等操作
- ProcessFunction:也叫RunTime層,最底層的API,帶狀態(tài)的事件驅(qū)動(dòng)。
Flink Architecture
Data Pipeline Applications
即 real-time Stream ETL:流式ETL拆分。
通常,ETL都是通過定時(shí)任務(wù)調(diào)度SQL文件或者M(jìn)R任務(wù)來執(zhí)行的。在實(shí)時(shí)ETL場景中,將批量ETL邏輯寫到流處理中,分散計(jì)算壓力和提高計(jì)算結(jié)果的實(shí)時(shí)性。
多用于實(shí)時(shí)數(shù)倉、實(shí)時(shí)搜索引擎等
Data Analytics Applications
即數(shù)據(jù)分析,包括流式數(shù)據(jù)分析和批量數(shù)據(jù)分析。例如實(shí)時(shí)報(bào)表、實(shí)時(shí)大屏。
Event-driven Applications
即事件驅(qū)動(dòng)應(yīng)用,在一個(gè)有狀態(tài)的計(jì)算過程中,通常情況下都是將狀態(tài)保存在第三方系統(tǒng)(如Hbase Redis等)中。
而在Flink中,狀態(tài)是保存在內(nèi)部程序中,減少了狀態(tài)存取的不必要的I/O開銷,更大吞吐量和更低延時(shí)。
第一個(gè) Flink 程序
開發(fā)環(huán)境要求
主要是Java環(huán)境和Maven環(huán)境。Java要求JDK1.8,Maven要求3.0以上,開發(fā)工具推薦使用 ItelliJ IDEA,社區(qū)說法:Eclipse在Java和Scala混合編程下有問題,故不推薦。
代碼示例:
package source.streamDataSource; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketWindowWordCount { public static void main(String[] args) throws Exception{ if(args.length!=2){ System.err.println("Usage:nSocketWindowWordCount hostname port"); } // 獲取程序參數(shù) String hostname = args[0]; int port = Integer.parseInt(args[1]); // 入口類,用于設(shè)置環(huán)境和參數(shù)等 StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); // 設(shè)置 Time 類型 see.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 從指定 IP 端口 讀取流數(shù)據(jù),返回一個(gè) DataStreamSource DataStreamSource<String> text = see.socketTextStream(hostname, port, "n", 5); // 在 DataStreamSource 上做操作即 transformation DataStream<Tuple2<String, Integer>> windowCount = text // flatMap , FlatMap接口的實(shí)現(xiàn):將獲取到的數(shù)據(jù)分割,并每個(gè)元素組合成 (word, count)形式 .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : value.split("\s")) { collector.collect(Tuple2.of(word, 1)); } } }) // 按位置指定key,進(jìn)行聚合操作 .keyBy(0) // 指定窗口大小 .timeWindow(Time.seconds(5)) // 在每個(gè)key上做sum // reduce 和 sum 的實(shí)現(xiàn) // .reduce(new ReduceFunction<Tuple2<String, Integer>>() { // @Override // public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception { // return Tuple2.of(stringIntegerTuple2.f0, stringIntegerTuple2.f1+t1.f1); // } // }); .sum(1); // 一個(gè)線程執(zhí)行 windowCount.print().setParallelism(1); see.execute("Socket Window WordCount"); // 其他 transformation 操作示例 // windowCount // .map(new MapFunction<Tuple2<String,Integer>, String>() { // @Override // public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { // return stringIntegerTuple2.f0; // } // }) // .print(); // // text.filter(new FilterFunction<String>() { // @Override // public boolean filter(String s) throws Exception { // return s.contains("h"); // } // }) // .print(); // // SplitStream<String> split = text.split(new OutputSelector<String>() { // @Override // public Iterable<String> select(String value) { // ArrayList<String> strings = new ArrayList<>(); // if (value.contains("h")) // strings.add("Hadoop"); // else // strings.add("noHadoop"); // return strings; // // } // }); // // split.select("hadoop").print(); // split.select("noHadoop").map(new MapFunction<String, String>() { // @Override // public String map(String s) throws Exception { // // return s.toUpperCase(); // } // }).print(); } }