给通用策略添加黑名单个股池,永久剔除ST,退市风险警示股票。
# 通用策略黑名单模块:永久剔除 ST 及退市风险警示股票
## 一、实际应用场景描述
在量化投资策略的实际运行中,股票池的**纯净度**直接决定了策略的存活率。一个"通用策略"往往需要具备泛化能力,但在 A 股市场,有一类股票是无论模型预测收益多高,都不应触碰的——**ST(特别处理)和 *ST(退市风险警示)股票**。
### 典型场景
- **场景一:踩雷** 某量化策略模型基于动量因子选中了一只"妖股",入场后才发现该股因财务造假被标记为 *ST,随后股价从 5 元一路跌至 0.5 元退市,策略直接归零。
- **场景二:流动性陷阱** ST 股票涨跌幅限制仅为 5%(正常股票为 10% 或 20%),且流动性极差。策略发出的买入信号可能导致挂单数日无法成交,或卖出时无人接盘,造成巨大的隐性滑点成本。
- **场景三:回测过拟合** 如果在历史回测中未剔除 ST 股票,某些策略可能会因为 ST 股在摘帽前后的异常波动而产生虚假的高收益,导致实盘严重亏损。
---
## 二、引入痛点
| 痛点 | 具体表现 |
|------|----------|
| 🔴 **本金毁灭性风险** | ST 股票面临退市风险,一旦退市,投资者可能血本无归,这是任何收益都无法弥补的风险。 |
| 🔴 **流动性枯竭** | ST 板块交易清淡,策略的买卖单极易造成价格冲击,实际成交价与回测假设相去甚远。 |
| 🔴 **涨跌幅限制差异** | ST 股票每日涨跌幅限制为 5%,直接套用普通股票的涨停板策略或波动率模型会产生巨大偏差。 |
| 🔴 **数据污染** | 含 ST 股票的历史数据会扭曲因子分析(如市盈率、市净率异常),导致 AI 模型学错规律。 |
| 🟡 **临时停牌与戴帽** | 股票可能在策略运行期间突然被 ST,如果未能及时识别并剔除,策略将暴露在风险中。 |
---
## 三、核心逻辑讲解
### 3.1 什么是 ST 与退市风险警示?
- **ST (Special Treatment)**:公司连续两年亏损,或净资产低于注册资本,交易所对其股票进行特别处理。日涨跌幅限制为 5%。
- ***ST**:公司存在退市风险(如连续三年亏损、重大违法等)。若在规定期限内未改善,将被强制退市。
### 3.2 剔除逻辑设计
我们不仅要在选股时剔除当前是 ST 的股票,还要维护一个**黑名单池(Blacklist Pool)**,永久记录曾经或正在被 ST 的股票,防止其在摘帽后短期内再次被模型选中(因为摘帽初期波动剧烈且基本面存疑)。
**核心流程:**
1. **每日扫描**:获取全市场股票列表,识别出当前被标记 ST 或 *ST 的股票代码。
2. **黑名单更新**:将识别出的代码加入黑名单 Set 中(Set 数据结构保证 O(1) 查询效率)。
3. **过滤拦截**:在策略发出买入信号后、实际执行交易前,检查标的代码是否在黑名单中。
4. **卖出清理**:如果已持有的股票被 ST,触发即时清仓信号(若无法卖出,则标记为风险持仓)。
### 3.3 黑名单机制设计
- **黑名单存储**:使用 Python 的 `set` 或 `dict` 存储,便于快速查找。
- **持久化**:策略重启时,黑名单应能从本地文件(如 `blacklist.json`)加载,实现"永久记忆"。
- **生命周期**:除非人工审核移除,否则黑名单股票永久有效。
---
## 四、项目结构
strategy_blacklist/
├── README.md
├── requirements.txt
├── config.yaml # 策略全局配置
├── data/
│ └── stock_universe.csv # 示例股票池(含ST标记)
├── src/
│ ├── data_loader.py # 数据加载与预处理
│ ├── blacklist_manager.py # ★ 黑名单核心管理模块
│ ├── stock_filter.py # 股票筛选器(含黑名单过滤)
│ ├── strategy_engine.py # 通用策略引擎(对接黑名单)
│ ├── backtester.py # 回测框架
│ └── visualizer.py # 可视化工具
├── main.py # 主入口
└── compare_with_blacklist.py # 有/无黑名单对比实验
---
## 五、完整代码(模块化 + 清晰注释)
### `requirements.txt`
txt
pandas>=1.5
numpy>=1.21
pyyaml>=6.0
matplotlib>=3.5
seaborn>=0.12
---
### `config.yaml`
yaml
策略全局配置
data:
stock_universe_file: "data/stock_universe.csv"
start_date: "2018-01-01"
end_date: "2024-12-31"
★ 黑名单模块配置
blacklist:
enabled: true
关键词规则:名称中含以下关键词的视为黑名单
st_keywords:
- "ST"
- "*ST"
- "退" # 退市板块标识(如 退市XX)
是否包含曾今被 ST 过但现在摘帽的股票(建议 true)
include_former_st: true
黑名单持久化文件路径
storage_file: "data/blacklist.json"
通用策略参数
strategy:
initial_capital: 1000000
commission_rate: 0.0003
stamp_tax_rate: 0.001
max_positions: 5
回测参数
backtest:
benchmark: "equal_weight" # 等权基准
---
### `src/data_loader.py`
python
"""
data_loader.py
数据加载模块:负责读取股票基础信息及日线行情
"""
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Optional, Tuple, Set
def load_stock_universe(filepath: str) -> pd.DataFrame:
"""
加载股票池数据
预期 CSV 格式:
code,name,industry,list_date,delist_date,status
000001,平安银行,金融,1991-04-03,,正常
000002,万科A,房地产,1991-01-29,,正常
000003,PT金田A,房地产,1991-01-02,2002-06-14,退市
...
600555,*ST海创,旅游,1992-12-02,,*ST
参数:
filepath: CSV 文件路径
返回:
DataFrame: 股票基础信息表
"""
df = pd.read_csv(filepath, parse_dates=['list_date', 'delist_date'])
required_cols = ['code', 'name', 'status']
for col in required_cols:
if col not in df.columns:
raise ValueError(f"股票池文件缺少必要列: {col}")
# 标准化股票代码格式(确保6位字符串)
df['code'] = df['code'].astype(str).str.zfill(6)
return df
def load_daily_prices(
stock_universe: pd.DataFrame,
start: Optional[str] = None,
end: Optional[str] = None
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
加载或生成日频价格数据
返回:
Tuple[收盘价矩阵, 成交量矩阵]
均为 DataFrame: index=date, columns=code
"""
# 模拟生成价格数据(实际项目中替换为真实数据加载逻辑)
dates = pd.date_range(start or '2018-01-01', end or '2024-12-31', freq='B')
codes = stock_universe['code'].tolist()
n = len(dates)
np.random.seed(42)
close_prices = pd.DataFrame(
np.cumprod(1 + np.random.normal(0.0003, 0.015, (n, len(codes))), axis=0) * 10,
index=dates,
columns=codes
)
volumes = pd.DataFrame(
np.random.randint(100000, 10000000, (n, len(codes))),
index=dates,
columns=codes
)
return close_prices, volumes
def identify_st_stocks(stock_universe: pd.DataFrame) -> Set[str]:
"""
识别股票池中的 ST 和 *ST 股票
逻辑:
1. 检查 status 字段是否包含 'ST' 或 '*ST'
2. 检查 name 字段是否以 'ST' 或 '*ST' 开头
参数:
stock_universe: 股票基础信息表
返回:
Set[str]: ST 股票代码集合
"""
st_codes = set()
for _, row in stock_universe.iterrows():
code = str(row['code']).zfill(6)
name = str(row.get('name', ''))
status = str(row.get('status', ''))
# 通过 status 字段判断
if 'ST' in status.upper():
st_codes.add(code)
continue
# 通过名称前缀判断
if name.startswith('ST') or name.startswith('*ST'):
st_codes.add(code)
continue
# 通过退市标记判断
if '退' in name or '退市' in name:
st_codes.add(code)
return st_codes
def get_delisted_stocks(stock_universe: pd.DataFrame) -> Set[str]:
"""
获取已退市股票列表
返回:
Set[str]: 已退市股票代码集合
"""
delisted = set()
for _, row in stock_universe.iterrows():
if pd.notna(row.get('delist_date')):
delisted.add(str(row['code']).zfill(6))
return delisted
---
### `src/blacklist_manager.py`(★ 核心模块)
python
"""
blacklist_manager.py
★ 黑名单管理模块:永久记录并剔除 ST、*ST 及退市风险股票
核心功能:
1. 自动识别 ST / *ST / 退市 股票
2. 黑名单持久化(JSON 文件),策略重启后仍有效
3. O(1) 时间复杂度查询(基于 Python set)
4. 支持手动添加/移除(人工审核入口)"""import jsonimport loggingfrom pathlib import Pathfrom typing import Set, Optional, List, Dictimport pandas as pd
配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(name)
class BlacklistManager:
"""
黑名单管理器
黑名单是一个 Set,存储所有被禁止交易的股票代码。
持久化到 JSON 文件,实现"永久记忆"。
"""
def __init__(
self,
storage_file: str = "data/blacklist.json",
st_keywords: Optional[List[str]] = None
):
"""
参数:
storage_file: 黑名单持久化文件路径
st_keywords: ST 识别关键词列表,默认 ['ST', '*ST', '退']
"""
self.storage_file = Path(storage_file)
self.st_keywords = st_keywords or ['ST', '*ST', '退']
# ★ 核心数据结构:黑名单代码集合
self._blacklist: Set[str] = set()
# 加载已保存的黑名单
self._load()
logger.info(f"黑名单管理器初始化完成,当前 {len(self._blacklist)} 只股票在黑名单中")
def _load(self):
"""从 JSON 文件加载黑名单"""
if self.storage_file.exists():
try:
with open(self.storage_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# 兼容旧格式:支持 list 或 dict 两种存储格式
if isinstance(data, list):
self._blacklist = set(data)
elif isinstance(data, dict):
self._blacklist = set(data.get('codes', []))
else:
self._blacklist = set()
logger.info(f"从 {self.storage_file} 加载了 {len(self._blacklist)} 只黑名单股票")
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"加载黑名单失败: {e},使用空黑名单")
self._blacklist = set()
else:
self._blacklist = set()
def save(self):
"""将黑名单持久化到 JSON 文件"""
self.storage_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.storage_file, 'w', encoding='utf-8') as f:
json.dump({
'codes': sorted(list(self._blacklist)),
'count': len(self._blacklist),
'last_updated': pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')
}, f, ensure_ascii=False, indent=2)
logger.debug(f"黑名单已保存: {len(self._blacklist)} 只")
def scan_and_update(
self,
stock_universe: pd.DataFrame
) -> Dict[str, List[str]]:
"""
★ 核心方法:扫描股票池,识别新的 ST/退市股票并更新黑名单
参数:
stock_universe: 股票基础信息表
返回:
Dict: {
'newly_added': [新加入黑名单的股票代码列表],
'already_in_list': [已在黑名单中的股票代码列表]
}
"""
newly_added = []
already_in = []
for _, row in stock_universe.iterrows():
code = str(row['code']).zfill(6)
name = str(row.get('name', ''))
status = str(row.get('status', '')).upper()
# === 判断逻辑 ===
is_st = False
# 1. 通过 status 字段判断
if 'ST' in status:
is_st = True
# 2. 通过名称关键词判断
for keyword in self.st_keywords:
if keyword in name:
is_st = True
break
# 3. 已退市股票
if pd.notna(row.get('delist_date')):
is_st = True
# === 更新黑名单 ===
if is_st:
if code not in self._blacklist:
self._blacklist.add(code)
newly_added.append(code)
logger.warning(f"⚠️ 新增黑名单: {code} ({name}) - 状态: {status}")
else:
already_in.append(code)
# 持久化
if newly_added:
self.save()
logger.info(f"黑名单更新: 新增 {len(newly_added)} 只,当前共 {len(self._blacklist)} 只")
return {
'newly_added': newly_added,
'already_in_list': already_in,
'total_count': len(self._blacklist)
}
def is_blocked(self, code: str) -> bool:
"""
检查某只股票是否在黑名单中
参数:
code: 6 位股票代码
返回:
bool: True = 被禁止交易
"""
return str(code).zfill(6) in self._blacklist
def filter_blocked(
self,
candidates: pd.DataFrame
) -> pd.DataFrame:
"""
从候选股票池中过滤掉黑名单股票
参数:
candidates: 候选股票池(需包含 'code' 列)
返回:
DataFrame: 过滤后的股票池
"""
before = len(candidates)
mask = candidates['code'].apply(lambda c: not self.is_blocked(str(c).zfill(6)))
filtered = candidates[mask].copy()
removed = before - len(filtered)
if removed > 0:
blocked_codes = candidates[~mask]['code'].tolist()
logger.info(f"黑名单过滤: 移除了 {removed} 只股票 {blocked_codes}")
return filtered
def add(self, code: str, reason: str = "manual"):
"""手动添加股票到黑名单"""
code = str(code).zfill(6)
if code not in self._blacklist:
self._blacklist.add(code)
self.save()
logger.info(f"手动添加黑名单: {code} (原因: {reason})")
else:
logger.debug(f"股票 {code} 已在黑名单中")
def remove(self, code: str, reason: str = "manual_review"):
"""手动从黑名单移除(需人工审核)"""
code = str(code).zfill(6)
if code in self._blacklist:
self._blacklist.remove(code)
self.save()
logger.info(f"手动移除黑名单: {code} (原因: {reason})")
else:
logger.debug(f"股票 {code} 不在黑名单中")
def get_all(self) -> List[str]:
"""获取完整黑名单列表"""
return sorted(list(self._blacklist))
def clear(self):
"""清空黑名单(慎用)"""
count = len(self._blacklist)
self._blacklist.clear()
self.save()
logger.warning(f"黑名单已清空 ({count} 只被移除)")
def print_summary(self):
"""打印黑名单摘要"""
codes = self.get_all()
print(f"\n{'='*60}")
print(f" 黑名单摘要")
print(f"{'='*60}")
print(f" 黑名单股票总数: {len(codes)}")
if codes:
print(f" 前 20 只:")
for c in codes[:20]:
print(f" {c}")
if len(codes) > 20:
print(f" ... 共 {len(codes)} 只")
print(f"{'='*60}\n")
---
### `src/stock_filter.py`
python
"""
stock_filter.py
股票筛选器:在 AI 模型打分前,先过滤掉不合格标的
"""
import pandas as pd
from typing import Optional
from src.blacklist_manager import BlacklistManager
class StockFilter:
"""
通用股票预筛选器
在 AI 模型介入之前,先用规则过滤掉:
1. ★ 黑名单股票(ST、*ST、退市)
2. 停牌股票
3. 上市不满 N 天的新股
4. 流动性不足的股票
"""
def __init__(
self,
blacklist_mgr: BlacklistManager,
min_listing_days: int = 60, # 上市不满 60 天的新股剔除
min_daily_volume: float = 500000, # 日均成交量下限
exclude_suspended: bool = True # 是否剔除停牌股
):
self.blacklist_mgr = blacklist_mgr
self.min_listing_days = min_listing_days
self.min_daily_volume = min_daily_volume
self.exclude_suspended = exclude_suspended
def filter(
self,
stock_pool: pd.DataFrame,
current_date: pd.Timestamp,
price_data: Optional[pd.DataFrame] = None,
volume_data: Optional[pd.DataFrame] = None
) -> pd.DataFrame:
"""
执行完整筛选流程
参数:
stock_pool: 候选股票池
current_date: 当前日期
price_data: 价格数据(用于判断停牌)
volume_data: 成交量数据(用于判断流动性)
返回:
DataFrame: 筛选后的股票池
"""
filtered = stock_pool.copy()
original_count = len(filtered)
# ===== 筛选 1: ★ 黑名单过滤(最核心) =====
filtered = self.blacklist_mgr.filter_blocked(filtered)
# ===== 筛选 2: 新股过滤 =====
if 'list_date' in filtered.columns:
listing_days = (current_date - pd.to_datetime(filtered['list_date'])).dt.days
filtered = filtered[listing_days >= self.min_listing_days]
# ===== 筛选 3: 停牌过滤 =====
if self.exclude_suspended and price_data is not None:
# 当日无价格数据 = 停牌
valid_codes = [c for c in filtered['code'] if c in price_data.columns]
filtered = filtered[filtered['code'].isin(valid_codes)]
# ===== 筛选 4: 流动性过滤 =====
if volume_data is not None and self.min_daily_volume > 0:
# 取最近 20 日均成交量
recent_vol = volume_data.loc[:current_date].tail(20)
avg_vol = recent_vol.mean()
liquid_codes = [c for c in filtered['code'] if c in avg_vol.index and avg_vol[c] >= self.min_daily_volume]
filtered = filtered[filtered['code'].isin(liquid_codes)]
removed_count = original_count - len(filtered)
if removed_count > 0:
print(f" [筛选器] 移除了 {removed_count} 只不合格股票,剩余 {len(filtered)} 只")
return filtered
---
### `src/strategy_engine.py`
python
"""
strategy_engine.py
通用策略引擎:对接黑名单模块,在交易执行前进行拦截
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from src.blacklist_manager import BlacklistManager
from src.stock_filter import StockFilter
@dataclass
class Position:
"""持仓记录"""
code: str
open_date: pd.Timestamp
open_price: float
quantity: int
status: str = "open"
close_date: Optional[pd.Timestamp] = None
close_price: Optional[float] = None
pnl: float = 0.0
class UniversalStrategy:
"""
通用策略引擎
核心特性:
★ 所有买入操作前必须经过黑名单检查
★ 持有股票被 ST 时触发强制平仓
"""
def __init__(
self,
blacklist_mgr: BlacklistManager,
stock_filter: StockFilter,
initial_capital: float = 1_000_000,
commission_rate: float = 0.0003,
stamp_tax_rate: float = 0.001,
max_positions: int = 5
):
self.blacklist_mgr = blacklist_mgr
self.stock_filter = stock_filter
self.initial_capital = initial_capital
self.capital = initial_capital
self.commission_rate = commission_rate
self.stamp_tax_rate = stamp_tax_rate
self.max_positions = max_positions
# 策略状态
self.positions: Dict[str, Position] = {}
self.closed_positions: List[Position] = []
self.daily_nav: Dict[pd.Timestamp, float] = {}
# 统计
self.blocked_buy_attempts = 0 # 被黑名单拦截的买入尝试
self.forced_sells_st = 0 # 因 ST 强制平仓的次数
print(f"\n{'='*60}")
print(f" 通用策略引擎初始化")
print(f" 黑名单股票数: {len(self.blacklist_mgr.get_all())}")
print(f" 最大持仓数: {max_positions}")
print(f"{'='*60}\n")
def _calculate_position_size(self, price: float) -> int:
"""计算买入数量(整百股)"""
max_cap = self.capital / self.max_positions
qty = int(max_cap / price / 100) * 100
return max(qty, 0)
def _execute_buy(self, code: str, date: pd.Timestamp, price: float) -> bool:
"""执行买入"""
qty = self._calculate_position_size(price)
if qty <= 0:
return False
cost = qty * price * (1 + self.commission_rate)
if cost > self.capital:
return False
self.capital -= cost
self.positions[code] = Position(
code=code, open_date=date, open_price=price, quantity=qty
)
return True
def _execute_sell(self, code: str, date: pd.Timestamp, price: float, reason: str = "normal"):
"""执行卖出"""
pos = self.positions.pop(code, None)
if pos is None:
return
revenue = pos.quantity * price * (1 - self.commission_rate - self.stamp_tax_rate)
self.capital += revenue
pos.close_date = date
pos.close_price = price
pos.status = reason
pos.pnl = revenue - pos.quantity * pos.open_price
self.closed_positions.append(pos)
if reason == "st_forced_sell":
self.forced_sells_st += 1
print(f" ⚠️ 强制平仓: {code} 被 ST/退市,于 {date.strftime('%Y-%m-%d')} 卖出")
def check_and_handle_st_positions(self, date: pd.Timestamp, prices: pd.Series):
"""
★ 核心方法:检查持仓中是否有股票被 ST,有则强制平仓
"""
codes_to_check = list(self.positions.keys())
for code in codes_to_check:
if self.blacklist_mgr.is_blocked(code):
# 该持仓股票已被加入黑名单(ST/退市)
price = prices.get(code, None)
if price is not None and price > 0:
self._execute_sell(code, date, price, reason="st_forced_sell")
else:
# 停牌无法卖出,标记风险
print(f" ⚠️ {code} 被 ST 且停牌,无法卖出!持仓风险暴露中!")
def generate_signals(
self,
filtered_pool: pd.DataFrame,
date: pd.Timestamp,
prices: pd.Series
) -> pd.Series:
"""
生成买入信号
简化逻辑(实际项目中替换为 AI 模型打分):
- 随机给每只股票打个分
- 分数 > 0.5 的视为买入候选
返回:
Series: {code: signal_score},0 分 = 不买入
"""
np.random.seed(int(date.strftime('%Y%m%d')))
scores = pd.Series(0.0, index=filtered_pool['code'].values)
for code in filtered_pool['code'].values:
if code in prices and pd.notna(prices[code]):
scores[code] = np.random.uniform(0, 1)
return scores
def run_daily(
self,
date: pd.Timestamp,
stock_pool: pd.DataFrame,
prices: pd.Series,
volumes: Optional[pd.Series] = None,
price_data: Optional[pd.DataFrame] = None,
volume_data: Optional[pd.DataFrame] = None
):
"""
★ 每日策略执行(含黑名单检查)
参数:
date: 当前交易日
stock_pool: 当日股票池
prices: {code: close_price}
volumes: {code: volume}(可选)
price_data: 完整价格矩阵(用于停牌判断)
volume_data: 完整成交量矩阵(用于流动性判断)
"""
# ===== Step 0: 检查现有持仓是否被 ST =====
self.check_and_handle_st_positions(date, prices)
# ===== Step 1: 股票预筛选(含黑名单过滤) =====
filtered_pool = self.stock_filter.filter(
stock_pool, date, price_data, volume_data
)
if len(filtered_pool) == 0:
self._record_daily_nav(date, prices)
return
# ===== Step 2: 生成买入信号 =====
signals = self.generate_signals(filtered_pool, date, prices)
# ===== Step 3: 执行买入(黑名单二次校验) =====
if len(self.positions) < self.max_positions:
buy_candidates = signals[signals > 0.5].sort_values(ascending=False)
for code, score in buy_candidates.items():
if len(self.positions) >= self.max_positions:
break
if code in self.positions:
continue
# ★★★ 黑名单最终拦截 ★★★
if self.blacklist_mgr.is_blocked(code):
self.blocked_buy_attempts += 1
print(f" 🚫 黑名单拦截: {code} 被禁止买入(ST/退市风险)")
continue
if code in prices and pd.notna(prices[code]) and prices[code] > 0:
self._execute_buy(code, date, prices[code])
# ===== Step 4: 记录当日净值 =====
self._record_daily_nav(date, prices)
def _record_daily_nav(self, date: pd.Timestamp, prices: pd.Series):
"""记录当日组合净值"""
nav = self.capital
for code, pos in self.positions.items():
if code in prices and pd.notna(prices[code]):
nav += pos.quantity * prices[code]
self.daily_nav[date] = nav
def print_statistics(self):
"""打印策略统计"""
print(f"\n{'='*60}")
print(f" 策略运行统计")
print(f"{'='*60}")
print(f" 初始资金: ¥{self.initial_capital:,.0f}")
print(f" 期末资金: ¥{self.capital:,.2f}")
print(f" 已完成交易: {len(self.closed_positions)} 笔")
print(f" 当前持仓: {len(self.positions)} 只")
print(f" 被黑名单拦截: {self.blocked_buy_attempts} 次买入尝试")
print(f" ST 强制平仓: {self.forced_sells_st} 次")
print(f"{'='*60}\n")
# 打印黑名单
sel
本文代码仅供学习与技术交流,不构成任何投资建议,股市有风险,入市需谨慎!
利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!