K線連續漲跌統計與分析工具
1. 概述
本工具是一個用于分析金融時間序列數據(特別是K線數據)的Python腳本,主要功能是統計連續n根同方向K線后,第n+1根K線的漲跌情況。該工具不僅提供統計分析功能,還支持圖形化標記以驗證結果,幫助交易者和量化分析師識別市場中的特定模式。
2. 功能需求
- 統計連續n根陽線或陰線后,第n+1根K線的漲跌情況
- 支持自定義n值(連續K線數量)和m值(統計次數)
- 提供圖形化界面標記連續模式出現的位置
- 輸出詳細的統計報告
- 支持多種數據源輸入(CSV、數據庫、在線API等)
3. 實現代碼
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.patches import Rectangle
from typing import List, Dict, Tuple, Optional
from enum import Enum
import argparse
import os
import sys
from datetime import datetimeclass CandleDirection(Enum):"""K線方向枚舉"""BULLISH = 1 # 陽線BEARISH = -1 # 陰線NEUTRAL = 0 # 平盤class CandlePatternAnalyzer:"""K線模式分析器"""def __init__(self, data: pd.DataFrame, open_col: str = 'open',high_col: str = 'high',low_col: str = 'low',close_col: str = 'close',date_col: str = 'date'):"""初始化分析器參數:data: 包含K線數據的DataFrameopen_col: 開盤價列名high_col: 最高價列名low_col: 最低價列名close_col: 收盤價列名date_col: 日期列名"""self.data = data.copy()self.open_col = open_colself.high_col = high_colself.low_col = low_colself.close_col = close_colself.date_col = date_col# 預處理數據self._preprocess_data()def _preprocess_data(self):"""預處理數據,計算K線方向"""# 確保日期是datetime類型并設置為索引if not pd.api.types.is_datetime64_any_dtype(self.data[self.date_col]):self.data[self.date_col] = pd.to_datetime(self.data[self.date_col])self.data.set_index(self.date_col, inplace=True)# 計算K線方向self.data['direction'] = np.where(self.data[self.close_col] > self.data[self.open_col],CandleDirection.BULLISH.value,np.where(self.data[self.close_col] < self.data[self.open_col],CandleDirection.BEARISH.value,CandleDirection.NEUTRAL.value))# 計算漲跌幅 (百分比)self.data['pct_change'] = self.data[self.close_col].pct_change() * 100def _find_consecutive_directions(self, n: int) -> List[Tuple[int, int, CandleDirection]]:"""查找連續n根同方向K線的起始和結束位置參數:n: 連續K線數量返回:列表,每個元素是元組(start_index, end_index, direction)"""directions = self.data['direction'].valuessequences = []current_dir = Nonestart_idx = 0count = 0for i, dir_val in enumerate(directions):if dir_val == current_dir and dir_val != CandleDirection.NEUTRAL.value:count += 1else:if count >= n:sequences.append((start_idx, i-1, CandleDirection(current_dir)))current_dir = dir_val if dir_val != CandleDirection.NEUTRAL.value else Nonestart_idx = icount = 1 if current_dir is not None else 0# 檢查最后一段序列if count >= n:sequences.append((start_idx, len(directions)-1, CandleDirection(current_dir)))return sequencesdef analyze_consecutive_patterns(self, n: int, m: Optional[int] = None) -> Dict:"""分析連續n根同方向K線后第n+1根K線的表現參數:n: 連續K線數量m: 可選,只分析前m次出現的情況返回:包含分析結果的字典"""sequences = self._find_consecutive_directions(n)if m is not None:sequences = sequences[:m]results = {'total_bullish_sequences': 0,'total_bearish_sequences': 0,'bullish_sequences_next_up': 0,'bullish_sequences_next_down': 0,'bullish_sequences_next_neutral': 0,'bearish_sequences_next_up': 0,'bearish_sequences_next_down': 0,'bearish_sequences_next_neutral': 0,'sequences_details': [],'n': n,