Flink项目(看了就会)
Flink基本项目WordCount
一, Flink批处理WordCount
使用maven先配置好pom文件
然后在项目目录上创建input包
然后在input包中创建word文件word.txt
并在文件中添加一下内容
hello woed
hello linux
hello mysql
然后在src的运行目录下创建一个目录,然后创建scala文件命名为BatcWordCount
具体代码为
package flink
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
object BatchWordCount {
def main(args: Array[String]): Unit = {
// 1 创建一个执行环境
val env=ExecutionEnvironment.getExecutionEnvironment
// 2 读取文本文件数据
val lineDataSet = env.readTextFile("input/word.txt")
// 3 对数据集进行转换处理
val wordAndOne = lineDataSet.flatMap( _.split(" ") ).map(word => (word , 1))
// 4 按照单词进行分组
val wordAndOneGroup = wordAndOne.groupBy(0)
// 5 对分组数据进行sum聚合统计
val sum = wordAndOneGroup.sum(1)
//6 打印输出
sum.print()
}
}
运行结果为:
二, Flink有界流处理WordCount
使用maven先配置好pom文件
然后在项目目录上创建input包
然后在input包中创建word文件word.txt
并在文件中添加一下内容
hello woed
hello linux
hello mysql
然后在src的运行目录下创建一个目录,然后创建scala文件命名为BoundeddStreamWordCount
具体代码为:
package flink
import org.apache.flink.streaming.api.scala._
object BoundeddStreamWordCount {
def main(args: Array[String]): Unit = {
// 1 创建一个流式执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2 读取文本文件数据
val lineDataStream = env.readTextFile("input/word.txt")
// 3 对数据集进行转换处理
val wordAndOne = lineDataStream.flatMap( _.split(" ") ).map(word => (word , 1))
// 4 按照单词进行分组
val wordAndOneGroup = wordAndOne.keyBy(_._1)
// 5 对分组数据进行sum聚合统计
val sum = wordAndOneGroup.sum(1)
//6 打印输出
sum.print()
//7 执行当前任务
env.execute()
}
}
运行结果为:
三, Flink无界流处理WordCount
在src的运行目录下创建一个目录,然后创建scala文件命名为 StreamWordCount
具体代码为:
package flink
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 1 创建一个流式执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2 读取socket文本流数据
val lineDataStream = env.socketTextStream("bigdata1",7777)
// 3 对数据集进行转换处理
val wordAndOne = lineDataStream.flatMap( _.split(" ") ).map(word => (word , 1))
// 4 按照单词进行分组
val wordAndOneGroup = wordAndOne.keyBy(_._1)
// 5 对分组数据进行sum聚合统计
val sum = wordAndOneGroup.sum(1)
//6 打印输出
sum.print()
//7 执行当前任务
env.execute()
}
}
首先在命名端口名称bigdtata1上执行
nc -lk 7777
然后连接好后在idea上执行程序StreamWordCount
然后会发现并没有任何的输出,这主要是因为在bigdata1中的7777端口上没有任何的数据到来
然后再端口7777中输入
hello word
然后再到idea中查看会发现产生了结果
然后继续往7777端口中输入
hello flink
进入idea中查看数据会继续生成