Python 实现 DMI 指标计算:股票技术分析的利器系列(5)
- 介绍
- 算法解释
- 代码
- rolling函数介绍
- 计算 MTR
- max 函数
- sum 函数
- 计算 HD
- 计算 LD
- 计算 DMP
- 计算 DMM
- 计算 PDI、MDI 和 ADX
- 完整代码
介绍
先看看官方介绍:
DMI (趋向指标)
用法
1.PDI线从下向上突破MDI线,显示有新多头进场,为买进信号;
2.PDI线从上向下跌破MDI线,显示有新空头进场,为卖出信号;
3.ADX值持续高于前一日时,市场行情将维持原趋势;
4.ADX值递减,降到20以下,且横向行进时,市场气氛为盘整;
5.ADX值从上升倾向转为下降时,表明行情即将反转。
算法解释
MTR:= SUM(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(LOW-REF(CLOSE,1))),N);
HD := HIGH-REF(HIGH,1);
LD := REF(LOW,1)-LOW;
DMP:= SUM(IF(HD>0 AND HD>LD,HD,0),N);
DMM:= SUM(IF(LD>0 AND LD>HD,LD,0),N);
PDI: DMP*100/MTR;
MDI: DMM*100/MTR;
ADX: MA(ABS(MDI-PDI)/(MDI+PDI)*100,M);
ADXR:(ADX+REF(ADX,M))/2;
代码
rolling函数介绍
rolling
函数通常与其他函数(如 mean
、sum
、std
等)一起使用,以计算滚动统计量,例如滚动均值、滚动总和等。
以下是 rolling
函数的基本语法:
DataFrame.rolling(window, min_periods=None, center=False, win_type=None, on=None, axis=0, closed=None)
window
: 用于计算统计量的窗口大小。min_periods
: 每个窗口最少需要的非空观测值数量。center
: 确定窗口是否居中,默认为False
。win_type
: 窗口类型,例如None
、boxcar
、triang
等,默认为None
。on
: 在数据帧中执行滚动操作的列,默认为None
,表示对整个数据帧执行操作。axis
: 执行滚动操作的轴,默认为0
,表示按列执行操作。closed
: 确定窗口的哪一端是闭合的,默认为None
。
计算 MTR
MTR
是真实范围指标(True Range),它是用来衡量股票或其他金融资产价格波动性的指标。
# 计算 MTR
df['HL'] = df['HIGH'] - df['LOW']
df['HC'] = abs(df['HIGH'] - df['CLOSE'].shift(1))
df['LC'] = abs(df['LOW'] - df['CLOSE'].shift(1))
df['MAX1'] = df[['HL', 'HC']].max(axis=1)
df['MAX2'] = df[['MAX1', 'LC']].max(axis=1)
df['MTR'] = df['MAX2'].rolling(window=N).sum()
df['HL'] = df['HIGH'] - df['LOW']
:计算每日的价格范围(High - Low)。df['HC'] = abs(df['HIGH'] - df['CLOSE'].shift(1))
:计算当日的最高价与前一日收盘价的差的绝对值。df['LC'] = abs(df['LOW'] - df['CLOSE'].shift(1))
:计算当日的最低价与前一日收盘价的差的绝对值。df['MAX1'] = df[['HL', 'HC']].max(axis=1)
:取每日价格范围与 HC 列的最大值。df['MAX2'] = df[['MAX1', 'LC']].max(axis=1)
:取 MAX1 列与 LC 列的最大值。df['MTR'] = df['MAX2'].rolling(window=N).sum()
:对 MTR 列进行滚动窗口计算,窗口大小为 N,每日的 MTR 是该日及前 N-1 日的 MAX2 列的总和。
max 函数
.max(axis=1)
函数是 Pandas DataFrame 中的函数,用于沿着指定的轴(这里是轴1,即列)计算最大值。例如,df[['HL', 'HC']].max(axis=1)
将计算 DataFrame 中 ‘HL’ 和 ‘HC’ 列对应行的最大值。
sum 函数
.sum()
函数是用于对指定轴(默认是轴0,即行)上的元素求和。在这里,.rolling(window=N).sum()
表示对滚动窗口中的元素进行求和,窗口大小为 N。
计算 HD
HD
是真实高点(True High),它表示当日的最高价与前一日的最高价之差。
# 计算 HD
df['HD'] = df['HIGH'] - df['HIGH'].shift(1)
df['HD'] = df['HIGH'] - df['HIGH'].shift(1)
:这行代码计算了每日最高价与前一日最高价的差,得到了真实高点。使用 .shift(1)
函数是为了获取前一日的数据,从而与当日的最高价相减得到差值。
计算 LD
LD
是真实低点(True Low),它表示当日的最低价与前一日的最低价之差。
# 计算 LD
df['LD'] = df['LOW'].shift(1) - df['LOW']
df['LD'] = df['LOW'].shift(1) - df['LOW']
:这行代码计算了每日最低价与前一日最低价的差,得到了真实低点。使用 .shift(1)
函数是为了获取前一日的数据,从而与当日的最低价相减得到差值。
计算 DMP
DMP
(上升动向值)是用来衡量股票或其他金融资产价格上涨趋势的指标。具体来说,DMP
是在一定时间内(这里使用滚动窗口大小为 N
)价格上涨幅度的累积。一般情况下,它是通过比较当日的真实高点(HD
)与真实低点(LD
)来确定是否存在上升趋势。
# 计算 DMP
df['DMP'] = df.apply(lambda x: x['HD'] if (x['HD'] > 0 and x['HD'] > x['LD']) else 0, axis=1).rolling(window=N).sum()
- 对于每一行数据,使用
apply()
函数应用一个匿名函数(lambda x
)。 - 在匿名函数中,根据条件
x['HD'] > 0 and x['HD'] > x['LD']
来判断是否满足上升动向的条件。如果当日的真实高点(HD)大于 0 且大于真实低点(LD),则将该行的 HD 作为 DMP 的值,否则设为 0。 - 最后,使用
.rolling(window=N).sum()
对 DMP 列进行滚动窗口求和,窗口大小为 N,得到了每日的 DMP 值。
计算 DMM
DMM
(下降动向值)是用来衡量股票或其他金融资产价格下跌趋势的指标。与DMP
相似,DMM
是在一定时间内(这里使用滚动窗口大小为N
)价格下跌幅度的累积。它通常通过比较当日的真实低点(LD
)与真实高点(HD
)来确定是否存在下降趋势。
# 计算 DMM
df['DMM'] = df.apply(lambda x: x['LD'] if (x['LD'] > 0 and x['LD'] > x['HD']) else 0, axis=1).rolling(window=N).sum()
- 对于每一行数据,使用
apply()
函数应用一个匿名函数(lambda x
)。 - 在匿名函数中,根据条件
x['LD'] > 0 and x['LD'] > x['HD']
来判断是否满足下降动向的条件。如果当日的真实低点(LD)大于0且大于真实高点(HD),则将该行的LD作为DMM的值,否则设为0。 - 最后,使用
.rolling(window=N).sum()
对DMM列进行滚动窗口求和,窗口大小为N,得到了每日的DMM值。
计算 PDI、MDI 和 ADX
指标名称 | 描述 |
---|---|
PDI(Positive Directional Indicator) | 正向指标,用于衡量上升趋势的强度。它是上升动向值(DMP)与真实范围(MTR)的比值,通常用百分比表示。 |
MDI(Negative Directional Indicator) | 负向指标,用于衡量下降趋势的强度。它是下降动向值(DMM)与真实范围(MTR)的比值,通常用百分比表示。 |
ADX(Average Directional Index) | 平均动向指数,用于衡量趋势的强度。它是 PDI 和 MDI 之间的差的绝对值与它们之和的比值,然后再乘以 100,得到的结果就是 DX(方向指标)值。最后,使用滚动窗口计算 DX 值的平均值,得到了 ADX。 |
# 计算 PDI、MDI 和 ADX
df['PDI'] = df['DMP'] * 100 / df['MTR']
df['MDI'] = df['DMM'] * 100 / df['MTR']
df['DX'] = (df['PDI'] - df['MDI']).abs() / (df['PDI'] + df['MDI']) * 100
df['ADX'] = df['DX'].rolling(window=M).mean()
df['PDI'] = df['DMP'] * 100 / df['MTR']
:计算了正向指标(PDI),即上升动向值与真实范围的比值,再乘以100得到百分比。
df['MDI'] = df['DMM'] * 100 / df['MTR']
:计算了负向指标(MDI),即下降动向值与真实范围的比值,再乘以100得到百分比。
df['DX'] = (df['PDI'] - df['MDI']).abs() / (df['PDI'] + df['MDI']) * 100
:计算了方向指标(DX),即 PDI 和 MDI 之间的差的绝对值与它们之和的比值,再乘以 100。
df['ADX'] = df['DX'].rolling(window=M).mean()
:最后,计算了平均动向指数(ADX),使用滚动窗口计算 DX 值的平均值,窗口大小为 M。
完整代码
import pandas as pd
data = {
'CLOSE': 填每日收盘的数据,
'HIGH': 填每日最高的数据,
'LOW': 填每日最低的数据
}
df = pd.DataFrame(data)
N = 14
M = 6
# 计算 MTR
df['HL'] = df['HIGH'] - df['LOW']
df['HC'] = abs(df['HIGH'] - df['CLOSE'].shift(1))
df['LC'] = abs(df['LOW'] - df['CLOSE'].shift(1))
df['MAX1'] = df[['HL', 'HC']].max(axis=1)
df['MAX2'] = df[['MAX1', 'LC']].max(axis=1)
df['MTR'] = df['MAX2'].rolling(window=N).sum()
# 计算 HD
df['HD'] = df['HIGH'] - df['HIGH'].shift(1)
# 计算 LD
df['LD'] = df['LOW'].shift(1) - df['LOW']
# 计算 DMP
df['DMP'] = df.apply(lambda x: x['HD'] if (x['HD'] > 0 and x['HD'] > x['LD']) else 0, axis=1).rolling(window=N).sum()
# 计算 DMM
df['DMM'] = df.apply(lambda x: x['LD'] if (x['LD'] > 0 and x['LD'] > x['HD']) else 0, axis=1).rolling(window=N).sum()
# 计算 PDI、MDI 和 ADX
df['PDI'] = df['DMP'] * 100 / df['MTR']
df['MDI'] = df['DMM'] * 100 / df['MTR']
df['DX'] = (df['PDI'] - df['MDI']).abs() / (df['PDI'] + df['MDI']) * 100
df['ADX'] = df['DX'].rolling(window=M).mean()
# 计算 ADXR
df['ADXR'] = (df['ADX'] + df['ADX'].shift(M)) / 2
# 删除中间计算用的列
df.drop(['LOW', 'HL', 'HC', 'LC', 'MAX1', 'MAX2', 'HD', 'LD', 'DX'], axis=1, inplace=True)
print(df)