模块创建和数据准备
在UserBehaviorAnalysis下新建一个mavenmodule作为子项目,命名为NetworkFlowAnalysis。在这个子模块中,我们同样并没有引入更多的依赖,所以也不需要改动pom文件。
在src/main/目录下,将默认源文件目录java改名为scala。将apache服务器的日志文件apache.log复制到资源文件目录src/main/resources下,我们将从这里读取数据。
当然,我们也可以仍然用UserBehavior.csv作为数据源,这时我们分析的就不是每一次对服务器的访问请求了,而是具体的页面浏览(“pv”)操作。
基于服务器log的热门页面浏览量统计
我们现在要实现的模块是“实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。
我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问每一个url的次数,然后排序输出显示。
具体做法为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。
在src/main/scala下创建NetworkFlow.scala文件,新建一个单例对象。定义样例类ApacheLogEvent,这是输入的日志数据流;另外还有UrlViewCount,这是窗口操作统计的输出数据类型。在main函数中创建StreamExecutionEnvironment并做配置,然后从apache.log文件中读取数据,并包装成ApacheLogEvent类型。
需要注意的是,原始日志中的时间是“dd/MM/yyyy:HH:mm:ss”的形式,需要定义一个DateTimeFormat将其转换为我们需要的时间戳格式:
.map(line={
vallinearray=line.split("")
valsdf=newSimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
valtimestamp=sdf.parse(linearray(3)).getTime
ApacheLogEvent(linearray(0),linearray(2),timestamp,
linearray(5),linearray(6))
})
完整代码如下:
NetworkFlowAnalysis/src/main/scala/NetworkFlow.scala
caseclassApacheLogEvent(ip:String,userId:String,eventTime:Long,method:String,
url:String)
caseclassUrlViewCount(url:String,windowEnd:Long,count:Long)
objectNetworkFlow{
defmain(args:Array[String]):Unit={
valenv=StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
valstream=env
//以window下为例,需替换成自己的路径
.readTextFile("YOUR_PATH\\resources\\apache.log")
.map(line={
vallinearray=line.split("")
valsimpleDateFormat=newSimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
valtimestamp=simpleDateFormat.parse(linearray(3)).getTime
ApacheLogEvent(linearray(0),linearray(2),timestamp,linearray(5),
linearray(6))
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]
(Time.milliseconds()){
overridedefextractTimestamp(t:ApacheLogEvent):Long={
t.eventTime
}
})
.filter(data={
valpattern="^((?!\\.(css
js)$).)*$".r
(patternfindFirstIndata.url).nonEmpty
})
.keyBy("url")
.timeWindow(Time.minutes(10),Time.seconds(5))
.aggregate(newCountAgg(),newWindowResultFunction())
.keyBy(1)
.process(newTopNHotUrls(5))
.print()
env.execute("NetworkFlowJob")
}
classCountAggextendsAggregateFunction[ApacheLogEvent,Long,Long]{
overridedefcreateAccumulator():Long=0L
overridedefadd(apacheLogEvent:ApacheLogEvent,acc:Long):Long=acc+1
overridedefgetResult(acc:Long):Long=acc
overridedefmerge(acc1:Long,acc2:Long):Long=acc1+acc2
}
classWindowResultFunctionextendsWindowFunction[Long,UrlViewCount,Tuple,
TimeWindow]{
overridedefapply(key:Tuple,window:TimeWindow,aggregateResult:Iterable[Long],
collector:Collector[UrlViewCount]):Unit={
valurl:String=key.asInstanceOf[Tuple1[String]].f0
valcount=aggregateResult.iterator.next
collector.collect(UrlViewCount(url,window.getEnd,count))
}
}
classTopNHotUrls(topsize:Int)extendsKeyedProcessFunction[Tuple,UrlViewCount,
String]{
privatevarurlState:ListState[UrlViewCount]=_
overridedefopen(parameters:Configuration):Unit={
super.open(parameters)
valurlStateDesc=newListStateDescriptor[UrlViewCount]("urlState-state",
classOf[UrlViewCount])
urlState=getRuntimeContext.getListState(urlStateDesc)
}
overridedefprocessElement(input:UrlViewCount,context:
KeyedProcessFunction[Tuple,UrlViewCount,String]#Context,collector:
Collector[String]):Unit={
//每条数据都保存到状态中
urlState.add(input)
context.timerService.registerEventTimeTimer(input.windowEnd+1)
}
overridedefonTimer(timestamp:Long,ctx:KeyedProcessFunction[Tuple,UrlViewCount,
String]#OnTimerContext,out:Collector[String]):Unit={
//获取收到的所有URL访问量
valallUrlViews:ListBuffer[UrlViewCount]=ListBuffer()
importscala.collection.JavaConversions._
for(urlView-urlState.get){
allUrlViews+=urlView
}
//提前清除状态中的数据,释放空间
urlState.clear()
//按照访问量从大到小排序
valsortedUrlViews=allUrlViews.sortBy(_.count)(Ordering.Long.reverse)
.take(topSize)
//将排名信息格式化成String,便于打印
varresult:StringBuilder=newStringBuilder
result.append("====================================\n")
result.append("时间:").append(newTimestamp(timestamp-1)).append("\n")
for(i-sortedUrlViews.indices){
valcurrentUrlView:UrlViewCount=sortedUrlViews(i)
//e.g.No1:URL=/blog/tags/firefox?flav=rss20流量=55
result.append("No").append(i+1).append(":")
.append("URL=").append(currentUrlView.url)
.append("流量=").append(currentUrlView.count).append("\n")
}
result.append("====================================\n\n")
//控制输出频率,模拟实时滚动结果
Thread.sleep()
out.collect(result.toString)
}
}
}
基于埋点日志数据的网络流量统计
我们发现,从web服务器log中得到的url,往往更多的是请求某个资源地址(/*.js、/*.css),如果要针对页面进行统计往往还需要进行过滤。而在实际电商应用中,相比每个单独页面的访问量,我们可能更加关心整个电商网站的网络流量。
这个指标,除了合并之前每个页面的统计结果之外,还可以通过统计埋点日志数据中的“pv”行为来得到。
网站总浏览量(PV)的统计
衡量网站流量一个最简单的指标,就是网站的页面浏览量(PageView,PV)。用户每次打开一个页面便记录1次PV,多次打开同一页面则浏览量累计。一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。
我们知道,用户浏览页面时,会从浏览器向网络服务器发出一个请求(Request),网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,从而产生了一个PV。所以我们的统计方法,可以是从web服务器的日志中去提取对应的页面访问然后统计,就向上一节中的做法一样;也可以直接从埋点日志中提取用户发来的页面请求,从而统计出总浏览量。
所以,接下来我们用UserBehavior.csv作为数据源,实现一个网站总浏览量的统计。我们可以设置滚动时间窗口,实时统计每小时内的网站PV。
在src/main/scala下创建PageView.scala文件,具体代码如下:
NetworkFlowAnalysis/src/main/scala/PageView.scala
caseclassUserBehavior(userId:Long,itemId:Long,categoryId:Int,behavior:String,
timestamp:Long)
objectPageView{
defmain(args:Array[String]):Unit={
valresourcesPath=getClass.getResource("/UserBehaviorTest.csv")
valenv=StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
valstream=env.readTextFile(resourcesPath.getPath)
.map(data={
valdataArray=data.split(",")
UserBehavior(dataArray(0).toLong,dataArray(1).toLong,dataArray(2).toInt,
dataArray(3),dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp*)
.filter(_.behavior=="pv")
.map(x=("pv",1))
.keyBy(_._1)
.timeWindow(Time.seconds(60*60))
.sum(1)
.print()
env.execute("PageViewJob")
}
}
网站独立访客数(UV)的统计
在上节的例子中,我们统计的是所有用户对页面的所有浏览行为,也就是说,同一用户的浏览行为会被重复统计。而在实际应用中,我们往往还会