What Apache Flink
Apache Flink 是一個==分布式大數(shù)據(jù)處理引擎==,可對==有限數(shù)據(jù)流和無限數(shù)據(jù)流==進(jìn)行==有狀態(tài)計算==??刹渴鹪?=各種集群環(huán)境==,對各種大小的數(shù)據(jù)規(guī)模進(jìn)行快速計算。
分布式大數(shù)據(jù)處理引擎
-
是一個分布式的、高可用的用于大數(shù)據(jù)處理的計算引擎
有限流和無限流
- 有限流:有始有終的數(shù)據(jù)流。即傳統(tǒng)意義上的批數(shù)據(jù),進(jìn)行批處理
-
無限流:有始無終的數(shù)據(jù)流。即現(xiàn)實生活中的流數(shù)據(jù),進(jìn)行流處理
有狀態(tài)計算
-
良好的狀態(tài)機(jī)制,進(jìn)行較好的容錯處理和任務(wù)恢復(fù)。同時實現(xiàn) Exactly-Once 語義。
各種集群環(huán)境
-
可部署standalone、Flink on yarn、Flink on Mesos、Flink on k8s等等
Flink Application
Streams
數(shù)據(jù)在真實世界中是不停產(chǎn)生不停發(fā)出的,所以數(shù)據(jù)處理也應(yīng)該還原真實,做到真正的流處理。而批處理則是流處理的特殊情況
- 即上面說的有限流和無限流,貼官網(wǎng)圖說明。
State
在流計算場景中,其實所有流計算本質(zhì)上都是增量計算(Incremental Processing)。
例如,計算前幾個小時或者一直以來的某個指標(biāo)(PV、UV等),計算完一條數(shù)據(jù)之后需要保存其計算結(jié)果即狀態(tài),以便和下一條計算結(jié)果合并。
另外,保留計算狀態(tài),進(jìn)行 CheckPoint 可以很好地實現(xiàn)流計算的容錯和任務(wù)恢復(fù),也可以實現(xiàn)Exactly Once處理語義
Time
三類時間:
- Event Time:事件真實產(chǎn)生的時間
- Processing Time:事件被 Flink 程序處理的時間
- Ingestion Time:事件進(jìn)入到 Flink 程序的時間
API
API分三層,越接近SQL層,越抽象,靈活性越低,但更簡單易用。
- SQL/Table層:直接使用SQL進(jìn)行數(shù)據(jù)處理
- DataStream/DataSet API:最核心的API,對流數(shù)據(jù)進(jìn)行處理,可在其上實現(xiàn)自定義的WaterMark、Windows、State等操作
- ProcessFunction:也叫RunTime層,最底層的API,帶狀態(tài)的事件驅(qū)動。
Flink Architecture
Data Pipeline Applications
即 real-time Stream ETL:流式ETL拆分。
通常,ETL都是通過定時任務(wù)調(diào)度SQL文件或者M(jìn)R任務(wù)來執(zhí)行的。在實時ETL場景中,將批量ETL邏輯寫到流處理中,分散計算壓力和提高計算結(jié)果的實時性。
多用于實時數(shù)倉、實時搜索引擎等
Data Analytics Applications
即數(shù)據(jù)分析,包括流式數(shù)據(jù)分析和批量數(shù)據(jù)分析。例如實時報表、實時大屏。
Event-driven Applications
即事件驅(qū)動應(yīng)用,在一個有狀態(tài)的計算過程中,通常情況下都是將狀態(tài)保存在第三方系統(tǒng)(如Hbase Redis等)中。
而在Flink中,狀態(tài)是保存在內(nèi)部程序中,減少了狀態(tài)存取的不必要的I/O開銷,更大吞吐量和更低延時。
第一個 Flink 程序
開發(fā)環(huán)境要求
主要是Java環(huán)境和Maven環(huán)境。Java要求JDK1.8,Maven要求3.0以上,開發(fā)工具推薦使用 ItelliJ IDEA,社區(qū)說法:Eclipse在Java和Scala混合編程下有問題,故不推薦。
代碼示例:
package source.streamDataSource; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketWindowWordCount { public static void main(String[] args) throws Exception{ if(args.length!=2){ System.err.println("Usage:nSocketWindowWordCount hostname port"); } // 獲取程序參數(shù) String hostname = args[0]; int port = Integer.parseInt(args[1]); // 入口類,用于設(shè)置環(huán)境和參數(shù)等 StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); // 設(shè)置 Time 類型 see.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 從指定 IP 端口 讀取流數(shù)據(jù),返回一個 DataStreamSource DataStreamSource<String> text = see.socketTextStream(hostname, port, "n", 5); // 在 DataStreamSource 上做操作即 transformation DataStream<Tuple2<String, Integer>> windowCount = text // flatMap , FlatMap接口的實現(xiàn):將獲取到的數(shù)據(jù)分割,并每個元素組合成 (word, count)形式 .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : value.split("\s")) { collector.collect(Tuple2.of(word, 1)); } } }) // 按位置指定key,進(jìn)行聚合操作 .keyBy(0) // 指定窗口大小 .timeWindow(Time.seconds(5)) // 在每個key上做sum // reduce 和 sum 的實現(xiàn) // .reduce(new ReduceFunction<Tuple2<String, Integer>>() { // @Override // public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception { // return Tuple2.of(stringIntegerTuple2.f0, stringIntegerTuple2.f1+t1.f1); // } // }); .sum(1); // 一個線程執(zhí)行 windowCount.print().setParallelism(1); see.execute("Socket Window WordCount"); // 其他 transformation 操作示例 // windowCount // .map(new MapFunction<Tuple2<String,Integer>, String>() { // @Override // public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { // return stringIntegerTuple2.f0; // } // }) // .print(); // // text.filter(new FilterFunction<String>() { // @Override // public boolean filter(String s) throws Exception { // return s.contains("h"); // } // }) // .print(); // // SplitStream<String> split = text.split(new OutputSelector<String>() { // @Override // public Iterable<String> select(String value) { // ArrayList<String> strings = new ArrayList<>(); // if (value.contains("h")) // strings.add("Hadoop"); // else // strings.add("noHadoop"); // return strings; // // } // }); // // split.select("hadoop").print(); // split.select("noHadoop").map(new MapFunction<String, String>() { // @Override // public String map(String s) throws Exception { // // return s.toUpperCase(); // } // }).print(); } }