博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
日活跃用户统计函数
阅读量:6118 次
发布时间:2019-06-21

本文共 9360 字,大约阅读时间需要 31 分钟。

题记:

  在做运营统计的时候,一个最常见的指标是日活跃用户数(DAU),它的一般性概念为当日所有用户的去重,但是在大部分情况下,我们获取到的数据中会有登录用户与有匿名用户,而这部分用户是会出现重叠的。常规的做法是利用cookie或者imei(移动端)进行自关联,然后算出有多少用户同时是登录用户和匿名用户,最终的 日活跃用户数 = 登录用户+匿名用户-匿名转登录用户。

  在实际操作中需要写复杂的HQL才能完成这部分工作,而且运行效率低下,为此需要开发一个UDAF函数进行处理。

首先说明一下函数的原理:

/** * 根据flag,uid和imei信息计算个数 * -fla为1    : 将对应的UID存储在UID集合中,该集合代表登录用户 * -flag不为1 : 将对应的imei|wyy存储在IMEI集合中,该集合代表匿名用户 * 将imei|wyy存储一个Map当中,并且判断该imei|wyy对应的flag是否同时出现过0和1俩个值,如果是则map中对应的value = 2否则为flag * 参数原型: *      int itemcount(flag,uid,imei) * 参数说明: *      flag: 1或者不为1 *      uid: 用户id *      imei: 用户的第二个参照标识(imei|wyy|cookie) *       * 返回值: *      int类型,dau值 *       * 使用示例: *      > SELECT flag, uid, imei FROM test; *      1   uid1 imei1 *      1   uid2 imei1    *      0   uid3 imei3    *  *      > SELECT daucount(flag,uid,imei) FROM test; *      1 */

  其中flag参数可以用其它udf函数进行替换,用以判断uid是否是登录用户。

下面是具体的代码块:

package yy.juefan.udaf;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.Map.Entry;import org.apache.hadoop.hive.ql.exec.Description;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.parse.SemanticException;import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.StructField;import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;@Description (        name = "dau_count",        value = "_FUNC_(flag,uid,imei)"        )public class GenericDauCount extends AbstractGenericUDAFResolver {    private static final boolean DEBUG = false;    private static final boolean TRACE = false;    @Override    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)            throws SemanticException {        if (parameters.length != 3) {            throw new UDFArgumentLengthException(                    "Exactly 3 argument is expected.");        }        if (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory() != PrimitiveCategory.INT) {            throw new UDFArgumentTypeException(0,                    "Only int argument is accepted, but "                            + parameters[0].getTypeName() + " is passed");        }        if (((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory() != PrimitiveCategory.STRING) {            throw new UDFArgumentTypeException(1,                    "Only string argument is accepted, but "                            + parameters[1].getTypeName() + " is passed");        }        if (((PrimitiveTypeInfo) parameters[2]).getPrimitiveCategory() != PrimitiveCategory.STRING) {            throw new UDFArgumentTypeException(2,                    "Only string argument is accepted, but "                            + parameters[2].getTypeName() + " is passed");        }        return new GenericDauCountEvaluator();    }    public static class GenericDauCountEvaluator extends GenericUDAFEvaluator {        // 封装接口        StructField uidSetField;        StructField imeiSetField;        StructField imeiMapField;        StructObjectInspector map2red;        // for PARTIAL1 and COMPLETE        IntObjectInspector flagIO;        StringObjectInspector uidIO;        StringObjectInspector imeiIO;        // for PARTIAL2 and FINAL        StandardListObjectInspector uidSetIO;        StandardListObjectInspector imeiSetIO;        StandardMapObjectInspector imeiMapIO; private static class DivideAB implements AggregationBuffer {            Set
uidSet; Set
imeiSet; Map
imeiMap; } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { DivideAB dab = new DivideAB(); reset(dab); return dab; } @Override public void reset(AggregationBuffer agg) throws HiveException { DivideAB dab = (DivideAB) agg; dab.uidSet = new HashSet
(); dab.imeiSet = new HashSet
(); dab.imeiMap = new HashMap
(); } boolean warned = false; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); // input if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { // for iterate assert (parameters.length == 3); flagIO = (IntObjectInspector) parameters[0]; uidIO = (StringObjectInspector) parameters[1]; imeiIO = (StringObjectInspector) parameters[2]; } else { // for merge map2red = (StructObjectInspector) parameters[0]; uidSetField = map2red.getStructFieldRef("uidSet"); imeiSetField = map2red.getStructFieldRef("imeiSet"); imeiMapField = map2red.getStructFieldRef("imeiMap"); uidSetIO = (StandardListObjectInspector) uidSetField .getFieldObjectInspector(); imeiSetIO = (StandardListObjectInspector) imeiSetField .getFieldObjectInspector(); imeiMapIO = (StandardMapObjectInspector) imeiMapField .getFieldObjectInspector(); } if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) { ArrayList
foi = new ArrayList
(); ArrayList
fname = new ArrayList
(); foi.add(ObjectInspectorFactory .getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector)); foi.add(ObjectInspectorFactory .getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector)); foi.add(ObjectInspectorFactory .getStandardMapObjectInspector( PrimitiveObjectInspectorFactory.javaStringObjectInspector, PrimitiveObjectInspectorFactory.javaIntObjectInspector)); fname.add("uidSet"); fname.add("imeiSet"); fname.add("imeiMap"); return ObjectInspectorFactory.getStandardStructObjectInspector( fname, foi); } else { return PrimitiveObjectInspectorFactory.javaLongObjectInspector; } } @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { if (parameters.length != 3) { return; } DivideAB dab = (DivideAB) agg; int check = PrimitiveObjectInspectorUtils.getInt(parameters[0], flagIO); String uid = PrimitiveObjectInspectorUtils.getString(parameters[1], uidIO); String imei = PrimitiveObjectInspectorUtils.getString( parameters[2], imeiIO); if (check == 1) { // 登录用户 dab.uidSet.add(uid); } else { // 匿名用户 dab.imeiSet.add(imei); } if (dab.imeiMap.containsKey(imei)) { int flag = dab.imeiMap.get(imei); if (flag < 2 && flag != check) { dab.imeiMap.put(imei, 2); } } else { dab.imeiMap.put(imei, check); } } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { DivideAB myagg = (DivideAB) agg; // 存储中间结果 Object[] partialResult = new Object[3]; partialResult[0] = new ArrayList
(myagg.uidSet); partialResult[1] = new ArrayList
(myagg.imeiSet); partialResult[2] = new HashMap
(myagg.imeiMap); return partialResult; } @SuppressWarnings("unchecked") @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { DivideAB dab = (DivideAB) agg; Object uidSet = map2red .getStructFieldData(partial, uidSetField); Object imeiSet = map2red.getStructFieldData(partial, imeiSetField); Object imeiMap = map2red.getStructFieldData(partial, imeiMapField); List
uidlist = (List) uidSetIO.getList(uidSet); System.err.println("uidList = " + uidlist.size()); if (uidlist != null) { System.err.println("uidSet = " + dab.uidSet.size()); for (Object obj : uidlist) { dab.uidSet.add(obj.toString()); } } List imeilist = (List) uidSetIO .getList(imeiSet); if (imeilist != null) { for (Object obj : imeilist) { dab.imeiSet.add(obj.toString()); } } Map
imeimap = (Map
) imeiMapIO .getMap(imeiMap); for (Entry
ele : imeimap.entrySet()) { Object kobj = ele.getKey(); String key = kobj.toString(); Object vobj = ele.getValue(); Object val = vobj.toString(); if (dab.imeiMap.containsKey(key)) { int flag = dab.imeiMap.get(key); if (flag < 2 && flag != Integer.parseInt(val.toString())) { dab.imeiMap.put(key, 2); } } else { dab.imeiMap.put(key, Integer.parseInt(val.toString())); } } } } @Override public Object terminate(AggregationBuffer agg) throws HiveException { DivideAB dab = (DivideAB) agg; int mix = 0; for (int val : dab.imeiMap.values()) { if (val == 2) { mix++; } } return (long) (dab.uidSet.size() + dab.imeiSet.size() - mix); } }}

 

又有工作要忙了,先把代码放上来,下次再写分析

转载于:https://www.cnblogs.com/juefan/p/3922787.html

你可能感兴趣的文章
Machine Learning Techniques -6-Support Vector Regression
查看>>
会计基础_001
查看>>
Cordova 开发环境搭建及创建第一个app
查看>>
ajax请求拿到多条数据拼接显示在页面中
查看>>
小程序: 查看正在写的页面
查看>>
dedecms生成文档数据库崩溃 mysql daemon failed to start
查看>>
Linux的50个基本命令
查看>>
Objective-C中创建单例方法的步骤
查看>>
Codeforces 520B:Two Buttons(思维,好题)
查看>>
Jenkins持续集成环境部署
查看>>
emoji等表情符号存mysql的方法
查看>>
检查磁盘利用率并且定期发送告警邮件
查看>>
MWeb 1.4 新功能介绍二:静态博客功能增强
查看>>
linux文本模式和文本替换功能
查看>>
Windows SFTP 的安装
查看>>
摄像机与绕任意轴旋转
查看>>
rsync 服务器配置过程
查看>>
预处理、const与sizeof相关面试题
查看>>
爬虫豆瓣top250项目-开发文档
查看>>
Elasticsearch增删改查
查看>>