您当前的位置: 首页 >  ar

Bulut0907

暂无认证

  • 7浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

向Flink传递自定义参数的3种方式(withParameters、setGlobalJobParameters、ParameterTool)

Bulut0907 发布时间:2021-11-13 07:35:39 ,浏览量:7

目录
  • 1. devBase\WithParameters.scala
  • 2. devBase\GlobalJobParameters.scala
  • 3. devBase\ParameterToolTest.scala
  • 4. ParameterTool解决中文乱码问题

1. devBase\WithParameters.scala
package devBase

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
import org.apache.flink.configuration.{ConfigOptions, Configuration}



class WithParametersMapFunction extends RichMapFunction[String, String] {

  private var bigdata:String = null

  override def map(value: String):String = {

    if (value == bigdata) "bigdata" else value
  }

  override def open(parameters: Configuration): Unit = {

    val configOption = ConfigOptions.key("bigdata")
      .stringType()
      .noDefaultValue()

    bigdata = parameters.getString(configOption)

  }

}

object WithParameters {


  def main(args: Array[String]): Unit = {


    val env = ExecutionEnvironment.getExecutionEnvironment
    val conf = new Configuration()
    conf.setString("bigdata", "flink")

    val text: DataSet[String] = env.fromElements("flink", "spark", "hadoop")

    val ds = text.map(new WithParametersMapFunction())
      // 只支持batch处理; 作用于上面的map, 将conf传递给rich类的open方法
      .withParameters(conf)

    ds.print()


  }

}

执行结果如下:

bigdata
spark
hadoop
2. devBase\GlobalJobParameters.scala
package devBase

import apiTest.WordSourceFunction
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
import org.apache.flink.configuration.{ConfigOptions, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}


class GlobalJobParametersMapFunction extends RichMapFunction[String, String] {

  private var bigdata:String = null

  override def map(value: String):String = {

    if (value == bigdata) "bigdata-" + value else value
  }

  override def open(parameters: Configuration): Unit = {

    val globalJobParameters= getRuntimeContext
      .getExecutionConfig.getGlobalJobParameters
      .asInstanceOf[Configuration]

    val configOption = ConfigOptions.key("bigdata")
      .stringType()
      .noDefaultValue()

    bigdata = globalJobParameters.getString(configOption)

  }

}

object GlobalJobParameters {

  def main(args: Array[String]): Unit = {

    val conf = new Configuration()
    conf.setString("bigdata", "flink")
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.getConfig.setGlobalJobParameters(conf)
    // senv.getConfig.setGlobalJobParameters(parameterTool)


    val text: DataStream[String] = senv.addSource(new WordSourceFunction)

    val ds = text.map(new GlobalJobParametersMapFunction())


    ds.print()
    senv.execute()



  }

}

执行结果:

4> world
5> stream
6> table
7> sql
8> bigdata-flink
1> bigdata-flink
2> batch
......省略部分......
3. devBase\ParameterToolTest.scala
package devBase

import org.apache.flink.api.java.utils.ParameterTool

import java.io.{File, FileInputStream}
import scala.collection.JavaConversions.mapAsJavaMap


object ParameterToolTest {


  def main(args: Array[String]): Unit = {

    val map = Map("id" -> "10", "name" -> "LiMing", "age" -> "10")
    val parameter_map = ParameterTool.fromMap(map)
    println("======map====== " + parameter_map.getRequired("name"))
    println("======map====== " + parameter_map.get("name", "default_name"))
    println("======map====== " + parameter_map.getInt("id", 0))
    println("======map====== " + parameter_map.getNumberOfParameters)
    println("======map====== " + parameter_map.getProperties)


    val prop_path = "src/main/resources/parameterToolTest.properties"
    val parameter_prop_path=ParameterTool.fromPropertiesFile(prop_path)
    println("======path====== " + parameter_prop_path.get("name"))

    val prop_file = new File(prop_path)
    val parameter_prop_file=ParameterTool.fromPropertiesFile(prop_file)
    println("======file====== " + parameter_prop_file.get("name"))

    val prop_stream = new FileInputStream(prop_file)
    val parameter_prop_stream=ParameterTool.fromPropertiesFile(prop_stream)
    println("======stream====== " + parameter_prop_stream.get("name"))

    // 例子:--input hdfs://xxx --output hdfs://xxx
    val parameter_args=ParameterTool.fromArgs(args)
    println("======args====== " + parameter_args.get("name"))

    val parameter_system=ParameterTool.fromSystemProperties()
    println("======system====== " + parameter_system.get("java.vm.version"))


  }

}

执行结果:

======map====== LiMing
======map====== LiMing
======map====== 10
======map====== 3
======map====== {age=10, name=LiMing, id=10}
======path====== LiMing
======file====== LiMing
======stream====== LiMing
======args====== null
======system====== 25.201-b09
4. ParameterTool解决中文乱码问题

使用ParameterTool读取properties配置文件,如果有中文,读取的是会乱码的。可以使用如下方法指定UTF-8编码读取

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.util.Properties
import org.apache.flink.api.java.utils.ParameterTool

object ParameterToolTest {

  def main(args: Array[String]): Unit = {
  
    val resourceProperties = new Properties()
    val resourceInputStream:InputStream = this.getClass.getClassLoader.getResourceAsStream("parameterToolTest.properties")
    val resourceInputStreamReader:InputStreamReader = new InputStreamReader(resourceInputStream, "UTF-8")
    val resourceBufferedReader:BufferedReader = new BufferedReader(resourceInputStreamReader)
    resourceProperties.load(resourceBufferedReader)

    val parameterTool = ParameterTool.fromMap(resourceProperties.asInstanceOf[java.util.Map[String,String]])
    println("======chinese stream====== " + parameterTool.get("chinese_name"))
  }

}

执行结果:

======chinese stream====== 李明
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.1481s