?創建股票數據庫
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2018-09-04 14:34:59
# @Author : Michael Li
# @Version : $V2.0$import pandas as pd
import numpy as np
import datetime
import random
import pymssql
from sqlalchemy import create_engine
import tushare as ts
import logging
from time import sleep
from queue import LifoQueue
import threading
#以上是需要使用的Python包,沒有安裝的請pip install XXXXclass stock(object):"""get stock information"""def __init__(self):self.host = '127.0.0.1'self.user = 'sa'self.password = 'test'self.port = 3306 #端口self.database = 'stock'self.Daily_tableName = 'daily'self.Daily_basic_tableName = 'daily_basic'#開始和結束時間僅用于獲取區間數據,代碼中有用于獲取當前日期的函數。self.startTime = '20100101' #開始時間self.endTime = datetime.datetime.now().strftime('%Y%m%d') #結束時間#以下為兩個隊列,用于支撐多線程。self.ts_code_queue = LifoQueue() #ts_code隊列,用于獲取指定代碼的股票信息。self.trade_cal_queue = LifoQueue() #trade_cal交易日期隊列,用于獲取指定時間的股票信息。ts.set_token('')self.pro = ts.pro_api()def log(self):''' 日志功能函數'''logger = logging.getLogger()logger.setLevel(logging.INFO)handler = logging.FileHandler(r'D:/abc.log', encoding='utf-8')# handlerStream = logging.StreamHandler()formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")handler.setFormatter(formatter)logger.addHandler(handler)# logger.addHandler(handlerStream)logging.info('開始獲取%s的數據' % (self.getDatetime()))def insertMysql(self, tableName, data):conn = pymssql.connect(host = '.',user ='sa',password = 'test',database = 'stock',charset ='utf8')'''創建數據庫連接,需要先在數據庫中建立相應的表和表結構'''engine = create_engine('mssql+pymssql://%s:%s@%s/%s' % (self.user, self.password, self.host, self.database))'''將獲取到的數據插入到數據庫中,if_exists=append為向后添加,index=False為不保存DF自動生成的index'''data.to_sql(tableName,con = engine,if_exists='append', index=False)def getDatetime(self):'''獲取當天日期的函數 '''taday = datetime.datetime.now().strftime('%Y%m%d')return tadaydef getTrade_cal(self):# 獲取各大交易所交易日歷數據,默認提取的是上交所(注意start_date和end_date的值,獲取周期數據可使用self.startTime和self.endTime)#官方文檔:https://tushare.pro/document/2?doc_id=26trade_cal = self.pro.query('trade_cal', exchange='SZSE', start_date=self.startTime,end_date=self.endTime, is_open=1)#將日期列轉換為list,便于使用隊列trade_cals = trade_cal['cal_date'].tolist()#循環將每個交易日塞進隊列中for trade_cal in trade_cals:self.trade_cal_queue.put(trade_cal, True, 2)logging.info('Trade_cal 隊列中共計:%s 條信息' % (self.trade_cal_queue.qsize()))def getList(self):'''獲取獲取基礎信息數據,包括股票代碼、名稱、上市日期、退市日期等'''stock_basic = self.pro.query('stock_basic', exchange_id='', is_hs='N',fields='ts_code,symbol,name,fullname,list_date,list_status')stock_basic.set_index('ts_code', inplace=True)for ts_code in stock_basic.index.tolist()[0:20]:self.ts_code_queue.put(ts_code, True, 2)logging.info('共計%s條數據' % (self.ts_code_queue.qsize()))#備注:此函數不是經常使用。def getDaily_code(self):'''獲取指定股票代碼的數據'''print('子線程(%s)啟動' % (threading.current_thread().name))while not self.ts_code_queue.empty():#訪問隊列,隊列為空時退出。code = self.ts_code_queue.get(True, 3)#使用tushare獲取(code是從隊列中取出的ts_code)#Tushare 官方文檔:https://tushare.pro/document/2?doc_id=27daily = self.pro.query('daily', ts_code=code,start_date=self.startTime, end_date=self.endTime)daily['trade_date'] = pd.to_datetime(daily['trade_date'], format='%Y-%m-%d')#將獲取到的數據保存至數據庫self.insertMysql(self.Daily_tableName, daily)# print(code)sleep(random.randint(1, 2))logging.info('------>%s的getDaily_code數據全部搞定<------' %(self.getDatetime()))def getDaily_date(self):'''獲指定日期或日期范圍的股票數據'''logging.info('線程(%s)啟動' % (threading.current_thread().name))while not self.trade_cal_queue.empty():#隊列功能同上個函數date = self.trade_cal_queue.get(True, 2)#使用tushare獲取(date是從隊列中取出的日期)#Tushare 官方文檔:https://tushare.pro/document/2?doc_id=27daily = self.pro.query('daily', start_date=date)daily['trade_date'] = pd.to_datetime(daily['trade_date'], format='%Y-%m-%d')self.insertMysql(self.Daily_tableName, daily)logging.info('--> %s 的數據共計:%s 條 <--' %(date, len(daily)))sleep(random.randint(1, 2))logging.info('線程(%s)結束' % (threading.current_thread().name))def getDaily_basic(self):'''獲取Daily_basic數據'''#Tushare 官方文檔:https://tushare.pro/document/2?doc_id=32daily_basic = self.pro.query('daily_basic', ts_code='002427',trade_date=self.getDatetime())#簡單的數據清洗,將空值填充為0daily_basic = daily_basic.fillna(0)self.insertMysql(self.Daily_basic_tableName, daily_basic)logging.info('------>%s的Daily_basic的數據搞定<------' %(self.getDatetime()))#print(daily_basic)def main(self):#這里算是總調度吧self.log() #日志self.getDaily_basic() # 獲取Daily_basicself.getTrade_cal() #獲取日期(可是周期,也可是單天日期)#啟動多線程(4個)for x in range(4):x += 1t1 = threading.Thread(target=self.getDaily_date, name='getDaily_date %d 號進程' % (x))t1.start()t1.join()if __name__ == '__main__':#大功告成,實例化后啟動main函數。stock = stock()stock.main()