Python中如何根据嵌套字典动态组装SQL语句?

StreamData = {
        "D1_RSET":{
            'sqltype':'int',
            'data':'61207'
        },
        "D2_DCK_time":{
            'sqltype': 'DATETIME',
            'data': '2018-9-9'
        },
        "D3_DCK_des":{
            'sqltype': 'varchar(10)',
            'data': 'TestABC'
        },
         "D4_Exp":{
            'sqltype': 'varchar(80)',
            'data': ''
        }
         "D5_stream_seq":{
            'sqltype': 'int',
            'data': '123'
        },
}

insertoTableRow = ‘’ insertoTableCol = ‘’

insertoTableSqlStr_Prefix = ‘INSERT INTO %s (’ % (tableName) insertoTableSqlStr = ‘’

for key1, value1 in StreamData.items():
	#字段字句组装,这一节写不出来,如何在第一层 for 判断出第二层的 data 字段有没有内容? 有内容才把第一层的 key 加入。。。
    insertoTableRow = insertoTableRow + key1 + ',' 
    #第二层 data 字段的组装???
    for key2, value2 in StreamData[key1].items():
        if key2 == ('data'):  #应该判断 data 的 value 有没有内容的。。。。
            insertoTableCol = insertoTableCol + value + ','

完成的状态如下:

insertoTableRow = “(” + insertoTableRowSection + “)” insertoTableCol = “(” + insertoTableColSection + ")’

#组装完成最后的 sql 语句 insertoTableSqlStr = ‘INSERT INTO %s (’ % (tableName) + insertoTableRow + ‘value’ + insertoTableCol

按照这个 StreamData 字典的例子,D4_Exp 是不用操作,因为它是空的。。。 完成组装的 SQL,写不出来。。。。冏。。。。

请教各位大大出手拉一把啊,谢谢。。。。。。


Python中如何根据嵌套字典动态组装SQL语句?

8 回复

……别写了,就算写出来,别人看着也费劲……为啥不把嵌套的 dict 先改成不嵌套的呢?


核心思路:递归遍历字典,动态生成SQL的SET或WHERE子句。

这里给你一个实用的函数,可以根据嵌套字典动态生成UPDATE语句的SET部分或SELECT语句的WHERE条件。关键是用递归处理嵌套结构,并用参数化查询防止SQL注入。

import sqlite3  # 示例用sqlite,其他数据库同理

def dict_to_sql_conditions(data, prefix='', connector='AND', for_update=False):
    """
    将嵌套字典转换为SQL条件片段和参数列表。
    
    Args:
        data: 嵌套字典,如 {'user': {'name': 'Alice', 'age': 30}, 'status': 'active'}
        prefix: 表别名或前缀,用于生成带点号的列名
        connector: 条件连接符 ('AND' 或 'OR')
        for_update: 是否为UPDATE语句生成SET子句
    
    Returns:
        (sql_fragment, params): SQL片段字符串和参数值列表
    """
    conditions = []
    params = []
    
    for key, value in data.items():
        full_key = f"{prefix}.{key}" if prefix else key
        
        if isinstance(value, dict):
            # 递归处理嵌套字典
            sub_sql, sub_params = dict_to_sql_conditions(
                value, 
                prefix=full_key if not for_update else '',  # UPDATE的SET不需要表前缀
                connector=connector,
                for_update=for_update
            )
            if sub_sql:
                conditions.append(sub_sql)
                params.extend(sub_params)
        else:
            if for_update:
                # UPDATE语句的SET子句: column = ?
                conditions.append(f"{full_key} = ?")
            else:
                # WHERE条件: column = ?
                conditions.append(f"{full_key} = ?")
            params.append(value)
    
    if for_update:
        # UPDATE的SET子句用逗号连接
        connector = ', '
    
    sql_fragment = f" {connector} ".join(conditions)
    return sql_fragment, params

# 使用示例1: 生成UPDATE语句
def generate_update_sql(table_name, data, where_conditions=None):
    """生成UPDATE语句"""
    set_clause, set_params = dict_to_sql_conditions(data, for_update=True)
    
    sql = f"UPDATE {table_name} SET {set_clause}"
    params = set_params
    
    if where_conditions:
        where_clause, where_params = dict_to_sql_conditions(where_conditions)
        sql += f" WHERE {where_clause}"
        params.extend(where_params)
    
    return sql, params

# 使用示例2: 生成SELECT的WHERE条件
def generate_select_sql(table_name, columns='*', where_conditions=None):
    """生成SELECT语句"""
    sql = f"SELECT {columns} FROM {table_name}"
    params = []
    
    if where_conditions:
        where_clause, params = dict_to_sql_conditions(where_conditions)
        sql += f" WHERE {where_clause}"
    
    return sql, params

# 实际使用例子
if __name__ == "__main__":
    # 示例数据
    user_data = {
        'profile': {
            'name': 'Alice',
            'age': 30
        },
        'account': {
            'email': 'alice@example.com',
            'status': 'active'
        }
    }
    
    # 1. 生成UPDATE语句
    update_sql, update_params = generate_update_sql(
        'users', 
        {'last_login': '2024-01-15', 'login_count': 5},
        where_conditions=user_data
    )
    print("UPDATE语句:")
    print(f"SQL: {update_sql}")
    print(f"参数: {update_params}")
    print()
    
    # 2. 生成SELECT语句
    select_sql, select_params = generate_select_sql(
        'users',
        columns='id, username, email',
        where_conditions=user_data
    )
    print("SELECT语句:")
    print(f"SQL: {select_sql}")
    print(f"参数: {select_params}")
    
    # 3. 实际执行示例
    conn = sqlite3.connect(':memory:')
    cursor = conn.cursor()
    
    # 创建测试表
    cursor.execute('''
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            username TEXT,
            email TEXT,
            status TEXT,
            last_login TEXT,
            login_count INTEGER
        )
    ''')
    
    # 使用生成的SQL执行查询
    cursor.execute(select_sql, select_params)
    results = cursor.fetchall()
    print(f"\n查询结果: {results}")

主要特点:

  1. 递归处理嵌套:自动展开多级字典结构
  2. 参数化查询:使用?占位符,避免SQL注入
  3. 灵活配置:通过for_update参数区分SET和WHERE子句
  4. 支持表别名:可通过prefix参数添加表名前缀

使用建议: 根据你的ORM或数据库驱动调整占位符格式(如PostgreSQL用%s,SQLite用?)。

,您说得很有道理,其实我今天上午也是这么想的。前面已经围绕着这个字典做了很多事情,要改也不是没办法改,只是本喵比较强迫症,看看这种情况下,是怎么样一种思路可以达到的呢。。。。^_^

弄这个真够蛋疼的,学习一下 sqlalchemy,peewee 之类的 ORM 不就得了,简单点的直接用 Django 的 ORM

insertoTableSqlStr_Prefix = 'INSERT INTO {tableName} ({colNames}) value ({colValues})'
colNames = []
colValues = []

def f(sqltype=“int”, data=None, **kwargs):
if not data:
return None

if sqltype == “int”:
return str(data) if data else ‘null’

return “”{}"".format(data)

for col_name, col_value in StreamData.items():
data = f(**col_value)
if data is None:
continue
colValues.append(data)
colNames.append(col_name)

print colValues, colNames

print insertoTableSqlStr_Prefix.format(tableName=‘test’, colNames=",".join(colNames), colValues=",".join(colValues))

这样写符合需要吗?

D4_Exp 是不用操作,因为它是空的. 空字符串和 null 还是不一样。 不一定它是个空字符串,就不需要处理。

<br><br>StreamData = {<br> "D1_RSET":{<br> 'sqltype':'int',<br> 'data':'61207'<br> },<br> "D2_DCK_time":{<br> 'sqltype': 'DATETIME',<br> 'data': '2018-9-9'<br> },<br> "D3_DCK_des":{<br> 'sqltype': 'varchar(10)',<br> 'data': 'TestABC'<br> },<br> "D4_Exp":{<br> 'sqltype': 'varchar(80)',<br> 'data': ''<br> },<br> "D5_stream_seq":{<br> 'sqltype': 'int',<br> 'data': '123'<br> },<br>}<br><br>insertoTableSqlStr_Prefix = 'INSERT INTO {tableName} ({colNames}) value ({colValues})'<br>colNames = []<br>colValues = []<br><br>def f(sqltype="int", data=None, **kwargs):<br> if not data:<br> return None<br><br> if sqltype == "int":<br> return str(data) if data else 'null'<br><br> return "\"{}\"".format(data)<br><br>for col_name, col_value in StreamData.items():<br> data = f(**col_value)<br> if data is None:<br> continue<br> colValues.append(data)<br> colNames.append(col_name)<br><br>print colValues, colNames<br><br>print insertoTableSqlStr_Prefix.format(tableName='test', colNames=",".join(colNames), colValues=",".join(colValues))<br><br><br>

了解过 SQL 注入吗?

没细看,试试 dataset 库

回到顶部