玖叶教程网

前端编程开发入门

画像笔记5-Elasticsearch 的应用(elasticsearch 详解)

申明,自我学习用,非原创。

接上节,

如何根据用户的标签,快速找到用户的相关信息。

  • 建议通过二级索引(ES),筛选标签找到用户id,
  • 再基于用户ID在Hbase 中查找用户的详细信息。

在线接口在查询HBase中数据时,由于Hbase无法像关系数据库那样根据多种条件对数据进行筛选(类似SQL语言中的where筛选条件)。一般地Hbase需要建立二级索引来满足根据复杂条件查询数据的需求,本案中选择Elastic search 存储Hbase 索引数据。

在组合标签查询对应的用户人群场景中,首先通过组合标签的条件在Elasticsearch 中查询对应的索引数据,然后通过索引数据去HBase中批量获取rowkey对应的数据。(elastic search 中的documentid 和HBase 中的rowkey都设计为用户id)


基于Elastic search 存储的HBase二级索引方案

为了避免从Hive向Hbase灌入数据时缺失,在向HBase数据同步完成后,还需要校验HBase和Hive中数据量是否一致,如出现较大的波动则发送告警信息。

下面通过Python脚本来看该Hbase状态表数据校验逻辑:

#查询Hvie中数据

def check_Hive_data(data_date):

r = os.popen("Hive -S -e\ "select count(1) from dw.userprofile_usergroup_labels_all where data_date='"+data_date+" '\ "")

Hive_userid_count = r.read()

r.close()

Hive_count = str(int(Hive_userid_count))

print "Hive_result: " + str(Hive_count)

print "Hive select finished"

#查询HBase中数据

def check_Hbase_data(data_date):

r = os.popen("HBase org.apache.hadoop.HBase.mapreduce.RowCounter 'userprofile_labels'\ " 2>&1 |grep ROWS")

HBase_count = r.read().strip()[5:]

r.close()

print "Hbase result: " + str(Hbase_count)

print "Hbase select finished!"

#连接db,将查询结果插入表

db = MySQLdb.connect(host ="xx.xx.xx.xx",port=3306,user="username",passwd="password",db="xxx",charset="utf8")

cursor = db.cursor()

cursor.execute("INSERT INTO service_monitor(date,service_type,Hive_count,Hbase_count) VALUES('"+Datestr_"','advertisement',"+str(Hive_userid_count)+", "+str(Hbase_count)+")")

db.commit()

本案例中将userid 作为rowkey 存入HBase,一方面在组合标签的场景中可以支持条件查询多用户人群,另一方面可以支持单个用户标签的查询,例如查看某id用户身上的标签,

以便运营人员决定是否对其进行运营操作。

Hbase 在离线数仓环境的服务架构如下图所示:


图:Hbase离线数仓服务架构

Elastic search 存储架构:

Elasticsearch 是一个开源的分布式全文检索引擎,可以近乎实时地存储、检索数据。

而且扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。

对于用户标签查询、用户人群计算、用户群多维度透视分析这类对响应时间要求较高的场景,也可以考虑选用Elastic search存储。

Elasticsearch 是面向文档型数据库,一条数据在这里就是一个文档,用json作为文档格式。为了更清晰的理解Elasticsearch查询的一些概念,将其和关系数据库的类型进行对照。

在关系型数据库中查询数据时,可通过选中数据库、表、行、列 来定位所查找的内容,在Elasticsearch中通过索引(index)、类型(type)、文档(document)、字段来定位查找内容。Elastic search的交互可以使用Java API,也可以使用HTTP的RESTful API方式。


应用场景:

基于Hbase的存储方案并没有解决数据的高效检索问题。在实际应用中,经常有根据特定的几个字段进行组合后检索的应用场景,而Hbase采用 rowkey 作为一级索引,不支持多条件查询如果要对库里的非rowkey 进行数据检索和查询,往往需要通过MapReduce 等分布式框架进行计算,时间延迟上会比较高,难以同时满足用户对于复杂条件查询高效率响应这两方面的需求。

主要查询过程包括:

  1. 在Elasticsearch 中存放用于检索条件的数据,并将rowkey也存储进去。
  2. 使用Elasticsearch 的API根据组合标签的条件查询出rowkey的集合。
  3. 使用上一步得到的rowkey去HBase数据库查询对应的结果。

Hbase数据存储数据的索引放在Elasticsearch中,实现了数据和索引的分离。在Elasticsearch 中documentid是文档的唯一标识,在HBase中rowkey是记录的唯一标识,在工程实践中,两者可同时选用用户在平台上得唯一标识(如 userid 或deviceid)作为rowkey 或 documentid,进而解决HBase和Elasticsearch 索引关联的问题。




通过scala 代码,把Hive 数据从总表中迁移到Elastic search中[略]:

提交命令:

"spark-submit --class com.example.HiveDataToEs --master yarn --deploy-mode client --executor-memory 2g --num-executors 50 --driver-memory 3g --executor-cores 2 spark-hive-to-es.jar 20190101"

#查询命令

GET userprofile/tags/_search

{

"size":0,

"aggs": {

"tagcounts": {

"terms":{

"field": "tags.ACTION_U_01_003"

}

}

}

}

}

methodurl地址描述

PUTlocalhost:9200/索引名称/类型名称/文档id创建文档(指定文档id)

POSTlocalhost:9200/索引名称/类型名称创建文档(随机文档id)

POSTlocalhost:9200/索引名称/类型名称/文档id/_update修改文档

DELETElocalhost:9200/索引名称/类型名称/文档id删除文档

GETlocalhost:9200/索引名称/类型名称/文档id查询文档通过文档id

POSTlocalhost:9200/索引名称/类型名称/_search查询所有数据

elasticsearch(集群)中可以包含多个索引(数据库),每个索引中可以包含多个类型(表),每个类型下又包含多个 个文档(行),每个文档中又包含多个字段(列)。

下面简单介绍下elasticsearch(ES)

1、添加数据

POST /db/user/1

{

"username": "wmyskxz1",

"password": "123456",

"age": "22"

}

POST /db/user/2

{

"username": "wmyskxz2",

"password": "123456",

"age": "22"

}

2、获取数据 GET

GET /carroll/user/1

3、修改数据

PUT /db/user/2

{

"username": "wmyskxz3",

"password": "123456",

"age": "22"

}

4、删除数据 DELETE

DELETE /db/user/1

————————————————

————————————————

版权声明:本文为CSDN博主「carroll18」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/qq_40722827/article/details/106287725

使用restfulapi 查询包含某个标签的用户量,可实时得到返回结果:



返回结果:


从返回结果中可以看到,用户总量(total)为100000000人,包含标签“ACTION_U_01_003”的用户有2500000人(doc_count).

#查询命令:

GET userprofile/_search

{

"query":{

"match_all":{}

}

}

查询结果如图3-27所示。


工程化案例

“用户人群”+ “人群分析”的功能解决方案

每天的ETL调度中,需要将Hive计算的标签数据导入ES中,如下图所示。

当标签完成“标签监控预警”后,将标签数据同步到ES中。



在与ES同步完成并通过校验后,向MySQL中维护的状态表中插入一条状态记录,

表示当前日期的Elasticsearch 数据可用,线上计算用户人群的接口则读取最近日期对应的数据。如果某天因为调度延迟等方面的原因,没有及时将当日数据导入Elasticsearch中,接口也能读取最近一天对应的数据,是一种可行的灾备方案:


数据同步完成后向MySQL状态表“elasticsearch_state”中插入记录,state字段为“0”,产出异常时为“1”。图3-29中,1月20日导入的数据出现异常,则“state” 状态字段置1,线上接口扫描该状态记录位后不读取1月20日数据,而是会读取最近的1月19日数据。



为了避免从Hive向Elastic search中灌入数据时发生数据缺失,在向状态表更新状态位前需要校验ES 和Hive中的数据量

是否一致。下面通过Python 脚本来看数据校验逻辑:

【略】

之后业务人员在画像产品端计算人群或透视分析人群时(如图所示)



通过Restful API 访问Elasticsearch进行计算(如图所示)。


发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言