北京大数据培训电商用户行为分析之实时流量

模块创建和数据准备

在UserBehaviorAnalysis下新建一个mavenmodule作为子项目,命名为NetworkFlowAnalysis。在这个子模块中,我们同样并没有引入更多的依赖,所以也不需要改动pom文件。

在src/main/目录下,将默认源文件目录java改名为scala。将apache服务器的日志文件apache.log复制到资源文件目录src/main/resources下,我们将从这里读取数据。

当然,我们也可以仍然用UserBehavior.csv作为数据源,这时我们分析的就不是每一次对服务器的访问请求了,而是具体的页面浏览(“pv”)操作。

基于服务器log的热门页面浏览量统计

我们现在要实现的模块是“实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。

我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问每一个url的次数,然后排序输出显示。

具体做法为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。

在src/main/scala下创建NetworkFlow.scala文件,新建一个单例对象。定义样例类ApacheLogEvent,这是输入的日志数据流;另外还有UrlViewCount,这是窗口操作统计的输出数据类型。在main函数中创建StreamExecutionEnvironment并做配置,然后从apache.log文件中读取数据,并包装成ApacheLogEvent类型。

需要注意的是,原始日志中的时间是“dd/MM/yyyy:HH:mm:ss”的形式,需要定义一个DateTimeFormat将其转换为我们需要的时间戳格式:

.map(line={

vallinearray=line.split("")

valsdf=newSimpleDateFormat("dd/MM/yyyy:HH:mm:ss")

valtimestamp=sdf.parse(linearray(3)).getTime

ApacheLogEvent(linearray(0),linearray(2),timestamp,

linearray(5),linearray(6))

})

完整代码如下:

NetworkFlowAnalysis/src/main/scala/NetworkFlow.scala

caseclassApacheLogEvent(ip:String,userId:String,eventTime:Long,method:String,

url:String)

caseclassUrlViewCount(url:String,windowEnd:Long,count:Long)

objectNetworkFlow{

defmain(args:Array[String]):Unit={

valenv=StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

valstream=env

//以window下为例,需替换成自己的路径

.readTextFile("YOUR_PATH\\resources\\apache.log")

.map(line={

vallinearray=line.split("")

valsimpleDateFormat=newSimpleDateFormat("dd/MM/yyyy:HH:mm:ss")

valtimestamp=simpleDateFormat.parse(linearray(3)).getTime

ApacheLogEvent(linearray(0),linearray(2),timestamp,linearray(5),

linearray(6))

})

.assignTimestampsAndWatermarks(new

BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]

(Time.milliseconds()){

overridedefextractTimestamp(t:ApacheLogEvent):Long={

t.eventTime

}

})

.filter(data={

valpattern="^((?!\\.(css

js)$).)*$".r

(patternfindFirstIndata.url).nonEmpty

})

.keyBy("url")

.timeWindow(Time.minutes(10),Time.seconds(5))

.aggregate(newCountAgg(),newWindowResultFunction())

.keyBy(1)

.process(newTopNHotUrls(5))

.print()

env.execute("NetworkFlowJob")

}

classCountAggextendsAggregateFunction[ApacheLogEvent,Long,Long]{

overridedefcreateAccumulator():Long=0L

overridedefadd(apacheLogEvent:ApacheLogEvent,acc:Long):Long=acc+1

overridedefgetResult(acc:Long):Long=acc

overridedefmerge(acc1:Long,acc2:Long):Long=acc1+acc2

}

classWindowResultFunctionextendsWindowFunction[Long,UrlViewCount,Tuple,

TimeWindow]{

overridedefapply(key:Tuple,window:TimeWindow,aggregateResult:Iterable[Long],

collector:Collector[UrlViewCount]):Unit={

valurl:String=key.asInstanceOf[Tuple1[String]].f0

valcount=aggregateResult.iterator.next

collector.collect(UrlViewCount(url,window.getEnd,count))

}

}

classTopNHotUrls(topsize:Int)extendsKeyedProcessFunction[Tuple,UrlViewCount,

String]{

privatevarurlState:ListState[UrlViewCount]=_

overridedefopen(parameters:Configuration):Unit={

super.open(parameters)

valurlStateDesc=newListStateDescriptor[UrlViewCount]("urlState-state",

classOf[UrlViewCount])

urlState=getRuntimeContext.getListState(urlStateDesc)

}

overridedefprocessElement(input:UrlViewCount,context:

KeyedProcessFunction[Tuple,UrlViewCount,String]#Context,collector:

Collector[String]):Unit={

//每条数据都保存到状态中

urlState.add(input)

context.timerService.registerEventTimeTimer(input.windowEnd+1)

}

overridedefonTimer(timestamp:Long,ctx:KeyedProcessFunction[Tuple,UrlViewCount,

String]#OnTimerContext,out:Collector[String]):Unit={

//获取收到的所有URL访问量

valallUrlViews:ListBuffer[UrlViewCount]=ListBuffer()

importscala.collection.JavaConversions._

for(urlView-urlState.get){

allUrlViews+=urlView

}

//提前清除状态中的数据,释放空间

urlState.clear()

//按照访问量从大到小排序

valsortedUrlViews=allUrlViews.sortBy(_.count)(Ordering.Long.reverse)

.take(topSize)

//将排名信息格式化成String,便于打印

varresult:StringBuilder=newStringBuilder

result.append("====================================\n")

result.append("时间:").append(newTimestamp(timestamp-1)).append("\n")

for(i-sortedUrlViews.indices){

valcurrentUrlView:UrlViewCount=sortedUrlViews(i)

//e.g.No1:URL=/blog/tags/firefox?flav=rss20流量=55

result.append("No").append(i+1).append(":")

.append("URL=").append(currentUrlView.url)

.append("流量=").append(currentUrlView.count).append("\n")

}

result.append("====================================\n\n")

//控制输出频率,模拟实时滚动结果

Thread.sleep()

out.collect(result.toString)

}

}

}

基于埋点日志数据的网络流量统计

我们发现,从web服务器log中得到的url,往往更多的是请求某个资源地址(/*.js、/*.css),如果要针对页面进行统计往往还需要进行过滤。而在实际电商应用中,相比每个单独页面的访问量,我们可能更加关心整个电商网站的网络流量。

这个指标,除了合并之前每个页面的统计结果之外,还可以通过统计埋点日志数据中的“pv”行为来得到。

网站总浏览量(PV)的统计

衡量网站流量一个最简单的指标,就是网站的页面浏览量(PageView,PV)。用户每次打开一个页面便记录1次PV,多次打开同一页面则浏览量累计。一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。

我们知道,用户浏览页面时,会从浏览器向网络服务器发出一个请求(Request),网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,从而产生了一个PV。所以我们的统计方法,可以是从web服务器的日志中去提取对应的页面访问然后统计,就向上一节中的做法一样;也可以直接从埋点日志中提取用户发来的页面请求,从而统计出总浏览量。

所以,接下来我们用UserBehavior.csv作为数据源,实现一个网站总浏览量的统计。我们可以设置滚动时间窗口,实时统计每小时内的网站PV。

在src/main/scala下创建PageView.scala文件,具体代码如下:

NetworkFlowAnalysis/src/main/scala/PageView.scala

caseclassUserBehavior(userId:Long,itemId:Long,categoryId:Int,behavior:String,

timestamp:Long)

objectPageView{

defmain(args:Array[String]):Unit={

valresourcesPath=getClass.getResource("/UserBehaviorTest.csv")

valenv=StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

valstream=env.readTextFile(resourcesPath.getPath)

.map(data={

valdataArray=data.split(",")

UserBehavior(dataArray(0).toLong,dataArray(1).toLong,dataArray(2).toInt,

dataArray(3),dataArray(4).toLong)

})

.assignAscendingTimestamps(_.timestamp*)

.filter(_.behavior=="pv")

.map(x=("pv",1))

.keyBy(_._1)

.timeWindow(Time.seconds(60*60))

.sum(1)

.print()

env.execute("PageViewJob")

}

}

网站独立访客数(UV)的统计

在上节的例子中,我们统计的是所有用户对页面的所有浏览行为,也就是说,同一用户的浏览行为会被重复统计。而在实际应用中,我们往往还会


转载请注明:http://www.aierlanlan.com/rzdk/5133.html