文章目录
01 引言
- 01 引言
- 02 File Sink介绍
- 03 File Sink案例演示
- 04 文末
在前面的博客,我们学习了Flink
的Streaming File Sink
了,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
- 《Flink教程(10)- Flink批流一体API(其它)》
- 《Flink教程(11)- Flink高级API(Window)》
- 《Flink教程(12)- Flink高级API(Time与Watermaker)》
- 《Flink教程(13)- Flink高级API(状态管理)》
- 《Flink教程(14)- Flink高级API(容错机制)》
- 《Flink教程(15)- Flink高级API(并行度)》
- 《Flink教程(16)- Flink Table与SQL》
- 《Flink教程(17)- Flink Table与SQL(案例与SQL算子)》
- 《Flink教程(18)- Flink阶段总结》
- 《Flink教程(19)- Flink高级特性(BroadcastState)》
- 《Flink教程(20)- Flink高级特性(双流Join)》
- 《Flink教程(21)- Flink高级特性(End-to-End Exactly-Once)》
- 《Flink教程(22)- Flink高级特性(异步IO)》
- 《Flink教程(23)- Flink高级特性(Streaming File Sink)》
本文主要讲解Flink
的高级特性其中之一的 File Sink。
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html
新的 Data Sink API (Beta):
- 之前发布的 Flink 版本中[1],已经支持了 source connector 工作在流批两种模式下,因此在 Flink 1.12 中,社区着重实现了统一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 what 和 how:SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);Committer 和 GlobalCommitter,封装了如何处理 committables。框架会负责 when 和 where:即在什么时间,以及在哪些机器或进程中 commit。
这种模块化的抽象允许为 BATCH 和 STREAMING 两种执行模式,实现不同的运行时策略,以达到仅使用一种 sink 实现,也可以使两种模式都可以高效执行。Flink 1.12 中,提供了统一的 FileSink connector,以替换现有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也将逐步迁移到新的接口。
Flink 1.12的 FileSink 为批处理和流式处理提供了一个统一的接收器,它将分区文件写入Flink文件系统抽象所支持的文件系统。这个文件系统连接器为批处理和流式处理提供了相同的保证,它是现有流式文件接收器的一种改进。
03 File Sink案例演示/**
* @author : YangLinWei
* @createTime: 2022/3/9 9:15 上午
*/
public class FileSinkDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
//2.source
DataStreamSource lines = env.socketTextStream("node1", 9999);
//3.sink
//设置sink的前缀和后缀
//文件的头和文件扩展名
//prefix-xxx-.txt
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".txt")
.build();
//设置sink的路径
String outputPath = "hdfs://node1:8020/FlinkFileSink/parquet";
final FileSink sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.build();
lines.sinkTo(sink).setParallelism(1);
env.execute();
}
}
04 文末
本文主要讲解Flink的高级特性其中之一的File Sink,谢谢大家的阅读,本文完!