可以在此处找到项目的原代码 📍链接

# 背景介绍 (瞎鸡儿凑字数)

# 为什么要用强化学习?

股票交易是一种优先考虑短期利润而不是长期回报的投资形式。因此,在进行股票交易时,有必要具备专业知识。大多数股票投资者都知道基本面分析中最常用的财务数据。在此分析的基础上,他们可以进行量化交易。

近年来,越来越多的公司试图利用人工智能来建立机器学习模型,以此来改进他们的交易策略。

在我们的项目中,我们在股票市场的背景下,建立了强化学习模型,对比特币的交易进行实时决策。

股票市场包括历史价格序列和走势,可以看作是一个复杂的环境。我们的目标是培养一个能自己对买入和卖出股票做决定的代理单元。因此,强化学习算法可以有效地解决这个问题。

强化学习的目的是优化累积的未来奖励信号,以便为序列选择问题制定适当的策略。在研究了部分金融投资组合管理问题的深度强化学习框架后,发现部分 Q 学习算法,如

  • 异步优势演员 - 评论家算法 (Asynchronous Advantage Actor-Critical, A3C)
  • 信赖域策略优化算法 (Trust Region Policy Optimization, TRPO)

在某些条件下存在明显高估的动作价值,继而影响最终的性能。相比之下,DQN (Deep Q-network) 通过将 Q 学习与深度神经网络相结合来解决这些问题,不仅减少了观察到的高估值,而且在一些博弈理论上显著地提高了性能。

# 为什么要用流处理

在传统的投资组合管理问题中使用流处理的原因是为了解决两个重要的量化交易问题:

  • 数据数量上的短缺
  • 数据本身质量低

金融数据信息处理需要从海量的噪声中提取有意义的数据。由于金融市场上存在大量的噪音,人们最朴素期待的结果就是自己做出的决策事正确决策的概率大于 50%。

但事实总是不尽人意,通常来说,基于以往有限样本的旧数据进行的预测研究往往很难达到令人接受的程度。

故,金融和经济学领域的强化学习研究通常不会在 “大数据” 背景下进行。

为了解决上述问题,使我们的系统具有更现实的意义和更多的商业价值,我们构建了一个集成深度强化学习模型和 Kafka 流处理方法的系统,来获取和处理实时股票数据。可以同时处理多种加密货币数据。

"深度强化学习(DRL)模型"

# 强化学习模型

强化学习训练的一个重要步骤是 Q 值表的更新,其中 Bellman 方程起着至关重要的作用。Bellman 方程的主要思想是利用下一个状态信息 vt+1v_{t+1} 得到当前状态的 Q 值 vtv_t

Qπ(s,a)=Eπ[Rt+1+γQπ(St+1,At+1)St=s,At=a]Q_{\pi}(s,a) = E_{\pi}[R_{t+1}+\gamma Q_{\pi}(S_{t+1},A_{t+1})|S_t = s,A_t = a]

其中 Qπ(s,a)Q_{\pi}(s,a) 是基于当前状态 ss 和遵循策略 π\pi 的动作 aa 的 Q 值。Qπ(St+1,At+1)Q_{\pi}(S_{t+1},A_{t+1}) 是下一个状态 st+1s_{t+1} 和此状态下的动作 at+1a_{t+1} 给出的 Q 值。 Rt+1R_{t+1} 表示下一刻的奖励。γ\gamma 是从 0 到 1 的折扣因子

DQN 中有 2 个神经网络,其中一个是策略网络,另一个是价值网络,它可以将状态映射到 (动作,Q 值) 对。在 DQN 训练中,更新神经网络权值时,Bellman 方程起着重要的作用

DQN 的核心部分是嵌入式 CNN 模型。对于我们项目中的状态空间,7 个技术指标用来表示状态环境:
平衡价格股票 MACDRSICCIADX
在每次状态迭代中,Q 值表都会被更新,agent 代理可以根据这些信息学习相应的策略

对于训练和测试数据,我们使用了 2015-2022 年的 Kaggle 开源 candle chart 数据,包括 BTCUSDT 和 Ethusdt 的开盘价高点低点收盘价。由于加密货币每周 7 天,每天 24 小时运行,因此我们的输入数据为 5GB 的 7 年数据

数据

7 年票证数据除了量大、处理资源需求大之外,还包含了大量 2018-2021 年不需要的动荡数据和长期牛市数据,这可能会影响我们模型的训练结果

我们选择了几个典型的时间段进行综合整合,包括牛市,熊市和高动荡市场 (2017 - 2018,2019 - 2020,2020-2021)。价格历史数据集标记良好、并预先格式化

为了提高网络收敛的速度,我们将数据归一化 (0 到 1 之间)。然后我们用模型进行了几次训练试验,但结果并不令人满意。我们认为,仅仅使用原始的价格数据可能不够充分。技术指标可能比价格数据能够为市场提供更多的信息。因此,我们决定将其包含在我们的数据集中,以提高模型的性能。

RSI、MACD 和 BRAR 技术指标是根据我们从 kaggle 收到的原始数据,使用 technical-indicators-lib 生成的

结果

我们对模型的超参数进行了微调。使用了 ReLU 和 Adam。用 Btcusdt 历史数据 (2017 - 2018,2019 - 2020,2020-2021) 来训练模型,2022.01 至 2022.04 来测试预训练的模型。我们将初始收益定为 100 万美元。

这种策略每天交易约 18 次。曲线如下图所示:最大 draw down 为 8%,年收益率为 30%,sharpe ratio 为 2.6。上图显示了利润与交易时间的关系

# 流处理操作流程

一旦我们导出了经过训练的强化学习模型,我们就可以通过一系列技术工具构建应用程序来获得实时预测结果。 如下图所示

"总体技术栈流程图"

整个项目流程图可以简单地分为几个部分,包括

  • 数据采集与流数据转换
  • 流数据处理
  • 流数据预测
  • 数据可视化

# 环境配置准备

建议使用 Anaconda 创建虚拟环境
conda create -n your_env_name python=x.x

在虚拟环境中安装相应依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
keras==2.3.1
tensorflow==1.14.0
pandas==0.24.2
numpy==1.16.1
tqdm==4.24.0
docopt==0.6.2
coloredlogs==10.0
jupyterlab==1.0.1
altair==4.0.0
seaborn==0.9.0

cryptocompare
matplotlib

pip install -r requirement.txt

# 流处理软件初始化

这个项目是在个人电脑上运行的 demo,Windows 11 系统。实际运行可能在云平台或虚拟机 Linux 上效果更加,配置操作相应变化。

Kafka 依赖 ZooKeeper 作为分布式系统提供协调服务的工具,在启动 Kafka 服务前需要先启动 ZooKeeper 服务。
新版本 Kafka 内置了 ZooKeeper 服务(Kafka 2.8 之后),所以我们可以有两种 ZooKeeper 的启动方式
一种是单独下载 ZooKeeper,然后配置环境变量、启动服务
第二种是启动 Kafka 内置的 ZooKeeper 服务

# 单独搭建 ZooKeeper 环境并启动服务

  1. 官网下载:https://zookeeper.apache.org/
  2. 选择自己需要的版本,下载压缩包(本项目是 3.6.3 版本)
  3. 将压缩包存放在磁盘某位置,路径无中文
  4. 在 ZooKeeper 的解压缩目录中,找到 conf 文件夹,该文件夹包含了 ZooKeeper 的配置文件。复制 “zoo_sample.cfg"这个文件,重命名为"zoo.cfg”,修改相应配置项,如
    • dataDir:指定 ZooKeeper 存储数据的目录,默认为 /tmp/zookeeper
    • dataLogDir:指定 ZooKeeper 存储日志的目录
    • clientPort:指定客户端连接到 ZooKeeper 服务器的端口号,默认为 2181
    • tickTime:指定 ZooKeeper 服务器之间的心跳间隔时间(以毫秒为单位),默认为 2000
  5. 创建需要的 data 和 logs 文件夹在相应目录,也可以在 cmd 命令行里全局指定地址,如
mark:1
1
set ZOOKEEPER_LOGS_PATH=C:\tmp\zookeeper
  1. 编辑 path 系统变量,添加路径 %ZOOKEEPER_HOME%\bin ,ZOOKEEPER_HOME 为文件夹路径
  2. 打开 cmd 命令窗口,输入 zkServer,启动 ZooKeeper 服务。关闭窗口则结束服务

# 下载 Kafka

  1. 官网下载:http://kafka.apache.org/downloads
  2. 选择自己需要的版本,下载压缩包(本项目是 2.12-3.1.0 版本)

# 启动 Kafka 内置的 ZooKeeper 服务

  1. 将压缩包存放在磁盘某位置,路径无中文,新建 data 文件夹
  2. 进入 "config" 目录,用编辑软件打开 "zookeeper.properties" 文件,修改 "dataDir"
  3. 打开 cmd 命令窗口,进入 Kafka 解压包根目录,输入命令行启动 Kafka 内置的 ZooKeeper 服务。关闭 cmd 窗口即关闭服务
mark:1
1
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

Start and initialize Zookeeper

# 启动 Kafka

  1. 在 Kafka 解压包根目录下新建 "kafka-logs" 文件夹
  2. 进入 "config" 目录,用编辑软件打开 "server.properties" 文件,找到 "log.dirs" 配置,将其路径设置为我们新建的 "kafka-logs" 目录的路径。如果有修改 ZooKeeper 服务的默认端口或 ip, 可以同时编辑 “zookeeper.connect”,更改 ZooKeeper 服务的 ip 和端口号配置
  3. 打开 cmd 命令窗口,进入 Kafka 解压包根目录下,输入命令行,启动 Kafka。关闭 cmd 窗口即关闭服务
mark:1
1
.\bin\windows\kafka-server-start.bat .\config\server.properties

Start and initialize Kafka

可能存在的报错
  • 端口占用
    可以用 netstat -ano | findstr 2181 找到占用端口的 pid,然后用 taskkill /f /pid pid号 关闭即可

  • Kafka 执行完就 shutting down
    可能是 Kafka 集群 id 跟元数据 "meta.properties" 中存储的不一致,导致启动失败。只需要将 Kafka 配置文件 "server.properties" 中 "log.dirs" 配置的 "kafka-logs" 目录下的文件全部删除,重新执行 Kafka 的启动命令即可。ZooKeeper 同理

  • timeindex.cleaned 另一个程序正在使用此文件,进程无法访问
    可能由于非正常关闭 Kafka, 导致其无法完成对日志文件完成解锁。将 "kafka-logs" 路径下的文件全部删除重启即可

  • 目录层级太深或者是目录名字太长可能导致启动失败

Kafka 和 Spark Dstream 之间的版本问题:

  • Spark 2.4 之后,没有针对 Spark Streaming 的更新(已弃用),这意味着两者之间没有 API。
    from pyspark.streaming.kafka import KafkaUtils
  • Structured Streaming 成为主流流处理数据结构
  • 需要一个配置文件用于连接 Kafka 和 Spark Streaming,他的版本也关系到能否起到兼容效果。
  • 此外还有需要注意的有
    • Py4J 的兼容性问题
    • Java JDK 的兼容性问题
    • 高版本 Python 的兼容性问题
mark:1-3
1
2
3
java8_location = '%JAVA_HOME%\Java\jdk1.8.0_331' 
os.environ['JAVA_HOME'] = java8_location
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-0-8-assembly_2.11-2.1.1.jar pyspark-shell'

使用 Cryptocompare API 来获取指定加密货币的实时价格以及相应的其他附加信息。原始数据以 {name, currency, amount, time} (加密货币名,对标货币名,货币量,时间)的字典形式打包

我们定义并初始化多个 Kafka 生产者,不同的价格信息将按其不同主题(其加密货币名称)分隔并存储在 Broker 上(这个 Broker 依赖 ZooKeeper 的分布式存储)。并发送日志记录当前实时数据情况

生产者
1
2
3
4
5
6
7
8
9
10
11
12
def initProducer():
# init an instance of KafkaProducer
print('Initializing Kafka producer at {}'.format(time.time()))
producer = KafkaProducer(bootstrap_servers=config['kafka_broker'], api_version=(0,11,5))
print('Initialized Kafka producer at {}'.format(time.time()))
return producer

def produceRecord(data, producer, topic, partition=0):
# act as a producer sending records on kafka
producer.send(topic=topic, partition=partition, value=bytes(str(data), 'utf-8'))
# debug \ message in prompt
print('Produce record to topic \'{0}\' at time {1}'.format(topic, time.time()))

Broker 充当生产者和消费者之间信息的渠道。 在这个过程中,所有这些组件都需要向 Zookeeper 注册。 Kafka 集群使用 Zookeeper 来管理 Kafka 配置并选举领导者。 所有主题和代理之间的映射由 Zookeeper 维护。 而当 Consumer Group 发生变化时,就会进行重新平衡。 由于我们仅在一台 PC 上运行演示,因此未应用分区配置。

消费者
1
2
3
4
5
6
7
8
9
10
11
def initConsumer(topic, timeout=1000):
# init an instance of KafkaConsumer
consumer = KafkaConsumer(topic, api_version=(0,11,5), bootstrap_servers=config['kafka_broker'], group_id=None,
auto_offset_reset='earliest', enable_auto_commit=False, consumer_timeout_ms=timeout)
return consumer

def produceRecord(data, producer, topic, partition=0):
# act as a producer sending records on kafka
producer.send(topic=topic, partition=partition, value=bytes(str(data), 'utf-8'))
# debug \ message in prompt
print('Produce record to topic \'{0}\' at time {1}'.format(topic, time.time()))

# Spark Dstream 流处理

Spark 作为 Consumer 获取 Broker 中的数据并转化为 Dstream。

Spark初始化
1
2
3
4
5
sc = SparkContext() 
# sc.setLogLevel('INFO')
ssc = StreamingContext(sc, 2)
# Instantiate a kafka stream for processing.
directKafkaStream = KafkaUtils.createDirectStream(ssc, [config['topic_1']], {'metadata.broker.list':config['kafka_broker']})

假设有几个用时间序列号标记的 Dstream。 为了消除由于我们获取高频数据而产生的异常干扰,我们设置了一个过滤操作来选择合格的候选者。原始 Dstream 被复制,以便一旦有新数据进来,就会与前一个数据进行比较。 如果它们之间的值差小于 3%,则可以允许进行进一步的操作,否则将被弃用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
data = []
time_get = []

print('Starting Apache Kafka consumers and producer')
consumer_1 = initConsumer(config['topic_1'])

while(len(data)<15):
records_1 = consumeRecord(consumer_1)
for record in records_1:
strings = time.strftime("%Y,%m,%d,%H,%M,%S", time.localtime(record["time"]))
t = strings.split(',')
numbers = [t[4], t[5]]
time_get.append(str(numbers[0]) + '-' + str(numbers[1]))
# time.sleep(1)
data.append(record["amount"])

initial_offset = data[1] - data[0]

经过初步选择后,对 Dstream 进行窗口操作。 窗口长度设置为 15 ,步幅 1。 一旦有新数据进来,旧记录就会消失。 一定程度上解决了数据短缺的问题。处理后的数据将被推送回 Broker,成为一个新的主题。

随后,该模型将从 Broker 获取数据作为新的 Consumer。 由于模型相对复杂,我们在利用模型进行预测时并没有完全采用流式处理。 我们没有使用 Spark MLlib 中的操作。

相反,从 Broker 中提取的数据将更改为 DataFrame 并由模型以正常方式读取。 最后,结果会形成折线图显示在前端页面。

此外,还初始化了另一组 Consumer 来代表不同加密货币的信息,以便在网站上显示价值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if request.is_websocket():
print('websocket connection....')
msg = request.websocket.wait() # 接收前端发来的消息
print(msg, type(msg), json.loads(msg)) # b'["1","2","3"]' <class 'bytes'> ['1', '2', '3']
consumer_1 = initConsumer(config['topic_1'])
consumer_2 = initConsumer(config['topic_2'])
consumer_3 = initConsumer(config['topic_3'])
while 1:
if msg:
records = {}
records[config['topic_1']] = consumeRecord(consumer_1)[0]
records[config['topic_2']] = consumeRecord(consumer_2)[0]
records[config['topic_3']] = consumeRecord(consumer_3)[0]
request.websocket.send(str(records).encode()) # 向客户端发送数据
time.sleep(0.5) # 每0.5秒发一次

重新整合字典的键后,将通过 WebSocket 传输到前端 HTML。 最后解析为 Json 格式进行可视化。
from dwebsocket.decorators import accept_websocket,require_websocket

# 命令整合

由于该项目涉及到的软件多,需要同时 / 相继运行多个命令,故写了 bat 批处理文件统筹运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
echo Preparing system
echo off
rmdir /q /s %ZOOKEEPER_LOGS_PATH%
rmdir /q /s %KAFKA_LOGS_PATH%
timeout 1
echo Starting Zookeeper
start cmd /k %KAFKA_PATH%\bin\windows\zookeeper-server-start.bat %KAFKA_PATH%\config\zookeeper.properties
timeout 5
echo Starting Kafka Server
start cmd /k %KAFKA_PATH%\bin\windows\kafka-server-start.bat %KAFKA_PATH%\config\server.properties
timeout 5
echo Starting Kafka Producer
start cmd /k %ANACONDA_PATH%/python.exe %THIS_PATH%/kafka/realTimeDataCollector.py /k
timeout 1
echo Starting Spark as Consumer
start cmd /k %ANACONDA_PATH%/python.exe %THIS_PATH%/trading-bot-master/eval.py /trading-bot-master/data/train.csv --model-name model_double-dqn_GOOG_50 --debug

echo Starting web server
start cmd /k %ANACONDA_PATH%/python.exe %THIS_PATH%/server/manage.py runserver

为了促进一点点代码健壮性和可复用性,乐。也写了配置文件统一修改参数

1
2
3
4
5
6
7
8
9
10
11
params = {
# crypto setup
'currency_1': 'BTC', # bitcoin
'currency_2': 'ETH', # ethereum
'currency_3': 'LINK', # chainlink
...
}
config = {
'kafka_broker': 'localhost:9092',
...
}

# 结果可视化

为了展示数据,我们设计了一个网站来展示一些图表和信息。

  1. 设置了几个旋转卡片来显示不同加密货币的信息。卡片的正面显示了加密货币的名称、实时价格和相应的时间点。背面是相应加密货币的介绍和网址链接。信息以秒为单位进行更新。更新速度和加密货币的数量可以调整。

cards

  1. 使用折线图显示强化学习模型的预测结果。x 轴表示对应的时间 (分 - 秒)。y 轴表示某种加密货币的实时值。由于我们输入的是窗口实时价格,它会根据概率计算输出下一个时间间隔的最佳动作,并给用户一些建议。一旦有下降的趋势,将会出现红色标志表示警告,提示卖出,而绿色标志这表示价格良好,可以买入。图像也以秒为单位进行更新。

graph

  1. 使用 Tradingview API 在页面中嵌入了一个 JavaScript 应用程序,以显示有关许多加密货币的更多信息。实现更多的功能。

tradingview

以下是 demo 视频展示

由于要快速传输实时数据,因此前端和后端之间的响应交互必须及时。因此,WebSocket 被用来解决这个问题。

websocket

中间件websocket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 判断浏览器是否支持WebSocket,目前应该所有的浏览器都支持了.....
if ('WebSocket' in window) {
console.log('你的浏览器支持 WebSocket')
}
// 创建一个WebSocket对象:sk,并且建立与服务端的连接(服务端程序要跑着哦)
var sk = new WebSocket('ws://' + window.location.host + '/test/');
// 向服务端发送消息
sk.onopen = function () {
console.log('websocket connection successful...');
var l = ['1', '2', '3'];
sk.send(JSON.stringify(l));
};
// 接收服务端的消息,主要的业务逻辑也在这里完成
sk.onmessage = function (msg) {
// 业务逻辑
// console.log('from service message: ', msg.data);
data = JSON.parse(msg.data.replace(/\'/g, '"'));
$("#BTC_currency").html(data["topic_BTC"]["currency"])
$("#BTC_amount").html(data["topic_BTC"]["amount"])
$("#BTC_time").html(timestampToTime(data["topic_BTC"]["time"]))

console.log('from service message: ', msg.data);
// 由于服务端主动断开连接,这里也断开WebSocket连接
if (sk.readyState == WebSocket.CLOSED) sk.close();
};
// 完事就关闭WebSocket连接
sk.onclose = function (msg) {
console.log('websocket connection close...');
sk.close()
};
// 当WebSocket连接创建成功后,我们就可以向服务端发送数据了
if (sk.readyState == WebSocket.OPEN) sk.onopen();

# 还能做的一些事情

  1. 可以将所有的强化学习操作将被移植到 Spark Streaming。利用 Keras API 和 Spark MLlib,我们可以更好的去实现 ransform, pipelines and featurization,完全以 Streaming 的形式实现预测功能。

  2. 可以使用其他工具来更好地在 Spark 中呈现模型。例如,Analytics Zoo 是一个在 Spark 上运行 TensorFlow 的平台。与基础设施、Spark 数据结构和管道集成。BigDL 是一个开源的分布式深度学习框架。从而实现分布式模型训练和评估。

  3. 虽然我们尝试用 Kafka 和 Spark 来重现整个管道过程,但是分布式属性,这个最大的优势和特点,并没有表现出来。原因主要是我们只在一台计算机上部署应用程序。

    为了实现分布式的想法,在云平台托管应用程序将是一个不错的选择。在此之前,Docker 可以设置一个容器来打包应用程序。Kubernetes 更好。它可以管理一堆主机和节点。在每个节点中,它运行许多独立的 pod (容器集),这些 pod 可以实现我们应用程序的组件,如数据库,网站服务器。

    一旦我们将 Kubernetes 部署在某个平台上,比如 Amazon EKS 或 Google GKE,它就可以创建一个集群来组织我们设置的所有节点,并构建一个控制面板来管理它们。在一定程度上实现了分布式特性。

    节点可以执行不同的指令。例如,不同节点可以异步处理不同类型的加密货币,从而减少数据更新的延迟。

    另一方面,多个节点可以继续运行训练过程,并通过实时性持续优化模型的性能。在不影响前端网页正常使用的情况下,逐步更换旧模型。

    当任何其他用户想要查看特定加密货币的预测结果时,特定的主机会提供相应的结果。Kafka 的分区配置也可以在多个节点中使用。

更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

Miralce 微信支付

微信支付

Miralce 支付宝

支付宝