All Projects → tlhhup → litemall-dw

tlhhup / litemall-dw

Licence: other
基于开源Litemall电商项目的大数据项目,包含前端埋点(openresty+lua)、后端埋点;数据仓库(五层)、实时计算和用户画像。大数据平台采用CDH6.3.2(已使用vagrant+ansible脚本化),同时也包含了Azkaban的workflow。

Programming Languages

java
68154 projects - #9 most used programming language
javascript
184084 projects - #8 most used programming language
Vue
7211 projects
scala
5932 projects
SCSS
7915 projects
CSS
56736 projects

Projects that are alternatives of or similar to litemall-dw

God Of Bigdata
专注大数据学习面试,大数据成神之路开启。Flink/Spark/Hadoop/Hbase/Hive...
Stars: ✭ 6,008 (+16588.89%)
Mutual labels:  hive, hbase, flume, flink, azkaban
Bigdata Notes
大数据入门指南 ⭐
Stars: ✭ 10,991 (+30430.56%)
Mutual labels:  hive, hbase, flume, azkaban
Szt Bigdata
深圳地铁大数据客流分析系统🚇🚄🌟
Stars: ✭ 826 (+2194.44%)
Mutual labels:  hive, clickhouse, hbase, flink
BigInsights-on-Apache-Hadoop
Example projects for 'BigInsights for Apache Hadoop' on IBM Bluemix
Stars: ✭ 21 (-41.67%)
Mutual labels:  hive, hbase, spark-streaming, oozie
cloud
云计算之hadoop、hive、hue、oozie、sqoop、hbase、zookeeper环境搭建及配置文件
Stars: ✭ 48 (+33.33%)
Mutual labels:  hive, hbase, flume, oozie
dockerfiles
Multi docker container images for main Big Data Tools. (Hadoop, Spark, Kafka, HBase, Cassandra, Zookeeper, Zeppelin, Drill, Flink, Hive, Hue, Mesos, ... )
Stars: ✭ 29 (-19.44%)
Mutual labels:  hive, hbase, flink
xxhadoop
Data Analysis Using Hadoop/Spark/Storm/ElasticSearch/MachineLearning etc. This is My Daily Notes/Code/Demo. Don't fork, Just star !
Stars: ✭ 37 (+2.78%)
Mutual labels:  hive, hbase, spark-streaming
TitanDataOperationSystem
最好的大数据项目。《Titan数据运营系统》,本项目是一个全栈闭环系统,我们有用作数据可视化的web系统,然后用flume-kafaka-flume进行日志的读取,在hive设计数仓,编写spark代码进行数仓表之间的转化以及ads层表到mysql的迁移,使用azkaban进行定时任务的调度,使用技术:Java/Scala语言,Hadoop、Spark、Hive、Kafka、Flume、Azkaban、SpringBoot,Bootstrap, Echart等;
Stars: ✭ 62 (+72.22%)
Mutual labels:  hive, flume, azkaban
aaocp
一个对用户行为日志进行分析的大数据项目
Stars: ✭ 53 (+47.22%)
Mutual labels:  hive, hbase, flume
Flink Learning
flink learning blog. http://www.54tianzhisheng.cn/ 含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容。涉及 Flink Connector、Metrics、Library、DataStream API、Table API & SQL 等内容的学习案例,还有 Flink 落地应用的大型项目案例(PVUV、日志存储、百亿数据实时去重、监控告警)分享。欢迎大家支持我的专栏《大数据实时计算引擎 Flink 实战与性能优化》
Stars: ✭ 11,378 (+31505.56%)
Mutual labels:  clickhouse, hbase, flink
Bdp Dataplatform
大数据生态解决方案数据平台:基于大数据、数据平台、微服务、机器学习、商城、自动化运维、DevOps、容器部署平台、数据平台采集、数据平台存储、数据平台计算、数据平台开发、数据平台应用搭建的大数据解决方案。
Stars: ✭ 456 (+1166.67%)
Mutual labels:  hive, hbase, flink
wasp
WASP is a framework to build complex real time big data applications. It relies on a kind of Kappa/Lambda architecture mainly leveraging Kafka and Spark. If you need to ingest huge amount of heterogeneous data and analyze them through complex pipelines, this is the framework for you.
Stars: ✭ 19 (-47.22%)
Mutual labels:  solr, hbase, spark-streaming
BigData-News
基于Spark2.2新闻网大数据实时系统项目
Stars: ✭ 36 (+0%)
Mutual labels:  hive, hbase, flume
bigdata-fun
A complete (distributed) BigData stack, running in containers
Stars: ✭ 14 (-61.11%)
Mutual labels:  solr, hbase, flume
Repository
个人学习知识库涉及到数据仓库建模、实时计算、大数据、Java、算法等。
Stars: ✭ 92 (+155.56%)
Mutual labels:  hive, hbase, flink
Springboot Templates
springboot和dubbo、netty的集成,redis mongodb的nosql模板, kafka rocketmq rabbit的MQ模板, solr solrcloud elasticsearch查询引擎
Stars: ✭ 100 (+177.78%)
Mutual labels:  hive, solr, hbase
Bigdataguide
大数据学习,从零开始学习大数据,包含大数据学习各阶段学习视频、面试资料
Stars: ✭ 817 (+2169.44%)
Mutual labels:  hive, hbase, flink
np-flink
flink详细学习实践
Stars: ✭ 26 (-27.78%)
Mutual labels:  clickhouse, hbase, flink
Real-time-log-analysis-system
🐧基于spark streaming+flume+kafka+hbase的实时日志处理分析系统(分为控制台版本和基于springboot、Echarts等的Web UI可视化版本)
Stars: ✭ 31 (-13.89%)
Mutual labels:  hbase, spark-streaming, flume
dpkb
大数据相关内容汇总,包括分布式存储引擎、分布式计算引擎、数仓建设等。关键词:Hadoop、HBase、ES、Kudu、Hive、Presto、Spark、Flink、Kylin、ClickHouse
Stars: ✭ 123 (+241.67%)
Mutual labels:  hive, hbase, flink

项目说明

基于litemall构建的数据仓库

核心模块

  1. 数据仓库(离线/实时)
  2. 推荐系统
  3. 用户画像

使用技术

  1. 大数据
    1. Hadoop
    2. hive on tez
    3. sqoop
    4. spark
    5. kafka
    6. azkaban
  2. 后端
    1. spring boot
    2. mysql
    3. redis
    4. mongo
    5. openresty

运行环境

  1. Hadoop主机
    1. Hadoop:3.1.2
    2. hive:3.1.2
    3. tez:0.10.1 结合对应的hive版本进行源码编译
    4. sqoop:1.4.7
    5. spark $\ge$2.4.7
  2. Kafka
    1. zookeeper:3.6.1
    2. Kafka:2.13-2.6.0
  3. CDH:6.3.2

数据准备

埋点

处理用户行为数据,采用json数据格式

  1. openresty采集数据发送到kafka
    1. 创建topic

       kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic litemall-action 
      
    2. 配置nginx

      1. 生成工作目录

         mkdir -p {litemall/logs,litemall/conf,litemall/luas}
        
      2. 配置nginx

         worker_processes  1;
         error_log logs/error.log;
         events {
             worker_connections 1024;
         }
         
         user root root; # 定义用户名、用户组,防止权限问题
         
         http {
             # 设置共享字典
             lua_shared_dict kafka_data 10m;
             resolver 192.168.241.20; #使用能解析kafka域名的dns
             server {
                 listen 80;
                 location / {
                     default_type text/html;
                     content_by_lua_block {
                         ngx.say("<p>hello, world</p>")
                     }
                 }
         
         	location /process{
         	   default_type application/json;
         	   content_by_lua_file /root/project/litemall/luas/kafka_test.lua;
         	}
         
             }
         }
        
      3. 发送到kafka

         local cjson=require "cjson"
         local producer = require "resty.kafka.producer"
         
         local broker_list = {
             { host = "192.168.241.180", port = 9092 },
             { host = "192.168.241.181", port = 9092 },
             { host = "192.168.241.182", port = 9092 },
         }
         
         local topic="litemall-action"
         local partitions=3
         
         local shared_data = ngx.shared.kafka_data
         local partitionNum=shared_data:get("count")
         if not partitionNum then
             shared_data:set("count",1)
             partitionNum=shared_data:get("count")
         end
         local key = ""..(partitionNum%partitions)
         shared_data:incr("count",1)
         
         ngx.req.read_body()
         local request_body = ngx.req.get_body_data()
         if request_body then
              local p = producer:new(broker_list)
             local offset, err = p:send(topic, key, request_body)
         end
         
         ngx.say(cjson.encode{code = 200, msg = request_body})
        
  2. 页面埋点
    1. 不同日志的处理:监听生命周期函数
      1. 启动日志:在main.js通过绑定vue对象的mounted事件来处理,保证只有一次请求
      2. 事件日志
        1. main.js通过绑定vue对象的mountedbeforeDestory来启动或清除定时器
        2. 在定时器中发送事件日志,消除频繁请求日志服务器
        3. 采用数组来记录事件,并在发送之后清空
    2. 跨域的处理
      1. 前端通过代理处理(修改vue.config.js配置文件,vue cli 3.0+)

         devServer: {
             port:6255,
             proxy:{
               '/process': {
                 target: process.env.VUE_LOG_BASE_API,
                 ws: true,
                 changeOrigin: true //允许跨域
               },
               '/wx': {
                 target: process.env.VUE_APP_BASE_API,
                 ws: true,
                 changeOrigin: true
               }
             }
           },
        
      2. 后端通过nginx处理

         location /process{
            default_type application/json;
            # 添加响应头,在opthons请求返回的响应中,从而在下个请求中携带
            add_header Access-Control-Allow-Credentials true;
            add_header Access-Control-Allow-Headers *;
              add_header Access-Control-Allow-Origin *;
            content_by_lua_file /root/project/litemall/luas/kafka_test.lua;
         }
        

预处理

将数据导入hdfs

hdfs dfs -mkdir -p {/original_data/litemall/db,/original_data/litemall/log}
  1. 将事件日志导入hdfs

    1. 自定义flume的interceptor将数据拆分为start和event类型的数据,并存储到hdfs中不同的文件夹
    2. 打包之后的jar放入到flume的plugins.d/interceptor/lib文件夹下
      1. plugins.d文件夹用于集成第三方的plugin
    3. flume守护进程问题
      1. 原因:启动配置参数-c设置不正确

      2. 解决:

         FLUME_HOME=/opt/apache-flume-1.9.0-bin
         # 需要配置到flume安装路径下的conf文件夹
         nohup $FLUME_HOME/bin/flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf/litemall_kafka_to_hdfs.conf --name a1 -Dflume.root.logger=INFO,console >$FLUME_HOME/logs/litemall 2>&1 &
        
    4. 处理hive报错:Cannot obtain block length for LocatedBlock
      1. 在跑Azkaban任务的时候出现以上错误,其原因是:文件处于openforwrite的状态,没有关闭
      2. 解决:
        1. 查看文件状态

           hdfs fsck dir -files
          
        2. 查看路径下是否有openforwrite的文件

           hdfs fsck dir -openforwrite
          
        3. 释放租约

           hdfs debug recoverLease -path filepath [-retries retry-num]
          
      3. 分析
        1. 没有关闭flume重启了hdfs
  2. 将业务数据导入hdfs

    1. sqoop将mysql中tinyint的数据在HDFS 上面显示的true、false及在hive中需要使用boolean存储
      1. 原因:jdbc会把tinyint 认为是java.sql.Types.BIT
      2. 解决:jdbc:mysql://localhost/test?tinyInt1isBit=false
    2. sqoop:源数据含有默认换行符导致的数据不正确(出现数据错位)
      1. 解决:import添加参数

         # Drops \n, \r, and \01 from string fields when importing to Hive.
         --hive-drop-import-delims
        
    3. 支付和退款数据
      1. 因该数据都存在于订单表中,所以进行特殊处理
        1. 支付:除了分区数据以外,同时添加pay_time作为过滤条件
        2. 退款:除了分区数据以外,同时添加refund_time作为过滤条件
  3. 自定义hive函数来解析events事件日志

    1. hive-function:中定义UDF和UDTF

      1. 将打包后的jar包上传到hdfs

         hdfs dfs -put hive-function.jar /user/hive/jars
        
    2. 注册全局函数

       # udf
       create function litemall.parse_json_object as 'org.tlh.litemall.udf.ParseJsonObject' using jar 'hdfs:///user/hive/jars/hive-function.jar'	
       # udtf
       create function litemall.extract_event_type as 'org.tlh.litemall.udtf.ExtractEventType' using jar 'hdfs:///user/hive/jars/hive-function.jar'
      
  4. 对于分区表,新增加列,重新覆盖写已有的分区,新增列为null

    1. 原因:由于分区元数据缺少新添加的字段导致的

    2. 解决:往出错的分区元数据中添加这个列

       alter table dwd_fact_cart_info partition(dt = '2020-12-04') add columns(add_time string comment '加购时间');	
      
  5. 加载时间维度数据

     LOAD DATA  INPATH '/date_info.txt' OVERWRITE INTO TABLE dwd_dim_date_info;
    

导出数据

  1. 将数据导出到mysql数据重复
    1. 对于以日期为基础的数据
      1. 如果只有一条,则mysql中日期列设置为主键
      2. 如果是多条,则hive中需要以日期进行分区
  2. 目前通过Sqoop从Hive的parquet抽数到关系型数据库的时候会报kitesdk找不到文件的错,这是Sqoop已知的问题
    1. 将需要导出的表存储方式采用默认的textfile

推荐系统

加载hive中的数据

  1. 启动hiveserver

     hive --service hiveserver2
    
  2. spark配置通过域名通信

     conf.set("dfs.client.use.datanode.hostname", "true")
    
  3. 注意事项

    1. mac环境下读取hive中lzo压缩的数据
      1. 使用homebrew安装lzo、lzop

         brew install lzop lzo
        
      2. 编译安装hadoop-lzo

         git clone [email protected]:twitter/hadoop-lzo.git
         mvn clean package install -DskipTests
        

用户画像

环境准备

  1. 创建画像namespace

     create_namespace 'litemall_profile'	
    

将用户标签导入solr

  1. 开启hbase表的复制功能

    # 禁用表
    disable 'litemall_profile:user_profile'
    # 开启表复制
    alter 'litemall_profile:user_profile',{NAME=>'cf',REPLICATION_SCOPE =>1}
    # 启用表
    enable 'litemall_profile:user_profile'
    
  2. 创建solr的collection

    1. 生成配置

       solrctl instancedir --generate $HOME/hbase_indexer/litemall
      
    2. 定义schema

       cd $HOME/hbase_indexer/litemall/conf
       # 修改managed-schema配置,添加如下内容
       vim managed-schema
       
       <field name="gender" type="string" indexed="true" stored="true"/>
       <field name="age" type="string" indexed="true" stored="true"/>
       <field name="consumptionAbility" type="string" indexed="true" stored="true"/>
       <field name="consumption" type="string" indexed="true" stored="true"/>
       <field name="coupon" type="string" indexed="true" stored="true"/>
       <field name="discountRate" type="string" indexed="true" stored="true"/>
       <field name="singleOrderMax" type="string" indexed="true" stored="true"/>
       <field name="orderFrequency" type="string" indexed="true" stored="true"/>
       <field name="pct" type="string" indexed="true" stored="true"/>
       <field name="refundRate" type="string" indexed="true" stored="true"/>
       <field name="rfm" type="string" indexed="true" stored="true"/>
       <field name="psm" type="string" indexed="true" stored="true"/>
       <field name="brandFavor" type="string" indexed="true" stored="true"/>
      
    3. 创建solr的collection并上传配置

       # 上传配置
       solrctl instancedir --create litemall $HOME/hbase_indexer/litemall
       # 创建collection
       solrctl collection --create litemall -s 3 -r 3 -m 9
      
  3. 创建Lily HBase Indexer 配置

    1. 定义morphline 和 hbase的映射配置

       cd $HOME/hbase_indexer/litemall
       vim morphline-hbase-mapper.xml
       
       <?xml version="1.0"?>
       <!--
                table: hbase中表名
       -->
       <indexer table="litemall_profile:user_profile" mapper="com.ngdata.hbaseindexer.morphline.MorphlineResultToSolrMapper">
          <!-- 设置morphlines文件的路径:绝对路径或相对路径 -->
          <param name="morphlineFile" value="morphlines.conf"/>
       
          <!-- 设置使用的是那个ETL -->
          <param name="morphlineId" value="userProfile"/>
       </indexer>
      
    2. 创建Morphline配置文件(主要用于导入历史数据),定义ETL(hbase-solr的转换关系),同时将userProfile的配置信息通过CDH控制台页面进行添加(Lily HBase Indexer->配置->Morphlines 文件),注册一个新的ETL

       cd $HOME/hbase_indexer/litemall
       vim morphlines.conf
       
       # 内容如下
       SOLR_LOCATOR : {
       	  # Name of solr collection
       	  collection : collection
       	
       	  # ZooKeeper ensemble
       	  zkHost : "$ZK_HOST"
       	}
       	
       	
       	morphlines : [
       	{
       	id : userProfile
       	importCommands : ["org.kitesdk.**", "com.ngdata.**"]
       	
       	commands : [
       	  {
       	    extractHBaseCells {
       	      mappings : [
       	        {
       	          inputColumn : "cf:gender"
       	          outputField : "gender"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:age"
       	          outputField : "age"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:consumptionAbility"
       	          outputField : "consumptionAbility"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:consumption"
       	          outputField : "consumption"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:coupon"
       	          outputField : "coupon"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:discountRate"
       	          outputField : "discountRate"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:singleOrderMax"
       	          outputField : "singleOrderMax"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:orderFrequency"
       	          outputField : "orderFrequency"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:pct"
       	          outputField : "pct"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:refundRate"
       	          outputField : "refundRate"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:rfm"
       	          outputField : "rfm"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:psm"
       	          outputField : "psm"
       	          type : string
       	          source : value
       	        }
       	        {
       	          inputColumn : "cf:brandFavor"
       	          outputField : "brandFavor"
       	          type : string
       	          source : value
       	        }
       	      ]
       	    }
       	  }
       	
       	
       	  { logDebug { format : "output record: {}", args : ["@{}"] } }
       	]
       }
       ]
      
    3. 注册Lily HBase Indexer ConfigurationLily HBase Indexer Service

       hbase-indexer add-indexer \
       --name litemallIndexer \
       --indexer-conf $HOME/hbase_indexer/litemall/morphline-hbase-mapper.xml \
       --connection-param solr.zk=cdh-master:2181,cdh-slave-3:2181,cdh-slave-2:2181/solr \
       --connection-param solr.collection=litemall \
       --zookeeper cdh-master:2181,cdh-slave-3:2181,cdh-slave-2:2181
      
    4. 查看Indexer

       hbase-indexer list-indexers
      
  4. 导入历史数据

     hadoop --config /etc/hadoop/conf \
     jar /opt/cloudera/parcels/CDH/lib/hbase-solr/tools/hbase-indexer-mr-*-job.jar \
     --conf /etc/hbase/conf/hbase-site.xml -Dmapreduce.map.java.opts="-Xmx512m" \
     -Dmapreduce.reduce.java.opts="-Xmx512m" \
     --morphline-file $HOME/hbase_indexer/litemall/morphlines.conf \
     --hbase-indexer-file $HOME/hbase_indexer/litemall/morphline-hbase-mapper.xml \
     --zk-host cdh-master:2181,cdh-slave-3:2181,cdh-slave-2:2181/solr \
     --collection litemall \
     --go-live
    
  5. solr数据备份与恢复(将其他集群中的数据导入)

    1. 备份其他集群的数据

      1. 创建快照

         solrctl collection --create-snapshot litemall_bk -c litemall
        
      2. 创建备份目录

         hdfs dfs -mkdir -p /backup/litemall_16
         # 修改权限
         sudo -u hdfs hdfs dfs -chown solr:solr /backup/litemall_16
        
      3. 准备导出(目的导出元数据合索引数据)

         solrctl collection --prepare-snapshot-export litemall_bk -c litemall -d /backup/litemall_16
        
      4. 导出

         solrctl collection --export-snapshot litemall_bk -s /backup/litemall_16 -d hdfs://cdh-master:8020/backup/bk
        
      5. 复制到本地

         hdfs dfs -copyToLocal /backup/bk litemall
         tar -zcvf litemall.tar.gz litemall/
        
    2. 恢复到当前集群

      1. 创建目录上传到出的数据

         hdfs dfs -mkdir -p /path/to/restore-staging
         hdfs dfs -copyFromLocal litemall /path/to/restore-staging
        
      2. 数据恢复

        solrctl collection --restore litemall -l /path/to/restore-staging/litemall -b litemall_bk -i restore-litemall
        
      3. 查看状态

        solrctl collection --request-status restore-litemall
        

注意事项

  1. CDH中使用lzo压缩,本地读取数据问题(报No LZO codec found, cannot run.错误)
    1. 原因:在hadoop-common包中使用的是SPI来加载解压缩方式,默认配置中并不包含lzo的配置

    2. 解决:添加core-site.xml文件,并添加lzo解压缩配置

       <?xml version="1.0" encoding="UTF-8"?>
       <configuration>
           <property>
               <name>io.compression.codecs</name>
               <value>
                   org.apache.hadoop.io.compress.DefaultCodec,
                   org.apache.hadoop.io.compress.GzipCodec,
                   org.apache.hadoop.io.compress.BZip2Codec,
                   org.apache.hadoop.io.compress.DeflateCodec,
                   org.apache.hadoop.io.compress.SnappyCodec,
                   org.apache.hadoop.io.compress.Lz4Codec,
                   com.hadoop.compression.lzo.LzoCodec,
                   com.hadoop.compression.lzo.LzopCodec
               </value>
           </property>
       </configuration>	 	
      

术语

  1. 支付转换率:所选时间内,支付买家数除以访客数(支付买家数/访客数),即访客转化为支付买家的比例 $$ 支付转换率={支付买家数 \over 访客数} $$
  2. 客单价:每一个顾客平均购买商品的金额,也即是平均交易金额 $$ 客单价={销售额 \over 成交顾客数} $$

致谢

本项目基于或参考以下项目:

  1. litemall:又一个小商场系统,Spring Boot后端 + Vue管理员前端 + 微信小程序用户前端 + Vue用户移动端
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].