背景
在数据仓库中写 SQL
代码获取所需数据,是数据分析师日常工作的一部分。
在一些情况下,需要批量执行多段 SQL,每段 SQL 之间只是某个变量不同。
在传统关系型数据仓库系统中都有存储过程
功能,可以使用变量声明和循环控制来执行代码; Hadoop 大数据平台上的 Hive 数据仓库中,一般通过 shell 脚本来实现变量定义和循环控制。
通常来说,存储过程或shell脚本的权限只对偏工程的开发人员开放;分析师即使有权限,也不一定熟悉储过程或shell脚本。
数据分析师除了求助工程师外,要自行解决该问题,有如下三种方法:
- 重复多次复制粘贴代码,手动修改不同的部分:适合执行次数较少的情况。
- 用 Excel 下拉填充变量功能,外加字符串拼接的方式实现:变量部分通常是日期或数字等自增变量,通过 Excel 下拉填充生成;把变量前的文本、变量、变量后的文本,通过字符串拼接方式合并成完整的 SQL。
- 在编程环境如 Python 或 R 语言中,通过变量、循环控制来处理字符串,生成批量 SQL。
这里以R语言为例,实现第三种方法。
目标
有如下一段 SQL 代码,是对某个历史拉链表的某个状态做每日数量统计。
获取历史上某天某个字段当时的状态,在历史拉链表中需要限定开始日期小于等于某天,并且结束日期大于等于某天。
现在要补充历史上某段范围内的统计数据,需要改变 SQL 中“某天”这个日期值。
这里每天的统计值,需要每天一个独立 SQL,N 天就是 N 个 SQL,每个 SQL 中的日期不同。
需要说明的是,从历史拉链表的设计与该具体的业务需求来看,无法通过只用一个 SQL 实现该统计结果。
备注:select
之前的 insert
语句,是将查询结果写入到分区表中,是 Hive SQL 中的写法。
insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-14')
select appoint_stronger, count(case_id) as case_num
from edw.collection_case_strength_h
where to_date(start_time) <= '2018-06-14'
and to_date(end_time) >= '2018-06-14'
group by appoint_stronger
;
R语言实现
初始化 SQL 字符串
该段 SQL 代码,直接作为一个字符向量即可,只包含一个元素的向量,也就是一个长字符串,换行会自动转换为 \n
换行符合。
sql <- "insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-14')
select appoint_stronger, count(case_id) as case_num
from edw.collection_case_strength_h
where to_date(start_time) <= '2018-06-14'
and to_date(end_time) >= '2018-06-14'
group by appoint_stronger
;"
其中需要改变的字符变量是 2018-06-14
。
当字符串较长时,直接使用查找加替换的方式处理,会比将字符串拆分成多个部分再拼接的方法更简洁。
批量生成SQL语句
生成序列的 seq()
函数,也可以用来生成日期序列。
对应的参数 from
和 to
需要输入的是日期型数据。
作为演示,这里只补充该示例日期的前三天数据。
# 生成日期序列
seq_date <- seq(from = as.Date("2018-06-11"), to = as.Date("2018-06-13"), by = 1)
用序列的第一个日期,替换 SQL 中的初始化日期。
library(stringr) # 载入字符串处理函数包
# 替换值要求字符型故将日期类型转换
sql_res <- str_replace_all(sql, "2018-06-14", as.character(seq_date[1]))
# 结果显示到屏幕上:特殊符号如换行符保持还是特殊符号而不会真正换行
print(sql_res)
## [1] "insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-11')\nselect appoint_stronger, count(case_id) as case_num\nfrom edw.collection_case_strength_h\nwhere to_date(start_time) <= '2018-06-11'\nand to_date(end_time) >= '2018-06-11'\ngroup by appoint_stronger\n;"
# 输出到屏幕上
cat(sql_res)
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-11')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-11'
## and to_date(end_time) >= '2018-06-11'
## group by appoint_stronger
## ;
for 循环
如果上面第一字符串替换后的SQL结果是正确的,就可以将其转为 for
循环,并将其打印到屏幕上。
library(stringr)
# 生成日期序列,并转为字符型
seq_date <- seq(from = as.Date("2018-06-11"), to = as.Date("2018-06-13"), by = 1) %>%
as.character()
# 循环执行
for (i in (seq_date)) {
sql %>%
str_replace_all( "2018-06-14", i) %>%
cat # 这里 cat 和 cat() 均可,能识别出来是一个函数
cat("\n\n") # 每段 SQL 后空换两行,便于分隔
}
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-11')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-11'
## and to_date(end_time) >= '2018-06-11'
## group by appoint_stronger
## ;
##
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-12')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-12'
## and to_date(end_time) >= '2018-06-12'
## group by appoint_stronger
## ;
##
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-13')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-13'
## and to_date(end_time) >= '2018-06-13'
## group by appoint_stronger
## ;
如果要将结果输出到文件中,只需要指定文件名,并将使用追加而非覆盖的方式即可。
library(stringr)
# 生成日期序列,并转为字符型
seq_date <- seq(from = as.Date("2018-06-11"), to = as.Date("2018-06-13"), by = 1) %>%
as.character()
# 循环执行
for (i in (seq_date)) {
sql %>%
str_replace_all( "2018-06-14", i) %>%
cat(file = "output.txt", append = TRUE)
# 参数说明:file 指定文件,
# 参数说明:append 为 TRUE 是将内容追加到该文件的后面
cat("\n\n", file = "output.txt", append = TRUE)
}
向量化运算
for
循环是最先想到的方法,但是在 R 中使用向量化运算才是最方便的。
seq_date
有三个元素(长度为3), sql
和 "2018-06-14"
都是只有一个元素的向量(长度为1)。
在 str_replace_all()
函数作用下,sql
会循环补齐为具有相同内容的三个元素向量(重复三次);类似地,"2018-06-14"
也遵循循环补齐
的原则对应生成长度为 3 的重复向量; str_replace_all()
最终的结果就变成了三个符合目标的 SQL
语句。
在每个 SQL
语句之后补充两个换行符,再将其输出到控制台也就是屏幕上,或者输出到文件中。
# 日期序列
seq_date <- seq(from = as.Date("2018-06-11"), to = as.Date("2018-06-13"), by = 1) %>%
as.character()
# 向量化
str_replace_all(sql, "2018-06-14", seq_date) %>%
str_c("\n\n") %>%
cat()
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-11')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-11'
## and to_date(end_time) >= '2018-06-11'
## group by appoint_stronger
## ;
##
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-12')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-12'
## and to_date(end_time) >= '2018-06-12'
## group by appoint_stronger
## ;
##
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-13')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-13'
## and to_date(end_time) >= '2018-06-13'
## group by appoint_stronger
## ;
补充案例
删除某段时间的分区
library(stringr)
sql <- "alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '$dt');"
seq_date <- seq.Date(from = as.Date("2018-05-24"), to = as.Date("2018-06-02"), by = 1) %>%
as.character()
for (i in seq_date) {
sql %>%
str_replace_all("\\$dt", i) %>% # 这里的 "$" 要通过 "\\" 来转义
str_c("\n") %>%
cat()
}
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-24');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-25');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-26');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-27');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-28');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-29');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-30');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-31');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-06-01');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-06-02');
总结
该实例虽只是用来处理 SQL
语句,但其本质是处理字符串,适用于其他重复生成批量文本的情况。