4 min read

R语言实例:生成批量SQL

背景

在数据仓库中写 SQL 代码获取所需数据,是数据分析师日常工作的一部分。

在一些情况下,需要批量执行多段 SQL,每段 SQL 之间只是某个变量不同。

在传统关系型数据仓库系统中都有存储过程功能,可以使用变量声明和循环控制来执行代码; Hadoop 大数据平台上的 Hive 数据仓库中,一般通过 shell 脚本来实现变量定义和循环控制。

通常来说,存储过程或shell脚本的权限只对偏工程的开发人员开放;分析师即使有权限,也不一定熟悉储过程或shell脚本。

数据分析师除了求助工程师外,要自行解决该问题,有如下三种方法:

  1. 重复多次复制粘贴代码,手动修改不同的部分:适合执行次数较少的情况。
  2. 用 Excel 下拉填充变量功能,外加字符串拼接的方式实现:变量部分通常是日期或数字等自增变量,通过 Excel 下拉填充生成;把变量前的文本、变量、变量后的文本,通过字符串拼接方式合并成完整的 SQL。
  3. 在编程环境如 Python 或 R 语言中,通过变量、循环控制来处理字符串,生成批量 SQL。

这里以R语言为例,实现第三种方法。

目标

有如下一段 SQL 代码,是对某个历史拉链表的某个状态做每日数量统计。

获取历史上某天某个字段当时的状态,在历史拉链表中需要限定开始日期小于等于某天,并且结束日期大于等于某天。

现在要补充历史上某段范围内的统计数据,需要改变 SQL 中“某天”这个日期值。

这里每天的统计值,需要每天一个独立 SQL,N 天就是 N 个 SQL,每个 SQL 中的日期不同。

需要说明的是,从历史拉链表的设计与该具体的业务需求来看,无法通过只用一个 SQL 实现该统计结果。

备注:select 之前的 insert 语句,是将查询结果写入到分区表中,是 Hive SQL 中的写法。

insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-14')
select appoint_stronger, count(case_id) as case_num
from edw.collection_case_strength_h
where to_date(start_time) <= '2018-06-14'
and to_date(end_time) >= '2018-06-14'
group by appoint_stronger
;

R语言实现

初始化 SQL 字符串

该段 SQL 代码,直接作为一个字符向量即可,只包含一个元素的向量,也就是一个长字符串,换行会自动转换为 \n 换行符合。

sql <- "insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-14')
select appoint_stronger, count(case_id) as case_num
from edw.collection_case_strength_h
where to_date(start_time) <= '2018-06-14'
and to_date(end_time) >= '2018-06-14'
group by appoint_stronger
;"

其中需要改变的字符变量是 2018-06-14

当字符串较长时,直接使用查找加替换的方式处理,会比将字符串拆分成多个部分再拼接的方法更简洁。

批量生成SQL语句

生成序列的 seq() 函数,也可以用来生成日期序列。

对应的参数 fromto 需要输入的是日期型数据。

作为演示,这里只补充该示例日期的前三天数据。

# 生成日期序列
seq_date <- seq(from = as.Date("2018-06-11"), to = as.Date("2018-06-13"), by = 1)

用序列的第一个日期,替换 SQL 中的初始化日期。

library(stringr) # 载入字符串处理函数包

# 替换值要求字符型故将日期类型转换
sql_res <- str_replace_all(sql, "2018-06-14", as.character(seq_date[1])) 

# 结果显示到屏幕上:特殊符号如换行符保持还是特殊符号而不会真正换行
print(sql_res) 
## [1] "insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-11')\nselect appoint_stronger, count(case_id) as case_num\nfrom edw.collection_case_strength_h\nwhere to_date(start_time) <= '2018-06-11'\nand to_date(end_time) >= '2018-06-11'\ngroup by appoint_stronger\n;"

# 输出到屏幕上
cat(sql_res) 
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-11')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-11'
## and to_date(end_time) >= '2018-06-11'
## group by appoint_stronger
## ;

for 循环

如果上面第一字符串替换后的SQL结果是正确的,就可以将其转为 for 循环,并将其打印到屏幕上。

library(stringr)

# 生成日期序列,并转为字符型
seq_date <- seq(from = as.Date("2018-06-11"), to = as.Date("2018-06-13"), by = 1) %>%
  as.character()

# 循环执行
for (i in (seq_date)) {

  sql %>%
    str_replace_all( "2018-06-14", i) %>% 
    cat # 这里 cat 和 cat() 均可,能识别出来是一个函数
  
  cat("\n\n") # 每段 SQL 后空换两行,便于分隔
  
}
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-11')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-11'
## and to_date(end_time) >= '2018-06-11'
## group by appoint_stronger
## ;
## 
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-12')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-12'
## and to_date(end_time) >= '2018-06-12'
## group by appoint_stronger
## ;
## 
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-13')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-13'
## and to_date(end_time) >= '2018-06-13'
## group by appoint_stronger
## ;

如果要将结果输出到文件中,只需要指定文件名,并将使用追加而非覆盖的方式即可。

library(stringr)

# 生成日期序列,并转为字符型
seq_date <- seq(from = as.Date("2018-06-11"), to = as.Date("2018-06-13"), by = 1) %>%
  as.character()

# 循环执行
for (i in (seq_date)) {

  sql %>%
    str_replace_all( "2018-06-14", i) %>% 
    cat(file = "output.txt", append = TRUE)
    # 参数说明:file 指定文件,
    # 参数说明:append 为 TRUE 是将内容追加到该文件的后面
  
  cat("\n\n", file = "output.txt", append = TRUE)
  
}

向量化运算

for 循环是最先想到的方法,但是在 R 中使用向量化运算才是最方便的。

seq_date 有三个元素(长度为3), sql"2018-06-14" 都是只有一个元素的向量(长度为1)。

str_replace_all() 函数作用下,sql循环补齐为具有相同内容的三个元素向量(重复三次);类似地,"2018-06-14" 也遵循循环补齐的原则对应生成长度为 3 的重复向量; str_replace_all()最终的结果就变成了三个符合目标的 SQL 语句。

在每个 SQL 语句之后补充两个换行符,再将其输出到控制台也就是屏幕上,或者输出到文件中。

# 日期序列
seq_date <- seq(from = as.Date("2018-06-11"), to = as.Date("2018-06-13"), by = 1) %>%
  as.character()

# 向量化
str_replace_all(sql, "2018-06-14", seq_date) %>% 
  str_c("\n\n") %>%
  cat()
## insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-11')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-11'
## and to_date(end_time) >= '2018-06-11'
## group by appoint_stronger
## ;
## 
##  insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-12')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-12'
## and to_date(end_time) >= '2018-06-12'
## group by appoint_stronger
## ;
## 
##  insert overwrite table rpt.rpt_collection_appoint_stronger_in_daily partition (dt = '2018-06-13')
## select appoint_stronger, count(case_id) as case_num
## from edw.collection_case_strength_h
## where to_date(start_time) <= '2018-06-13'
## and to_date(end_time) >= '2018-06-13'
## group by appoint_stronger
## ;

补充案例

删除某段时间的分区

library(stringr)

sql <- "alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '$dt');"

seq_date <- seq.Date(from = as.Date("2018-05-24"), to = as.Date("2018-06-02"), by = 1) %>%
  as.character()
  
for (i in seq_date) {
  
  sql %>%
    str_replace_all("\\$dt", i) %>%  # 这里的 "$" 要通过 "\\" 来转义
    str_c("\n") %>%
    cat()
}
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-24');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-25');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-26');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-27');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-28');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-29');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-30');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-05-31');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-06-01');
## alter table rpt.rpt_collection_ccms_oacase_list drop if exists partition (dt = '2018-06-02');

总结

该实例虽只是用来处理 SQL 语句,但其本质是处理字符串,适用于其他重复生成批量文本的情况。