玖叶教程网

前端编程开发入门

使用 Python 增强 SQL 操作的 5 种方法


Python 和 SQL:携手并进,走得更远

尽管有所有关于查询性能优化的内容,但我发现,有时,增强 SQL 功能的最佳方法是将其与 Python 等脚本语言结合起来。

毫无疑问,虽然 SQL 是提取、操作和写入数据库的强大方法,但它缺乏脚本语言的灵活性和实用性,这使得某些操作(例如循环)几乎不可能。

此外,底层数据库的限制可能会降低性能或阻止执行消耗资源的查询。 例如,我曾经遇到过持续存在的过度元读取错误,我将在下面详细说明。

像 Python 这样的脚本语言提供了一种解决方法,它不仅仅可以取代您的 SQL 工作。 结合使用 Python 和 SQL 可以生成更强大、更高效、更清晰的脚本。

循环访问多个 SQL 表

Python 帮助增强 SQL 的最明显方法之一是将查询字符串合并到 Python 的循环结构中,以连续迭代多个查询。

使用 Python 中定义的变量,您可以创建基本查询并使用 SQL 文本和 Python 变量执行操作。

例如,假设我们正在尝试按大小获取 GCP 项目中包含的所有数据集和表格的列表。

在纯 SQL 中,您必须写:

SELECT * FROM `my_project.dataset_1.INFORMATION_SCHEMA`
UNION ALL 
SELECT * FROM `my_project.dataset_2.INFORMATION_SCHEMA`
UNION ALL 
SELECT * FROM `my_project.dataset_3.INFORMATION_SCHEMA`

现在,通过集成Python,我们可以避免一遍又一遍地手动编写这个查询。

from google.cloud import bigquerydatasets = ['dataset_1', 'dataset_2', 'dataset_3']bq_client = bigquery.Client()for dataset in datasets:
    get_datasets = bq_client.query("SELECT dataset_id, table_id,         size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size 
FROM `"+dataset.dataset_id+"`.__TABLES__ GROUP BY 1, 2, 3")    tables = get_datasets.result()
    for table in tables:
        dataset_id = table.dataset
        table_id = table.table_id
        size = table.size_bytes
        gb_size = table.gb_size        print(dataset_id, table_id, size, gb_size)

尽管使用两个循环,这可能看起来有些复杂,但我们所做的只是循环访问数据集列表。

我们所更改的只是我们引用的数据集,本质上是创建与 UNION 查询相同的查询,但手动编写较少。

写下来,该操作将如下所示:

""" SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size 
FROM `my_project.dataset_1.`__TABLES__ GROUP BY 1, 2, 3 """ """ SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size 
FROM `my_project.dataset_2.`__TABLES__ GROUP BY 1, 2, 3 """""" SELECT dataset_id, table_id, size_bytes, ROUND(size_bytes / 10000000000), 2) AS gb_size 
FROM `my_project.dataset_3.`__TABLES__ GROUP BY 1, 2, 3 """

如果您是处理此实际用例的 BigQuery 用户,您还可以使用 bq_client.list_datasets() 函数生成数据集列表。

需要注意的是,可以在 SQL 中进行循环,但通常必须采取额外的步骤(例如定义变量和创建 UDF)来完成所需的操作。

自动化架构定义

如果你不能通过前面的例子看出,我讨厌多余的操作。 只要有可能,我会尝试使脚本更加高效,并避免多次编写某些内容。

BigQuery Python 客户端允许开发人员将架构定义为列表,稍后可以将其传递给加载函数。

我更喜欢在脚本中定义 BigQuery 架构,而不是默认自动检测,因为 GCP 默认为给定字段提供错误的数据类型,这让我很恼火。

如果您手动定义 BigQuery 架构,它可能如下所示:

schema = [
     bigquery.SchemaField("first_name", "STRING"),
     bigquery.SchemaField("last_name", "STRING"),
     bigquery.SchemaField("age", "INTEGER")]

对于一些领域来说这是可以的。 但当您处理需要 100 列或更多列的数据时,这会变得乏味。

我提出的解决方案是一个相对简单的 Python 函数,它以与我之前描述的循环类似的方式自动填充这些字段。

def create_schema(field_list: list, type_list: list):
    
    schema_list = []
    
    for fields, types in zip(field_list, type_list):
        schema = bigquery.SchemaField(fields, types)
        schema_list.append(schema)        return schema_list

该函数的输出将与上面定义的模式完全相同,但如果我不指定字段是否为 NULLABLE,它将默认为 NULLABLE。

请随意查看我之前的工作,了解该函数的更详细解释、如何调用它以及为什么此方法比手动定义更可取。

只需 1 行 Python 代码即可转换为数据框架

有一种优雅而简单的方法来创建从 SQL 查询派生的数据框架。

更好的是,它只需要一行 Python,特别是如果您将查询存储在外部配置文件中。

query = """ SELECT * FROM `my_project.dataset.table` """ query_job = bq_client.query(cfg.query).to_dataframe()

如果您想保存查询结果,您甚至可以在同一行中将其转换为 CSV。

query_job = bq_client.query(cfg.query).to_dataframe().to_csv('query_output.csv')

至少对我来说,这是一个比必须从 SQL 引擎的 UI 导出或下载报告更简化的过程。

解决 SQL 环境限制

我工作中的第一个“大”项目是自动审计并随后删除数据仓库中未使用的表。

正如您可以想象的那样,这个过程涉及大量元数据。

您可能不知道,BigQuery 对每个作业允许的元读取数量施加了限制。

我经常遇到警告和错误,告诉我无法运行查询,因为它尝试了太多元读取。

起初,我尝试将我的工作分成两个单独的 CTE,但由于它们在同一个查询中运行,我仍然会遇到相同的错误。

然后我的一位高级工程师建议我应该在 Python 中分块运行该东西并使用 Pandas 进行连接。

这种方法非常有效,我最终将整个脚本转换为 Pandas,仅将查询保留为原始数据源。

如果您运行的查询过于消耗资源,请考虑将其拆分为多个部分,在 Python 中运行并使用 Pandas 重新连接。

因为我引用的 CTE 长达数百行并涉及 50 多个元读取,所以我会要求您暂停您的怀疑,只考虑以下示例中操作的查询部分。

query_1 = """ SELECT * FROM a_resource_consuming_cte_1 """ query_2 = """ SELECT * FROM a_resource_consuming_cte_2 """ query_1_df = bq_client.query(query_1).to_dataframe()
query_2_df = bq_client.query(query_2).to_dataframe()final_df = pd.concat([query_1_df, query_2_df]

除了在 Pandas 中进行联接之外,请记住,您可以在 Pandas 中执行几乎所有 SQL 操作,包括更简化的重复数据删除过程。

追加/截断

我过去曾写过相关文章,但我对 BigQuery 的一个主要抱怨是它缺乏对 APPEND/TRUNCATE 操作的支持。

我的意思是我可以将记录添加到表中或覆盖它们。 目前,BigQuery 确实包含一个允许开发人员指定两者的参数。

因此,如果您只想覆盖特定时间范围内的 SQL 表,则需要发挥一点创意。

值得庆幸的是,结合 Python 和 SQL 将使我们能够进行该操作。

在开始编写代码之前,我们先讨论一下为什么需要每天覆盖行。

假设您有一个每天更新多次的电子表格,并且在每天结束时,您希望上传当天日期的条目结果。

由于工作表每天可以编辑多次,因此简单地附加数据会产生重复的行,从而导致数据混乱。

理想情况下,您希望在加载数据时消除任何重复项。

最简单的方法是将 CRUD 语句与 Python/Pandas 代码配对,该代码将创建我们想要覆盖的数据的子集。

crud_statement = """ DELETE FROM table WHERE date = CURRENT_DATE() """ bq_client.query(crud_statement)df = df[(df['date'] == date.today())]bq_client.load_table_from_dataframe(df, job_config)

反过来,这将使您的数据幂等并反映每次运行脚本时的实时更改。

回顾与要点

将 Python 等脚本语言与 SQL 相结合,可以为仅使用 SQL 无法完成的操作开辟新的可能性。

由于许多数据作业不仅需要 SQL 知识,还至少需要 Python 等脚本语言的中级知识,因此您必须了解并希望能够体会到 SQL 和 Python 相结合的强大功能。

SQL 和 Python 的一些用例包括:

循环动态变量

自动执行繁

琐的数据库任务,例如模式自动化

转换为数据框并导出结果

解决环境造成的资源限制

执行自定义加载操作,例如追加/截断

当您继续学习 SQL 时,我鼓励您考虑如何将 Python 的处理能力与 SQL 的精度和实用性结合起来的用例。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言