ETFs, fundos mútuos e análise de dados de ativos: introdução
- Claude Paugh
- há 4 dias
- 5 min de leitura
Atualizado: há 4 dias
Há alguns anos, comecei um projeto paralelo que achei que seria divertido: agregar e enviar registros mensais da SEC para ETFs e fundos mútuos. Eu queria automatizar o processo de compilação de registros da SEC usando registros de índice enviados por empresas e atualizá-los conforme eles eram registrados.

Ao pesquisar em cada pasta rotulada com o ano, você pode pesquisar por trimestre. Abaixo de cada trimestre você encontrará os principais arquivos de índice.

Cada arquivo de índice mestre contém uma lista de todos os arquivos arquivados durante aquele período, incluindo o "tipo de formulário", que indica o que o arquivo contém. Por exemplo, o formulário ENPORT-P contém o arquivo de participações em ETFs/fundos mútuos, que foi com o que comecei. Os registros da SEC seguem padrões de armazenamento do sistema de arquivos, portanto, uma vez compreendidos, são fáceis de programar.
Comecei criando duas tabelas: uma para os dados de referência do arquivo (metadados do arquivo) e a outra para o status do download do arquivo.

O download é feito em várias etapas:
Baixe o arquivo master.idx para o ano/trimestre desejado. Eu costumava automatizar esse processo, mas não consegui devido a mudanças no site da SEC.
Execute o script "
import pandas as pd
from datetime import datetime
from ref_data.connect_util import getconn
from sqlalchemy import text
year_target = "2024"
quarter_target = "QTR3"
# read the dataset
df = pd.read_csv(f"/tmp/{year_target}/{quarter_target}/master.idx",
delimiter="|",
skiprows=1,
header=4,
low_memory="false")
df.CIK = df.CIK.astype(str)
df["Date Filed"] = df["Date Filed"].apply(pd.to_datetime)
df["Company Name"] = df["Company Name"].astype(str)
df["Filename"] = df["Filename"].astype(str)
# get DB connection
conn, conn_r = getconn()
# Filter each frame for portfolio filings
form_list: list = ["NPORT-P", "10-Q", "10-K", "8-A12B"]
df2 = df[df["Form Type"].isin(form_list)]
print(df2.count())
try:
for row in df2.itertuples():
CIK = row[1]
company = str(row[2]).replace("'", " ")
form_type = row[3]
dt_filed = row[4]
filename = row[5]
sql_count_str = (
f"SELECT COUNT(cik) FROM inbd_raw.filing_ref WHERE cik = '{CIK}' "
f"and date_filed = to_date('{dt_filed}', 'YYYY-MM-DD') "
f"and form_type = '{form_type}' "
f"and file_name = '{filename}'"
)
rec_count = conn.execute(text(sql_count_str))
row_result = rec_count.fetchall()
for rec in row_result:
count = rec.count
if count == 0:
sql_stmt_str = (
f"INSERT INTO inbd_raw.filing_ref(cik,company_name,form_type,date_filed,file_name)"
f"values ('{CIK}', '{company}', '{form_type}', to_date('{dt_filed}', 'YYYY-MM-DD'), '{filename}')"
)
# print(sql_stmt_str)
print(
f"Adding record for {CIK} and company {company} at: "
+ str(datetime.now())
)
cur = conn_r.cursor()
cur.execute(sql_stmt_str)
else:
print(f"{CIK} Record for {company} already exists, skipping...")
except Exception as e:
print("Exeception occurred...." + str(e))
exit(1)
finally:
if conn:
conn_r.commit()
conn_r.cursor().close()
conn_r.close()
print("PostgreSQL connection is closed")
else:
exit(0)
3. A tabela file_ref está cheia de dados.

Como os dados da tabela usam chaves substitutas, ou seja, "chaves burras", como criadoras de chaves primárias, agora adiciono todos os valores "filling_ref_sid" da tabela "filling_ref" à tabela "filling_ref_status" via SQL:
insert into filing_ref_status(filing_ref_sid)
select distinct filing_ref.filing_ref_sid from filing_ref
where filing_ref_sid not in
(select distinct filing_ref_sid from filing_ref_status);
Há várias colunas definidas como "filling_ref_status" por padrão, então preciso apenas da chave primária "filling_ref".
Os metadados e os dados de rastreamento estavam prontos, então executei o script.
Os downloads ficam assim:

Arquivos armazenados em um caminho que segue a estrutura SEC:

Como você pode ver acima, cada arquivo é apresentado no formato XML.
Eu fiz um script intitulado "
import datetime
import os
import shutil
import time
from DBSchema.convertXML import convertXML2JSON
from ref_data.connect_util import getconn
# Constants
FORM_TYPE = "NPORT-P"
DIRECTORY = "/Volumes/ExtShield/SecFundFilings/"
# Get DB connection
conn, conn_r = getconn()
def get_query_results(f_name: str):
sql_query_select = (
f"select fr.filing_ref_sid "
f"from inbd_raw.filing_ref fr "
f"where fr.form_type = '{FORM_TYPE}' and fr.file_name = '{f_name}'"
)
cur = conn_r.cursor()
cur.execute(sql_query_select)
return cur.fetchone()
def update_database_status(ref_sid: int):
sql_update = (
f"update inbd_raw.filing_ref_status "
f"set json_converted_ind = true, record_ts = current_timestamp, "
f"json_converted_ts = current_timestamp "
f"where filing_ref_sid = {ref_sid}"
)
cur = conn_r.cursor()
upd_stat = cur.execute(sql_update)
conn_r.commit()
return upd_stat
def getFundFileList(target_dir=None, target_size=None):
if os.path.exists(target_dir):
target_files: list = []
for root, dirs, files in os.walk(target_dir):
for file in files:
# Set utime to current time
file_path = root + "/" + file
file_path = str(file_path).replace("\\", "/")
target_files.append(file_path)
if len(target_files) == target_size:
return target_files
else:
print("Path does not exists: " + target_dir)
exit(1)
+
if __name__ == "__main__":
t_size = 5000
files = getFundFileList(target_dir="/Volumes/ExtShield/SecFundFilings/raw/", target_size=t_size)
l_count: int = 0
for file in files:
split_file = file.split("/")
file_end = str(split_file[-2])[:10]
file_middle = str(split_file[-2])[10:12]
file_start = str(split_file[-2])[12:]
file_parts = file_end + '-' + file_middle + '-' + file_start + '.txt'
file_name = split_file[-5] + "/" + split_file[-4] + "/" + split_file[-3] + "/" + file_parts
filing_sid = str(get_query_results(file_name)).replace('(', '').replace(')', '').replace(',', '').replace('--.txt', '.txt')
print(filing_sid)
time.sleep(0.1)
l_count = l_count + 1
sid = get_query_results(file_name)
print("File to be processed: " + file)
print("Processing file " + str(l_count) + " of " + str(t_size))
base_file = file.replace("/Volumes/ExtShield/SecFundFilings/raw/", "")
complete_file = "/Volumes/ExtShield/SetupFilings/setup_complete/" + base_file
target_path = complete_file.replace("primary_doc.xml", "")
if not os.path.exists(target_path):
os.makedirs(target_path)
shutil.copy(file, complete_file)
processing_dt = datetime.datetime.today()
status = convertXML2JSON(inputXMLFile=file, processing_dt=processing_dt)
print(str(status))
if status == 0:
print("Continue to next file...")
update_database_status(filing_sid)
os.remove(file)
continue
else:
print("Fund did not complete successfully: " + str(status))
exit(1)
Encerrarei este post introdutório aqui e retomarei o trabalho após converter os arquivos para JSON. Como você pode ver no texto, o "target_size" para a conversão pode ser qualquer tamanho e os arquivos de origem originais são movidos para outro local durante o processamento.
Você pode executar o script várias vezes com tamanhos de destino diferentes ou reiniciá-lo se falhar e o script continuará de onde parou. As conversões de arquivos são atualizadas na tabela "filing_ref_status", de modo que ela só opera em arquivos que consegue encontrar e que correspondem à entrada nos metadados.