您当前的位置: 首页 >  flink

顧棟

暂无认证

  • 2浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【Flink学习】入门教程之DataStream API 简介

顧棟 发布时间:2022-01-26 17:39:31 ,浏览量:2

文章目录
  • DataStream API 简介
    • Java tuples 和 POJOs
      • Tuples
      • POJOs
    • Scala tuples 和 case classes
    • 一个完整的示例
      • Stream execution environment 流执行环境
      • Basic stream sources 基本的 stream source
      • Basic stream sinks 基本的 stream sink
      • Debugging
    • Hands-on 实践

DataStream API 简介

官网原文https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/datastream_api/

流处理,顾名思义,需要将数据转换成流的形式进行处理。那么什么可以转换成流呢?答案是Flink 的 Java 和 Scala DataStream API 可以将任何可序列化的对象转化为流。

Flink 自带的序列化器有:

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

同样也支持Kryo和Avro序列化器。

Java tuples 和 POJOs

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs

Tuples

对于 Java,Flink 自带有 Tuple0Tuple25 类型。

POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。
public class Person {
    public String name;  
    public Integer age;  
    public Person() {}
    public Person(String name, Integer age) {  
        . . .
    }
}  

Person person = new Person("Fred Flintstone", 35);
Scala tuples 和 case classes

如果你了解 Scala,那一定知道 tuple 和 case class。

【Scala学习】函数式编程续 case类

【Scala学习】之Tuples和OOP 示例

一个完整的示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Example {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

        DataStream adults = flintstones.filter(new FilterFunction() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {}

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}
Stream execution environment 流执行环境

每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

在这里插入图片描述

此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。

Basic stream sources 基本的 stream source

上述示例用 env.fromElements(...) 方法构造 DataStream 。这样将简单的流放在一起是为了方便用于原型或测试。StreamExecutionEnvironment 上还有一个 fromCollection(Collection) 方法。因此,你可以这样做:

List people = new ArrayList();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream flintstones = env.fromCollection(people);

另一个获取数据到流中的便捷方法是用 socket

DataStream lines = env.socketTextStream("localhost", 9999)

或读取文件

DataStream lines = env.readTextFile("file:///path");

在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如 Apache Kafka,Kinesis 和各种文件系统。REST API 和数据库也经常用于增强流处理的能力(stream enrichment)。

Basic stream sinks 基本的 stream sink

上述示例用 adults.print() 打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。

输出看起来类似于

1> Fred: age 35
2> Wilma: age 35

1>2> 指出输出来自哪个 sub-task(即 thread)

Debugging

在生产中,应用程序将在远程集群或一组容器中运行。如果集群或容器挂了,这就属于远程失败。JobManager 和 TaskManager 日志对于调试此类故障非常有用,但是更简单的是 Flink 支持在 IDE 内部进行本地调试。你可以设置断点,检查局部变量,并逐行执行代码。如果想了解 Flink 的工作原理和内部细节,查看 Flink 源码也是非常好的方法。

Hands-on 实践

练习解析

关注
打赏
1663402667
查看更多评论
立即登录/注册

微信扫码登录

0.0604s