今天这堂课,开始要讲代码了,基于上一堂课,我们对交易系统有了最最基础的一个认识。接下来,我们就要结合着我们的代码来看一下,如何基于基本面进行选股。我们的策略是,先进行基本面分析,筛选出目前还在盈利的,负债水平较低,并且毛利率大于30%的股票。然后把这些股票作为我们的候选股,供我们下一步进行操作。
今天这堂课,开始要讲代码了,基于上一堂课,我们对交易系统有了最最基础的一个认识。接下来,我们就要结合着我们的代码来看一下,如何基于基本面进行选股。
我们的策略是,先进行基本面分析,筛选出目前还在盈利的,负债水平较低,并且毛利率大于30%的股票。然后把这些股票作为我们的候选股,供我们下一步进行操作。
我们采集数据的平台其实有多个,比如QMT,比如TuShare,比如akShare
| 数据平台 | 系统适配 | 局限性 |
|---|---|---|
| QMT | 只支持Win | 交易数据最为完整,但是宏观数据,研报等数据需要额外采集 |
| QMT | 无平台限制 | 需要花钱购买积分,有频率限制 |
| QMT | 无平台限制 | 数据依赖爬虫,容易出现数据不全 |
本期我们只考虑交易数据和财务数据,因此我们的本次代码将以QMT作为数据源。
第一步,主要是操作日线数据,捞取历史的日线数据,记住,一定要选择“前复权”,不然数据就失真,导致后面进行分析的时候,认为是重大利空,从而排除掉优质股票。
def download_all_stocks_data():
"""
下载A股全量股票历史数据并保存到MySQL数据库
"""
print("=" * 60)
print("开始同步A股全量股票数据")
print(f"日期范围:{DATA_START} 至 {DATA_END}")
print("=" * 60)
try:
# 步骤1:下载全量数据
print("\n步骤1:下载A股全量历史数据...")
try:
xtdata.download_history_data(
stock_code='', # 空表示全市场
period='1d',
start_time=DATA_START
)
print("下载完成,等待数据写入...")
time.sleep(3)
except Exception as e:
print(f"下载数据时出现警告:{e}")
print("继续尝试获取数据...")
# 步骤2:获取股票列表
print("\n步骤2:获取股票列表...")
stock_list = get_all_stock_codes()
if not stock_list:
print("无法获取股票列表")
return False
# 步骤3:逐只股票处理数据
print(f"\n步骤3:处理 {len(stock_list)} 只股票数据...")
total_inserted = 0
success_count = 0
failed_count = 0
for idx, stock_code in enumerate(stock_list, 1):
try:
print(f"[{idx}/{len(stock_list)}] 处理 {stock_code}...", end=' ')
df = process_stock_data(stock_code)
if df is not None and not df.empty:
affected_rows = insert_stock_data(df, stock_code)
total_inserted += affected_rows
success_count += 1
print(f"✓ ({len(df)} 条数据,新增 {affected_rows} 条)")
else:
print("✗ (无数据)")
failed_count += 1
# 每处理50只股票暂停一下,避免过度占用资源
if idx % 50 == 0:
time.sleep(1)
except Exception as e:
print(f"✗ (错误: {str(e)[:30]})")
failed_count += 1
continue
# 步骤4:显示统计信息
print("\n" + "=" * 60)
print("数据同步完成!")
print(f"处理股票数:{len(stock_list)}")
print(f"成功:{success_count},失败:{failed_count}")
print(f"总新增数据条数:{total_inserted}")
print(f"数据已保存到MySQL数据库 {DB_CONFIG['database']}.{TABLE_NAME}")
print("=" * 60)
return True
except Exception as e:
print(f"同步数据过程中发生错误:{e}")
import traceback
traceback.print_exc()
return False
def main():
print("=" * 60)
print("财务数据采集 (MiniQMT -> MySQL)")
if SYNC_ALL_STOCKS:
print(f"[全量模式] 采集{SECTOR}, 每批{BATCH_SIZE}只")
else:
print("[测试模式] 只采集贵州茅台")
print("=" * 60)
print("\n连接QMT数据服务...")
xtdata.connect()
print(" 连接成功")
# 获取股票列表
if SYNC_ALL_STOCKS:
print(f"\n获取 {SECTOR} 股票列表...")
all_codes = xtdata.get_stock_list_in_sector(SECTOR)
all_codes = [c for c in all_codes if '.' in str(c)]
print(f" 共 {len(all_codes)} 只股票")
else:
all_codes = [TEST_STOCK]
print(f"\n[测试模式] 只采集 {TEST_STOCK}")
# 查询已采集的股票,跳过已有数据的
print("查询数据库已有数据...")
existing = get_existing_stocks()
pending = [c for c in all_codes if c not in existing]
print(f" 已采集: {len(existing)} 只, 待采集: {len(pending)} 只")
if not pending:
print("\n全部已采集完成,无需下载")
_print_summary()
return
# 分批处理
batches = [pending[i:i + BATCH_SIZE] for i in range(0, len(pending), BATCH_SIZE)]
total_batches = len(batches)
total_pending = len(pending)
print(f"\n开始批量下载(共 {total_batches} 批, 每批最多 {BATCH_SIZE} 只)...")
total_rows = 0
total_ok = 0
total_done_stocks = 0
start_time = time.time()
for i, batch in enumerate(batches):
# 先打印当前批次状态,让用户知道正在处理
sys.stdout.write(
f"\r 批次 {i + 1}/{total_batches} 下载中... "
f"({total_done_stocks}/{total_pending}) "
)
sys.stdout.flush()
batch_rows, batch_ok = process_batch(batch)
total_rows += batch_rows
total_ok += batch_ok
total_done_stocks += len(batch)
elapsed = time.time() - start_time
speed = total_done_stocks / elapsed if elapsed > 0 else 0
eta = (total_pending - total_done_stocks) / speed if speed > 0 else 0
sys.stdout.write(
f"\r 批次 {i + 1}/{total_batches} 完成 | "
f"进度 {total_done_stocks}/{total_pending} ({total_done_stocks * 100 / total_pending:.1f}%) | "
f"{speed:.1f} 只/秒 | 剩余约 {eta:.0f}秒 | "
f"写入 {total_rows:,} 条 "
)
sys.stdout.flush()
print()
elapsed = time.time() - start_time
print("\n" + "=" * 60)
print(f"财务数据采集完成! 耗时 {elapsed:.1f} 秒")
print(f" 本次处理: {total_ok}/{total_pending} 只股票")
print(f" 总写入: {total_rows:,} 条记录")
def process_batch(batch_codes):
"""批量下载 + 解析 + 写DB,返回 (写入总行数, 成功股票数)"""
# 批量下载到本地缓存(一次调用覆盖整批所有报表)
done = [False]
def on_done(data):
done[0] = True
xtdata.download_financial_data2(
stock_list=batch_codes,
table_list=TABLE_LIST,
start_time=DATA_START,
end_time=DATA_END,
callback=on_done
)
# 等待下载完成,最长120秒
deadline = time.time() + 120
while not done[0] and time.time() < deadline:
time.sleep(0.5)
# 下载完成后额外等待,确保缓存写入
time.sleep(1)
# 批量获取数据
data = xtdata.get_financial_data(
stock_list=batch_codes,
table_list=TABLE_LIST,
start_time=DATA_START,
end_time=DATA_END,
report_type='report_time'
)
if not data:
return 0, 0
# 逐只解析,统一写DB
batch_rows = 0
batch_ok = 0
for code in batch_codes:
records = extract_periods(data, code)
if records:
rows = []
for rec in records:
p = rec['report_date']
report_date = f"{p[:4]}-{p[4:6]}-{p[6:8]}"
rows.append((
code, report_date,
rec['revenue'], rec['net_profit'], rec['eps'],
rec['roe'], rec['roa'], rec['gross_margin'], rec['net_margin'],
rec['debt_ratio'], rec['current_ratio'],
rec['operating_cashflow'], rec['total_assets'], rec['total_equity'],
'qmt'
))
execute_many(INSERT_SQL, rows)
batch_rows += len(rows)
batch_ok += 1
return batch_rows, batch_ok
我们在进行股票筛选的时候,首先需要做的是,去掉ST,去掉还在亏损的股票。另外,考虑到盈利水平,通过净利润同比增长来进行判断。以下是我罗列的指标。
ROE_MIN = 15
NETPROFIT_YOY_MIN = 10
GROSSPROFIT_MARGIN_MIN = 30
DEBT_TO_ASSETS_MAX = 60
OCF_TO_REVENUE_MIN = 10
#ROE >= 15%
ROE_MIN = 15
#净利润同比 >= 10%
NETPROFIT_YOY_MIN = 10
#毛利率 >= 30%
GROSSPROFIT_MARGIN_MIN = 30
#资产负债率 <= 60%
DEBT_TO_ASSETS_MAX = 60
#经营现金流/营收 >= 10%
OCF_TO_REVENUE_MIN = 10
def fetch_financial_data():
"""
从数据库查询符合条件的财务数据
"""
try:
conn = get_connection()
cursor = conn.cursor()
# 直接在SQL中应用所有筛选条件
query = f"""
WITH ranked_data AS (
SELECT
stock_code,
report_date,
roe,
gross_margin,
debt_ratio,
operating_cashflow,
revenue,
-- 计算净利润同比增长率
CASE
WHEN LAG(net_profit) OVER (
PARTITION BY stock_code
ORDER BY report_date
) IS NOT NULL
AND LAG(net_profit) OVER (
PARTITION BY stock_code
ORDER BY report_date
) != 0
THEN ROUND(
(net_profit - LAG(net_profit) OVER (
PARTITION BY stock_code
ORDER BY report_date
)) * 100.0 /
ABS(LAG(net_profit) OVER (
PARTITION BY stock_code
ORDER BY report_date
)), 2
)
ELSE NULL
END AS net_profit_yoy, -- 净利润同比增长率(%)
CASE
WHEN revenue IS NOT NULL
AND revenue > 0
AND operating_cashflow IS NOT NULL
THEN operating_cashflow * 1.0 / revenue * 100
ELSE NULL
END AS ocf_to_revenue_min,
-- 为每个股票按报告期倒序排名
ROW_NUMBER() OVER (
PARTITION BY stock_code
ORDER BY report_date DESC
) AS rn
FROM {TABLE_NAME}
WHERE 1=1
-- 基础数据校验
AND roe IS NOT NULL
AND gross_margin IS NOT NULL
AND debt_ratio IS NOT NULL
AND operating_cashflow IS NOT NULL
AND revenue IS NOT NULL
)
SELECT
stock_code,
report_date,
roe,
gross_margin,
debt_ratio,
operating_cashflow,
revenue,
ocf_to_revenue_min,
net_profit_yoy
FROM ranked_data
WHERE rn = 1 -- 只取每个股票最近一期
AND roe >= {ROE_MIN}
AND gross_margin >= {GROSSPROFIT_MARGIN_MIN}
AND debt_ratio <= {DEBT_TO_ASSETS_MAX}
AND ocf_to_revenue_min >= {OCF_TO_REVENUE_MIN}
AND net_profit_yoy >= {NETPROFIT_YOY_MIN}
ORDER BY roe DESC, stock_code
"""
cursor.execute(query)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
cursor.close()
conn.close()
if not rows:
print(f"警告:数据库中没有符合条件的数据")
return None
# 转换为 DataFrame
df = pd.DataFrame(rows, columns=columns)
print(f"从数据库查询到 {len(df)} 只符合条件的股票")
return df
except Exception as e:
print(f"查询数据库失败:{e}")
if conn:
conn.close()
return None
# 从数据库查询符合条件的数据
selected = fetch_financial_data()
if selected is None or selected.empty:
print("错误:无法获取符合条件的财务数据")
return
# 创建输出目录
os.makedirs(DATA_DIR, exist_ok=True)
selected.to_csv(OUTPUT_FILE, index=False, encoding='utf-8-sig')
print(f"\n筛选完成:共 {len(selected)} 只股票达标")
print(f"已保存:{OUTPUT_FILE}")
最后我们可以得到如下的数据,按照ROE进行排序之后,我们

对比了几个平台的数据,就财务数据来说,TuShare的数据其实更全面一点,它很多指标都帮你进行了计算,因此,有条件的同学可以购买TuShare的积分,来获取财务数据。但是如果确实也不想买这个TuShare积分,我们其实自己来计算这个指标。
本讲采用代码的方式讲解了量化交易里面第一步,也是最重要的一步。收集历史数据,收集财务数据,完成简单筛选。后面会讲解基于技术面的指标,制定策略,然后进行筛选。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!