top of page

ETF, fondos mutuos y análisis de datos de activos: Introducción

Actualizado: hace 4 días

Hace varios años, comencé un proyecto paralelo que pensé que sería divertido: agregar y cargar presentaciones mensuales ante la SEC de ETF y fondos mutuos. Quería automatizar el proceso de compilación de presentaciones ante la SEC utilizando presentaciones de índices enviadas por las empresas y actualizarlas a medida que se presentaban.


Directory listings for SEC filings
Directory listings for SEC filings

Al buscar en cada carpeta etiquetada con el año, puede buscar por trimestre. Debajo de cada trimestre encontrarás los principales archivos de índice.

Master index files for 2025 QTR1
Master index files for 2025 QTR1






Cada archivo de índice maestro contiene una lista de todos los archivos archivados durante ese período, incluido el "tipo de formulario", que indica qué contiene el archivo. Por ejemplo, el formulario ENPORT-P contiene el archivo de tenencias de ETF/fondos mutuos, que es con lo que comencé. Las presentaciones ante la SEC siguen patrones de almacenamiento del sistema de archivos, por lo que una vez entendidas, son fáciles de programar.


Comencé creando dos tablas: una para los datos de referencia del archivo (metadatos del archivo) y la otra para el estado de descarga del archivo.


Tables in PostgreSQL for metadata tracking of SEC filings
Tables in PostgreSQL for metadata tracking of SEC filings

La descarga se realiza en varios pasos:

  1. Descargue el archivo master.idx para el año/trimestre deseado. Solía automatizar este proceso, pero no pude debido a cambios en el sitio web de la SEC.

  2. Ejecutar el script "

import pandas as pd
from datetime import datetime
from ref_data.connect_util import getconn
from sqlalchemy import text

year_target = "2024"
quarter_target = "QTR3"
# read the dataset
df = pd.read_csv(f"/tmp/{year_target}/{quarter_target}/master.idx",
                 delimiter="|",
                 skiprows=1,
                 header=4,
                 low_memory="false")

df.CIK = df.CIK.astype(str)
df["Date Filed"] = df["Date Filed"].apply(pd.to_datetime)
df["Company Name"] = df["Company Name"].astype(str)
df["Filename"] = df["Filename"].astype(str)

# get DB connection
conn, conn_r = getconn()

# Filter each frame for portfolio filings
form_list: list = ["NPORT-P", "10-Q", "10-K", "8-A12B"]
df2 = df[df["Form Type"].isin(form_list)]
print(df2.count())

try:
    for row in df2.itertuples():
        CIK = row[1]
        company = str(row[2]).replace("'", " ")
        form_type = row[3]
        dt_filed = row[4]
        filename = row[5]

        sql_count_str = (
            f"SELECT COUNT(cik) FROM inbd_raw.filing_ref WHERE cik = '{CIK}' "
            f"and date_filed = to_date('{dt_filed}', 'YYYY-MM-DD') "
            f"and form_type = '{form_type}' "
            f"and file_name = '{filename}'"
        )

        rec_count = conn.execute(text(sql_count_str))
        row_result = rec_count.fetchall()

        for rec in row_result:
            count = rec.count

            if count == 0:
                sql_stmt_str = (
                    f"INSERT INTO inbd_raw.filing_ref(cik,company_name,form_type,date_filed,file_name)"
                    f"values ('{CIK}', '{company}', '{form_type}', to_date('{dt_filed}', 'YYYY-MM-DD'), '{filename}')"
                )

                # print(sql_stmt_str)
                print(
                    f"Adding record for {CIK} and company {company} at: "
                    + str(datetime.now())
                )
                cur = conn_r.cursor()
                cur.execute(sql_stmt_str)
            else:
                print(f"{CIK} Record for {company} already exists, skipping...")

except Exception as e:
    print("Exeception occurred...." + str(e))
    exit(1)
finally:
    if conn:
        conn_r.commit()
        conn_r.cursor().close()
        conn_r.close()
        print("PostgreSQL connection is closed")
    else:
        exit(0)
    

3. La tabla file_ref está llena de datos.

Table data from filing_ref
Table data from filing_ref

Dado que los datos de la tabla utilizan claves sustitutas, es decir, "claves tontas", como creadoras de claves primarias, ahora agrego todos los valores de "filling_ref_sid" de la tabla "filling_ref" a la tabla "filling_ref_status" a través de SQL:

insert into filing_ref_status(filing_ref_sid)
select distinct filing_ref.filing_ref_sid from filing_ref
      where filing_ref_sid not in
       (select distinct filing_ref_sid from filing_ref_status);

Hay varias columnas que están configuradas como "filling_ref_status" de forma predeterminada, por lo que solo necesito la clave principal de "filling_ref".

  1. Los metadatos y los datos de seguimiento estaban listos, así que ejecuté el script.

Las descargas se ven así:

Archivos almacenados en una ruta que sigue la estructura SEC:

Downloaded SEC Filings
Downloaded SEC Filings

Como puede ver arriba, cada archivo se presenta en formato XML.


  1. Hice un guión titulado "

import datetime
import os
import shutil
import time
from DBSchema.convertXML import convertXML2JSON
from ref_data.connect_util import getconn

# Constants
FORM_TYPE = "NPORT-P"
DIRECTORY = "/Volumes/ExtShield/SecFundFilings/"

# Get DB connection
conn, conn_r = getconn()

def get_query_results(f_name: str):
    sql_query_select = (
        f"select fr.filing_ref_sid "
        f"from inbd_raw.filing_ref fr "
        f"where fr.form_type = '{FORM_TYPE}' and fr.file_name = '{f_name}'"
    )
    cur = conn_r.cursor()
    cur.execute(sql_query_select)
    return cur.fetchone()

def update_database_status(ref_sid: int):
    sql_update = (
        f"update inbd_raw.filing_ref_status "
        f"set json_converted_ind = true, record_ts = current_timestamp, "
        f"json_converted_ts = current_timestamp "
        f"where filing_ref_sid = {ref_sid}"
    )
    cur = conn_r.cursor()
    upd_stat = cur.execute(sql_update)
    conn_r.commit()
    return upd_stat

def getFundFileList(target_dir=None, target_size=None):
    if os.path.exists(target_dir):
        target_files: list = []
        for root, dirs, files in os.walk(target_dir):
            for file in files:
                # Set utime to current time
                file_path = root + "/" + file
                file_path = str(file_path).replace("\\", "/")
                target_files.append(file_path)
                if len(target_files) == target_size:
                    return target_files
    else:
        print("Path does not exists: " + target_dir)
        exit(1)
+
if __name__ == "__main__":
    t_size = 5000
    files = getFundFileList(target_dir="/Volumes/ExtShield/SecFundFilings/raw/", target_size=t_size)
    l_count: int = 0

    for file in files:
        split_file = file.split("/")
        file_end = str(split_file[-2])[:10]
        file_middle = str(split_file[-2])[10:12]
        file_start = str(split_file[-2])[12:]
        file_parts = file_end + '-' + file_middle + '-' + file_start + '.txt'
        file_name = split_file[-5] + "/" + split_file[-4] + "/" + split_file[-3] + "/" + file_parts
        filing_sid = str(get_query_results(file_name)).replace('(', '').replace(')', '').replace(',', '').replace('--.txt', '.txt')
        print(filing_sid)
        time.sleep(0.1)
        l_count = l_count + 1
        sid = get_query_results(file_name)
        print("File to be processed: " + file)
        print("Processing file " + str(l_count) + " of " + str(t_size))

        base_file = file.replace("/Volumes/ExtShield/SecFundFilings/raw/", "")
        complete_file = "/Volumes/ExtShield/SetupFilings/setup_complete/" + base_file
        target_path = complete_file.replace("primary_doc.xml", "")

        if not os.path.exists(target_path):
            os.makedirs(target_path)
        shutil.copy(file, complete_file)

        processing_dt = datetime.datetime.today()
        status = convertXML2JSON(inputXMLFile=file, processing_dt=processing_dt)
        print(str(status))

        if status == 0:
            print("Continue to next file...")
            update_database_status(filing_sid)
            os.remove(file)
            continue
        else:
            print("Fund did not complete successfully: " + str(status))
            exit(1)

Terminaré esta publicación introductoria aquí y reanudaré el trabajo después de convertir los archivos a JSON. Como puede ver en el texto, el "target_size" para la conversión puede ser cualquier tamaño y los archivos fuente originales se mueven a otra ubicación durante el procesamiento.


Puede ejecutar el script varias veces con distintos tamaños de destino o reiniciarlo si falla y el script se reanudará desde donde lo dejó. Las conversiones de archivos se actualizan en la tabla "filing_ref_status", por lo que solo opera en archivos que puede encontrar y que coinciden con la entrada en los metadatos.


Bedford, Massachusetts 01730

bottom of page