Spark 2.0 streaming 视频讲解(上海技术助理 段智华)
package com.dt.spark200;
import java.util.Arrays; import java.util.Iterator;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery;
public class Spark200StructuredStreaming {
public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .master("local") .config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFJavaWorkspace_Spark200/Spark200Demo/spark-warehouse") .getOrCreate(); // Create DataFrame representing the stream of input lines from connection to localhost:9999 Dataset lines = spark .readStream() .format("socket") .option("host", "pc") .option("port", 9999) .load();
// Split the lines into words Dataset words = lines .as(Encoders.STRING()) .flatMap( new FlatMapFunction() { @Override public Iterator call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }, Encoders.STRING());
// Generate running word count Dataset wordCounts = words.groupBy("value").count(); StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start();
query.awaitTermination(); while(true){} }
}
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.dt.spark200;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.functions; import org.apache.spark.sql.streaming.StreamingQuery; import scala.Tuple2;
import java.sql.Timestamp; import java.util.ArrayList; import java.util.Iterator; import java.util.List;
/** * Counts words in UTF8 encoded, '\n' delimited text received from the network over a * sliding window of configurable duration. Each line from the network is tagged * with a timestamp that is used to determine the windows into which it falls. * * Usage: JavaStructuredNetworkWordCountWindowed * [] * and describe the TCP server that Structured Streaming * would connect to receive data. * gives the size of window, specified as integer number of seconds * gives the amount of time successive windows are offset from one another, * given in the same units as above. should be less than or equal to * . If the two are equal, successive windows have no overlap. If * is not provided, it defaults to . * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed * localhost 9999 []` * * One recommended , pair is 10, 5 */ public final class JavaStructuredNetworkWordCountWindowed {
public static void main(String[] args) throws Exception { /* if (args.length < 3) { System.err.println("Usage: JavaStructuredNetworkWordCountWindowed " + " []"); System.exit(1); }*/
//String host = args[0]; String host = "pc"; // int port = Integer.parseInt(args[1]); int port = 9999 ; // int windowSize = Integer.parseInt(args[2]); int windowSize = 30; //int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]); int slideSize = 10; if (slideSize > windowSize) { System.err.println(" must be less than or equal to "); } String windowDuration = windowSize + " seconds"; String slideDuration = slideSize + " seconds";
SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCountWindowed") .master("local") .config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFJavaWorkspace_Spark200/Spark200Demo/spark-warehouse") .getOrCreate();
// Create DataFrame representing the stream of input lines from connection to host:port Dataset lines = spark .readStream() .format("socket") .option("host", host) .option("port", port) .option("includeTimestamp", true) .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
// Split the lines into words, retaining timestamps Dataset words = lines.flatMap( new FlatMapFunction() { @Override public Iterator call(Tuple2 t) { List result = new ArrayList(); System.out.println("Tuple2 t "+ t + " t._1: " +t._1 + " t._2: " + t._2 ); for (String word : t._1.split(" ")) { System.out.println("new Tuple2(word, t._2) "+ word + " t._2: " + t._2 ); result.add(new Tuple2(word, t._2)); } return result.iterator(); } }, Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) ).toDF("IMFword", "IMFtimestamp");
// Group the data by window and word and compute the count of each group Dataset windowedCounts = words.groupBy( functions.window(words.col("IMFtimestamp"), windowDuration, slideDuration), words.col("IMFword") ).count().orderBy("window");
// Start running the query that prints the windowed word counts to the console StreamingQuery query = windowedCounts.writeStream() .outputMode("complete") .format("console") .option("truncate", "false") .start();
query.awaitTermination(); } }