您当前的位置: 首页 >  ar

段智华

暂无认证

  • 3浏览

    0关注

    1232博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Spark 2.0 streaming 视频讲解

段智华 发布时间:2016-09-16 19:08:39 ,浏览量:3

 

Spark 2.0 streaming 视频讲解(上海技术助理 段智华)

 

 

 

 

 

 

 

 

package com.dt.spark200;

import java.util.Arrays; import java.util.Iterator;

import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery;

 

public class Spark200StructuredStreaming {

 public static void main(String[] args) {      SparkSession spark = SparkSession        .builder()        .appName("JavaStructuredNetworkWordCount")        .master("local")        .config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFJavaWorkspace_Spark200/Spark200Demo/spark-warehouse")        .getOrCreate();         // Create DataFrame representing the stream of input lines from connection to localhost:9999   Dataset lines = spark     .readStream()     .format("socket")     .option("host", "pc")     .option("port", 9999)     .load();

  // Split the lines into words   Dataset words = lines       .as(Encoders.STRING())       .flatMap(           new FlatMapFunction() {             @Override             public Iterator call(String x) {               return Arrays.asList(x.split(" ")).iterator();             }           }, Encoders.STRING());

  // Generate running word count   Dataset wordCounts = words.groupBy("value").count();      StreamingQuery query = wordCounts.writeStream()       .outputMode("complete")       .format("console")       .start();

    query.awaitTermination();   while(true){}     }

}

 

 

 

 

 

/*  * Licensed to the Apache Software Foundation (ASF) under one or more  * contributor license agreements.  See the NOTICE file distributed with  * this work for additional information regarding copyright ownership.  * The ASF licenses this file to You under the Apache License, Version 2.0  * (the "License"); you may not use this file except in compliance with  * the License.  You may obtain a copy of the License at  *  *    http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package com.dt.spark200;

import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.functions; import org.apache.spark.sql.streaming.StreamingQuery; import scala.Tuple2;

import java.sql.Timestamp; import java.util.ArrayList; import java.util.Iterator; import java.util.List;

/**  * Counts words in UTF8 encoded, '\n' delimited text received from the network over a  * sliding window of configurable duration. Each line from the network is tagged  * with a timestamp that is used to determine the windows into which it falls.  *  * Usage: JavaStructuredNetworkWordCountWindowed  *   []  * and describe the TCP server that Structured Streaming  * would connect to receive data.  * gives the size of window, specified as integer number of seconds  * gives the amount of time successive windows are offset from one another,  * given in the same units as above. should be less than or equal to  * . If the two are equal, successive windows have no overlap. If  * is not provided, it defaults to .  *  * To run this on your local machine, you need to first run a Netcat server  *    `$ nc -lk 9999`  * and then run the example  *    `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed  *    localhost 9999 []`  *  * One recommended , pair is 10, 5  */ public final class JavaStructuredNetworkWordCountWindowed {

  public static void main(String[] args) throws Exception {  /*   if (args.length < 3) {       System.err.println("Usage: JavaStructuredNetworkWordCountWindowed " +         " []");       System.exit(1);     }*/

    //String host = args[0];     String host = "pc";   //  int port = Integer.parseInt(args[1]);     int port = 9999 ;    // int windowSize = Integer.parseInt(args[2]);     int windowSize = 30;     //int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]);     int slideSize = 10;     if (slideSize > windowSize) {       System.err.println(" must be less than or equal to ");     }     String windowDuration = windowSize + " seconds";     String slideDuration = slideSize + " seconds";

    SparkSession spark = SparkSession       .builder()       .appName("JavaStructuredNetworkWordCountWindowed")       .master("local")    .config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFJavaWorkspace_Spark200/Spark200Demo/spark-warehouse")       .getOrCreate();

    // Create DataFrame representing the stream of input lines from connection to host:port     Dataset lines = spark       .readStream()       .format("socket")       .option("host", host)       .option("port", port)       .option("includeTimestamp", true)       .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));

    // Split the lines into words, retaining timestamps     Dataset words = lines.flatMap(       new FlatMapFunction() {         @Override         public Iterator call(Tuple2 t) {           List result = new ArrayList();           System.out.println("Tuple2 t  "+ t + "  t._1:  " +t._1 + "   t._2:   " + t._2  );           for (String word : t._1.split(" ")) {            System.out.println("new Tuple2(word, t._2) "+ word +   "   t._2:   " + t._2  );                                     result.add(new Tuple2(word, t._2));           }           return result.iterator();         }       },       Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())     ).toDF("IMFword", "IMFtimestamp");

    // Group the data by window and word and compute the count of each group     Dataset windowedCounts = words.groupBy(       functions.window(words.col("IMFtimestamp"), windowDuration, slideDuration),       words.col("IMFword")     ).count().orderBy("window");

    // Start running the query that prints the windowed word counts to the console     StreamingQuery query = windowedCounts.writeStream()       .outputMode("complete")       .format("console")       .option("truncate", "false")       .start();

    query.awaitTermination();   } }

 

 

 

关注
打赏
1659361485
查看更多评论
立即登录/注册

微信扫码登录

0.1015s