ETF、共同基金和资产数据分析:简介
- Claude Paugh
- 4天前
- 讀畢需時 4 分鐘
已更新:4天前
几年前,我开始了一个我认为很有趣的副业:汇总并提交 ETF 和共同基金的每月 SEC 文件。我希望使用公司提交的指数文件来自动化编制 SEC 文件的过程,并在提交时进行更新。

通过搜索每个标有年份的文件夹,您可以按季度进行搜索。在每个季度下方,您将找到主要的索引文件。

每个主索引文件都包含该期间存档的所有文件的列表,包括指示文件包含内容的“表单类型”。例如,ENPORT-P 表格包含 ETF/共同基金持股文件,这是我开始使用的文件。 SEC 记录遵循文件系统存储标准,因此一旦理解,就很容易进行编程。
我首先创建了两个表:一个用于文件参考数据(文件元数据),另一个用于文件下载状态。

下载分为几个步骤完成:
下载所需年份/季度的 master.idx 文件。我曾经自动化过这个过程,但由于 SEC 网站的变化而无法实现。
运行脚本“
import pandas as pd
from datetime import datetime
from ref_data.connect_util import getconn
from sqlalchemy import text
year_target = "2024"
quarter_target = "QTR3"
# read the dataset
df = pd.read_csv(f"/tmp/{year_target}/{quarter_target}/master.idx",
delimiter="|",
skiprows=1,
header=4,
low_memory="false")
df.CIK = df.CIK.astype(str)
df["Date Filed"] = df["Date Filed"].apply(pd.to_datetime)
df["Company Name"] = df["Company Name"].astype(str)
df["Filename"] = df["Filename"].astype(str)
# get DB connection
conn, conn_r = getconn()
# Filter each frame for portfolio filings
form_list: list = ["NPORT-P", "10-Q", "10-K", "8-A12B"]
df2 = df[df["Form Type"].isin(form_list)]
print(df2.count())
try:
for row in df2.itertuples():
CIK = row[1]
company = str(row[2]).replace("'", " ")
form_type = row[3]
dt_filed = row[4]
filename = row[5]
sql_count_str = (
f"SELECT COUNT(cik) FROM inbd_raw.filing_ref WHERE cik = '{CIK}' "
f"and date_filed = to_date('{dt_filed}', 'YYYY-MM-DD') "
f"and form_type = '{form_type}' "
f"and file_name = '{filename}'"
)
rec_count = conn.execute(text(sql_count_str))
row_result = rec_count.fetchall()
for rec in row_result:
count = rec.count
if count == 0:
sql_stmt_str = (
f"INSERT INTO inbd_raw.filing_ref(cik,company_name,form_type,date_filed,file_name)"
f"values ('{CIK}', '{company}', '{form_type}', to_date('{dt_filed}', 'YYYY-MM-DD'), '{filename}')"
)
# print(sql_stmt_str)
print(
f"Adding record for {CIK} and company {company} at: "
+ str(datetime.now())
)
cur = conn_r.cursor()
cur.execute(sql_stmt_str)
else:
print(f"{CIK} Record for {company} already exists, skipping...")
except Exception as e:
print("Exeception occurred...." + str(e))
exit(1)
finally:
if conn:
conn_r.commit()
conn_r.cursor().close()
conn_r.close()
print("PostgreSQL connection is closed")
else:
exit(0)
3.file_ref表已满数据。

由于表数据使用代理键,即“哑键”,作为主键创建者,我现在通过 SQL 将“filling_ref”表中的所有“filling_ref_sid”值添加到“filling_ref_status”表中:
insert into filing_ref_status(filing_ref_sid)
select distinct filing_ref.filing_ref_sid from filing_ref
where filing_ref_sid not in
(select distinct filing_ref_sid from filing_ref_status);
默认情况下有多个列定义为“filling_ref_status”,所以我只需要主键“filling_ref”。
元数据和跟踪数据已经准备好了,所以我运行了脚本。
下载内容如下:

按照 SEC 结构存储的路径的文件:

正如您上面看到的,每个文件都以 XML 格式呈现。
我写了一篇题为“
import datetime
import os
import shutil
import time
from DBSchema.convertXML import convertXML2JSON
from ref_data.connect_util import getconn
# Constants
FORM_TYPE = "NPORT-P"
DIRECTORY = "/Volumes/ExtShield/SecFundFilings/"
# Get DB connection
conn, conn_r = getconn()
def get_query_results(f_name: str):
sql_query_select = (
f"select fr.filing_ref_sid "
f"from inbd_raw.filing_ref fr "
f"where fr.form_type = '{FORM_TYPE}' and fr.file_name = '{f_name}'"
)
cur = conn_r.cursor()
cur.execute(sql_query_select)
return cur.fetchone()
def update_database_status(ref_sid: int):
sql_update = (
f"update inbd_raw.filing_ref_status "
f"set json_converted_ind = true, record_ts = current_timestamp, "
f"json_converted_ts = current_timestamp "
f"where filing_ref_sid = {ref_sid}"
)
cur = conn_r.cursor()
upd_stat = cur.execute(sql_update)
conn_r.commit()
return upd_stat
def getFundFileList(target_dir=None, target_size=None):
if os.path.exists(target_dir):
target_files: list = []
for root, dirs, files in os.walk(target_dir):
for file in files:
# Set utime to current time
file_path = root + "/" + file
file_path = str(file_path).replace("\\", "/")
target_files.append(file_path)
if len(target_files) == target_size:
return target_files
else:
print("Path does not exists: " + target_dir)
exit(1)
+
if __name__ == "__main__":
t_size = 5000
files = getFundFileList(target_dir="/Volumes/ExtShield/SecFundFilings/raw/", target_size=t_size)
l_count: int = 0
for file in files:
split_file = file.split("/")
file_end = str(split_file[-2])[:10]
file_middle = str(split_file[-2])[10:12]
file_start = str(split_file[-2])[12:]
file_parts = file_end + '-' + file_middle + '-' + file_start + '.txt'
file_name = split_file[-5] + "/" + split_file[-4] + "/" + split_file[-3] + "/" + file_parts
filing_sid = str(get_query_results(file_name)).replace('(', '').replace(')', '').replace(',', '').replace('--.txt', '.txt')
print(filing_sid)
time.sleep(0.1)
l_count = l_count + 1
sid = get_query_results(file_name)
print("File to be processed: " + file)
print("Processing file " + str(l_count) + " of " + str(t_size))
base_file = file.replace("/Volumes/ExtShield/SecFundFilings/raw/", "")
complete_file = "/Volumes/ExtShield/SetupFilings/setup_complete/" + base_file
target_path = complete_file.replace("primary_doc.xml", "")
if not os.path.exists(target_path):
os.makedirs(target_path)
shutil.copy(file, complete_file)
processing_dt = datetime.datetime.today()
status = convertXML2JSON(inputXMLFile=file, processing_dt=processing_dt)
print(str(status))
if status == 0:
print("Continue to next file...")
update_database_status(filing_sid)
os.remove(file)
continue
else:
print("Fund did not complete successfully: " + str(status))
exit(1)
我将在这里结束这篇介绍性文章,并在将文件转换为 JSON 后继续工作。从文中可以看出,转换的“target_size”可以是任意大小,并且在处理过程中将原始源文件移动到另一个位置。
您可以使用不同的目标大小多次运行该脚本,或者如果失败则重新启动它,并且脚本将从中断处继续运行。文件转换在“filing_ref_status”表中更新,因此它仅对与元数据中的条目匹配的文件进行操作。