赞
踩
近年来,中国的电子商务快速发展,交易额连创新高,电子商务在各领域的应用不断拓展和深化、相关服务业蓬勃发展、支撑体系不断健全完善、创新的动力和能力 不断增强。电子商务正在与实体经济深度融合,进入规模性发展阶段,对经济社会生活的影响不断增大,正成为我国经济发展的新引擎。
中国电子商务研究中心数据显示,截止到 2012 年底,中国电子商务市场交易规模达 7.85万亿人民币,同比增长 30.83%。其中,B2B 电子商务交易额 达 6.25 万亿,同比增长 27%。而 2011 年全年,中国电子商务市场交易额达 6 万亿人民币,同比增长 33%,占 GDP 比重上升到 13%;2012 年,电子商务占 GDP 的比重已经高达 15%。
电商行业技术特点
技术新
技术范围广
分布式
高并发、集群、负载均衡(Nginx)、高可用(备机)
海量数据
业务复杂
系统安全
品优购网上商城是一个综合性的 B2B2C 平台,类似京东商城、天猫商城。网站采用商家入驻的模式,商家入驻平台提交申请,有平台进行资质审核,审核通过后,商家拥有独立的管理后台录入商品信息。商品经过平台审核后即可发布。
品优购网上商城主要分为
主要包括
前台 |
---|
![]() |
是运营商的运营人员的管理后台。 主要包括商家审核、品牌管理、规格管理、模板管理、商品分类管理、商品审核、广告类型管理、广告管理、订单查询、商家结算等。
后台 |
---|
![]() |
入驻的商家进行管理的后台,主要功能是对商品的管理以及订单查询统计、资金结算等功能。
商家管理后台 |
---|
![]() |
本次数仓业务流程主要分为两类,
GMV (Gross Merchandise Volume):主要是指网站的成交金额,而这里的成交金额包括:付款金额和未付款。
千亿级数仓模仿阿里巴巴双十一的大屏显示功能实现的互联网电商指标的离线分析,同时也模仿了阿里巴巴大数据平台上面数据仓库的设计思想和理念。通过这个项目,能够掌握以下三个核心技能:
1、数据仓库的概念和建设过程
2、离线数据仓库的功能、使用场景和常用的技术栈
离线项目架构图 |
---|
![]() |
业务数据量
数据在hdfs中平均每天 40G左右的速度增长,存储3份,每天增长大概120G,存储hive表时
会说过parquet格式+snappy压缩
硬件资源
数量:30台
CPU资源:24核
内存:128G
硬盘:4T
字段名称 | 数据类型 | 字段说明 |
---|---|---|
orderId | bigint(11) | 订单id |
orderNo | varchar(20) | 订单编号 |
userId | bigint(11) | 用户id |
orderStatus | tinyint(4) | 订单状态,-3:用户拒收;-2:未付款的订单;-1:用户取消;0:待发货;1:配送中;2:用户确认收货 |
goodsMoney | decimal(11,2) | 商品金额 |
deliverType | tinyint(4) | 收货方式 |
deliverMoney | decimal(11,2) | 运费 |
totalMoney | decimal(11,2) | 订单金额(包括运费) |
realTotalMoney | decimal(11,2) | 实际订单金额(折扣后金额) |
payType | tinyint(4) | 支付方式,0:未知;1:支付宝,2:微信;3、现金;4、其他 |
payFrom | varchar(20) | 支付来源 |
isPay | tinyint(4) | 是否支付 |
areaId | int(11) | 区域最低一级 |
areaIdPath | varchar(255) | 区域idpath |
userName | varchar(20) | 收件人姓名 |
userAddressId | int(11) | 收件人地址ID |
userAddress | varchar(255) | 收件人地址 |
userPhone | char(20) | 收件人电话 |
orderScore | int(11) | 订单所得积分 |
isInvoice | tinyint(4) | 是否开发票,1:需要;0:不需要 |
invoiceClient | varchar(255) | 发票抬头 |
orderRemarks | varchar(255) | 订单备注 |
orderSrc | tinyint(4) | 订单来源,0:商城;1:微信;2:手机版;3:安卓App4:苹果App;5订餐设备 |
needPay | decimal(11,2) | 需缴费用 |
payRand | int(11) | 货币单位 |
orderType | int(11) | 订单类型 |
isRefund | tinyint(4) | 是否退款 |
isAppraise | tinyint(4) | 是否点评 |
cancelReason | int(11) | 取消原因ID |
rejectReason | int(11) | 用户拒绝原因ID |
rejectOtherReason | varchar(255) | 拒收原因 |
isClosed | tinyint(4) | 是否订单已完结 |
orderunique | varchar(50) | 订单流水号 |
isFromCart | tinyint(1) | 是否来自购物车 0:直接下单 1:购物车 |
receiveTime | varchar(25) | 收货时间 |
deliveryTime | varchar(25) | 发货时间 |
tradeNo | varchar(100) | 在线支付交易流水 |
dataFlag | tinyint(4) | 订单有效标志 |
createTime | varchar(25) | 下单时间 |
settlementId | int(11) | 是否结算,大于0的话则是结算ID |
commissionFee | decimal(11,2) | 订单应收佣金 |
scoreMoney | decimal(11,2) | 积分抵扣金额 |
useScore | int(11) | 花费积分 |
extraJson | text | 额外信息 |
noticeDeliver | tinyint(3) | 提醒发货,0:未提醒;1:已提醒 |
invoiceJson | text | 发票信息 |
lockCashMoney | decimal(11,2) | 锁定提现金额 |
payTime | varchar(25) | 支付时间 |
isBatch | tinyint(4) | 是否拼单 |
totalPayFee | int(11) | 总支付金额 |
modifiedTime | timestamp | 更新时间 |
字段名 | 类型 | 说明 |
---|---|---|
ogId | bigint(11) | 订单明细(商品)id |
orderId | bigint(11) | 订单id |
goodsId | bigint(11) | 商品id |
goodsNum | bigint(11) | 商品数量 |
goodsPrice | decimal(11,2) | 商品价格 |
goodsSpecId | int(11) | 商品规格id |
goodsSpecNames | varchar(500) | 商品规格列表 |
goodsName | varchar(200) | 商品名称 |
goodsImg | varchar(150) | 商品图片 |
extraJson | text | 额外信息 |
goodsType | tinyint(4) | 商品类型 |
commissionRate | decimal(11,2) | 商品佣金比率 |
goodsCode | varchar(20) | 商品编码 |
promotionJson | text | 促销信息 |
createTime | varchar(20) | 创建时间 |
goodsId | bigint(11) | 商品id |
---|---|---|
goodsSn | varchar(20) | 商品编号 |
productNo | varchar(20) | 商品货号 |
goodsName | varchar(200) | 商品名称 |
goodsImg | varchar(150) | 商品图片 |
shopId | bigint(11) | 门店ID |
goodsType | tinyint(4) | 货物类型 |
marketPrice | decimal(11,2) | 市场价 |
shopPrice | decimal(11,2) | 门店价 |
warnStock | bigint(11) | 预警库存 |
goodsStock | bigint(11) | 商品总库存 |
goodsUnit | char(10) | 单位 |
goodsTips | text | 促销信息 |
isSale | tinyint(4) | 是否上架 0:不上架 1:上架 |
isBest | tinyint(4) | 是否精品 0:否 1:是 |
isHot | tinyint(4) | 是否热销产品 0:否 1:是 |
isNew | tinyint(4) | 是否新品 0:否 1:是 |
isRecom | tinyint(4) | 是否推荐 0:否 1:是 |
goodsCatIdPath | varchar(255) | 商品分类ID路径catId1_catId2_catId3 |
goodsCatId | int(11) | 最后一级商品分类ID |
shopCatId1 | int(11) | 门店商品分类第一级ID |
shopCatId2 | int(11) | 门店商品第二级分类ID |
brandId | int(11) | 品牌ID |
goodsDesc | text | 商品描述 |
goodsStatus | tinyint(4) | 商品状态 -1:违规 0:未审核 1:已审核 |
saleNum | int(11) | 总销售量 |
saleTime | varchar(25) | 上架时间 |
visitNum | int(11) | 访问数 |
appraiseNum | int(11) | 评价书 |
isSpec | tinyint(4) | 是否有规格 0:没有 1:有 |
gallery | text | 商品相册 |
goodsSeoKeywords | varchar(200) | 商品SEO关键字 |
illegalRemarks | varchar(255) | 状态说明 一般用于说明拒绝原因 |
dataFlag | tinyint(4) | 删除标志 -1:删除 1:有效 |
createTime | varchar(25) | 创建时间 |
isFreeShipping | tinyint(4) | |
goodsSerachKeywords | text | 商品搜索关键字 |
字段名 | 字段类型 | 字段说明 |
---|---|---|
shopId | int(11) | 商铺ID,自增 |
shopSn | varchar(20) | |
userId | int(11) | 商铺联系人ID, |
areaIdPath | varchar(255) | |
areaId | int(11) | |
isSelf | tinyint(4) | |
shopName | varchar(100) | 商铺名称, |
shopkeeper | varchar(50) | |
telephone | varchar(20) | 联系人电话, |
shopCompany | varchar(255) | 商家实体名称, |
shopImg | varchar(150) | logo图片, |
shopTel | varchar(40) | 商家联系电话, |
shopQQ | varchar(50) | 联系人QQ, |
shopWangWang | varchar(50) | |
shopAddress | varchar(255) | 商家地址, |
bankId | int(11) | |
bankNo | varchar(20) | |
bankUserName | varchar(50) | |
isInvoice | tinyint(4) | |
invoiceRemarks | varchar(255) | |
serviceStartTime | bigint(20) | 服务开始时间, |
serviceEndTime | bigint(20) | 服务结束时间, |
freight | int(11) | |
shopAtive | tinyint(4) | |
shopStatus | tinyint(4) | 商铺状态, |
statusDesc | varchar(255) | |
dataFlag | tinyint(4) | |
createTime | date | |
shopMoney | decimal(11,2) | |
lockMoney | decimal(11,2) | |
noSettledOrderNum | int(11) | |
noSettledOrderFee | decimal(11,2) | |
paymentMoney | decimal(11,2) | |
bankAreaId | int(11) | |
bankAreaIdPath | varchar(100) | |
applyStatus | tinyint(4) | |
applyDesc | varchar(255) | |
applyTime | datetime | |
applyStep | tinyint(4) | |
shopNotice | varchar(300) | 店铺公告, |
rechargeMoney | decimal(11,2) | 充值金额, |
longitude | decimal(10,7) | |
latitude | decimal(10,7) | |
mapLevel | int(11) | |
BDcode | varchar(16) | 公司管理人员code, |
字段名 | 字段说明 |
---|---|
catId | 品类ID |
parentId | 父ID |
catName | 分类名称 |
isShow | 是否显示 |
isFloor | 是否显示楼层 |
catSort | 排序号 |
dataFlag | 删除标志 |
createTime | 建立时间 |
commissionRate | 商品佣金比例 |
catImg | |
subTitle | 楼层副标题 |
simpleName | 简写名称 |
seoTitle | 分类SEO标题 |
seoKeywords | 分类SEO关键字 |
seoDes | 分类SEO描述 |
catListTheme | 商品分类列表风格 |
detailTheme | 商品详情风格 |
mobileCatListTheme | 移动端商品分类列表风格 |
mobileDetailTheme | 移动端商品详情风格 |
wechatCatListTheme | 微信端商品分类列表风格 |
wechatDetailTheme | 微信端商品详情风格 |
cat_level | 分类级别,共3级 |
字段名 | 字段说明 |
---|---|
orgId | 组织ID |
parentId | 父ID |
orgName | 组织名称 |
orgLevel | 组织级别1;总部及大区级部门;2:总部下属的各个部门及基部门;3:具体工作部门 |
managerCode | 主管工号 |
isdelete | 删除标志,1:删除;0:有效 |
createTime | 创建时间 |
updateTime | 最后修改时间 |
isShow | 是否显示,0:是;1:否 |
orgType | 组织类型,0:总裁办;1:研发;2:销售;3:运营;4:产品 |
id | int(11) | 自增ID |
---|---|---|
orderId | int(11) | 订单id |
goodsId | int(11) | 商品id |
refundTo | int(11) | 接收退款用户 |
refundReson | int(11) | 用户申请退款原因ID |
refundOtherReson | varchar(255) | 用户申请退款原因 |
backMoney | decimal(11,2) | 退款金额 |
refundTradeNo | varchar(100) | 退款流水号 |
refundRemark | varchar(500) | 退款备注 |
refundTime | varchar(25) | 退款时间 |
shopRejectReason | varchar(255) | 店铺不同意退款原因 |
refundStatus | tinyint(4) | 退款状态 |
createTime | varchar(25) | 用户申请退款时间 |
userId | int(11) | 用户id |
---|---|---|
loginName | varchar(20) | 登录名 |
loginSecret | int(11) | 登录凭证 |
loginPwd | varchar(50) | 登录密码 |
userType | tinyint(4) | 用户类型 |
userSex | tinyint(4) | 用户性别 |
userName | varchar(100) | 用户名 |
trueName | varchar(100) | 真实姓名 |
brithday | date | 生日 |
userPhoto | varchar(200) | 用户头像 |
userQQ | varchar(20) | 用户QQ |
userPhone | char(11) | 用户手机号 |
userEmail | varchar(50) | 邮箱 |
userScore | int(11) | 积分 |
userTotalScore | int(11) | 总积分 |
lastIP | varchar(16) | 最后一次登录IP |
lastTime | datetime | 最后一次登录时间 |
userFrom | tinyint(4) | 注册渠道 |
userMoney | decimal(11,2) | 用户余额 |
lockMoney | decimal(11,2) | 锁定余额 |
userStatus | tinyint(4) | 用户状态 |
dataFlag | tinyint(4) | 数据状态 |
createTime | datetime | 创建时间 |
payPwd | varchar(100) | 支付密码 |
rechargeMoney | decimal(11,2) | 重置金额 |
isInform | tinyint(4) | 是否接收通知 |
addressId | int(11) | 地址id |
---|---|---|
userId | int(11) | 用户id |
userName | varchar(50) | 用户名 |
otherName | varchar(50) | 地址类型 |
userPhone | varchar(20) | 用户联系方式 |
areaIdPath | varchar(255) | 地址id路径 |
areaId | int(11) | 区域ID |
userAddress | varchar(255) | 用户地址 |
isDefault | tinyint(4) | 是否默认地址 |
dataFlag | tinyint(4) | 数据状态 |
createTime | datetime | 创建时间 |
id | int(11) | 唯一id |
---|---|---|
payCode | varchar(20) | 支付类型码 |
payName | varchar(255) | 支付类型名称 |
payDesc | text | 描述 |
payOrder | int(11) | 显示顺序 |
payConfig | text | 配置 |
enabled | tinyint(4) | 是否启用 |
isOnline | tinyint(4) | 是否在线 |
payFor | varchar(100) |
将资料\mysql建表语句\10tables.sql文件上传到linux,登录mysql使用source命令执行该sql文件创建数据库和表
mysql -uroot -p
source /root/sql/10tables.sql;
分库存放
命名规则
ods层表与原始数据库表名称相同
dw层表
fact_前缀表示事实表
dim_前缀表示维度表
创建分层数据库:
create database itcast_ods;
create database itcast_dw;
create database itcast_ads;
资料\hiveods层建表语句\ods_create_table.sql
-- 创建ods层订单表 drop table if exists `itcast_ods`.`itcast_orders`; create EXTERNAL table `itcast_ods`.`itcast_orders`( orderId bigint, orderNo string, shopId bigint, userId bigint, orderStatus bigint, goodsMoney double, deliverType bigint, deliverMoney double, totalMoney double, realTotalMoney double, payType bigint, isPay bigint, areaId bigint, userAddressId bigint, areaIdPath string, userName string, userAddress string, userPhone string, orderScore bigint, isInvoice bigint, invoiceClient string, orderRemarks string, orderSrc bigint, needPay double, payRand bigint, orderType bigint, isRefund bigint, isAppraise bigint, cancelReason bigint, rejectReason bigint, rejectOtherReason string, isClosed bigint, goodsSearchKeys string, orderunique string, receiveTime string, deliveryTime string, tradeNo string, dataFlag bigint, createTime string, settlementId bigint, commissionFee double, scoreMoney double, useScore bigint, orderCode string, extraJson string, orderCodeTargetId bigint, noticeDeliver bigint, invoiceJson string, lockCashMoney double, payTime string, isBatch bigint, totalPayFee bigint, modifiedTime string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- 创建ods层订单明细表 drop table if exists `itcast_ods`.`itcast_order_goods`; create EXTERNAL table `itcast_ods`.`itcast_order_goods`( ogId bigint, orderId bigint, goodsId bigint, goodsNum bigint, goodsPrice double, payPrice double, goodsSpecId bigint, goodsSpecNames string, goodsName string, goodsImg string, extraJson string, goodsType bigint, commissionRate double, goodsCode string, promotionJson string, createtime string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- 创建ods层店铺表 drop table if exists `itcast_ods`.`itcast_shops`; create EXTERNAL table `itcast_ods`.`itcast_shops`( shopId bigint, shopSn string, userId bigint, areaIdPath string, areaId bigint, isSelf bigint, shopName string, shopkeeper string, telephone string, shopCompany string, shopImg string, shopTel string, shopQQ string, shopWangWang string, shopAddress string, bankId bigint, bankNo string, bankUserName string, isInvoice bigint, invoiceRemarks string, serviceStartTime bigint, serviceEndTime bigint, freight bigint, shopAtive bigint, shopStatus bigint, statusDesc string, dataFlag bigint, createTime string, shopMoney double, lockMoney double, noSettledOrderNum bigint, noSettledOrderFee double, paymentMoney double, bankAreaId bigint, bankAreaIdPath string, applyStatus bigint, applyDesc string, applyTime string, applyStep bigint, shopNotice string, rechargeMoney double, longitude double, latitude double, mapLevel bigint, BDcode string, modifyTime string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- 创建ods层商品表 drop table if exists `itcast_ods`.`itcast_goods`; create EXTERNAL table `itcast_ods`.`itcast_goods`( goodsId bigint, goodsSn string, productNo string, goodsName string, goodsImg string, shopId bigint, goodsType bigint, marketPrice double, shopPrice double, warnStock bigint, goodsStock bigint, goodsUnit string, goodsTips string, isSale bigint, isBest bigint, isHot bigint, isNew bigint, isRecom bigint, goodsCatIdPath string, goodsCatId bigint, shopCatId1 bigint, shopCatId2 bigint, brandId bigint, goodsDesc string, goodsStatus bigint, saleNum bigint, saleTime string, visitNum bigint, appraiseNum bigint, isSpec bigint, gallery string, goodsSeoKeywords string, illegalRemarks string, dataFlag bigint, createTime string, isFreeShipping bigint, goodsSerachKeywords string, modifyTime string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- 创建ods层组织机构表 drop table `itcast_ods`.`itcast_org`; create EXTERNAL table `itcast_ods`.`itcast_org`( orgId bigint, parentId bigint, orgName string, orgLevel bigint, managerCode string, isdelete bigint, createTime string, updateTime string, isShow bigint, orgType bigint ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- 创建ods层商品分类表 drop table if exists `itcast_ods`.`itcast_goods_cats`; create EXTERNAL table `itcast_ods`.`itcast_goods_cats`( catId bigint, parentId bigint, catName string, isShow bigint, isFloor bigint, catSort bigint, dataFlag bigint, createTime string, commissionRate double, catImg string, subTitle string, simpleName string, seoTitle string, seoKeywords string, seoDes string, catListTheme string, detailTheme string, mobileCatListTheme string, mobileDetailTheme string, wechatCatListTheme string, wechatDetailTheme string, cat_level bigint ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- 创建ods层用户表 drop table if exists `itcast_ods`.`itcast_users`; create EXTERNAL table `itcast_ods`.`itcast_users`( userId bigint, loginName string, loginSecret bigint, loginPwd string, userType bigint, userSex bigint, userName string, trueName string, brithday string, userPhoto string, userQQ string, userPhone string, userEmail string, userScore bigint, userTotalScore bigint, lastIP string, lastTime string, userFrom bigint, userMoney double, lockMoney double, userStatus bigint, dataFlag bigint, createTime string, payPwd string, rechargeMoney double, isInform bigint ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- 创建ods层退货表 drop table if exists `itcast_ods`.`itcast_order_refunds`; create EXTERNAL table `itcast_ods`.`itcast_order_refunds`( id bigint, orderId bigint, goodsId bigint, refundTo bigint, refundReson bigint, refundOtherReson string, backMoney double, refundTradeNo string, refundRemark string, refundTime string, shopRejectReason string, refundStatus bigint, createTime string, modifiedTime string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- 创建ods层地址表 drop table if exists `itcast_ods`.`itcast_user_address`; create EXTERNAL table `itcast_ods`.`itcast_user_address`( addressId bigint, userId bigint, userName string, otherName string, userPhone string, areaIdPath string, areaId bigint, userAddress string, isDefault bigint, dataFlag bigint, createTime string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- 创建ods层支付方式表 drop table if exists `itcast_ods`.`itcast_payments`; create EXTERNAL table `itcast_ods`.`itcast_payments`( id bigint, payCode string, payName string, payDesc string, payOrder bigint, payConfig string, enabled bigint, isOnline bigint, payFor string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
全量采集配置图 |
---|
![]() |
配置转换命名参数 |
---|
![]() |
msck repair table itcast_ods.itcast_orders;
msck repair table itcast_ods.itcast_goods;
msck repair table itcast_ods.itcast_order_goods;
msck repair table itcast_ods.itcast_shops;
msck repair table itcast_ods.itcast_goods_cats;
msck repair table itcast_ods.itcast_org;
msck repair table itcast_ods.itcast_order_refunds;
msck repair table itcast_ods.itcast_users;
msck repair table itcast_ods.itcast_user_address;
msck repair table itcast_ods.itcast_payments;
修复分区 |
---|
![]() |
SELECT
*
FROM itcast_orders
WHERE DATE_FORMAT(createtime, '%Y%m%d') <= '${dt}';
组件图 |
---|
![]() |
字段选择 |
---|
![]() |
文件位置 |
![]() |
![]() |
parquet output配置 |
![]() |
测试数据是否都正确被加载。
select * from itcast_ods.itcast_orders limit 2;
select * from itcast_ods.itcast_goods limit 2;
select * from itcast_ods.itcast_order_goods limit 2;
select * from itcast_ods.itcast_shops limit 2;
select * from itcast_ods.itcast_goods_cats limit 2;
select * from itcast_ods.itcast_org limit 2;
select * from itcast_ods.itcast_order_refunds limit 2;
select * from itcast_ods.itcast_users limit 2;
select * from itcast_ods.itcast_user_address limit 2;
select * from itcast_ods.itcast_payments limit 2;
注意:
-- dw层建表 DROP TABLE IF EXISTS `itcast_dw`.`dim_goods`; CREATE TABLE `itcast_dw`.`dim_goods`( goodsId bigint, goodsSn string, productNo string, goodsName string, goodsImg string, shopId bigint, goodsType bigint, marketPrice double, shopPrice double, warnStock bigint, goodsStock bigint, goodsUnit string, goodsTips string, isSale bigint, isBest bigint, isHot bigint, isNew bigint, isRecom bigint, goodsCatIdPath string, goodsCatId bigint, shopCatId1 bigint, shopCatId2 bigint, brandId bigint, goodsDesc string, goodsStatus bigint, saleNum bigint, saleTime string, visitNum bigint, appraiseNum bigint, isSpec bigint, gallery string, goodsSeoKeywords string, illegalRemarks string, dataFlag bigint, createTime string, isFreeShipping bigint, goodsSerachKeywords string, modifyTime string, dw_start_date string, dw_end_date string ) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
1 第一次全量导入
2 增量导入(某天,举例:2019-09-10)
操作步骤:
使用Kettle将20190909以前的数据抽取到ods
SELECT *
FROM itcast_ods.itcast_goods
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') <= '20190909';
使用spark sql将全量数据导入到dw层维度表
set spark.sql.shuffle.partitions=1; --shuffle时的分区数,默认是200个 -- 使用spark sql将全量数据导入到dw层维度表 insert overwrite table `itcast_dw`.`dim_goods` select goodsId, goodsSn, productNo, goodsName, goodsImg, shopId, goodsType, marketPrice, shopPrice, warnStock, goodsStock, goodsUnit, goodsTips, isSale, isBest, isHot, isNew, isRecom, goodsCatIdPath, goodsCatId, shopCatId1, shopCatId2, brandId, goodsDesc, goodsStatus, saleNum, saleTime, visitNum, appraiseNum, isSpec, gallery, goodsSeoKeywords, illegalRemarks, dataFlag, createTime, isFreeShipping, goodsSerachKeywords, modifyTime, case when modifyTime is not null then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd') else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') end as dw_start_date, '9999-12-31' as dw_end_date from `itcast_ods`.`itcast_goods` t where dt='20190909';
操作步骤:
使用Kettle将20190910创建的 或者修改的数据抽取到ods
SELECT *
FROM itcast_goods
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') = '${dt}';
编写spark-sql更新历史数据
-- 更新历史数据 select dw.goodsId, dw.goodsSn, dw.productNo, dw.goodsName, dw.goodsImg, dw.shopId, dw.goodsType, dw.marketPrice, dw.shopPrice, dw.warnStock, dw.goodsStock, dw.goodsUnit, dw.goodsTips, dw.isSale, dw.isBest, dw.isHot, dw.isNew, dw.isRecom, dw.goodsCatIdPath, dw.goodsCatId, dw.shopCatId1, dw.shopCatId2, dw.brandId, dw.goodsDesc, dw.goodsStatus, dw.saleNum, dw.saleTime, dw.visitNum, dw.appraiseNum, dw.isSpec, dw.gallery, dw.goodsSeoKeywords, dw.illegalRemarks, dw.dataFlag, dw.createTime, dw.isFreeShipping, dw.goodsSerachKeywords, dw.modifyTime, dw.dw_start_date, case when dw.dw_end_date = '9999-12-31' and ods.goodsId is not null then '2019-09-09' else dw.dw_end_date end as dw_end_date from `itcast_dw`.`dim_goods` dw left join (select * from `itcast_ods`.`itcast_goods` where dt='20190910') ods on dw.goodsId = ods.goodsId ;
编写spark-sql获取当日数据
-- 今日数据 select goodsId, goodsSn, productNo, goodsName, goodsImg, shopId, goodsType, marketPrice, shopPrice, warnStock, goodsStock, goodsUnit, goodsTips, isSale, isBest, isHot, isNew, isRecom, goodsCatIdPath, goodsCatId, shopCatId1, shopCatId2, brandId, goodsDesc, goodsStatus, saleNum, saleTime, visitNum, appraiseNum, isSpec, gallery, goodsSeoKeywords, illegalRemarks, dataFlag, createTime, isFreeShipping, goodsSerachKeywords, modifyTime, case when modifyTime is not null then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd') else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') end as dw_start_date, '9999-12-31' as dw_end_date from `itcast_ods`.`itcast_goods` where dt = '20190910';
将历史数据 当日数据合并加载到临时表
-- 将历史数据 当日数据合并加载到临时表 drop table if exists `itcast_dw`.`tmp_dim_goods_history`; create table `itcast_dw`.`tmp_dim_goods_history` as select dw.goodsId, dw.goodsSn, dw.productNo, dw.goodsName, dw.goodsImg, dw.shopId, dw.goodsType, dw.marketPrice, dw.shopPrice, dw.warnStock, dw.goodsStock, dw.goodsUnit, dw.goodsTips, dw.isSale, dw.isBest, dw.isHot, dw.isNew, dw.isRecom, dw.goodsCatIdPath, dw.goodsCatId, dw.shopCatId1, dw.shopCatId2, dw.brandId, dw.goodsDesc, dw.goodsStatus, dw.saleNum, dw.saleTime, dw.visitNum, dw.appraiseNum, dw.isSpec, dw.gallery, dw.goodsSeoKeywords, dw.illegalRemarks, dw.dataFlag, dw.createTime, dw.isFreeShipping, dw.goodsSerachKeywords, dw.modifyTime, dw.dw_start_date, case when dw.dw_end_date >= '9999-12-31' and ods.goodsId is not null then '2019-09-09' else dw.dw_end_date end as dw_end_date from `itcast_dw`.`dim_goods` dw left join (select * from `itcast_ods`.`itcast_goods` where dt='20190910') ods on dw.goodsId = ods.goodsId union all select goodsId, goodsSn, productNo, goodsName, goodsImg, shopId, goodsType, marketPrice, shopPrice, warnStock, goodsStock, goodsUnit, goodsTips, isSale, isBest, isHot, isNew, isRecom, goodsCatIdPath, goodsCatId, shopCatId1, shopCatId2, brandId, goodsDesc, goodsStatus, saleNum, saleTime, visitNum, appraiseNum, isSpec, gallery, goodsSeoKeywords, illegalRemarks, dataFlag, createTime, isFreeShipping, goodsSerachKeywords, modifyTime, case when modifyTime is not null then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd') else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') end as dw_start_date, '9999-12-31' as dw_end_date from `itcast_ods`.`itcast_goods` where dt = '20190910';
将历史数据 当日数据导入到历史拉链表
-- 将历史数据 当日数据导入到历史拉链表
insert overwrite table `itcast_dw`.`dim_goods`
select * from `itcast_dw`.`tmp_dim_goods_history`;
-- 获取2019-09-09日的商品数据
select * from `itcast_dw`.`dim_goods` where dw_start_date <= '2019-09-09' and dw_end_date >= '2019-09-09' limit 10;
--查看对应商品id的历史拉链数据
select * from `itcast_dw`.`dim_goods` where goodsId = 100134;
因为订单表和订单退款表都有状态变化的特点,所以他们作为周期性事实表在进行同步操作也就是采集数据到数仓中时需要我们能记录下订单的状态变化。因次依然使用拉链表来解决这类周期性事实表的同步需求。订单明细表并不会随着时间而变化,所以不需要使用拉链表进行同步。
订单表、订单退款表 拉链表具体实现步骤:
-- 创建dw层订单事实表--带有分区字段 DROP TABLE IF EXISTS itcast_dw.fact_orders; create table itcast_dw.fact_orders( orderId bigint, orderNo string, shopId bigint, userId bigint, orderStatus bigint, goodsMoney double, deliverType bigint, deliverMoney double, totalMoney double, realTotalMoney double, payType bigint, isPay bigint, areaId bigint, userAddressId bigint, areaIdPath string, userName string, userAddress string, userPhone string, orderScore bigint, isInvoice bigint, invoiceClient string, orderRemarks string, orderSrc bigint, needPay double, payRand bigint, orderType bigint, isRefund bigint, isAppraise bigint, cancelReason bigint, rejectReason bigint, rejectOtherReason string, isClosed bigint, goodsSearchKeys string, orderunique string, receiveTime string, deliveryTime string, tradeNo string, dataFlag bigint, createTime string, settlementId bigint, commissionFee double, scoreMoney double, useScore bigint, orderCode string, extraJson string, orderCodeTargetId bigint, noticeDeliver bigint, invoiceJson string, lockCashMoney double, payTime string, isBatch bigint, totalPayFee bigint, modifiedTime string, dw_start_date string, dw_end_date string ) partitioned by (dt string) --按照天分区 STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); --临时订单表 DROP TABLE IF EXISTS itcast_dw.tmp_fact_orders; create table itcast_dw.tmp_fact_orders( orderId bigint, orderNo string, shopId bigint, userId bigint, orderStatus bigint, goodsMoney double, deliverType bigint, deliverMoney double, totalMoney double, realTotalMoney double, payType bigint, isPay bigint, areaId bigint, userAddressId bigint, areaIdPath string, userName string, userAddress string, userPhone string, orderScore bigint, isInvoice bigint, invoiceClient string, orderRemarks string, orderSrc bigint, needPay double, payRand bigint, orderType bigint, isRefund bigint, isAppraise bigint, cancelReason bigint, rejectReason bigint, rejectOtherReason string, isClosed bigint, goodsSearchKeys string, orderunique string, receiveTime string, deliveryTime string, tradeNo string, dataFlag bigint, createTime string, settlementId bigint, commissionFee double, scoreMoney double, useScore bigint, orderCode string, extraJson string, orderCodeTargetId bigint, noticeDeliver bigint, invoiceJson string, lockCashMoney double, payTime string, isBatch bigint, totalPayFee bigint, modifiedTime string, dw_start_date string, dw_end_date string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); --创建订单退款表--带有分区字段 drop table if exists `itcast_dw`.`fact_order_refunds`; create table `itcast_dw`.`fact_order_refunds`( id bigint, orderId bigint, goodsId bigint, refundTo bigint, refundReson bigint, refundOtherReson string, backMoney double, refundTradeNo string, refundRemark string, refundTime string, shopRejectReason string, refundStatus bigint, createTime string, modifiedTime string, dw_start_date string, dw_end_date string ) partitioned by (dt string) --按照天分区 STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); --临时表 drop table if exists `itcast_dw`.`tmp_fact_order_refunds`; create table `itcast_dw`.`tmp_fact_order_refunds`( id bigint, orderId bigint, goodsId bigint, refundTo bigint, refundReson bigint, refundOtherReson string, backMoney double, refundTradeNo string, refundRemark string, refundTime string, shopRejectReason string, refundStatus bigint, createTime string, modifiedTime string, dw_start_date string, dw_end_date string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
我们开启hive的动态分区,并根据数据的createtime字段进行分区划分,同一天创建的订单放在同一分区!!
#开启动态分区,默认是false
#开启允许所有分区都是动态的,否则必须要有静态分区才能使用
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
订单表数据:ods层导入dw层
insert overwrite table itcast_dw.fact_orders select orderId , orderNo , shopId , userId , orderStatus , goodsMoney , deliverType , deliverMoney , totalMoney , realTotalMoney , payType , isPay , areaId , userAddressId , areaIdPath , userName , userAddress , userPhone , orderScore , isInvoice , invoiceClient , orderRemarks , orderSrc , needPay , payRand , orderType , isRefund , isAppraise , cancelReason , rejectReason , rejectOtherReason , isClosed , goodsSearchKeys , orderunique , receiveTime , deliveryTime , tradeNo , dataFlag , createTime , settlementId , commissionFee , scoreMoney , useScore , orderCode , extraJson , orderCodeTargetId , noticeDeliver , invoiceJson , lockCashMoney , payTime , isBatch , totalPayFee , modifiedTime , --增加开始时间 date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date, --增加结束时间 '9999-12-31' as dw_end_date, --指定动态分区使用的字段,动态分区的用法:就是查询字段的最后一个字段hive表进行解析然后存入指定分区 --此次数据分区按照订单的创建时间 date_format(createtime,'yyyyMMdd') from itcast_ods.itcast_orders where dt="20190909";
订单退款表:ods层导入dw层
insert overwrite table itcast_dw.fact_order_refunds select id, orderId, goodsId, refundTo, refundReson, refundOtherReson, backMoney, refundTradeNo, refundRemark, refundTime, shopRejectReason, refundStatus, createTime, modifiedTime, date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date, '9999-12-31' as dw_end_date, --此次数据分区按照订单退款的创建时间 date_format(createTime,'yyyyMMdd') from itcast_ods.itcast_order_refunds where dt="20190909";
抽取20190910这一天的数据,查询条件为modifiedTime等于20190910这天的订单数据和订单退款数据!! |
---|
![]() |
表输入组件sql语句:
SELECT *
FROM itcast_orders
WHERE DATE_FORMAT(modifiedTime, '%Y%m%d') = '${dt}';
字段选择组件 |
---|
![]() |
parquet output组件 |
![]() |
insert overwrite table itcast_dw.tmp_fact_orders select dw.orderId , dw.orderNo , dw.shopId , dw.userId , dw.orderStatus , dw.goodsMoney , dw.deliverType , dw.deliverMoney , dw.totalMoney , dw.realTotalMoney , dw.payType , dw.isPay , dw.areaId , dw.userAddressId , dw.areaIdPath , dw.userName , dw.userAddress , dw.userPhone , dw.orderScore , dw.isInvoice , dw.invoiceClient , dw.orderRemarks , dw.orderSrc , dw.needPay , dw.payRand , dw.orderType , dw.isRefund , dw.isAppraise , dw.cancelReason , dw.rejectReason , dw.rejectOtherReason , dw.isClosed , dw.goodsSearchKeys , dw.orderunique , dw.receiveTime , dw.deliveryTime , dw.tradeNo , dw.dataFlag , dw.createTime , dw.settlementId , dw.commissionFee , dw.scoreMoney , dw.useScore , dw.orderCode , dw.extraJson , dw.orderCodeTargetId , dw.noticeDeliver , dw.invoiceJson , dw.lockCashMoney , dw.payTime , dw.isBatch , dw.totalPayFee , dw.modifiedTime , dw.dw_start_date, --修改end_date case when ods.orderid is not null and dw.dw_end_date ='9999-12-31' then '2019-09-09' else dw.dw_end_date end as dw_end_date, --动态分区需要的字段 dw.dt from itcast_dw.fact_orders dw left join (select * from itcast_ods.itcast_orders where dt ='20190910') ods on dw.orderid=ods.orderid union all --今天新增数据的插入动作 select orderId , orderNo , shopId , userId , orderStatus , goodsMoney , deliverType , deliverMoney , totalMoney , realTotalMoney , payType , isPay , areaId , userAddressId , areaIdPath , userName , userAddress , userPhone , orderScore , isInvoice , invoiceClient , orderRemarks , orderSrc , needPay , payRand , orderType , isRefund , isAppraise , cancelReason , rejectReason , rejectOtherReason , isClosed , goodsSearchKeys , orderunique , receiveTime , deliveryTime , tradeNo , dataFlag , createTime , settlementId , commissionFee , scoreMoney , useScore , orderCode , extraJson , orderCodeTargetId , noticeDeliver , invoiceJson , lockCashMoney , payTime , isBatch , totalPayFee , modifiedTime , --增加开始时间 date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date, --增加结束时间 '9999-12-31' as dw_end_date, --指定动态分区使用的字段,动态分区的用法:就是查询字段的最后一个字段hive表进行解析然后存入指定分区 --此次数据分区按照订单的创建时间 date_format(createtime,'yyyyMMdd') from itcast_ods.itcast_orders where dt="20190910"; --从临时表再插入itcast_dw.fact_orders insert overwrite table itcast_dw.fact_orders select * from itcast_dw.tmp_fact_orders; --验证数据查询拉链表数据 select * from itcast_dw.fact_orders limit 5; --订单退款表增量数据与历史数据合并覆盖插入dw层临时拉链表中 insert overwrite table itcast_dw.tmp_fact_order_refunds select dw.id, dw.orderId, dw.goodsId, dw.refundTo, dw.refundReson, dw.refundOtherReson, dw.backMoney, dw.refundTradeNo, dw.refundRemark, dw.refundTime, dw.shopRejectReason, dw.refundStatus, dw.createTime, dw.modifiedTime, dw.dw_start_date, case when ods.id is not null and dw.dw_end_date ='9999-12-31' then '2019-09-09' else dw.dw_end_date end as dw_end_date, dw.dt from itcast_dw.fact_order_refunds dw left join (select * from itcast_ods.itcast_order_refunds where dt="20190910") ods on dw.id =ods.id union all select id, orderId, goodsId, refundTo, refundReson, refundOtherReson, backMoney, refundTradeNo, refundRemark, refundTime, shopRejectReason, refundStatus, createTime, modifiedTime, date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date, '9999-12-31' as dw_end_date, date_format(createTime,'yyyyMMdd') from itcast_ods.itcast_order_refunds where dt="20190910"; --合并数据插入临时表 insert overwrite table itcast_dw.fact_order_refunds select * from itcast_dw.tmp_fact_order_refunds; --验证数据 select * from itcast_dw.fact_order_refunds limit 5;
--合并11号数据 insert overwrite table itcast_dw.tmp_fact_orders select dw.orderId , dw.orderNo , dw.shopId , dw.userId , dw.orderStatus , dw.goodsMoney , dw.deliverType , dw.deliverMoney , dw.totalMoney , dw.realTotalMoney , dw.payType , dw.isPay , dw.areaId , dw.userAddressId , dw.areaIdPath , dw.userName , dw.userAddress , dw.userPhone , dw.orderScore , dw.isInvoice , dw.invoiceClient , dw.orderRemarks , dw.orderSrc , dw.needPay , dw.payRand , dw.orderType , dw.isRefund , dw.isAppraise , dw.cancelReason , dw.rejectReason , dw.rejectOtherReason , dw.isClosed , dw.goodsSearchKeys , dw.orderunique , dw.receiveTime , dw.deliveryTime , dw.tradeNo , dw.dataFlag , dw.createTime , dw.settlementId , dw.commissionFee , dw.scoreMoney , dw.useScore , dw.orderCode , dw.extraJson , dw.orderCodeTargetId , dw.noticeDeliver , dw.invoiceJson , dw.lockCashMoney , dw.payTime , dw.isBatch , dw.totalPayFee , dw.modifiedTime , dw.dw_start_date, --修改end_date case when ods.orderid is not null and dw.dw_end_date ='9999-12-31' then '2019-09-10' else dw.dw_end_date end
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。