【Mquant】4:量化投研配置本地数据库

admin

量化投研配置本地数据库是量化投资领域中非常重要的一环。通过搭建本地数据库,可以方便地存储和管理股票市场的日频数据,为后续的因子计算和策略研究提供支持。

1. 数据库搭建

首先我们抛出一个问题:真的需要数据库吗?

前段时间跑了一个回测代码(这里用的是mysql):用海龟策略跑2011年到2022年12年的分钟线数据,源数据是csv文件,我们将其存储到数据库过程就非常耗时,从数据库读出来也耗费了很长时间,大概有20多分钟的样子,但是如果我们将csv文件转化为pickle二进制文件,从硬盘读取到内存只用2分钟,而且内存占用量也下降了2倍多。如果说用金融时序数据库的话速度可能会快一些。

缺点:IO操作非常耗时,占用硬盘空间大。 优点:方便数据管理添加维护、框架集成,用户操作方便。

为了简化数据库搭建的过程,我们采用Veighna框架自带的数据管理模块来作为本地数据库管理工具。

2. PostgreSQL

PostgreSQL是特性更为丰富的开源关系型数据库,只推荐熟手使用。相比于MySQL,其特点如下:

采用多进程结构;支持通过扩展插件来新增功能。

Windows上PostgreSQL安装配置教程:https://blog.csdn.net/my1324/article/details/103226622

3. Veighna数据库架构介绍

我们首先进入到vnpy\trader\database.py这个python文件中

3.1 数据结构类

主要是BarOverview类和TickOverview类,这两个类封装了底层接口数据结构,提供数据视图,用来区分存储的数据。

3.2 BaseDatabase(ABC)数据库基类

BaseDatebase(ABC):是数据库基类,封装了常用的数据保存加载删除等抽象方法,由子类实现。

定义了8个抽象方法,继承了基类的子类都必须实现这些方法,从命名中我们也可以很清楚的知道每个方法的意思。

返回的数据类型,比如BarData,这种自定义的数据结构是在vnpy\trader\object.py文件中定义的。

3.3 get_database()获取数据库服务模块

def get_database() -> BaseDatabase:

""""""

# Return database object if already inited

global database

if database:

return database

# Read database related global setting

database_name: str = SETTINGS["database.name"]

module_name: str = f"vnpy_{database_name}"

# Try to import database module

try:

module: ModuleType = import_module(module_name)

except ModuleNotFoundError:

print(f"找不到数据库驱动{module_name},使用默认的SQLite数据库")

module: ModuleType = import_module("vnpy_sqlite")

# Create database object from module

database = module.Database()

return database

可以看到,Veighna是从SETTINGS配置文件里面读取设置的database名称,通过import_module()导入该数据库模块,所以我们需要提前下载好对应的vnpy_{}包,否则会默认使用vnpy_sqlite包作为项目数据库服务模块。

本文使用的是postgresql数据库,因此需要运行下面这行代码。

pip install vnpy_PostgreSQL Postgres

3.4 vnpy_postgresql数据库实例模块

首先找到vnpy_postgresql文件夹,一般在你下载的虚拟环境envs\vnpy\Lib\site-packages\vnpy_postgresql下。 可以使用pycharm中File——Settings——Project:xxx——project Structure导入这个模块到你的项目结构下。 我们点击postgresql_database.py这个文件,看到它的结构如下。 其实Veighna就是使用peewee这个python的ORM框架对数据结构进行进一步封装,对数据库的操作映射成对类、对象的操作,避免了我们直接写在SQL语句。看到这边,相信你对Veighna的数据管理已经有了直观的理解,下面我们就进行具体的配置操作。

4.具体配置

4.1 创建.vntrader文件

.vntrader和run.py一定是同级目录 程序加载setting里面的配置信息都是从这个文件夹里面的json文件获取。

def _get_trader_dir(temp_name: str) -> Tuple[Path, Path]:

"""

Get path where trader is running in.

"""

cwd: Path = Path.cwd()

temp_path: Path = cwd.joinpath(temp_name)

# If .vntrader folder exists in current working directory,

# then use it as trader running path.

if temp_path.exists():

return cwd, temp_path

# Otherwise use home path of system.

home_path: Path = Path.home()

temp_path: Path = home_path.joinpath(temp_name)

# Create .vntrader folder under home path if not exist.

if not temp_path.exists():

temp_path.mkdir()

return home_path, temp_path

TRADER_DIR, TEMP_DIR = _get_trader_dir(".vntrader")

sys.path.append(str(TRADER_DIR))

我们可以在vnpy\trader\utility.py里面找到这串代码,意思就是从当前工作目录获取.vntrader文件,添加到系统环境变量里面。如果当前环境没有这个文件,就会到Path.home()里面找,也就是你c盘用户目录C:\Users\Administrator

4.2 运行下面代码run.py

from vnpy.event import EventEngine

from vnpy.trader.engine import MainEngine

from vnpy.trader.ui import MainWindow, create_qapp

from vnpy_datamanager import DataManagerApp

def main():

"""Start VeighNa Trader"""

qapp = create_qapp()

event_engine = EventEngine()

main_engine = MainEngine(event_engine)

main_engine.add_app(DataManagerApp)

main_window = MainWindow(main_engine, event_engine)

main_window.showMaximized()

qapp.exec()

if __name__ == "__main__":

main()

第一次运行报错:

找不到数据库驱动vnpy_{},使用默认的SQLite数据库 没关系,这是因为我们在.vntrader里面没有配置要使用的数据库,我们将下面这个文件保存到.vntrader文件夹下面。

vt_setting.json

{

"font.family": "微软雅黑",

"font.size": 8,

"log.active": true,

"log.level": 50,

"log.console": true,

"log.file": true,

"email.server": "smtp.qq.com",

"email.port": 465,

"email.username": "",

"email.password": "",

"email.sender": "",

"email.receiver": "",

"datafeed.name": "",

"datafeed.username": "",

"datafeed.password": "",

"database.timezone": "Asia/Shanghai",

"database.name": "postgresql",

"database.database": "vnpy",

"database.host": "localhost",

"database.port": 5432,

"database.user":"postgres",

"database.password": "123456"

}

在SQL Shell(psql)下创建数据库

重启run.py文件

5.数据下载

5.1 环境配置

5.1.1 安装vnpy_binance

pip install vnpy_binance

vnpy_binance下面有三个接口,使用时需要注意本接口:

只支持全仓保证金模式只支持单向持仓模式

from vnpy_datamanager import DataManagerApp

from vnpy.event import EventEngine

from vnpy.trader.engine import MainEngine

from vnpy.trader.ui import MainWindow, create_qapp

from vnpy_binance import (

BinanceSpotGateway, # 现货交易

BinanceUsdtGateway, # 合约交易

BinanceInverseGateway # 用于对接币安反向合约的交易接口

)

def main():

"""Start VeighNa Trader"""

qapp = create_qapp()

event_engine = EventEngine()

main_engine = MainEngine(event_engine)

main_engine.add_app(DataManagerApp)

main_engine.add_gateway(BinanceSpotGateway)

main_window = MainWindow(main_engine, event_engine)

main_window.showMaximized()

qapp.exec()

if __name__ == "__main__":

main()

5.1.1 API配置

记住自己的API和Secret key 并且不要泄露出去,否则会有资金安全风险。

5.1.3 启动项目

首先 pip install requests这个包

运行上面代码后,发现报错如下: 在trader下的constant的Exchange类下添加一个常量

当我们看到这个界面后说明配置完成!

5.2 下载币安合约数据

from datetime import datetime, timedelta

from typing import List

from vnpy_binance import BinanceSpotGateway,BinanceUsdtGateway

from vnpy.event import EventEngine, Event

from vnpy.trader.constant import Exchange, Interval

from vnpy.trader.database import get_database, BarOverview

from vnpy.trader.event import EVENT_LOG

from vnpy.trader.object import HistoryRequest

from vnpy.trader.utility import load_json

setting = {

"key": "",

"secret": "",

"服务器": "REAL",

"代理地址": "",

"代理端口":

}

"""

一键下载币安现货的行情数据

"""

# 用于初始化引擎和数据库

class BinanceData:

def __init__(self, EventEngine):

self.binance = BinanceUsdtGateway(EventEngine, gateway_name="BINANCE_USDT")

self.binance.connect(setting=setting)

self.database = get_database()

self.proxies = {'https': '127.0.0.1:22307'}

def is_symbol_existed(self, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:

"""判断下载的数据是否重复出现在数据库中"""

bar_overview: List[BarOverview] = self.database.get_bar_overview()

for bar_view in bar_overview:

if bar_view.symbol != symbol:

continue

if bar_view.symbol == symbol and bar_view.interval != interval:

continue

if bar_view.symbol == symbol and bar_view.interval == interval:

if end < bar_view.start or start > bar_view.end:

continue

else:

return True

return False

def data_to_db(self, symbol: str, interval: Interval, start: datetime, end: datetime):

if self.is_symbol_existed(symbol, interval, start, end):

return

# 获取新数据

print(f"{datetime.now()}正在获取--{symbol}--k线数据")

req = HistoryRequest(

symbol=symbol,

exchange=Exchange.BINANCE,

start=start,

end=end,

interval=interval

)

bars = self.binance.query_history(req)

self.database.save_bar_data(bars)

print(f"{datetime.now()}--{symbol}--k线数据已被存入数据库!")

if __name__ == '__main__':

event = EventEngine()

binance = BinanceData(event)

event.start()

event.register(EVENT_LOG, lambda event: print(event.data))

symbols = ["BLURUSDT"]

for symbol in symbols:

binance.data_to_db(symbol, interval=Interval.HOUR, start=datetime(2015, 10, 5),

end=datetime.now())

event.stop()

5.3 下载币安现货数据

from datetime import datetime, timedelta

from typing import List

from vnpy_binance import BinanceSpotGateway

from vnpy.event import EventEngine, Event

from vnpy.trader.constant import Exchange, Interval

from vnpy.trader.database import get_database, BarOverview

from vnpy.trader.event import EVENT_LOG

from vnpy.trader.object import HistoryRequest

from vnpy.trader.utility import load_json

setting = {

"key": "",

"secret": "",

"服务器": "REAL",

"代理地址": "",

"代理端口":

}

"""

一键下载币安现货的行情数据

"""

# 用于初始化引擎和数据库

class BinanceData:

def __init__(self, EventEngine):

self.binance = BinanceSpotGateway(EventEngine, gateway_name="BINANCE_SPOT")

self.binance.connect(setting=setting)

self.database = get_database()

self.proxies = {'https': '127.0.0.1:22307'}

def is_symbol_existed(self, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:

"""判断下载的数据是否重复出现在数据库中"""

bar_overview: List[BarOverview] = self.database.get_bar_overview()

for bar_view in bar_overview:

if bar_view.symbol != symbol:

continue

if bar_view.symbol == symbol and bar_view.interval != interval:

continue

if bar_view.symbol == symbol and bar_view.interval == interval:

if end < bar_view.start or start > bar_view.end:

continue

else:

return True

return False

def data_to_db(self, symbol: str, interval: Interval, start: datetime, end: datetime):

if self.is_symbol_existed(symbol, interval, start, end):

return

# 获取新数据

print(f"{datetime.now()}正在获取--{symbol}--k线数据")

req = HistoryRequest(

symbol=symbol,

exchange=Exchange.BINANCE,

start=start,

end=end,

interval=interval

)

bars = self.binance.query_history(req)

self.database.save_bar_data(bars)

print(f"{datetime.now()}--{symbol}--k线数据已被存入数据库!")

if __name__ == '__main__':

event = EventEngine()

binance = BinanceData(event)

event.start()

event.register(EVENT_LOG, lambda event: print(event.data))

symbols = ["blurusdt"]

for symbol in symbols:

binance.data_to_db(symbol, interval=Interval.MINUTE, start=datetime(2015, 10, 5),

end=datetime.now()-timedelta(days=1))

event.stop()

5.4 下载期货行情数据

from datetime import datetime

from typing import List

from vnpy_rqdata.rqdata_datafeed import RqdataDatafeed

from vnpy.trader.constant import Exchange, Interval

from vnpy.trader.database import get_database, BarOverview

from vnpy.trader.object import HistoryRequest

import rqdatac

# 初始化

rqdataDatafeed = RqdataDatafeed()

rqdataDatafeed.init()

database = get_database()

def query_traffic():

# 获取流量使用情况

traffic_info = rqdatac.user.get_quota()

print(f"使用流量:{traffic_info['bytes_used'] / (2 ** 20)} Mb \n"

f"剩余流量:{(traffic_info['bytes_limit'] - traffic_info['bytes_used']) / (2 ** 20)} Mb ")

return (traffic_info['bytes_limit'] - traffic_info['bytes_used']) / (2 ** 20)

def is_symbol_existed(database, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:

"""判断下载的数据是否重复出现在数据库中"""

bar_overview: List[BarOverview] = database.get_bar_overview()

for bar_view in bar_overview:

if bar_view.symbol != symbol:

continue

if bar_view.symbol == symbol and bar_view.interval != interval:

continue

if bar_view.symbol == symbol and bar_view.interval == interval:

if end < bar_view.start or start > bar_view.end:

continue

else:

return True

return False

if __name__ == '__main__':

vt_symbols = [

"i99.DCE", "j99.DCE", "jm99.DCE", "rb99.SHFE", "hc99.SHFE"

]

for vt_symbol in vt_symbols:

symbol, exchange = vt_symbol.split(".")

start = datetime(2010, 1, 1)

end = datetime.now()

interval = Interval.DAILY

if is_symbol_existed(database, symbol, interval, start, end):

print(f"{symbol}的{Interval.MINUTE}数据重复出现在数据库中")

print("-" * 40)

continue

if query_traffic() < 30:

print("-" * 40)

break

historyReq = HistoryRequest(

symbol=symbol,

exchange=Exchange(exchange),

start=start,

end=end,

interval=interval

)

bars = rqdataDatafeed.query_bar_history(historyReq)

database.save_bar_data(bars)

print(f"{vt_symbol}下载完成!")

print()

6.保存成本地csv文件

为了后面方便投研分析以及数据备份的需要,我也给出了数据转化成csv文件保存的代码。

import datetime

import os.path

from typing import List

from vnpy.trader.constant import Exchange, Interval

from vnpy_datamanager import ManagerEngine

from vnpy.trader.database import BarOverview

from vnpy.trader.engine import MainEngine, EventEngine

def output_data_to_csv(engine, exchange, interval):

bar_view:List[BarOverview] = engine.get_bar_overview()

for bar_ in bar_view:

bar_dict = bar_.__dict__.get("__data__")

if bar_dict.get("interval") == interval and bar_dict.get(

"exchange") == exchange:

print(

f"--------正在保存{bar_dict.get('symbol')}这个文件------------")

# 保存成csv文件

filepath = rf"D:\market\feature\{interval.value}"

if not os.path.exists(filepath):

os.makedirs(filepath)

filepath_name = os.path.join(filepath,

f"{bar_dict.get('symbol')}.{bar_dict.get('exchange').value}.csv")

flag = manage_engine.output_data_to_csv(file_path=filepath_name,

exchange=exchange,

symbol=bar_dict.get(

"symbol"),

interval=interval,

start=bar_dict.get(

"start"),

end=bar_dict.get("end"))

if flag:

print(f"{datetime.datetime.now()},{filepath_name}文件保存成功!")

else:

print(f"{datetime.datetime.now()},{filepath_name}文件保存失败!")

if __name__ == '__main__':

main_engine = MainEngine()

event_engine = EventEngine()

manage_engine = ManagerEngine(main_engine, event_engine)

output_data_to_csv(manage_engine,

exchange=Exchange.SHFE, interval=Interval.DAILY)

main_engine.close()

7. 总结

文章介绍了在量化投资中配置本地数据库的重要性,并使用Veighna框架和PostgreSQL数据库来搭建和配置本地数据库。首先讨论了是否真的需要数据库以及数据库的优缺点。介绍了Veighna框架和PostgreSQL数据库的特点和安装配置方法。接下来详细介绍了Veighna框架中的数据库架构和相关类的功能。文章最后给出了具体的配置步骤,包括创建.vntrader文件夹和配置vt_setting.json文件,并演示了如何使用run.py运行程序。提供了配置本地数据库的详细指南,帮助量化投资者方便地存储和管理市场数据,为后续的因子计算和策略研究提供支持。

Copyright © 2088 南美洲世界杯预选赛程_世界杯2 - ycfcjt.com All Rights Reserved.
友情链接