基于大模型根據提示詞去寫SQL執行SQL返回結果輸出報表技術上可行的,但為啥還要基于pandas去實現呢?
原因有以下幾點:
1、大模型無法滿足實時性輸出報表的需求;
2、使用大模型比較適合數據量比較大的場景,大模型主要輔助寫SQL;
3、使用pandas方便快捷,定制好各種模版后,功能也能通用。
代碼如下:
from flask import Flask, request, jsonify, render_template_string
import pandas as pd
from io import StringIO
import randomapp = Flask(__name__)# 全局變量,用于保存上傳的CSV文件內容和解析后的DataFrame
uploaded_csv_data = None
csv_dataframe = None
@app.route('/')
def index():html_content = '''<!DOCTYPE html><html><head><meta charset="UTF-8"><title>CSV 上傳與圖表分析</title><script src="https://cdn.jsdelivr.net/npm/chart.js"></script><style>body {font-family: Arial, sans-serif;margin: 20px;}.container {max-width: 800px;margin: 0 auto;}.row {display: flex;flex-wrap: wrap;justify-content: space-between;margin-bottom: 15px;}.row label {margin-right: 10px;white-space: nowrap;}.row select {flex: 1;margin-right: 10px;}.row button {margin-top: 10px;}canvas {margin-top: 20px;}</style></head><body><div class="container"><h2>上傳 CSV 文件</h2><div class="row"><input type="file" id="csvFile" accept=".csv"><button onclick="uploadCSV()">上傳文件</button></div><p id="uploadStatus"></p><h2>選擇列</h2><div class="row"><label for="xAxisColumn">X軸列:</label><select id="xAxisColumn"></select><label for="yAxisColumn">Y軸列:</label><select id="yAxisColumn"></select><label for="groupColumn">分組列:</label><select id="groupColumn"></select></div><h2>選擇圖表類型</h2><div class="row"><label for="chartType">圖表類型:</label><select id="chartType"><option value="bar">柱狀圖</option><option value="line">折線圖</option></select></div><div class="row"><button onclick="analyzeData()">生成圖表</button><button onclick="analyzeData('above')">顯示均值線上</button><button onclick="analyzeData('below')">顯示均值線下</button></div><canvas id="myChart" width="600" height="400"></canvas></div><script>var csvUploaded = false;var columns = [];function uploadCSV() {var fileInput = document.getElementById("csvFile");if (fileInput.files.length === 0) {alert("請選擇一個 CSV 文件!");return;}var file = fileInput.files[0];var reader = new FileReader();reader.onload = function(e) {var csvContent = e.target.result;fetch('/upload', {method: 'POST',headers: {'Content-Type': 'text/plain'},body: csvContent}).then(response => response.json()).then(data => {if (data.success) {document.getElementById("uploadStatus").innerText = "CSV 文件上傳成功!";csvUploaded = true;columns = data.columns;populateColumns();} else {alert("上傳失敗:" + data.error);}});};reader.readAsText(file);}function populateColumns() {var xAxisSelect = document.getElementById("xAxisColumn");var yAxisSelect = document.getElementById("yAxisColumn");var groupSelect = document.getElementById("groupColumn");columns.forEach(column => {var option = document.createElement("option");option.value = column;option.text = column;xAxisSelect.appendChild(option.cloneNode(true));yAxisSelect.appendChild(option.cloneNode(true));groupSelect.appendChild(option.cloneNode(true));});}function analyzeData(filter = '') {var xAxisColumn = document.getElementById("xAxisColumn").value;var yAxisColumn = document.getElementById("yAxisColumn").value;var groupColumn = document.getElementById("groupColumn").value;var chartType = document.getElementById("chartType").value;if (!csvUploaded) {alert("請先上傳 CSV 文件!");return;}if (!xAxisColumn || !yAxisColumn || !groupColumn) {alert("請選擇X軸列、Y軸列和分組列!");return;}fetch(`/analyze?xAxis=${encodeURIComponent(xAxisColumn)}&yAxis=${encodeURIComponent(yAxisColumn)}&group=${encodeURIComponent(groupColumn)}&chartType=${encodeURIComponent(chartType)}&filter=${filter}`).then(response => response.json()).then(data => {renderChart(data, chartType);});}function renderChart(chartData, chartType) {var ctx = document.getElementById('myChart').getContext('2d');if (window.myChartInstance) {window.myChartInstance.destroy();}window.myChartInstance = new Chart(ctx, {type: chartType,data: {labels: chartData.labels,datasets: chartData.datasets},options: {responsive: true,scales: {y: {beginAtZero: true}}}});}</script></body></html>'''return render_template_string(html_content)@app.route('/upload', methods=['POST'])
def upload():global uploaded_csv_data, csv_dataframecontent = request.data.decode('utf-8')try:# 將上傳的 CSV 文件內容解析為 DataFramedf = pd.read_csv(StringIO(content))uploaded_csv_data = content # 存儲原始數據(可選)csv_dataframe = df # 保存解析后的DataFrame供后續分析使用columns = df.columns.tolist() # 獲取列名return jsonify({'success': True, 'columns': columns})except Exception as e:return jsonify({'success': False, 'error': str(e)})@app.route('/get_unique_values')
def get_unique_values():global csv_dataframecolumn = request.args.get('column', '')if csv_dataframe is None or column not in csv_dataframe.columns:return jsonify({'uniqueValues': []})unique_values = csv_dataframe[column].dropna().unique().tolist()return jsonify({'uniqueValues': unique_values})@app.route('/analyze')
def analyze():global csv_dataframexAxis = request.args.get('xAxis', '')yAxis = request.args.get('yAxis', '')group = request.args.get('group', '')chartType = request.args.get('chartType', 'bar') # 獲取圖表類型filter_type = request.args.get('filter', '') # 獲取過濾類型# 若未上傳文件,則返回空數據if csv_dataframe is None:return jsonify({'labels': [], 'datasets': []})df = csv_dataframe.copy()# 如果 X 軸是日期類型,確保其為日期格式if pd.api.types.is_string_dtype(df[xAxis]):df[xAxis] = pd.to_datetime(df[xAxis])# 按 X 軸列和分組列分組,并對 Y 軸列進行求和grouped = df.groupby([xAxis, group])[yAxis].sum().reset_index()# 按 X 軸列排序grouped = grouped.sort_values(by=xAxis)# 獲取所有唯一的 X 軸值(日期)labels = grouped[xAxis].dt.strftime('%Y-%m-%d').unique().tolist() # 轉換為字符串格式datasets = []# 計算每個 X 軸值對應的 Y 軸均值mean_values = grouped.groupby(xAxis)[yAxis].mean().reset_index()mean_values_dict = mean_values.set_index(xAxis)[yAxis].to_dict()# 添加均值線mean_data = [mean_values_dict.get(pd.to_datetime(label), 0) for label in labels]datasets.append({'label': '均值','data': mean_data,'borderColor': 'rgba(255, 0, 0, 1)','borderWidth': 2,'borderDash': [5, 5], # 虛線'fill': False,'type': 'line'})# 根據均值線過濾數據if filter_type == 'above':filtered_df = df[df.apply(lambda row: row[yAxis] > mean_values_dict.get(row[xAxis], 0), axis=1)]elif filter_type == 'below':filtered_df = df[df.apply(lambda row: row[yAxis] < mean_values_dict.get(row[xAxis], 0), axis=1)]else:filtered_df = df# 重新分組并計算filtered_grouped = filtered_df.groupby([xAxis, group])[yAxis].sum().reset_index()# 確保每個日期都有數據all_dates = pd.date_range(start=grouped[xAxis].min(), end=grouped[xAxis].max(), freq='D')all_dates_str = all_dates.strftime('%Y-%m-%d').tolist()for obj in filtered_grouped[group].unique():data = []for date in all_dates:value = filtered_grouped[(filtered_grouped[xAxis] == date) & (filtered_grouped[group] == obj)][yAxis].sum()if value > 0: # 只包括值大于0的點data.append(value)else:data.append(None) # 使用 None 來表示沒有數據的點color = f"rgba({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)}, 0.6)"datasets.append({'label': obj,'data': data,'backgroundColor': color,'borderColor': color.replace("0.6", "1"),'borderWidth': 1,'type': chartType # 使用用戶選擇的圖表類型})return jsonify({'labels': all_dates_str, 'datasets': datasets})if __name__ == '__main__':app.run(debug=True)