Analyse des données sur les ETF, les fonds communs de placement et la richesse : une introduction
- Claude Paugh
- il y a 4 jours
- 5 min de lecture
Il y a quelques années, j'ai commencé un travail secondaire que je pensais être amusant : collecter et classer les documents mensuels de la SEC pour les ETF et les fonds communs de placement. Je souhaite automatiser le processus de compilation des documents déposés auprès de la SEC à l'aide des dépôts d'index des entreprises et les mettre à jour lors du dépôt.

Vous pouvez effectuer une recherche par trimestre en parcourant chaque dossier étiqueté avec une année. Sous chaque trimestre, vous trouverez le fichier d'index principal.

Chaque fichier d'index principal contient une liste de tous les fichiers archivés au cours de cette période, y compris un « type de formulaire » qui indique ce que contient le fichier. Par exemple, le formulaire ENPORT-P contient le fichier de titres ETF/fonds communs de placement avec lequel j'ai commencé. Les enregistrements SEC suivent les normes de stockage du système de fichiers et sont donc faciles à programmer une fois compris.
J'ai commencé par créer deux tables : une pour les données de référence du fichier (métadonnées du fichier) et une autre pour l'état de téléchargement du fichier.

Le téléchargement se déroule en plusieurs étapes :
Téléchargez le fichier « master.idx » pour l’année/le trimestre souhaité. J'ai tenté d'automatiser ce processus, mais je n'y suis pas parvenu en raison de modifications apportées au site Web de la SEC.
Exécution du script
import pandas as pd
from datetime import datetime
from ref_data.connect_util import getconn
from sqlalchemy import text
year_target = "2024"
quarter_target = "QTR3"
# read the dataset
df = pd.read_csv(f"/tmp/{year_target}/{quarter_target}/master.idx",
delimiter="|",
skiprows=1,
header=4,
low_memory="false")
df.CIK = df.CIK.astype(str)
df["Date Filed"] = df["Date Filed"].apply(pd.to_datetime)
df["Company Name"] = df["Company Name"].astype(str)
df["Filename"] = df["Filename"].astype(str)
# get DB connection
conn, conn_r = getconn()
# Filter each frame for portfolio filings
form_list: list = ["NPORT-P", "10-Q", "10-K", "8-A12B"]
df2 = df[df["Form Type"].isin(form_list)]
print(df2.count())
try:
for row in df2.itertuples():
CIK = row[1]
company = str(row[2]).replace("'", " ")
form_type = row[3]
dt_filed = row[4]
filename = row[5]
sql_count_str = (
f"SELECT COUNT(cik) FROM inbd_raw.filing_ref WHERE cik = '{CIK}' "
f"and date_filed = to_date('{dt_filed}', 'YYYY-MM-DD') "
f"and form_type = '{form_type}' "
f"and file_name = '{filename}'"
)
rec_count = conn.execute(text(sql_count_str))
row_result = rec_count.fetchall()
for rec in row_result:
count = rec.count
if count == 0:
sql_stmt_str = (
f"INSERT INTO inbd_raw.filing_ref(cik,company_name,form_type,date_filed,file_name)"
f"values ('{CIK}', '{company}', '{form_type}', to_date('{dt_filed}', 'YYYY-MM-DD'), '{filename}')"
)
# print(sql_stmt_str)
print(
f"Adding record for {CIK} and company {company} at: "
+ str(datetime.now())
)
cur = conn_r.cursor()
cur.execute(sql_stmt_str)
else:
print(f"{CIK} Record for {company} already exists, skipping...")
except Exception as e:
print("Exeception occurred...." + str(e))
exit(1)
finally:
if conn:
conn_r.commit()
conn_r.cursor().close()
conn_r.close()
print("PostgreSQL connection is closed")
else:
exit(0)
3. La table file_ref est pleine de données.

Étant donné que les données de la table utilisent une clé de substitution, c'est-à-dire une « clé muette », comme créateur de clé primaire, j'ajoute maintenant toutes les valeurs « filling_ref_sid » de la table « filling_ref » à la table « filling_ref_status » via SQL :
insert into filing_ref_status(filing_ref_sid)
select distinct filing_ref.filing_ref_sid from filing_ref
where filing_ref_sid not in
(select distinct filing_ref_sid from filing_ref_status);
Par défaut, plusieurs colonnes sont définies comme « filling_ref_status », donc je n’ai besoin que de la clé primaire « filling_ref ».
Les métadonnées et les données de suivi étaient prêtes, j'ai donc exécuté le script.
Le contenu du téléchargement est le suivant :

Fichiers avec des chemins stockés dans la structure SEC :

Comme vous pouvez le voir ci-dessus, chaque fichier est représenté au format XML.
J'ai un article intitulé «
import datetime
import os
import shutil
import time
from DBSchema.convertXML import convertXML2JSON
from ref_data.connect_util import getconn
# Constants
FORM_TYPE = "NPORT-P"
DIRECTORY = "/Volumes/ExtShield/SecFundFilings/"
# Get DB connection
conn, conn_r = getconn()
def get_query_results(f_name: str):
sql_query_select = (
f"select fr.filing_ref_sid "
f"from inbd_raw.filing_ref fr "
f"where fr.form_type = '{FORM_TYPE}' and fr.file_name = '{f_name}'"
)
cur = conn_r.cursor()
cur.execute(sql_query_select)
return cur.fetchone()
def update_database_status(ref_sid: int):
sql_update = (
f"update inbd_raw.filing_ref_status "
f"set json_converted_ind = true, record_ts = current_timestamp, "
f"json_converted_ts = current_timestamp "
f"where filing_ref_sid = {ref_sid}"
)
cur = conn_r.cursor()
upd_stat = cur.execute(sql_update)
conn_r.commit()
return upd_stat
def getFundFileList(target_dir=None, target_size=None):
if os.path.exists(target_dir):
target_files: list = []
for root, dirs, files in os.walk(target_dir):
for file in files:
# Set utime to current time
file_path = root + "/" + file
file_path = str(file_path).replace("\\", "/")
target_files.append(file_path)
if len(target_files) == target_size:
return target_files
else:
print("Path does not exists: " + target_dir)
exit(1)
+
if __name__ == "__main__":
t_size = 5000
files = getFundFileList(target_dir="/Volumes/ExtShield/SecFundFilings/raw/", target_size=t_size)
l_count: int = 0
for file in files:
split_file = file.split("/")
file_end = str(split_file[-2])[:10]
file_middle = str(split_file[-2])[10:12]
file_start = str(split_file[-2])[12:]
file_parts = file_end + '-' + file_middle + '-' + file_start + '.txt'
file_name = split_file[-5] + "/" + split_file[-4] + "/" + split_file[-3] + "/" + file_parts
filing_sid = str(get_query_results(file_name)).replace('(', '').replace(')', '').replace(',', '').replace('--.txt', '.txt')
print(filing_sid)
time.sleep(0.1)
l_count = l_count + 1
sid = get_query_results(file_name)
print("File to be processed: " + file)
print("Processing file " + str(l_count) + " of " + str(t_size))
base_file = file.replace("/Volumes/ExtShield/SecFundFilings/raw/", "")
complete_file = "/Volumes/ExtShield/SetupFilings/setup_complete/" + base_file
target_path = complete_file.replace("primary_doc.xml", "")
if not os.path.exists(target_path):
os.makedirs(target_path)
shutil.copy(file, complete_file)
processing_dt = datetime.datetime.today()
status = convertXML2JSON(inputXMLFile=file, processing_dt=processing_dt)
print(str(status))
if status == 0:
print("Continue to next file...")
update_database_status(filing_sid)
os.remove(file)
continue
else:
print("Fund did not complete successfully: " + str(status))
exit(1)
Je terminerai cet article d'introduction ici et continuerai après avoir converti le fichier en JSON. D'après le texte, on peut voir que la « target_size » de la conversion peut être arbitrairement grande et que le fichier source d'origine est déplacé vers un autre emplacement pendant le processus.
Vous pouvez exécuter le script plusieurs fois avec différentes tailles de cible ou le redémarrer si une erreur se produit. Le script continuera alors là où il s'est arrêté. La conversion de fichier est mise à jour dans la table filing_ref_status afin qu'elle soit appliquée uniquement aux fichiers qui correspondent à une entrée dans les métadonnées.