top of page

ईटीएफ, म्यूचुअल फंड और संपत्ति डेटा विश्लेषण: परिचय

  • लेखक की तस्वीर: Claude Paugh
    Claude Paugh
  • 4 दिन पहले
  • 5 मिनट पठन

अपडेट करने की तारीख: 4 दिन पहले

कई साल पहले, मैंने एक साइड प्रोजेक्ट शुरू किया था जिसके बारे में मुझे लगा कि यह मजेदार होगा: ईटीएफ और म्यूचुअल फंड के लिए मासिक एसईसी फाइलिंग को एकत्रित करना और अपलोड करना। मैं कम्पनियों द्वारा प्रस्तुत सूचकांक फाइलिंग का उपयोग करके एसईसी फाइलिंग को संकलित करने की प्रक्रिया को स्वचालित करना चाहता था, तथा फाइल करते समय उसे अद्यतन करना चाहता था।


Directory listings for SEC filings
Directory listings for SEC filings

वर्ष के साथ लेबल किए गए प्रत्येक फ़ोल्डर को खोजते समय, आप तिमाही के अनुसार खोज सकते हैं। प्रत्येक तिमाही के अंतर्गत आपको मुख्य सूचकांक फ़ाइलें मिलेंगी।

Master index files for 2025 QTR1
Master index files for 2025 QTR1






प्रत्येक मास्टर इंडेक्स फ़ाइल में उस अवधि के दौरान दायर की गई सभी फ़ाइलों की सूची होती है, जिसमें "फ़ॉर्म प्रकार" भी शामिल होता है, जो आपको बताता है कि फ़ाइल में क्या है। उदाहरण के लिए, फॉर्म एनपोर्ट-पी में ईटीएफ/म्यूचुअल फंड होल्डिंग्स फाइल होती है, जिससे मैंने शुरुआत की थी। एसईसी फाइलिंग फाइल सिस्टम भंडारण पैटर्न का पालन करती है, इसलिए एक बार समझ लेने के बाद, उन्हें प्रोग्राम करना आसान होता है।


मैंने दो तालिकाएँ बनाकर शुरुआत की: एक फ़ाइल संदर्भ डेटा (फ़ाइल से मेटाडेटा) के लिए और दूसरी फ़ाइल डाउनलोड स्थिति के लिए।


Tables in PostgreSQL for metadata tracking of SEC filings
Tables in PostgreSQL for metadata tracking of SEC filings

डाउनलोड कई चरणों में किया जाता है:

  1. इच्छित वर्ष/तिमाही के लिए master.idx फ़ाइल डाउनलोड करें। मैं इस प्रक्रिया को स्वचालित कर देता था, लेकिन SEC वेबसाइट में परिवर्तन के कारण ऐसा नहीं हो सका।

  2. स्क्रिप्ट चलाएँ "

import pandas as pd
from datetime import datetime
from ref_data.connect_util import getconn
from sqlalchemy import text

year_target = "2024"
quarter_target = "QTR3"
# read the dataset
df = pd.read_csv(f"/tmp/{year_target}/{quarter_target}/master.idx",
                 delimiter="|",
                 skiprows=1,
                 header=4,
                 low_memory="false")

df.CIK = df.CIK.astype(str)
df["Date Filed"] = df["Date Filed"].apply(pd.to_datetime)
df["Company Name"] = df["Company Name"].astype(str)
df["Filename"] = df["Filename"].astype(str)

# get DB connection
conn, conn_r = getconn()

# Filter each frame for portfolio filings
form_list: list = ["NPORT-P", "10-Q", "10-K", "8-A12B"]
df2 = df[df["Form Type"].isin(form_list)]
print(df2.count())

try:
    for row in df2.itertuples():
        CIK = row[1]
        company = str(row[2]).replace("'", " ")
        form_type = row[3]
        dt_filed = row[4]
        filename = row[5]

        sql_count_str = (
            f"SELECT COUNT(cik) FROM inbd_raw.filing_ref WHERE cik = '{CIK}' "
            f"and date_filed = to_date('{dt_filed}', 'YYYY-MM-DD') "
            f"and form_type = '{form_type}' "
            f"and file_name = '{filename}'"
        )

        rec_count = conn.execute(text(sql_count_str))
        row_result = rec_count.fetchall()

        for rec in row_result:
            count = rec.count

            if count == 0:
                sql_stmt_str = (
                    f"INSERT INTO inbd_raw.filing_ref(cik,company_name,form_type,date_filed,file_name)"
                    f"values ('{CIK}', '{company}', '{form_type}', to_date('{dt_filed}', 'YYYY-MM-DD'), '{filename}')"
                )

                # print(sql_stmt_str)
                print(
                    f"Adding record for {CIK} and company {company} at: "
                    + str(datetime.now())
                )
                cur = conn_r.cursor()
                cur.execute(sql_stmt_str)
            else:
                print(f"{CIK} Record for {company} already exists, skipping...")

except Exception as e:
    print("Exeception occurred...." + str(e))
    exit(1)
finally:
    if conn:
        conn_r.commit()
        conn_r.cursor().close()
        conn_r.close()
        print("PostgreSQL connection is closed")
    else:
        exit(0)
    

3. file_ref तालिका डेटा से भरी हुई है।

Table data from filing_ref
Table data from filing_ref

चूंकि तालिका डेटा प्राथमिक कुंजियों के निर्माता के रूप में सरोगेट कुंजियों, यानी "गूंगा कुंजियों" का उपयोग करता है, इसलिए अब मैं "filling_ref" तालिका से "filling_ref_sid" के सभी मानों को SQL के माध्यम से "filling_ref_status" तालिका में जोड़ता हूं:

insert into filing_ref_status(filing_ref_sid)
select distinct filing_ref.filing_ref_sid from filing_ref
      where filing_ref_sid not in
       (select distinct filing_ref_sid from filing_ref_status);

ऐसे कई कॉलम हैं जो डिफ़ॉल्ट रूप से "filling_ref_status" पर सेट होते हैं, इसलिए मुझे केवल "filling_ref" से प्राथमिक कुंजी की आवश्यकता है।

  1. मेटाडेटा और ट्रैकिंग डेटा तैयार था, इसलिए मैंने स्क्रिप्ट चलायी।

डाउनलोड इस प्रकार दिखते हैं:

SEC संरचना का अनुसरण करने वाले पथ में संग्रहीत फ़ाइलें:

Downloaded SEC Filings
Downloaded SEC Filings

जैसा कि आप ऊपर देख सकते हैं, प्रत्येक फ़ाइल XML प्रारूप में प्रस्तुत की गई है।


  1. मैंने एक स्क्रिप्ट बनाई जिसका शीर्षक था "

import datetime
import os
import shutil
import time
from DBSchema.convertXML import convertXML2JSON
from ref_data.connect_util import getconn

# Constants
FORM_TYPE = "NPORT-P"
DIRECTORY = "/Volumes/ExtShield/SecFundFilings/"

# Get DB connection
conn, conn_r = getconn()

def get_query_results(f_name: str):
    sql_query_select = (
        f"select fr.filing_ref_sid "
        f"from inbd_raw.filing_ref fr "
        f"where fr.form_type = '{FORM_TYPE}' and fr.file_name = '{f_name}'"
    )
    cur = conn_r.cursor()
    cur.execute(sql_query_select)
    return cur.fetchone()

def update_database_status(ref_sid: int):
    sql_update = (
        f"update inbd_raw.filing_ref_status "
        f"set json_converted_ind = true, record_ts = current_timestamp, "
        f"json_converted_ts = current_timestamp "
        f"where filing_ref_sid = {ref_sid}"
    )
    cur = conn_r.cursor()
    upd_stat = cur.execute(sql_update)
    conn_r.commit()
    return upd_stat

def getFundFileList(target_dir=None, target_size=None):
    if os.path.exists(target_dir):
        target_files: list = []
        for root, dirs, files in os.walk(target_dir):
            for file in files:
                # Set utime to current time
                file_path = root + "/" + file
                file_path = str(file_path).replace("\\", "/")
                target_files.append(file_path)
                if len(target_files) == target_size:
                    return target_files
    else:
        print("Path does not exists: " + target_dir)
        exit(1)
+
if __name__ == "__main__":
    t_size = 5000
    files = getFundFileList(target_dir="/Volumes/ExtShield/SecFundFilings/raw/", target_size=t_size)
    l_count: int = 0

    for file in files:
        split_file = file.split("/")
        file_end = str(split_file[-2])[:10]
        file_middle = str(split_file[-2])[10:12]
        file_start = str(split_file[-2])[12:]
        file_parts = file_end + '-' + file_middle + '-' + file_start + '.txt'
        file_name = split_file[-5] + "/" + split_file[-4] + "/" + split_file[-3] + "/" + file_parts
        filing_sid = str(get_query_results(file_name)).replace('(', '').replace(')', '').replace(',', '').replace('--.txt', '.txt')
        print(filing_sid)
        time.sleep(0.1)
        l_count = l_count + 1
        sid = get_query_results(file_name)
        print("File to be processed: " + file)
        print("Processing file " + str(l_count) + " of " + str(t_size))

        base_file = file.replace("/Volumes/ExtShield/SecFundFilings/raw/", "")
        complete_file = "/Volumes/ExtShield/SetupFilings/setup_complete/" + base_file
        target_path = complete_file.replace("primary_doc.xml", "")

        if not os.path.exists(target_path):
            os.makedirs(target_path)
        shutil.copy(file, complete_file)

        processing_dt = datetime.datetime.today()
        status = convertXML2JSON(inputXMLFile=file, processing_dt=processing_dt)
        print(str(status))

        if status == 0:
            print("Continue to next file...")
            update_database_status(filing_sid)
            os.remove(file)
            continue
        else:
            print("Fund did not complete successfully: " + str(status))
            exit(1)

मैं इस प्रारंभिक पोस्ट को यहीं समाप्त करूंगा, और फ़ाइलों को JSON में परिवर्तित करने के बाद काम फिर से शुरू करूंगा। जैसा कि आप पाठ में देख सकते हैं, रूपांतरण के लिए "target_size" का आकार कोई भी हो सकता है, और मूल स्रोत फ़ाइलें प्रसंस्करण के दौरान किसी अन्य स्थान पर ले जाई जाती हैं।


आप स्क्रिप्ट को विभिन्न लक्ष्य आकारों के साथ कई बार चला सकते हैं, या क्रैश होने पर इसे पुनः आरंभ कर सकते हैं, और स्क्रिप्ट वहीं से पुनः आरंभ हो जाएगी जहां से इसे छोड़ा गया था। फ़ाइल रूपांतरण को "filing_ref_status" तालिका में अद्यतन किया जाता है, इसलिए यह केवल उन फ़ाइलों पर काम करता है जिन्हें यह ढूंढ सकता है और मेटाडेटा में प्रविष्टि से मिलान कर सकता है।


 
 

बेडफोर्ड, एमए 01730

bottom of page