top of page

ETF、共同基金和资产数据分析:简介

已更新:4天前

几年前,我开始了一个我认为很有趣的副业:汇总并提交 ETF 和共同基金的每月 SEC 文件。我希望使用公司提交的指数文件来自动化编制 SEC 文件的过程,并在提交时进行更新。


Directory listings for SEC filings
Directory listings for SEC filings

通过搜索每个标有年份的文件夹,您可以按季度进行搜索。在每个季度下方,您将找到主要的索引文件。

Master index files for 2025 QTR1
Master index files for 2025 QTR1






每个主索引文件都包含该期间存档的所有文件的列表,包括指示文件包含内容的“表单类型”。例如,ENPORT-P 表格包含 ETF/共同基金持股文件,这是我开始使用的文件。 SEC 记录遵循文件系统存储标准,因此一旦理解,就很容易进行编程。


我首先创建了两个表:一个用于文件参考数据(文件元数据),另一个用于文件下载状态。


Tables in PostgreSQL for metadata tracking of SEC filings
Tables in PostgreSQL for metadata tracking of SEC filings

下载分为几个步骤完成:

  1. 下载所需年份/季度的 master.idx 文件。我曾经自动化过这个过程,但由于 SEC 网站的变化而无法实现。

  2. 运行脚本“

import pandas as pd
from datetime import datetime
from ref_data.connect_util import getconn
from sqlalchemy import text

year_target = "2024"
quarter_target = "QTR3"
# read the dataset
df = pd.read_csv(f"/tmp/{year_target}/{quarter_target}/master.idx",
                 delimiter="|",
                 skiprows=1,
                 header=4,
                 low_memory="false")

df.CIK = df.CIK.astype(str)
df["Date Filed"] = df["Date Filed"].apply(pd.to_datetime)
df["Company Name"] = df["Company Name"].astype(str)
df["Filename"] = df["Filename"].astype(str)

# get DB connection
conn, conn_r = getconn()

# Filter each frame for portfolio filings
form_list: list = ["NPORT-P", "10-Q", "10-K", "8-A12B"]
df2 = df[df["Form Type"].isin(form_list)]
print(df2.count())

try:
    for row in df2.itertuples():
        CIK = row[1]
        company = str(row[2]).replace("'", " ")
        form_type = row[3]
        dt_filed = row[4]
        filename = row[5]

        sql_count_str = (
            f"SELECT COUNT(cik) FROM inbd_raw.filing_ref WHERE cik = '{CIK}' "
            f"and date_filed = to_date('{dt_filed}', 'YYYY-MM-DD') "
            f"and form_type = '{form_type}' "
            f"and file_name = '{filename}'"
        )

        rec_count = conn.execute(text(sql_count_str))
        row_result = rec_count.fetchall()

        for rec in row_result:
            count = rec.count

            if count == 0:
                sql_stmt_str = (
                    f"INSERT INTO inbd_raw.filing_ref(cik,company_name,form_type,date_filed,file_name)"
                    f"values ('{CIK}', '{company}', '{form_type}', to_date('{dt_filed}', 'YYYY-MM-DD'), '{filename}')"
                )

                # print(sql_stmt_str)
                print(
                    f"Adding record for {CIK} and company {company} at: "
                    + str(datetime.now())
                )
                cur = conn_r.cursor()
                cur.execute(sql_stmt_str)
            else:
                print(f"{CIK} Record for {company} already exists, skipping...")

except Exception as e:
    print("Exeception occurred...." + str(e))
    exit(1)
finally:
    if conn:
        conn_r.commit()
        conn_r.cursor().close()
        conn_r.close()
        print("PostgreSQL connection is closed")
    else:
        exit(0)
    

3.file_ref表已满数据。

Table data from filing_ref
Table data from filing_ref

由于表数据使用代理键,即“哑键”,作为主键创建者,我现在通过 SQL 将“filling_ref”表中的所有“filling_ref_sid”值添加到“filling_ref_status”表中:

insert into filing_ref_status(filing_ref_sid)
select distinct filing_ref.filing_ref_sid from filing_ref
      where filing_ref_sid not in
       (select distinct filing_ref_sid from filing_ref_status);

默认情况下有多个列定义为“filling_ref_status”,所以我只需要主键“filling_ref”。

  1. 元数据和跟踪数据已经准备好了,所以我运行了脚本。

下载内容如下:

按照 SEC 结构存储的路径的文件:

Downloaded SEC Filings
Downloaded SEC Filings

正如您上面看到的,每个文件都以 XML 格式呈现。


  1. 我写了一篇题为“

import datetime
import os
import shutil
import time
from DBSchema.convertXML import convertXML2JSON
from ref_data.connect_util import getconn

# Constants
FORM_TYPE = "NPORT-P"
DIRECTORY = "/Volumes/ExtShield/SecFundFilings/"

# Get DB connection
conn, conn_r = getconn()

def get_query_results(f_name: str):
    sql_query_select = (
        f"select fr.filing_ref_sid "
        f"from inbd_raw.filing_ref fr "
        f"where fr.form_type = '{FORM_TYPE}' and fr.file_name = '{f_name}'"
    )
    cur = conn_r.cursor()
    cur.execute(sql_query_select)
    return cur.fetchone()

def update_database_status(ref_sid: int):
    sql_update = (
        f"update inbd_raw.filing_ref_status "
        f"set json_converted_ind = true, record_ts = current_timestamp, "
        f"json_converted_ts = current_timestamp "
        f"where filing_ref_sid = {ref_sid}"
    )
    cur = conn_r.cursor()
    upd_stat = cur.execute(sql_update)
    conn_r.commit()
    return upd_stat

def getFundFileList(target_dir=None, target_size=None):
    if os.path.exists(target_dir):
        target_files: list = []
        for root, dirs, files in os.walk(target_dir):
            for file in files:
                # Set utime to current time
                file_path = root + "/" + file
                file_path = str(file_path).replace("\\", "/")
                target_files.append(file_path)
                if len(target_files) == target_size:
                    return target_files
    else:
        print("Path does not exists: " + target_dir)
        exit(1)
+
if __name__ == "__main__":
    t_size = 5000
    files = getFundFileList(target_dir="/Volumes/ExtShield/SecFundFilings/raw/", target_size=t_size)
    l_count: int = 0

    for file in files:
        split_file = file.split("/")
        file_end = str(split_file[-2])[:10]
        file_middle = str(split_file[-2])[10:12]
        file_start = str(split_file[-2])[12:]
        file_parts = file_end + '-' + file_middle + '-' + file_start + '.txt'
        file_name = split_file[-5] + "/" + split_file[-4] + "/" + split_file[-3] + "/" + file_parts
        filing_sid = str(get_query_results(file_name)).replace('(', '').replace(')', '').replace(',', '').replace('--.txt', '.txt')
        print(filing_sid)
        time.sleep(0.1)
        l_count = l_count + 1
        sid = get_query_results(file_name)
        print("File to be processed: " + file)
        print("Processing file " + str(l_count) + " of " + str(t_size))

        base_file = file.replace("/Volumes/ExtShield/SecFundFilings/raw/", "")
        complete_file = "/Volumes/ExtShield/SetupFilings/setup_complete/" + base_file
        target_path = complete_file.replace("primary_doc.xml", "")

        if not os.path.exists(target_path):
            os.makedirs(target_path)
        shutil.copy(file, complete_file)

        processing_dt = datetime.datetime.today()
        status = convertXML2JSON(inputXMLFile=file, processing_dt=processing_dt)
        print(str(status))

        if status == 0:
            print("Continue to next file...")
            update_database_status(filing_sid)
            os.remove(file)
            continue
        else:
            print("Fund did not complete successfully: " + str(status))
            exit(1)

我将在这里结束这篇介绍性文章,并在将文件转换为 JSON 后继续工作。从文中可以看出,转换的“target_size”可以是任意大小,并且在处理过程中将原始源文件移动到另一个位置。


您可以使用不同的目标大小多次运行该脚本,或者如果失败则重新启动它,并且脚本将从中断处继续运行。文件转换在“filing_ref_status”表中更新,因此它仅对与元数据中的条目匹配的文件进行操作。


 
 

马萨诸塞州贝德福德 01730

bottom of page