$ tailf flink-csap-taskmanager-0-XXXX.log 2018-05-0310:07:53,718 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-4c371de9-0f85-4889-b4d9-4a522641549c 2018-05-0310:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor at akka://flink/user/taskmanager#-524742300. 2018-05-0310:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager data connection information: 2c358d6f38949f9aae31c5bddb0cc1dc @ LY1F-R021707-VM14.local (dataPort=55234) 2018-05-0310:07:53,726 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 1 task slot(s). 2018-05-0310:07:53,727 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 111/1024/1024 MB, NON HEAP: 35/36/-1 MB (used/committed/max)] 2018-05-0310:07:53,730 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds) 2018-05-0310:07:53,848 INFO org.apache.flink.runtime.taskmanager.TaskManager - Successful registration at JobManager (akka.tcp://flink@localhost:6123/user/jobmanager), starting network stack and library cache. 2018-05-0310:07:53,851 INFO org.apache.flink.runtime.taskmanager.TaskManager - Determined BLOB server address to be localhost/127.0.0.1:52382. Starting BLOB cache. 2018-05-0310:07:53,858 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Created BLOB cache storage directory /tmp/blobStore-c07b9e80-41f0-490f-8126-7008144c4b0b 2018-05-0310:07:53,861 INFO org.apache.flink.runtime.blob.TransientBlobCache - Created BLOB cache storage directory /tmp/blobStore-e0d1b687-1c47-41c4-b5bc-10ceaa39e778
// the port to connect to val port: Int = try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'") return } }
// get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket val text = env.socketTextStream("localhost", port, '\n')
// parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .sum("count")
// print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount") }
// Data type for words with count caseclass WordWithCount(word: String, count: Long) }
这个demo是监控端口,然后对端口输入单子进行wordcount的程序。
运行demo,首先打开一个窗口进行端口数据输入:
1 2 3 4 5
$ nc -l 9001 hello hello word world
然后运行demo监控端口单词输入统计:
1
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9001
运行后可以看到结果统计:
1 2 3 4 5
$ more flink-csap-taskmanager-0-XXX.out.1 hello : 1 hello : 1 word : 1 world : 1