集群分发脚本
#!/bin/bash
#使用方法:
#第一个参数输入文件路径
#如果只同步到一个服务器上就只要在第二个参数上填写服务器编号就行
#如果是要同步一个范围的服务器,那么写上开始节点和结束节点就行
num=$#
if [ $num -lt 2 ]
then
echo '参数小于二,参数至少要两个'
else
path=$1
if [ -e $path ]
then
fname=`basename $path`
dname=`cd -P $(dirname $path);pwd`
me=`whoami`
if [ $num -gt 2 ]
then
startNode=$2
endNode=$3
for((i=startNode;i < endNode+1;i++))
do
echo "======同步hadoop$i======"
rsync -avl $dname/$fname $me@hadoop$i:$dname/
done
else
echo "======同步hadoop$2======"
rsync -avl $dname/$fname $me@hadoop$2:$dname/
fi
else
echo "没有$dname/$fname这个文件或目录或等等"
fi
fi
集群查看脚本
xcall
for i in hadoop101 hadoop102 hadoop103
do
echo =============$i=============
ssh $i "$*"
done
集群启动脚本
cluster.sh
#!/bin/bash
case $1 in
"start"){
echo "==============集群统一启动中============"
echo "=======启动zookeeper======="
zk start
echo "=====启动hadoop===="
hdp start
echo "====启动Kafka======="
kfk start
echo "======启动采集Flume====="
f1 start
echo "========启动消费Flume========"
f2 start
};;
"stop"){
echo "=========停止集群========="
echo "=======停止消费Flume======"
f2 stop
echo "=======停止采集Flume======"
f1 stop
echo "=======停止Kafka======"
kfk stop
echo "=======停止Hadoop======"
hdp stop
echo "=======停止Zookeeper======"
zk stop
};;
esac
zk
#!/bin/bash
case $1 in
"start")
for i in hadoop101 hadoop102 hadoop103
do
echo "===========$i=========="
ssh $i "zkServer.sh start"
done
;;
"stop")
for i in hadoop101 hadoop102 hadoop103
do
echo "===========$i=========="
ssh $i "zkServer.sh stop"
done
;;
"status")
for i in hadoop101 hadoop102 hadoop103
do
echo "===========$i=========="
ssh $i "zkServer.sh status"
done
;;
esac
hdp
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo " =================== 启动 hadoop集群 ==================="
echo " --------------- 启动 hdfs ---------------"
ssh hadoop101 "start-dfs.sh"
echo " --------------- 启动 yarn ---------------"
ssh hadoop102 "start-yarn.sh"
echo " --------------- 启动 historyserver ---------------"
ssh hadoop101 "mapred --daemon start historyserver"
;;
"stop")
echo " =================== 关闭 hadoop集群 ==================="
echo " --------------- 关闭 historyserver ---------------"
ssh hadoop101 "mapred --daemon stop historyserver"
echo " --------------- 关闭 yarn ---------------"
ssh hadoop102 "stop-yarn.sh"
echo " --------------- 关闭 hdfs ---------------"
ssh hadoop101 "stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac
kfk
#!/bin/bash
case $1 in
"start"){
for i in hadoop101 hadoop102 hadoop103
do
echo "================starting $i kafka==========="
ssh $i "kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties"
done
};;
"stop"){
for i in hadoop101 hadoop102 hadoop103
do
echo "=============stoping $i kafka==========="
ssh $i "kafka-server-stop.sh stop"
done
};;
esac
f1
#! /bin/bash
case $1 in
"start"){
for i in hadoop101 hadoop102
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup flume-ng agent --conf-file $FLUME_HOME/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/EBDW/flume/log1.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop101 hadoop102
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
f2
#消费flume集群操作脚本
#! /bin/bash
case $1 in
"start"){
for i in hadoop103
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup flume-ng agent --conf-file $FLUME_HOME/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE > $FLUME_HOME/log2.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop103
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
done
};;
esac
将业务数据从mysql传递到hdfs上
mysql_to_hdfs_init(首日同步脚本)
#! /bin/bash
APP=gmall
# sqoop=/opt/module/sqoop/bin/sqoop
if [ -n "$2" ] ;then
do_date=$2
else
echo "请传入日期参数"
exit
fi
import_data(){
sqoop import \
--connect jdbc:mysql://hadoop101:3306/$APP \
--username root \
--password 764991 \
--target-dir /origin_data/$APP/db/$1/$do_date \
--delete-target-dir \
--query "$2 where \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
hadoop jar /opt/module/EBDW/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/$APP/db/$1/$do_date
}
import_order_info(){
import_data order_info "select
id,
total_amount,
order_status,
user_id,
payment_way,
delivery_address,
out_trade_no,
create_time,
operate_time,
expire_time,
tracking_no,
province_id,
activity_reduce_amount,
coupon_reduce_amount,
original_total_amount,
feight_fee,
feight_fee_reduce
from order_info"
}
import_coupon_use(){
import_data coupon_use "select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time,
expire_time
from coupon_use"
}
import_order_status_log(){
import_data order_status_log "select
id,
order_id,
order_status,
operate_time
from order_status_log"
}
import_user_info(){
import_data "user_info" "select
id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time
from user_info"
}
import_order_detail(){
import_data order_detail "select
id,
order_id,
sku_id,
sku_name,
order_price,
sku_num,
create_time,
source_type,
source_id,
split_total_amount,
split_activity_amount,
split_coupon_amount
from order_detail"
}
import_payment_info(){
import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
payment_type,
trade_no,
total_amount,
subject,
payment_status,
create_time,
callback_time
from payment_info"
}
import_comment_info(){
import_data comment_info "select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from comment_info"
}
import_order_refund_info(){
import_data order_refund_info "select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
refund_status,
create_time
from order_refund_info"
}
import_sku_info(){
import_data sku_info "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
is_sale,
create_time
from sku_info"
}
import_base_category1(){
import_data "base_category1" "select
id,
name
from base_category1"
}
import_base_category2(){
import_data "base_category2" "select
id,
name,
category1_id
from base_category2"
}
import_base_category3(){
import_data "base_category3" "select
id,
name,
category2_id
from base_category3"
}
import_base_province(){
import_data base_province "select
id,
name,
region_id,
area_code,
iso_code,
iso_3166_2
from base_province"
}
import_base_region(){
import_data base_region "select
id,
region_name
from base_region"
}
import_base_trademark(){
import_data base_trademark "select
id,
tm_name
from base_trademark"
}
import_spu_info(){
import_data spu_info "select
id,
spu_name,
category3_id,
tm_id
from spu_info"
}
import_favor_info(){
import_data favor_info "select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info"
}
import_cart_info(){
import_data cart_info "select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from cart_info"
}
import_coupon_info(){
import_data coupon_info "select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
limit_num,
taken_count,
start_time,
end_time,
operate_time,
expire_time
from coupon_info"
}
import_activity_info(){
import_data activity_info "select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from activity_info"
}
import_activity_rule(){
import_data activity_rule "select
id,
activity_id,
activity_type,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule"
}
import_base_dic(){
import_data base_dic "select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic"
}
import_order_detail_activity(){
import_data order_detail_activity "select
id,
order_id,
order_detail_id,
activity_id,
activity_rule_id,
sku_id,
create_time
from order_detail_activity"
}
import_order_detail_coupon(){
import_data order_detail_coupon "select
id,
order_id,
order_detail_id,
coupon_id,
coupon_use_id,
sku_id,
create_time
from order_detail_coupon"
}
import_refund_payment(){
import_data refund_payment "select
id,
out_trade_no,
order_id,
sku_id,
payment_type,
trade_no,
total_amount,
subject,
refund_status,
create_time,
callback_time
from refund_payment"
}
import_sku_attr_value(){
import_data sku_attr_value "select
id,
attr_id,
value_id,
sku_id,
attr_name,
value_name
from sku_attr_value"
}
import_sku_sale_attr_value(){
import_data sku_sale_attr_value "select
id,
sku_id,
spu_id,
sale_attr_value_id,
sale_attr_id,
sale_attr_name,
sale_attr_value_name
from sku_sale_attr_value"
}
case $1 in
"order_info")
import_order_info
;;
"base_category1")
import_base_category1
;;
"base_category2")
import_base_category2
;;
"base_category3")
import_base_category3
;;
"order_detail")
import_order_detail
;;
"sku_info")
import_sku_info
;;
"user_info")
import_user_info
;;
"payment_info")
import_payment_info
;;
"base_province")
import_base_province
;;
"base_region")
import_base_region
;;
"base_trademark")
import_base_trademark
;;
"activity_info")
import_activity_info
;;
"cart_info")
import_cart_info
;;
"comment_info")
import_comment_info
;;
"coupon_info")
import_coupon_info
;;
"coupon_use")
import_coupon_use
;;
"favor_info")
import_favor_info
;;
"order_refund_info")
import_order_refund_info
;;
"order_status_log")
import_order_status_log
;;
"spu_info")
import_spu_info
;;
"activity_rule")
import_activity_rule
;;
"base_dic")
import_base_dic
;;
"order_detail_activity")
import_order_detail_activity
;;
"order_detail_coupon")
import_order_detail_coupon
;;
"refund_payment")
import_refund_payment
;;
"sku_attr_value")
import_sku_attr_value
;;
"sku_sale_attr_value")
import_sku_sale_attr_value
;;
"all")
import_base_category1
import_base_category2
import_base_category3
import_order_info
import_order_detail
import_sku_info
import_user_info
import_payment_info
import_base_region
import_base_province
import_base_trademark
import_activity_info
import_cart_info
import_comment_info
import_coupon_use
import_coupon_info
import_favor_info
import_order_refund_info
import_order_status_log
import_spu_info
import_activity_rule
import_base_dic
import_order_detail_activity
import_order_detail_coupon
import_refund_payment
import_sku_attr_value
import_sku_sale_attr_value
;;
esac
mysql_to_hdfs(每日同步脚本)
#! /bin/bash
APP=gmall
#sqoop=/opt/module/sqoop/bin/sqoop
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d '-1 day' +%F`
fi
import_data(){
sqoop import \
--connect jdbc:mysql://hadoop101:3306/$APP \
--username root \
--password 764991 \
--target-dir /origin_data/$APP/db/$1/$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
hadoop jar /opt/module/EBDW/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/$APP/db/$1/$do_date
}
import_order_info(){
import_data order_info "select
id,
total_amount,
order_status,
user_id,
payment_way,
delivery_address,
out_trade_no,
create_time,
operate_time,
expire_time,
tracking_no,
province_id,
activity_reduce_amount,
coupon_reduce_amount,
original_total_amount,
feight_fee,
feight_fee_reduce
from order_info
where (date_format(create_time,'%Y-%m-%d')='$do_date'
or date_format(operate_time,'%Y-%m-%d')='$do_date')"
}
import_coupon_use(){
import_data coupon_use "select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time,
expire_time
from coupon_use
where (date_format(get_time,'%Y-%m-%d')='$do_date'
or date_format(using_time,'%Y-%m-%d')='$do_date'
or date_format(used_time,'%Y-%m-%d')='$do_date'
or date_format(expire_time,'%Y-%m-%d')='$do_date')"
}
import_order_status_log(){
import_data order_status_log "select
id,
order_id,
order_status,
operate_time
from order_status_log
where date_format(operate_time,'%Y-%m-%d')='$do_date'"
}
import_user_info(){
import_data "user_info" "select
id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time
from user_info
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date'
or DATE_FORMAT(operate_time,'%Y-%m-%d')='$do_date')"
}
import_order_detail(){
import_data order_detail "select
id,
order_id,
sku_id,
sku_name,
order_price,
sku_num,
create_time,
source_type,
source_id,
split_total_amount,
split_activity_amount,
split_coupon_amount
from order_detail
where DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date'"
}
import_payment_info(){
import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
payment_type,
trade_no,
total_amount,
subject,
payment_status,
create_time,
callback_time
from payment_info
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date'
or DATE_FORMAT(callback_time,'%Y-%m-%d')='$do_date')"
}
import_comment_info(){
import_data comment_info "select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from comment_info
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_order_refund_info(){
import_data order_refund_info "select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
refund_status,
create_time
from order_refund_info
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_sku_info(){
import_data sku_info "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
is_sale,
create_time
from sku_info where 1=1"
}
import_base_category1(){
import_data "base_category1" "select
id,
name
from base_category1 where 1=1"
}
import_base_category2(){
import_data "base_category2" "select
id,
name,
category1_id
from base_category2 where 1=1"
}
import_base_category3(){
import_data "base_category3" "select
id,
name,
category2_id
from base_category3 where 1=1"
}
import_base_province(){
import_data base_province "select
id,
name,
region_id,
area_code,
iso_code,
iso_3166_2
from base_province
where 1=1"
}
import_base_region(){
import_data base_region "select
id,
region_name
from base_region
where 1=1"
}
import_base_trademark(){
import_data base_trademark "select
id,
tm_name
from base_trademark
where 1=1"
}
import_spu_info(){
import_data spu_info "select
id,
spu_name,
category3_id,
tm_id
from spu_info
where 1=1"
}
import_favor_info(){
import_data favor_info "select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info
where 1=1"
}
import_cart_info(){
import_data cart_info "select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from cart_info
where 1=1"
}
import_coupon_info(){
import_data coupon_info "select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
limit_num,
taken_count,
start_time,
end_time,
operate_time,
expire_time
from coupon_info
where 1=1"
}
import_activity_info(){
import_data activity_info "select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from activity_info
where 1=1"
}
import_activity_rule(){
import_data activity_rule "select
id,
activity_id,
activity_type,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule
where 1=1"
}
import_base_dic(){
import_data base_dic "select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic
where 1=1"
}
import_order_detail_activity(){
import_data order_detail_activity "select
id,
order_id,
order_detail_id,
activity_id,
activity_rule_id,
sku_id,
create_time
from order_detail_activity
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_order_detail_coupon(){
import_data order_detail_coupon "select
id,
order_id,
order_detail_id,
coupon_id,
coupon_use_id,
sku_id,
create_time
from order_detail_coupon
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_refund_payment(){
import_data refund_payment "select
id,
out_trade_no,
order_id,
sku_id,
payment_type,
trade_no,
total_amount,
subject,
refund_status,
create_time,
callback_time
from refund_payment
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date'
or DATE_FORMAT(callback_time,'%Y-%m-%d')='$do_date')"
}
import_sku_attr_value(){
import_data sku_attr_value "select
id,
attr_id,
value_id,
sku_id,
attr_name,
value_name
from sku_attr_value
where 1=1"
}
import_sku_sale_attr_value(){
import_data sku_sale_attr_value "select
id,
sku_id,
spu_id,
sale_attr_value_id,
sale_attr_id,
sale_attr_name,
sale_attr_value_name
from sku_sale_attr_value
where 1=1"
}
case $1 in
"order_info")
import_order_info
;;
"base_category1")
import_base_category1
;;
"base_category2")
import_base_category2
;;
"base_category3")
import_base_category3
;;
"order_detail")
import_order_detail
;;
"sku_info")
import_sku_info
;;
"user_info")
import_user_info
;;
"payment_info")
import_payment_info
;;
"base_province")
import_base_province
;;
"activity_info")
import_activity_info
;;
"cart_info")
import_cart_info
;;
"comment_info")
import_comment_info
;;
"coupon_info")
import_coupon_info
;;
"coupon_use")
import_coupon_use
;;
"favor_info")
import_favor_info
;;
"order_refund_info")
import_order_refund_info
;;
"order_status_log")
import_order_status_log
;;
"spu_info")
import_spu_info
;;
"activity_rule")
import_activity_rule
;;
"base_dic")
import_base_dic
;;
"order_detail_activity")
import_order_detail_activity
;;
"order_detail_coupon")
import_order_detail_coupon
;;
"refund_payment")
import_refund_payment
;;
"sku_attr_value")
import_sku_attr_value
;;
"sku_sale_attr_value")
import_sku_sale_attr_value
;;
"all")
import_base_category1
import_base_category2
import_base_category3
import_order_info
import_order_detail
import_sku_info
import_user_info
import_payment_info
import_base_trademark
import_activity_info
import_cart_info
import_comment_info
import_coupon_use
import_coupon_info
import_favor_info
import_order_refund_info
import_order_status_log
import_spu_info
import_activity_rule
import_base_dic
import_order_detail_activity
import_order_detail_coupon
import_refund_payment
import_sku_attr_value
import_sku_sale_attr_value
;;
esac
将数据从hdfs转入ods层
将业务数据从hdfs层转入ods层(首日同步脚本)
#!/bin/bash
APP=gmall
if [ -n "$2" ] ;then
do_date=$2
else
echo "请传入日期参数"
exit
fi
ods_order_info="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');"
ods_order_detail="
load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');"
ods_sku_info="
load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');"
ods_user_info="
load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');"
ods_payment_info="
load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');"
ods_base_category1="
load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');"
ods_base_category2="
load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');"
ods_base_category3="
load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date'); "
ods_base_trademark="
load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date'); "
ods_activity_info="
load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date'); "
ods_cart_info="
load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date'); "
ods_comment_info="
load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date'); "
ods_coupon_info="
load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date'); "
ods_coupon_use="
load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date'); "
ods_favor_info="
load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date'); "
ods_order_refund_info="
load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date'); "
ods_order_status_log="
load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date'); "
ods_spu_info="
load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date'); "
ods_activity_rule="
load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date');"
ods_base_dic="
load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date'); "
ods_order_detail_activity="
load data inpath '/origin_data/$APP/db/order_detail_activity/$do_date' OVERWRITE into table ${APP}.ods_order_detail_activity partition(dt='$do_date'); "
ods_order_detail_coupon="
load data inpath '/origin_data/$APP/db/order_detail_coupon/$do_date' OVERWRITE into table ${APP}.ods_order_detail_coupon partition(dt='$do_date'); "
ods_refund_payment="
load data inpath '/origin_data/$APP/db/refund_payment/$do_date' OVERWRITE into table ${APP}.ods_refund_payment partition(dt='$do_date'); "
ods_sku_attr_value="
load data inpath '/origin_data/$APP/db/sku_attr_value/$do_date' OVERWRITE into table ${APP}.ods_sku_attr_value partition(dt='$do_date'); "
ods_sku_sale_attr_value="
load data inpath '/origin_data/$APP/db/sku_sale_attr_value/$do_date' OVERWRITE into table ${APP}.ods_sku_sale_attr_value partition(dt='$do_date'); "
ods_base_province="
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;"
ods_base_region="
load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;"
case $1 in
"ods_order_info"){
hive -e "$ods_order_info"
};;
"ods_order_detail"){
hive -e "$ods_order_detail"
};;
"ods_sku_info"){
hive -e "$ods_sku_info"
};;
"ods_user_info"){
hive -e "$ods_user_info"
};;
"ods_payment_info"){
hive -e "$ods_payment_info"
};;
"ods_base_category1"){
hive -e "$ods_base_category1"
};;
"ods_base_category2"){
hive -e "$ods_base_category2"
};;
"ods_base_category3"){
hive -e "$ods_base_category3"
};;
"ods_base_trademark"){
hive -e "$ods_base_trademark"
};;
"ods_activity_info"){
hive -e "$ods_activity_info"
};;
"ods_cart_info"){
hive -e "$ods_cart_info"
};;
"ods_comment_info"){
hive -e "$ods_comment_info"
};;
"ods_coupon_info"){
hive -e "$ods_coupon_info"
};;
"ods_coupon_use"){
hive -e "$ods_coupon_use"
};;
"ods_favor_info"){
hive -e "$ods_favor_info"
};;
"ods_order_refund_info"){
hive -e "$ods_order_refund_info"
};;
"ods_order_status_log"){
hive -e "$ods_order_status_log"
};;
"ods_spu_info"){
hive -e "$ods_spu_info"
};;
"ods_activity_rule"){
hive -e "$ods_activity_rule"
};;
"ods_base_dic"){
hive -e "$ods_base_dic"
};;
"ods_order_detail_activity"){
hive -e "$ods_order_detail_activity"
};;
"ods_order_detail_coupon"){
hive -e "$ods_order_detail_coupon"
};;
"ods_refund_payment"){
hive -e "$ods_refund_payment"
};;
"ods_sku_attr_value"){
hive -e "$ods_sku_attr_value"
};;
"ods_sku_sale_attr_value"){
hive -e "$ods_sku_sale_attr_value"
};;
"ods_base_province"){
hive -e "$ods_base_province"
};;
"ods_base_region"){
hive -e "$ods_base_region"
};;
"all"){
hive -e "$ods_order_info$ods_order_detail$ods_sku_info$ods_user_info$ods_payment_info$ods_base_category1$ods_base_category2$ods_base_category3$ods_base_trademark$ods_activity_info$ods_cart_info$ods_comment_info$ods_coupon_info$ods_coupon_use$ods_favor_info$ods_order_refund_info$ods_order_status_log$ods_spu_info$ods_activity_rule$ods_base_dic$ods_order_detail_activity$ods_order_detail_coupon$ods_refund_payment$ods_sku_attr_value$ods_sku_sale_attr_value$ods_base_province$ods_base_region"
};;
esac
将业务数据从hdfs转入ods层(每日同步脚本)
#!/bin/bash
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
ods_order_info="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');"
ods_order_detail="
load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');"
ods_sku_info="
load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');"
ods_user_info="
load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');"
ods_payment_info="
load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');"
ods_base_category1="
load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');"
ods_base_category2="
load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');"
ods_base_category3="
load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date'); "
ods_base_trademark="
load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date'); "
ods_activity_info="
load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date'); "
ods_cart_info="
load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date'); "
ods_comment_info="
load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date'); "
ods_coupon_info="
load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date'); "
ods_coupon_use="
load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date'); "
ods_favor_info="
load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date'); "
ods_order_refund_info="
load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date'); "
ods_order_status_log="
load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date'); "
ods_spu_info="
load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date'); "
ods_activity_rule="
load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date');"
ods_base_dic="
load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date'); "
ods_order_detail_activity="
load data inpath '/origin_data/$APP/db/order_detail_activity/$do_date' OVERWRITE into table ${APP}.ods_order_detail_activity partition(dt='$do_date'); "
ods_order_detail_coupon="
load data inpath '/origin_data/$APP/db/order_detail_coupon/$do_date' OVERWRITE into table ${APP}.ods_order_detail_coupon partition(dt='$do_date'); "
ods_refund_payment="
load data inpath '/origin_data/$APP/db/refund_payment/$do_date' OVERWRITE into table ${APP}.ods_refund_payment partition(dt='$do_date'); "
ods_sku_attr_value="
load data inpath '/origin_data/$APP/db/sku_attr_value/$do_date' OVERWRITE into table ${APP}.ods_sku_attr_value partition(dt='$do_date'); "
ods_sku_sale_attr_value="
load data inpath '/origin_data/$APP/db/sku_sale_attr_value/$do_date' OVERWRITE into table ${APP}.ods_sku_sale_attr_value partition(dt='$do_date'); "
ods_base_province="
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;"
ods_base_region="
load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;"
case $1 in
"ods_order_info"){
hive -e "$ods_order_info"
};;
"ods_order_detail"){
hive -e "$ods_order_detail"
};;
"ods_sku_info"){
hive -e "$ods_sku_info"
};;
"ods_user_info"){
hive -e "$ods_user_info"
};;
"ods_payment_info"){
hive -e "$ods_payment_info"
};;
"ods_base_category1"){
hive -e "$ods_base_category1"
};;
"ods_base_category2"){
hive -e "$ods_base_category2"
};;
"ods_base_category3"){
hive -e "$ods_base_category3"
};;
"ods_base_trademark"){
hive -e "$ods_base_trademark"
};;
"ods_activity_info"){
hive -e "$ods_activity_info"
};;
"ods_cart_info"){
hive -e "$ods_cart_info"
};;
"ods_comment_info"){
hive -e "$ods_comment_info"
};;
"ods_coupon_info"){
hive -e "$ods_coupon_info"
};;
"ods_coupon_use"){
hive -e "$ods_coupon_use"
};;
"ods_favor_info"){
hive -e "$ods_favor_info"
};;
"ods_order_refund_info"){
hive -e "$ods_order_refund_info"
};;
"ods_order_status_log"){
hive -e "$ods_order_status_log"
};;
"ods_spu_info"){
hive -e "$ods_spu_info"
};;
"ods_activity_rule"){
hive -e "$ods_activity_rule"
};;
"ods_base_dic"){
hive -e "$ods_base_dic"
};;
"ods_order_detail_activity"){
hive -e "$ods_order_detail_activity"
};;
"ods_order_detail_coupon"){
hive -e "$ods_order_detail_coupon"
};;
"ods_refund_payment"){
hive -e "$ods_refund_payment"
};;
"ods_sku_attr_value"){
hive -e "$ods_sku_attr_value"
};;
"ods_sku_sale_attr_value"){
hive -e "$ods_sku_sale_attr_value"
};;
"all"){
hive -e "$ods_order_info$ods_order_detail$ods_sku_info$ods_user_info$ods_payment_info$ods_base_category1$ods_base_category2$ods_base_category3$ods_base_trademark$ods_activity_info$ods_cart_info$ods_comment_info$ods_coupon_info$ods_coupon_use$ods_favor_info$ods_order_refund_info$ods_order_status_log$ods_spu_info$ods_activity_rule$ods_base_dic$ods_order_detail_activity$ods_order_detail_coupon$ods_refund_payment$ods_sku_attr_value$ods_sku_sale_attr_value"
};;
esac
将用户行为数据从hdfs转入ods层
#!/bin/bash
# 定义变量方便修改
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
echo ================== 日志日期为 $do_date ==================
sql="
load data inpath '/origin_data/$APP/log/topic_log/$do_date' into table ${APP}.ods_log partition(dt='$do_date');
"
hive -e "$sql"
hadoop jar /opt/module/EBDW/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/$APP/ods/ods_log/dt=$do_date
将dos层数据传入dim层
ods_to_dim_db_init(首日同步脚本)
#!/bin/bash
APP=gmall
if [ -n "$2" ] ;then
do_date=$2
else
echo "请传入日期参数"
exit
fi
dim_user_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_user_info partition(dt='9999-99-99')
select
id,
login_name,
nick_name,
md5(name),
md5(phone_num),
md5(email),
user_level,
birthday,
gender,
create_time,
operate_time,
'$do_date',
'9999-99-99'
from ${APP}.ods_user_info
where dt='$do_date';
"
dim_sku_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
with
sku as
(
select
id,
price,
sku_name,
sku_desc,
weight,
is_sale,
spu_id,
category3_id,
tm_id,
create_time
from ${APP}.ods_sku_info
where dt='$do_date'
),
spu as
(
select
id,
spu_name
from ${APP}.ods_spu_info
where dt='$do_date'
),
c3 as
(
select
id,
name,
category2_id
from ${APP}.ods_base_category3
where dt='$do_date'
),
c2 as
(
select
id,
name,
category1_id
from ${APP}.ods_base_category2
where dt='$do_date'
),
c1 as
(
select
id,
name
from ${APP}.ods_base_category1
where dt='$do_date'
),
tm as
(
select
id,
tm_name
from ${APP}.ods_base_trademark
where dt='$do_date'
),
attr as
(
select
sku_id,
collect_set(named_struct('attr_id',attr_id,'value_id',value_id,'attr_name',attr_name,'value_name',value_name)) attrs
from ${APP}.ods_sku_attr_value
where dt='$do_date'
group by sku_id
),
sale_attr as
(
select
sku_id,
collect_set(named_struct('sale_attr_id',sale_attr_id,'sale_attr_value_id',sale_attr_value_id,'sale_attr_name',sale_attr_name,'sale_attr_value_name',sale_attr_value_name)) sale_attrs
from ${APP}.ods_sku_sale_attr_value
where dt='$do_date'
group by sku_id
)
insert overwrite table ${APP}.dim_sku_info partition(dt='$do_date')
select
sku.id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.is_sale,
sku.spu_id,
spu.spu_name,
sku.category3_id,
c3.name,
c3.category2_id,
c2.name,
c2.category1_id,
c1.name,
sku.tm_id,
tm.tm_name,
attr.attrs,
sale_attr.sale_attrs,
sku.create_time
from sku
left join spu on sku.spu_id=spu.id
left join c3 on sku.category3_id=c3.id
left join c2 on c3.category2_id=c2.id
left join c1 on c2.category1_id=c1.id
left join tm on sku.tm_id=tm.id
left join attr on sku.id=attr.sku_id
left join sale_attr on sku.id=sale_attr.sku_id;
"
dim_base_province="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.iso_3166_2,
bp.region_id,
br.region_name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br on bp.region_id = br.id;
"
dim_coupon_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_coupon_info partition(dt='$do_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
limit_num,
taken_count,
start_time,
end_time,
operate_time,
expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';
"
dim_activity_rule_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_activity_rule_info partition(dt='$do_date')
select
ar.id,
ar.activity_id,
ai.activity_name,
ar.activity_type,
ai.start_time,
ai.end_time,
ai.create_time,
ar.condition_amount,
ar.condition_num,
ar.benefit_amount,
ar.benefit_discount,
ar.benefit_level
from
(
select
id,
activity_id,
activity_type,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from ${APP}.ods_activity_rule
where dt='$do_date'
)ar
left join
(
select
id,
activity_name,
start_time,
end_time,
create_time
from ${APP}.ods_activity_info
where dt='$do_date'
)ai
on ar.activity_id=ai.id;
"
case $1 in
"dim_user_info"){
hive -e "$dim_user_info"
};;
"dim_sku_info"){
hive -e "$dim_sku_info"
};;
"dim_base_province"){
hive -e "$dim_base_province"
};;
"dim_coupon_info"){
hive -e "$dim_coupon_info"
};;
"dim_activity_rule_info"){
hive -e "$dim_activity_rule_info"
};;
"all"){
hive -e "$dim_user_info$dim_sku_info$dim_coupon_info$dim_activity_rule_info$dim_base_province"
};;
esac
ods_to_dim_db(每日同步脚本)
#!/bin/bash
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
dim_user_info="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
with
tmp as
(
select
old.id old_id,
old.login_name old_login_name,
old.nick_name old_nick_name,
old.name old_name,
old.phone_num old_phone_num,
old.email old_email,
old.user_level old_user_level,
old.birthday old_birthday,
old.gender old_gender,
old.create_time old_create_time,
old.operate_time old_operate_time,
old.start_date old_start_date,
old.end_date old_end_date,
new.id new_id,
new.login_name new_login_name,
new.nick_name new_nick_name,
new.name new_name,
new.phone_num new_phone_num,
new.email new_email,
new.user_level new_user_level,
new.birthday new_birthday,
new.gender new_gender,
new.create_time new_create_time,
new.operate_time new_operate_time,
new.start_date new_start_date,
new.end_date new_end_date
from
(
select
id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time,
start_date,
end_date
from ${APP}.dim_user_info
where dt='9999-99-99'
and start_date<'$do_date'
)old
full outer join
(
select
id,
login_name,
nick_name,
md5(name) name,
md5(phone_num) phone_num,
md5(email) email,
user_level,
birthday,
gender,
create_time,
operate_time,
'$do_date' start_date,
'9999-99-99' end_date
from ${APP}.ods_user_info
where dt='$do_date'
)new
on old.id=new.id
)
insert overwrite table ${APP}.dim_user_info partition(dt)
select
nvl(new_id,old_id),
nvl(new_login_name,old_login_name),
nvl(new_nick_name,old_nick_name),
nvl(new_name,old_name),
nvl(new_phone_num,old_phone_num),
nvl(new_email,old_email),
nvl(new_user_level,old_user_level),
nvl(new_birthday,old_birthday),
nvl(new_gender,old_gender),
nvl(new_create_time,old_create_time),
nvl(new_operate_time,old_operate_time),
nvl(new_start_date,old_start_date),
nvl(new_end_date,old_end_date),
nvl(new_end_date,old_end_date) dt
from tmp
union all
select
old_id,
old_login_name,
old_nick_name,
old_name,
old_phone_num,
old_email,
old_user_level,
old_birthday,
old_gender,
old_create_time,
old_operate_time,
old_start_date,
cast(date_add('$do_date',-1) as string),
cast(date_add('$do_date',-1) as string) dt
from tmp
where new_id is not null and old_id is not null;
"
dim_sku_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
with
sku as
(
select
id,
price,
sku_name,
sku_desc,
weight,
is_sale,
spu_id,
category3_id,
tm_id,
create_time
from ${APP}.ods_sku_info
where dt='$do_date'
),
spu as
(
select
id,
spu_name
from ${APP}.ods_spu_info
where dt='$do_date'
),
c3 as
(
select
id,
name,
category2_id
from ${APP}.ods_base_category3
where dt='$do_date'
),
c2 as
(
select
id,
name,
category1_id
from ${APP}.ods_base_category2
where dt='$do_date'
),
c1 as
(
select
id,
name
from ${APP}.ods_base_category1
where dt='$do_date'
),
tm as
(
select
id,
tm_name
from ${APP}.ods_base_trademark
where dt='$do_date'
),
attr as
(
select
sku_id,
collect_set(named_struct('attr_id',attr_id,'value_id',value_id,'attr_name',attr_name,'value_name',value_name)) attrs
from ${APP}.ods_sku_attr_value
where dt='$do_date'
group by sku_id
),
sale_attr as
(
select
sku_id,
collect_set(named_struct('sale_attr_id',sale_attr_id,'sale_attr_value_id',sale_attr_value_id,'sale_attr_name',sale_attr_name,'sale_attr_value_name',sale_attr_value_name)) sale_attrs
from ${APP}.ods_sku_sale_attr_value
where dt='$do_date'
group by sku_id
)
insert overwrite table ${APP}.dim_sku_info partition(dt='$do_date')
select
sku.id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.is_sale,
sku.spu_id,
spu.spu_name,
sku.category3_id,
c3.name,
c3.category2_id,
c2.name,
c2.category1_id,
c1.name,
sku.tm_id,
tm.tm_name,
attr.attrs,
sale_attr.sale_attrs,
sku.create_time
from sku
left join spu on sku.spu_id=spu.id
left join c3 on sku.category3_id=c3.id
left join c2 on c3.category2_id=c2.id
left join c1 on c2.category1_id=c1.id
left join tm on sku.tm_id=tm.id
left join attr on sku.id=attr.sku_id
left join sale_attr on sku.id=sale_attr.sku_id;
"
dim_base_province="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.iso_3166_2,
bp.region_id,
bp.name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br on bp.region_id = br.id;
"
dim_coupon_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_coupon_info partition(dt='$do_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
limit_num,
taken_count,
start_time,
end_time,
operate_time,
expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';
"
dim_activity_rule_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_activity_rule_info partition(dt='$do_date')
select
ar.id,
ar.activity_id,
ai.activity_name,
ar.activity_type,
ai.start_time,
ai.end_time,
ai.create_time,
ar.condition_amount,
ar.condition_num,
ar.benefit_amount,
ar.benefit_discount,
ar.benefit_level
from
(
select
id,
activity_id,
activity_type,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from ${APP}.ods_activity_rule
where dt='$do_date'
)ar
left join
(
select
id,
activity_name,
start_time,
end_time,
create_time
from ${APP}.ods_activity_info
where dt='$do_date'
)ai
on ar.activity_id=ai.id;
"
case $1 in
"dim_user_info"){
hive -e "$dim_user_info"
};;
"dim_sku_info"){
hive -e "$dim_sku_info"
};;
"dim_base_province"){
hive -e "$dim_base_province"
};;
"dim_coupon_info"){
hive -e "$dim_coupon_info"
};;
"dim_activity_rule_info"){
hive -e "$dim_activity_rule_info"
};;
"all"){
hive -e "$dim_user_info$dim_sku_info$dim_coupon_info$dim_activity_rule_info"
};;
esac
将ods层数据同步到dwd层
ods_to_dwd_log
#!/bin/bash
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
dwd_start_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_start_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.start') is not null;"
dwd_page_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_page_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.page') is not null;"
dwd_action_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_action_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(action,'$.action_id'),
get_json_object(action,'$.item'),
get_json_object(action,'$.item_type'),
get_json_object(action,'$.ts')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.actions')) tmp as action
where dt='$do_date'
and get_json_object(line,'$.actions') is not null;"
dwd_display_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_display_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.ts'),
get_json_object(display,'$.display_type'),
get_json_object(display,'$.item'),
get_json_object(display,'$.item_type'),
get_json_object(display,'$.order'),
get_json_object(display,'$.pos_id')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.displays')) tmp as display
where dt='$do_date'
and get_json_object(line,'$.displays') is not null;"
dwd_error_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_error_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.actions'),
get_json_object(line,'$.displays'),
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.err') is not null;"
case $1 in
dwd_start_log )
hive -e "$dwd_start_log"
;;
dwd_page_log )
hive -e "$dwd_page_log"
;;
dwd_action_log )
hive -e "$dwd_action_log"
;;
dwd_display_log )
hive -e "$dwd_display_log"
;;
dwd_error_log )
hive -e "$dwd_error_log"
;;
all )
hive -e "$dwd_start_log$dwd_page_log$dwd_action_log$dwd_display_log$dwd_error_log"
;;
esac
azkaban启动脚本
case $1 in
"start")
echo "===============开始启动azkaban"
for i in hadoop101 hadoop102 hadoop103
do
echo "============$i的azkaban-exec启动中==========="
ssh $i 'cd /opt/module/EBDW/azkaban/azkaban-exec && bin/start-exec.sh'
ssh $i "cd /opt/module/EBDW/azkaban/azkaban-exec && curl -G $i:12321/executor?action=activate"
done
;;
"stop")
echo "===============开始启动azkaban"
for i in hadoop101 hadoop102 hadoop103
do
echo "============$i的azkaban-exec启动中==========="
ssh $i "cd /opt/module/EBDW/azkaban/azkaban-exec && bin/shutdown-exec.sh"
done
;;
esac
但是暂时不知道为什么,在执行激活命令的时候出现了端口请求拒绝的反馈.
只能分别手动执行激活命令
# curl 就是符合URL规则的文件传输工具
curl -G hadoop101:12321/executor?action=activate && echo
curl -G hadoop102:12321/executor?action=activate && echo
curl -G hadoop103:12321/executor?action=activate && echo
azkaban-web端口: 8081
url: hadoop101:8081
user: tzk
password: tzk
superset
Superset-web端口: 8787
# 1.进入conda设置的superset环境
conda activate superset
# 退出环境代码为:
conda deactivate
# 启动Superset服务
gunicorn --workers 5 --timeout 120 --bind hadoop101:8787 "superset.app:create_app()" --daemon
# 关闭Superset
ps -ef | awk '/gunicorn/ && !/awk/{print $2}' | xargs kill -9
# 也可以像下面这样写
ps -ef|grep superset|grep -v grep|awk '{print$2}'|xargs kill -9
user: Superset init
password: 764991
Superset.sh
#!/bin/bash
superset_status(){
result=`ps -ef | awk '/gunicorn/ && !/awk/{print $2}' | wc -l`
if [[ $result -eq 0 ]]; then
return 0
else
return 1
fi
}
superset_start(){
source ~/.bashrc
superset_status >/dev/null 2>&1
if [[ $? -eq 0 ]]; then
conda activate superset ; gunicorn --workers 5 --timeout 120 --bind hadoop101:8787 --daemon 'superset.app:create_app()'
else
echo "superset正在运行"
fi
}
superset_stop(){
superset_status >/dev/null 2>&1
if [[ $? -eq 0 ]]; then
echo "superset未在运行"
else
ps -ef | awk '/gunicorn/ && !/awk/{print $2}' | xargs kill -9
fi
}
case $1 in
start )
echo "启动Superset"
superset_start
;;
stop )
echo "停止Superset"
superset_stop
;;
restart )
echo "重启Superset"
superset_stop
superset_start
;;
status )
superset_status >/dev/null 2>&1
if [[ $? -eq 0 ]]; then
echo "superset未在运行"
else
echo "superset正在运行"
fi
esac
hiveserver2
hive --service hiveserver2>/dev/null 2>&1 &
日志数据生成脚本
#!/bin/bash
for i in hadoop101 hadoop102; do
echo "========== $i =========="
ssh $i "cd /opt/module/EBDW/applog/; java -jar gmall2020-mock-log-2021-01-22.jar >/dev/null 2>&1 &"
done