import pandas as pd
import psycopg2
from psycopg2 import sql# 連接Redshift
conn = psycopg2.connect(host='your-cluster.endpoint.redshift.amazonaws.com',port=5439,dbname='dev',user='admin',password='your-password'
)# 權限檢查函數
def check_redshift_permissions(conn):"""獲取所有權限信息"""permissions = {'table_level': [],'column_level': [],'row_level': [],'data_masking': []}with conn.cursor() as cur:# 獲取表級權限cur.execute("""SELECT grantee, table_schema, table_name, privilege_type FROM information_schema.table_privilegesWHERE grantee != 'PUBLIC'""")permissions['table_level'] = cur.fetchall()# 獲取列級權限cur.execute("""SELECT grantee, table_schema, table_name, column_name, privilege_type FROM information_schema.column_privileges""")permissions['column_level'] = cur.fetchall()# 獲取行級權限(基于視圖定義)cur.execute("""SELECT viewname, definition FROM pg_views WHERE schemaname NOT IN ('pg_catalog', 'information_schema')""")for view in cur.fetchall():if ' WHERE ' in view[1].upper():permissions['row_level'].append((view[0],view[1].split('WHERE')[1].strip()))# 獲取數據掩碼函數cur.execute("""SELECT proname, prosrc FROM pg_proc WHERE proname LIKE 'mask%' OR proname LIKE 'dynamic_mask%'""")permissions['data_masking'] = cur.fetchall()return permissions# 獲取權限數據
permissions = check_redshift_permissions(conn)# 構建自然語言描述
permission_desc = []# 表級權限處理
table_df = pd.DataFrame(permissions['table_level'], columns=['角色', 'schema', '表名', '權限類型'])
for _, row in table_df.iterrows():desc = f"角色 {row['角色']} 在表 {row['schema']}.{row['表名']} 上擁有 {row['權限類型']} 權限"sql_stmt = f"GRANT {row['權限類型']} ON {row['schema']}.{row['表名']} TO {row['角色']};"permission_desc.append(('表級權限', desc, sql_stmt))# 列級權限處理
column_df = pd.DataFrame(permissions['column_level'],columns=['角色', 'schema', '表名', '列名', '權限類型'])
for _, row in column_df.iterrows():desc = f"角色 {row['角色']} 在表 {row['schema']}.{row['表名']} 的 {row['列名']} 列上擁有 {row['權限類型']} 權限"sql_stmt = f"GRANT {row['權限類型']}({row['列名']}) ON {row['schema']}.{row['表名']} TO {row['角色']};"permission_desc.append(('列級權限', desc, sql_stmt))# 行級權限處理
for view, condition in permissions['row_level']:desc = f"視圖 {view} 實施了行級過濾,條件: {condition.split('/*')[0].strip()}"sql_stmt = f"CREATE VIEW {view} AS SELECT ... WHERE {condition};" # 需要根據實際視圖定義補充permission_desc.append(('行級權限', desc, sql_stmt))# 數據掩碼處理
for func_name, func_def in permissions['data_masking']:desc = f"數據掩碼函數 {func_name} 實現規則: {func_def[:100]}..."sql_stmt = f"CREATE FUNCTION {func_name} ... \n{func_def};"permission_desc.append(('數據掩碼', desc, sql_stmt))# 打印結果
df = pd.DataFrame(permission_desc, columns=['權限類型', '描述', 'SQL示例'])
print("權限描述:")
print(df[['權限類型', '描述']].to_markdown(index=False))print("\n對應SQL語句示例:")
print(df[['權限類型', 'SQL示例']].to_markdown(index=False))conn.close()
輸出結果說明:
權限描述示例:
權限類型 | 描述 |
---|---|
表級權限 | 角色 sales_dept 在表 sales_data.orders 上擁有 SELECT 權限 |
列級權限 | 角色 hr_dept 在表 hr.employees 的 salary 列上擁有 SELECT 權限 |
行級權限 | 視圖 sales_apac_view 實施了行級過濾,條件: region = ‘APAC’ |
數據掩碼 | 數據掩碼函數 mask_ssn 實現規則: CASE WHEN … |
對應SQL語句示例:
權限類型 | SQL示例 |
---|---|
表級權限 | GRANT SELECT ON sales_data.orders TO sales_dept; |
列級權限 | GRANT SELECT(salary) ON hr.employees TO hr_dept; |
行級權限 | CREATE VIEW sales_apac_view AS SELECT … WHERE region = ‘APAC’; |
數據掩碼 | CREATE FUNCTION mask_ssn … |
完整權限控制SQL生成模板:
-- 角色體系
CREATE ROLE sales_dept;
CREATE ROLE hr_dept;-- 表級權限
GRANT SELECT ON TABLE sales_data.* TO sales_dept;
GRANT USAGE ON SCHEMA hr TO hr_dept;-- 列級權限
CREATE VIEW hr_limited_view AS
SELECT employee_id, name, department, CASE WHEN CURRENT_USER = 'hr_director' THEN salary ELSE NULL END AS salary
FROM employees;GRANT SELECT ON hr_limited_view TO hr_dept;-- 行級權限
CREATE VIEW sales_region_view AS
SELECT * FROM orders
WHERE region = CURRENT_SCHEMA();-- 動態數據掩碼
CREATE FUNCTION mask_email(email varchar) RETURNS varchar AS $$
BEGINRETURN regexp_replace(email, '(.)(.*)@', '\1***@');
END;
$$ LANGUAGE plpgsql;CREATE VIEW customer_masked_view AS
SELECT customer_id,mask_email(email) AS email,LEFT(phone, 3) || '****' AS phone
FROM customers;-- 權限組合
GRANT sales_dept TO user1;
GRANT hr_dept TO user2;
各權限類型說明:
-
表級權限:
- 控制對整張表的訪問
- 典型操作:SELECT/INSERT/UPDATE/DELETE
- 最佳實踐:通過角色授權,避免直接授予用戶
-
列級權限:
- 通過創建受限視圖實現
- 使用CASE語句動態控制列可見性
- 配合列加密保護敏感數據
-
行級權限:
- 使用視圖WHERE子句過濾數據
- 動態條件:CURRENT_USER/SESSION變量
- 可結合安全策略(Security Policy)
-
數據掩碼:
- 使用自定義函數實現動態脫敏
- 支持條件掩碼(根據用戶角色不同顯示不同數據)
- 常用方法:正則替換、數值模糊、部分隱藏
權限管理建議:
-
使用三層角色體系:
-- 組織級角色 CREATE ROLE org_analyst; -- 部門角色 CREATE ROLE dept_finance; -- 功能角色 CREATE ROLE sensitive_data_access;GRANT sensitive_data_access TO dept_finance; GRANT dept_finance TO org_analyst;
-
定期審計腳本:
-- 檢查權限分布 SELECT * FROM svv_user_grants; -- 查看列權限 SELECT * FROM svv_column_privileges; -- 審計數據訪問 SELECT * FROM svl_userlog;
-
自動化清理:
# 自動撤銷過期權限示例 def revoke_expired_permissions():expired_users = query_db("SELECT user_name FROM expired_users")for user in expired_users:execute_sql(f"REVOKE ALL PRIVILEGES ON ALL TABLES FROM {user}")