您当前的位置: 首页 >  flink

Bulut0907

暂无认证

  • 8浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink的数据类型和序列化(Scala版)

Bulut0907 发布时间:2021-11-05 18:16:36 ,浏览量:8

目录
  • 1. 数据类型
    • 1.1 Tuple和case class
    • 1.2 POJOs类
    • 1.3 原生数据类型
    • 1.4 普通class
    • 1.5 Values
    • 1.6 Hadoop Writables
    • 1.7 Special Types
  • 2. TypeInformation类
    • 2.1 创建TypeInformation和TypeSerializer
    • 2.2 泛型参数用TypeInformation表示

1. 数据类型 1.1 Tuple和case class
package devBase

import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation}

case class Student(name:String, age:Int)

object DataTypeTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val case_input= env.fromElements(Student("LiMing",16), Student("Zhangsan",18))
    val tuple_input = env.fromElements(("LiMing",16),("Zhangsan",18))

    case_input.print()
    tuple_input.print()
  }

}

执行结果:

Student(LiMing,16)
Student(Zhangsan,18)
(LiMing,16)
(Zhangsan,18)
  • Tuple用TupleTypeInfo进行表示,case class用CaseClassTypeInfo进行表示
1.2 POJOs类

Flink处理POJOs类比普通的类更高效和易用,满足以下条件的类即是POJOs类:

  1. 类是访问权限是public
  2. 类有一个无参的默认构造器
  3. 类的字段访问权限都是public,且字段类型被Flink注册的序列化所支持
  • POJOs类在Flink中用PojoTypeInfo所表示,并用PojoSerializer进行序列化(可以配置用Kryo进行序列化)
package devBase

import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation}

class Student(name:String, age:Int) {
  def this() {
    this("default_name", 0)
  }

  override def toString: String = {
    s"name:${name}, age:${age}"
  }
}

object DataTypeTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val class_iput= env.fromElements(new Student("LiMing",16),
      new Student("Zhangsan",18))

    class_iput.print()
  }

}

执行结果:

name:LiMing, age:16
name:Zhangsan, age:18
1.3 原生数据类型

Flink支持所有Scala的原生数据类型,比如Int、String、Double; 用BasicTypeInfo进行表示

1.4 普通class
  • Flink支持不是POJOs类型的普通class(除了字段不能被序列化的class,比如字段类型为file pointers、I/O streams、native resources)
  • 不能访问普通class的字段
  • 使用Kryo对普通class进行序列化
1.5 Values

todo

1.6 Hadoop Writables

todo

1.7 Special Types

todo

2. TypeInformation类

scala的所有数据类型在Flink中都有对应的TypeInformation类,TypeInformation类对Scala的数据类型进行描述并生成序列化器

2.1 创建TypeInformation和TypeSerializer
package devBase

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment



object DataTypeTest {

  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    // 或者在rich函数中通过getRuntimeContext.getExecutionConfig
    val config = senv.getConfig

    val stringInfo: TypeInformation[String] = createTypeInformation[String]
    val stringSerializer:TypeSerializer[String] = stringInfo.createSerializer(config)

    val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
    val tupleSerializer:TypeSerializer[(String, Double)] = tupleInfo.createSerializer(config)


  }

}
2.2 泛型参数用TypeInformation表示

对于泛型参数,Flink并不知道具体的数据类型,可以参考下面:

package devBase

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.DataStream

object DataTypeTest {

  def selectFirst[T : TypeInformation](input: DataStream[(T, _)]) : DataStream[T] = {
    input.map(_._1)
  }

  def main(args: Array[String]): Unit = {


  }

}

关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0823s