تحليل بيانات صناديق الاستثمار المتداولة وصناديق الاستثمار المشتركة والممتلكات: المقدمة
- Claude Paugh
- قبل 4 أيام
- 4 دقائق قراءة
تاريخ التحديث: قبل 4 أيام
قبل عدة سنوات، بدأتُ مشروعًا جانبيًا اعتقدتُ أنه سيكون ممتعًا: جمع وتحميل ملفات هيئة الأوراق المالية والبورصات الأمريكية (SEC) الخاصة بصناديق الاستثمار المتداولة (ETFs) وصناديق الاستثمار المشتركة شهريًا. أردتُ أتمتة عملية جمع ملفات هيئة الأوراق المالية والبورصات الأمريكية (SEC) باستخدام ملفات الفهرس التي تُقدمها الشركات وتُحدّثها أثناء تقديمها لملفاتها.

عند البحث في كل مجلد يحمل تسمية السنة، يمكنك البحث حسب الربع. ستجد تحت كل ربع ملفات الفهرس الرئيسية.

يحتوي كل ملف فهرس رئيسي على جرد لجميع الملفات المُودعة خلال الفترة، بما في ذلك "نوع النموذج"، الذي يُخبرك بمحتوى الملف. على سبيل المثال، يحتوي نموذج "NPORT-P" على ملف حيازات صندوق الاستثمار المتداول/صندوق الاستثمار المشترك، وهو ما بدأت به. تتبع ملفات هيئة الأوراق المالية والبورصات الأمريكية نمط تخزين نظام الملفات، لذا بمجرد فهمه، يسهل برمجته.
بدأت بإنشاء جدولين: الأول لبيانات مرجع الملف (البيانات الوصفية من الملف) والجدول الثاني لحالة تنزيل الملف.

تتم عمليات التنزيل في خطوات متعددة:
نزّل ملف master.idx للسنة/الربع المطلوب. كنتُ أؤتمت هذه العملية سابقًا، لكن التغييرات التي طرأت على موقع هيئة الأوراق المالية والبورصات الأمريكية حالت دون ذلك.
شغّل البرنامج النصي "
import pandas as pd
from datetime import datetime
from ref_data.connect_util import getconn
from sqlalchemy import text
year_target = "2024"
quarter_target = "QTR3"
# read the dataset
df = pd.read_csv(f"/tmp/{year_target}/{quarter_target}/master.idx",
delimiter="|",
skiprows=1,
header=4,
low_memory="false")
df.CIK = df.CIK.astype(str)
df["Date Filed"] = df["Date Filed"].apply(pd.to_datetime)
df["Company Name"] = df["Company Name"].astype(str)
df["Filename"] = df["Filename"].astype(str)
# get DB connection
conn, conn_r = getconn()
# Filter each frame for portfolio filings
form_list: list = ["NPORT-P", "10-Q", "10-K", "8-A12B"]
df2 = df[df["Form Type"].isin(form_list)]
print(df2.count())
try:
for row in df2.itertuples():
CIK = row[1]
company = str(row[2]).replace("'", " ")
form_type = row[3]
dt_filed = row[4]
filename = row[5]
sql_count_str = (
f"SELECT COUNT(cik) FROM inbd_raw.filing_ref WHERE cik = '{CIK}' "
f"and date_filed = to_date('{dt_filed}', 'YYYY-MM-DD') "
f"and form_type = '{form_type}' "
f"and file_name = '{filename}'"
)
rec_count = conn.execute(text(sql_count_str))
row_result = rec_count.fetchall()
for rec in row_result:
count = rec.count
if count == 0:
sql_stmt_str = (
f"INSERT INTO inbd_raw.filing_ref(cik,company_name,form_type,date_filed,file_name)"
f"values ('{CIK}', '{company}', '{form_type}', to_date('{dt_filed}', 'YYYY-MM-DD'), '{filename}')"
)
# print(sql_stmt_str)
print(
f"Adding record for {CIK} and company {company} at: "
+ str(datetime.now())
)
cur = conn_r.cursor()
cur.execute(sql_stmt_str)
else:
print(f"{CIK} Record for {company} already exists, skipping...")
except Exception as e:
print("Exeception occurred...." + str(e))
exit(1)
finally:
if conn:
conn_r.commit()
conn_r.cursor().close()
conn_r.close()
print("PostgreSQL connection is closed")
else:
exit(0)
3. يتم ملء جدول file_ref بالبيانات

نظرًا لأن بيانات الجدول تستخدم مفاتيح بديلة، أي "المفاتيح الغبية"، كمنشئ للمفاتيح الأساسية، أقوم الآن بإضافة جميع القيم الخاصة بـ "filing_ref_sid" من جدول "filing_ref" إلى جدول "filing_ref_status" عبر SQL:
insert into filing_ref_status(filing_ref_sid)
select distinct filing_ref.filing_ref_sid from filing_ref
where filing_ref_sid not in
(select distinct filing_ref_sid from filing_ref_status);
هناك أعمدة متعددة يتم تعيينها افتراضيًا على "filing_ref_status"، لذلك أحتاج فقط إلى المفتاح الأساسي من "filing_ref".
البيانات الوصفية وبيانات التتبع جاهزة، لذا شغّلتُ البرنامج النصي "
تبدو التنزيلات مثل هذا:

الملفات المخزنة في المسار الذي يتبع بنية SEC:

يتم تقديم كل ملف بتنسيق XML كما يمكنك أن ترى أعلاه.
لقد قمت بإنشاء نص برمجي يحمل عنوان "
import datetime
import os
import shutil
import time
from DBSchema.convertXML import convertXML2JSON
from ref_data.connect_util import getconn
# Constants
FORM_TYPE = "NPORT-P"
DIRECTORY = "/Volumes/ExtShield/SecFundFilings/"
# Get DB connection
conn, conn_r = getconn()
def get_query_results(f_name: str):
sql_query_select = (
f"select fr.filing_ref_sid "
f"from inbd_raw.filing_ref fr "
f"where fr.form_type = '{FORM_TYPE}' and fr.file_name = '{f_name}'"
)
cur = conn_r.cursor()
cur.execute(sql_query_select)
return cur.fetchone()
def update_database_status(ref_sid: int):
sql_update = (
f"update inbd_raw.filing_ref_status "
f"set json_converted_ind = true, record_ts = current_timestamp, "
f"json_converted_ts = current_timestamp "
f"where filing_ref_sid = {ref_sid}"
)
cur = conn_r.cursor()
upd_stat = cur.execute(sql_update)
conn_r.commit()
return upd_stat
def getFundFileList(target_dir=None, target_size=None):
if os.path.exists(target_dir):
target_files: list = []
for root, dirs, files in os.walk(target_dir):
for file in files:
# Set utime to current time
file_path = root + "/" + file
file_path = str(file_path).replace("\\", "/")
target_files.append(file_path)
if len(target_files) == target_size:
return target_files
else:
print("Path does not exists: " + target_dir)
exit(1)
+
if __name__ == "__main__":
t_size = 5000
files = getFundFileList(target_dir="/Volumes/ExtShield/SecFundFilings/raw/", target_size=t_size)
l_count: int = 0
for file in files:
split_file = file.split("/")
file_end = str(split_file[-2])[:10]
file_middle = str(split_file[-2])[10:12]
file_start = str(split_file[-2])[12:]
file_parts = file_end + '-' + file_middle + '-' + file_start + '.txt'
file_name = split_file[-5] + "/" + split_file[-4] + "/" + split_file[-3] + "/" + file_parts
filing_sid = str(get_query_results(file_name)).replace('(', '').replace(')', '').replace(',', '').replace('--.txt', '.txt')
print(filing_sid)
time.sleep(0.1)
l_count = l_count + 1
sid = get_query_results(file_name)
print("File to be processed: " + file)
print("Processing file " + str(l_count) + " of " + str(t_size))
base_file = file.replace("/Volumes/ExtShield/SecFundFilings/raw/", "")
complete_file = "/Volumes/ExtShield/SetupFilings/setup_complete/" + base_file
target_path = complete_file.replace("primary_doc.xml", "")
if not os.path.exists(target_path):
os.makedirs(target_path)
shutil.copy(file, complete_file)
processing_dt = datetime.datetime.today()
status = convertXML2JSON(inputXMLFile=file, processing_dt=processing_dt)
print(str(status))
if status == 0:
print("Continue to next file...")
update_database_status(filing_sid)
os.remove(file)
continue
else:
print("Fund did not complete successfully: " + str(status))
exit(1)
سأنهي هذه التدوينة الأولية هنا، وسأستأنف العمل بعد تحويل الملفات إلى JSON. كما ترى في النص، يمكن أن يكون "target_size" للتحويل أي حجم، ويتم نقل ملفات المصدر الأصلية إلى موقع آخر أثناء المعالجة.
يمكنك تشغيل البرنامج النصي عدة مرات بأحجام أهداف مختلفة، أو إعادة تشغيله في حال حدوث عطل، وسيستأنف البرنامج من حيث توقف. يتم تحديث تحويل الملفات في جدول "filing_ref_status"، لذا فهو يعمل فقط على الملفات التي يمكنه العثور عليها ومطابقتها مع مُدخل في البيانات الوصفية.