第三讲 如何基于基本面筛选

  • Leo
  • 发布于 7小时前
  • 阅读 31

今天这堂课,开始要讲代码了,基于上一堂课,我们对交易系统有了最最基础的一个认识。接下来,我们就要结合着我们的代码来看一下,如何基于基本面进行选股。我们的策略是,先进行基本面分析,筛选出目前还在盈利的,负债水平较低,并且毛利率大于30%的股票。然后把这些股票作为我们的候选股,供我们下一步进行操作。

今天这堂课,开始要讲代码了,基于上一堂课,我们对交易系统有了最最基础的一个认识。接下来,我们就要结合着我们的代码来看一下,如何基于基本面进行选股。

我们的策略是,先进行基本面分析,筛选出目前还在盈利的,负债水平较低,并且毛利率大于30%的股票。然后把这些股票作为我们的候选股,供我们下一步进行操作。

第一步 采集数据

我们采集数据的平台其实有多个,比如QMT,比如TuShare,比如akShare

各个平台的差异性

数据平台 系统适配 局限性
QMT 只支持Win 交易数据最为完整,但是宏观数据,研报等数据需要额外采集
QMT 无平台限制 需要花钱购买积分,有频率限制
QMT 无平台限制 数据依赖爬虫,容易出现数据不全

本期我们只考虑交易数据和财务数据,因此我们的本次代码将以QMT作为数据源。

采集交易数据

第一步,主要是操作日线数据,捞取历史的日线数据,记住,一定要选择“前复权”,不然数据就失真,导致后面进行分析的时候,认为是重大利空,从而排除掉优质股票。

def download_all_stocks_data():
    """
    下载A股全量股票历史数据并保存到MySQL数据库
    """
    print("=" * 60)
    print("开始同步A股全量股票数据")
    print(f"日期范围:{DATA_START} 至 {DATA_END}")
    print("=" * 60)

    try:
        # 步骤1:下载全量数据
        print("\n步骤1:下载A股全量历史数据...")
        try:
            xtdata.download_history_data(
                stock_code='',  # 空表示全市场
                period='1d',
                start_time=DATA_START
            )
            print("下载完成,等待数据写入...")
            time.sleep(3)
        except Exception as e:
            print(f"下载数据时出现警告:{e}")
            print("继续尝试获取数据...")

        # 步骤2:获取股票列表
        print("\n步骤2:获取股票列表...")
        stock_list = get_all_stock_codes()
        if not stock_list:
            print("无法获取股票列表")
            return False

        # 步骤3:逐只股票处理数据
        print(f"\n步骤3:处理 {len(stock_list)} 只股票数据...")
        total_inserted = 0
        success_count = 0
        failed_count = 0

        for idx, stock_code in enumerate(stock_list, 1):
            try:
                print(f"[{idx}/{len(stock_list)}] 处理 {stock_code}...", end=' ')

                df = process_stock_data(stock_code)
                if df is not None and not df.empty:
                    affected_rows = insert_stock_data(df, stock_code)
                    total_inserted += affected_rows
                    success_count += 1
                    print(f"✓ ({len(df)} 条数据,新增 {affected_rows} 条)")
                else:
                    print("✗ (无数据)")
                    failed_count += 1

                # 每处理50只股票暂停一下,避免过度占用资源
                if idx % 50 == 0:
                    time.sleep(1)

            except Exception as e:
                print(f"✗ (错误: {str(e)[:30]})")
                failed_count += 1
                continue

        # 步骤4:显示统计信息
        print("\n" + "=" * 60)
        print("数据同步完成!")
        print(f"处理股票数:{len(stock_list)}")
        print(f"成功:{success_count},失败:{failed_count}")
        print(f"总新增数据条数:{total_inserted}")
        print(f"数据已保存到MySQL数据库 {DB_CONFIG['database']}.{TABLE_NAME}")
        print("=" * 60)

        return True

    except Exception as e:
        print(f"同步数据过程中发生错误:{e}")
        import traceback
        traceback.print_exc()
        return False

采集财务数据

def main():
    print("=" * 60)
    print("财务数据采集 (MiniQMT -> MySQL)")
    if SYNC_ALL_STOCKS:
        print(f"[全量模式] 采集{SECTOR}, 每批{BATCH_SIZE}只")
    else:
        print("[测试模式] 只采集贵州茅台")

    print("=" * 60)

    print("\n连接QMT数据服务...")
    xtdata.connect()
    print("  连接成功")

    # 获取股票列表
    if SYNC_ALL_STOCKS:
        print(f"\n获取 {SECTOR} 股票列表...")
        all_codes = xtdata.get_stock_list_in_sector(SECTOR)
        all_codes = [c for c in all_codes if '.' in str(c)]
        print(f"  共 {len(all_codes)} 只股票")
    else:
        all_codes = [TEST_STOCK]
        print(f"\n[测试模式] 只采集 {TEST_STOCK}")

    # 查询已采集的股票,跳过已有数据的
    print("查询数据库已有数据...")
    existing = get_existing_stocks()
    pending = [c for c in all_codes if c not in existing]

    print(f"  已采集: {len(existing)} 只, 待采集: {len(pending)} 只")

    if not pending:
        print("\n全部已采集完成,无需下载")
        _print_summary()
        return

    # 分批处理
    batches = [pending[i:i + BATCH_SIZE] for i in range(0, len(pending), BATCH_SIZE)]
    total_batches = len(batches)
    total_pending = len(pending)

    print(f"\n开始批量下载(共 {total_batches} 批, 每批最多 {BATCH_SIZE} 只)...")

    total_rows = 0
    total_ok = 0
    total_done_stocks = 0
    start_time = time.time()

    for i, batch in enumerate(batches):
        # 先打印当前批次状态,让用户知道正在处理
        sys.stdout.write(
            f"\r  批次 {i + 1}/{total_batches} 下载中... "
            f"({total_done_stocks}/{total_pending})    "
        )
        sys.stdout.flush()

        batch_rows, batch_ok = process_batch(batch)
        total_rows += batch_rows
        total_ok += batch_ok
        total_done_stocks += len(batch)

        elapsed = time.time() - start_time
        speed = total_done_stocks / elapsed if elapsed > 0 else 0
        eta = (total_pending - total_done_stocks) / speed if speed > 0 else 0

        sys.stdout.write(
            f"\r  批次 {i + 1}/{total_batches} 完成 | "
            f"进度 {total_done_stocks}/{total_pending} ({total_done_stocks * 100 / total_pending:.1f}%) | "
            f"{speed:.1f} 只/秒 | 剩余约 {eta:.0f}秒 | "
            f"写入 {total_rows:,} 条    "
        )
        sys.stdout.flush()

    print()

    elapsed = time.time() - start_time
    print("\n" + "=" * 60)
    print(f"财务数据采集完成! 耗时 {elapsed:.1f} 秒")
    print(f"  本次处理: {total_ok}/{total_pending} 只股票")
    print(f"  总写入: {total_rows:,} 条记录")

def process_batch(batch_codes):
    """批量下载 + 解析 + 写DB,返回 (写入总行数, 成功股票数)"""
    # 批量下载到本地缓存(一次调用覆盖整批所有报表)
    done = [False]
    def on_done(data):
        done[0] = True

    xtdata.download_financial_data2(
        stock_list=batch_codes,
        table_list=TABLE_LIST,
        start_time=DATA_START,
        end_time=DATA_END,
        callback=on_done
    )

    # 等待下载完成,最长120秒
    deadline = time.time() + 120
    while not done[0] and time.time() < deadline:
        time.sleep(0.5)
    # 下载完成后额外等待,确保缓存写入
    time.sleep(1)

    # 批量获取数据
    data = xtdata.get_financial_data(
        stock_list=batch_codes,
        table_list=TABLE_LIST,
        start_time=DATA_START,
        end_time=DATA_END,
        report_type='report_time'
    )

    if not data:
        return 0, 0

    # 逐只解析,统一写DB
    batch_rows = 0
    batch_ok = 0

    for code in batch_codes:
        records = extract_periods(data, code)
        if records:
            rows = []
            for rec in records:
                p = rec['report_date']
                report_date = f"{p[:4]}-{p[4:6]}-{p[6:8]}"
                rows.append((
                    code, report_date,
                    rec['revenue'], rec['net_profit'], rec['eps'],
                    rec['roe'], rec['roa'], rec['gross_margin'], rec['net_margin'],
                    rec['debt_ratio'], rec['current_ratio'],
                    rec['operating_cashflow'], rec['total_assets'], rec['total_equity'],
                    'qmt'
                ))
            execute_many(INSERT_SQL, rows)
            batch_rows += len(rows)
        batch_ok += 1

    return batch_rows, batch_ok

第二步 设定基本面指标

我们在进行股票筛选的时候,首先需要做的是,去掉ST,去掉还在亏损的股票。另外,考虑到盈利水平,通过净利润同比增长来进行判断。以下是我罗列的指标。

ROE >= 15%

ROE_MIN = 15

净利润同比 >= 10%

NETPROFIT_YOY_MIN = 10

毛利率 >= 30%

GROSSPROFIT_MARGIN_MIN = 30

资产负债率 <= 60%

DEBT_TO_ASSETS_MAX = 60

经营现金流/营收 >= 10%

OCF_TO_REVENUE_MIN = 10

#ROE >= 15%
ROE_MIN = 15
#净利润同比 >= 10%
NETPROFIT_YOY_MIN = 10
#毛利率 >= 30%
GROSSPROFIT_MARGIN_MIN = 30
#资产负债率 &lt;= 60%
DEBT_TO_ASSETS_MAX = 60
#经营现金流/营收 >= 10%
OCF_TO_REVENUE_MIN = 10

def fetch_financial_data():
    """
    从数据库查询符合条件的财务数据
    """

    try:
        conn = get_connection()
        cursor = conn.cursor()

        # 直接在SQL中应用所有筛选条件
        query = f"""
        WITH ranked_data AS (
            SELECT 
                stock_code,
                report_date,
                roe,
                gross_margin,
                debt_ratio,
                operating_cashflow,
                revenue,
                 -- 计算净利润同比增长率
                CASE 
                    WHEN LAG(net_profit) OVER (
                        PARTITION BY stock_code 
                        ORDER BY report_date
                    ) IS NOT NULL
                    AND LAG(net_profit) OVER (
                        PARTITION BY stock_code 
                        ORDER BY report_date
                    ) != 0
                    THEN ROUND(
                        (net_profit - LAG(net_profit) OVER (
                            PARTITION BY stock_code 
                            ORDER BY report_date
                        )) * 100.0 / 
                        ABS(LAG(net_profit) OVER (
                            PARTITION BY stock_code 
                            ORDER BY report_date
                        )), 2
                    )
                    ELSE NULL
                END AS net_profit_yoy,  -- 净利润同比增长率(%)
                CASE 
                    WHEN revenue IS NOT NULL 
                         AND revenue > 0 
                         AND operating_cashflow IS NOT NULL
                    THEN operating_cashflow * 1.0 / revenue * 100
                    ELSE NULL 
                END AS ocf_to_revenue_min,
                -- 为每个股票按报告期倒序排名
                ROW_NUMBER() OVER (
                    PARTITION BY stock_code 
                    ORDER BY report_date DESC
                ) AS rn
            FROM {TABLE_NAME}
            WHERE 1=1
                -- 基础数据校验
                AND roe IS NOT NULL
                AND gross_margin IS NOT NULL
                AND debt_ratio IS NOT NULL
                AND operating_cashflow IS NOT NULL
                AND revenue IS NOT NULL
        )
        SELECT 
            stock_code,
            report_date,
            roe,
            gross_margin,
            debt_ratio,
            operating_cashflow,
            revenue,
            ocf_to_revenue_min,
            net_profit_yoy
        FROM ranked_data
        WHERE rn = 1  -- 只取每个股票最近一期
            AND roe >= {ROE_MIN}
            AND gross_margin >= {GROSSPROFIT_MARGIN_MIN}
            AND debt_ratio &lt;= {DEBT_TO_ASSETS_MAX}
            AND ocf_to_revenue_min >= {OCF_TO_REVENUE_MIN}
            AND net_profit_yoy >= {NETPROFIT_YOY_MIN}
        ORDER BY roe DESC, stock_code
        """

        cursor.execute(query)
        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()

        cursor.close()
        conn.close()

        if not rows:
            print(f"警告:数据库中没有符合条件的数据")
            return None

        # 转换为 DataFrame
        df = pd.DataFrame(rows, columns=columns)
        print(f"从数据库查询到 {len(df)} 只符合条件的股票")

        return df

    except Exception as e:
        print(f"查询数据库失败:{e}")
        if conn:
            conn.close()
        return None

第三步 筛选出符合指标的股票,然后导出到csv文件

 # 从数据库查询符合条件的数据
    selected = fetch_financial_data()
    if selected is None or selected.empty:
        print("错误:无法获取符合条件的财务数据")
        return

    # 创建输出目录
    os.makedirs(DATA_DIR, exist_ok=True)

    selected.to_csv(OUTPUT_FILE, index=False, encoding='utf-8-sig')
    print(f"\n筛选完成:共 {len(selected)} 只股票达标")
    print(f"已保存:{OUTPUT_FILE}")

最后我们可以得到如下的数据,按照ROE进行排序之后,我们 image.png

结尾

对比了几个平台的数据,就财务数据来说,TuShare的数据其实更全面一点,它很多指标都帮你进行了计算,因此,有条件的同学可以购买TuShare的积分,来获取财务数据。但是如果确实也不想买这个TuShare积分,我们其实自己来计算这个指标。

本讲采用代码的方式讲解了量化交易里面第一步,也是最重要的一步。收集历史数据,收集财务数据,完成简单筛选。后面会讲解基于技术面的指标,制定策略,然后进行筛选。

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Leo
Leo
江湖只有他的大名,没有他的介绍。