技術資料

AIによる数法則発見の時系列データへの拡張と金融データへの応用 

もくじ 

必要なモジュール 

必要なモジュールバージョンインストール方法 必要なモジュールバージョンインストール方法
pandas1.5.3pip install pandas==1.5.3 numpy1.23.5pip install numpy==1.23.5
torch2.7.0+cu118https://pytorch.org/get-started/locally/ sympy1.14.0pip install sympy==1.14.0
symbolicregressionhttps://github.com/facebookresearch/symbolicregression requests2.32.3pip install requests==2.32.3
IPython  sys
os sys

バージョンやインストール方法を記述していないものはおそらくもともと入っているもの

symbolic regression

プログラム 

実行手順 

データの収集からはじめる.

データの収集には以下のプログラムを使う.

このプログラムではpythonのモジュールであるyfinanceを使ってデータを収集している. 

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf
#investing.comから取得したデータ
#最新のデータダウンロードにはGoogle accountが必要

日本および米国の10年国債利回りなど一部のデータはyfinanceによって取得することはでき ないため,以下↓のウェブサイトからcsvファイルをダウンロードする.

               https://jp.investing.com/rates-bonds/japan-10-year-bond-yield-historical-data

jp_interest_rate = pd.read_csv("JP_InterestRate2Y.csv", index_col=0,  parse_dates=True)
us_interest_rate = pd.read_csv("US_interestRate2Y.csv", index_col=0, parse_dates=True)
jp_10Y = pd.read_csv("JP_10Y_rimawari.csv", index_col=0, parse_dates=True)
us_10Y = pd.read_csv("US_10Y_rimawari.csv", index_col=0, parse_dates=True)
owarineJP = jp_interest_rate[["終値"]]
owarineUS = us_interest_rate[["終値"]]
aligned_data = owarineJP.join(owarineUS, how='inner',lsuffix='_JP',rsuffix='_US')
aligned_data['金利差_JP-US'] = aligned_data['終値_JP'] - aligned_data['終値_US']
start_data = '2010-01-01'
end_data = '2025-10-01'
# S&P500、日経225、USD/JPYのデータを取得するためのコード
#アメリカの株式市場インデックスS&P500を取得
sp500_df = yf.download('^GSPC', start=start_data, end=end_data)
print("sp500_df columns:", sp500_df.columns)
if sp500_df.empty:
    print("Warning: sp500_df is empty!")
    sp500 = pd.Series(dtype=float) # 空のSeriesとして初期化
else:
    try:
        # MultiIndexの 'Close' 列を指定
        sp500 = sp500_df[('Close', '^GSPC')]
    except KeyError:
        print("KeyError: ('Close', '^GSPC') column not found in sp500_df. Available  columns:", sp500_df.columns)
       sp500 = pd.Series(dtype=float) # エラー時は空のSeries
print("sp500 data (Series):")
print(sp500.head())
print(f"Type of sp500: {type(sp500)}")
if isinstance(sp500, pd.Series):
    print(f"sp500 is a Series. Length: {len(sp500)}, Empty: {sp500.empty}, Ndim: {sp500.ndim}")
else:
    print(f"sp500 is NOT a Series. Value: {sp500}")
#日本の株式市場インデックス日経225を取得
nikkei225_df = yf.download('^N225', start=start_data, end=end_data)
print("\nnikkei225_df columns:", nikkei225_df.columns)
if nikkei225_df.empty:
    print("Warning: nikkei225_df is empty!")
    nikkei225 = pd.Series(dtype=float)
else:
    try:
        # MultiIndexの 'Close' 列を指定
        nikkei225 = nikkei225_df[('Close', '^N225')]
    except KeyError:
        print("KeyError: ('Close', '^N225') column not found in nikkei225_df.  Available columns:", nikkei225_df.columns)
       nikkei225 = pd.Series(dtype=float)
print("\nnikkei225 data (Series):")
print(nikkei225.head())
print(f"Type of nikkei225: {type(nikkei225)}")
if isinstance(nikkei225, pd.Series):
    print(f"nikkei225 is a Series. Length: {len(nikkei225)}, Empty:  {nikkei225.empty}, Ndim: {nikkei225.ndim}")
else:
    print(f"nikkei225 is NOT a Series. Value: {nikkei225}")
#USD/JPYの為替レートを取得
usd_jpy_df = yf.download('JPY=X', start=start_data, end=end_data)
print("\nusd_jpy_df columns:", usd_jpy_df.columns)
if usd_jpy_df.empty:
    print("Warning: usd_jpy_df is empty!")
    usd_jpy = pd.Series(dtype=float)
else:
    try:
        # MultiIndexの 'Close' 列を指定
        usd_jpy = usd_jpy_df[('Close', 'JPY=X')]
    except KeyError:
        print("KeyError: ('Close', 'JPY=X') column not found in usd_jpy_df.  Available columns:", usd_jpy_df.columns)
        usd_jpy = pd.Series(dtype=float)
print("\nusd_jpy data (Series):")
print(usd_jpy.head())
print(f"Type of usd_jpy: {type(usd_jpy)}")
if isinstance(usd_jpy, pd.Series):
    print(f"usd_jpy is a Series. Length: {len(usd_jpy)}, Empty: {usd_jpy.empty},  Ndim: {usd_jpy.ndim}")
else:
    print(f"usd_jpy is NOT a Series. Value: {usd_jpy}")


#データフレームに結合
# Seriesであることを確認し、空でないことを確認
is_sp500_valid = isinstance(sp500, pd.Series) and not sp500.empty
is_nikkei225_valid = isinstance(nikkei225, pd.Series) and not nikkei225.empty
is_usd_jpy_valid = isinstance(usd_jpy, pd.Series) and not usd_jpy.empty
if is_sp500_valid and is_nikkei225_valid and is_usd_jpy_valid:
    print("\nAll series are valid and non-empty. Attempting to create DataFrame.")
    print(f"Debug - Type of sp500 before DataFrame: {type(sp500)}")
    print(f"Debug - Type of nikkei225 before DataFrame: {type(nikkei225)}")
    print(f"Debug - Type of usd_jpy before DataFrame: {type(usd_jpy)}")
    
    try:
        combined_data = pd.DataFrame({
            'SP500': sp500,
            'Nikkei225': nikkei225,
            'USD/JPY': usd_jpy,
        }).dropna()
        print("\nCombined Data Head:")
        print(combined_data.head())
        print("\nCombined Data Info:")
        combined_data.info()
    except ValueError as e:
        print(f"\nValueError during DataFrame creation: {e}")
        print("Please check the types and contents of sp500, nikkei225, and usd_jpy above.")
    except Exception as e:
        print(f"\nAn unexpected error occurred during DataFrame creation: {e}")
else:
    print("\nError: One or more data series are not valid Pandas Series or are empty. Cannot create combined DataFrame.")
    print(f"SP500 valid: {is_sp500_valid}, Nikkei225 valid: {is_nikkei225_valid},  USD/JPY valid: {is_usd_jpy_valid}")
import pandas as pd
import yfinance as yf
# コモディティデータの取得とデータフレームへの格納
start_date_commodity = '2015-01-01'
end_date_commodity = '2025-01-01'
# WTI原油価格と金価格のティッカーシンボル
commodity_tickers = ['CL=F', 'GC=F'] # CL=F はWTI原油先物, GC=F は金先物
# データを一度にダウンロード
raw_commodity_data = yf.download(commodity_tickers, start=start_date_commodity,  end=end_date_commodity)
if not raw_commodity_data.empty:
    # 'Close' 価格のみを抽出
    # yf.download に複数のティッカーを渡すと、列がMultiIndexになるため、
    # raw_commodity_data['Close'] で各ティッカーの終値のDataFrameを取得できます。
   commodity_prices = raw_commodity_data['Close']
   
   # 列名を分かりやすいものに変更
   commodity_prices = commodity_prices.rename(columns={'CL=F': 'WTI_Oil', 'GC=F': 'Gold'})
   
   # NaNを含む行を削除 (いずれかのデータが存在しない日を除く)
   commodity_prices = commodity_prices.dropna()
   print("\nCommodity Data Head:")
   print(commodity_prices.head())
   print("\nCommodity Data Info:")
   commodity_prices.info()
   
   # 完成したデータフレームを表示
   commodity_data = commodity_prices
   print("\nFinal Commodity DataFrame:")
   print(commodity_data.head())
else:
   print("コモディティデータのダウンロードに失敗しました。")
   commodity_data = pd.DataFrame() # 空のDataFrameを作成
# 変数 commodity_data を次のセルで使えるようにする (Jupyter Notebookの場合)
commodity_data
owarineJP10Y = jp_10Y[["終値"]]
owarineUS10Y = us_10Y[["終値"]]
aligned_data10Y = owarineJP10Y.join(owarineUS10Y,  how='inner',lsuffix='_JP',rsuffix='_US')
aligned_data10Y['金利差_JP-US_10Y'] = aligned_data10Y['終値_JP'] -  aligned_data10Y['終値_US']
# 金利差が計算されたデータを確認
print("\nAligned Data with Interest Rate Difference Head:")
print(aligned_data10Y.head())
print("\nAligned Data with Interest Rate Difference Info:")
InterestRateDifferentials10Y = aligned_data10Y[["終値_JP","終値_US","金利差_JP-US_10Y"]]
InterestRateDifferentials10Y
vix = pd.read_csv("VIX.csv", index_col=0, parse_dates=True)
vix = vix["終値"]
import pandas as pd
# 各データフレームの準備と列名の調整
# VIXデータ (SeriesからDataFrameへ変換し、列名を'VIX'に)
if isinstance(vix, pd.Series):
   vix_df = vix.to_frame(name='VIX')
else: # もしvixが既にDataFrameで、適切な列名がついている場合はそのまま使用
     # 必要であればここで列名を確認・変更してください
   vix_df = vix
# 2年債金利差の列名を変更
InterestRateDifferentials = InterestRateDifferentials.rename(
   columns={'金利差_JP-US': '金利差_JP-US_2Y'}
)
# 10年債金利関連の列名を変更
InterestRateDifferentials10Y = InterestRateDifferentials10Y.rename(
    columns={
        '終値_JP': 'JP_10Y_Yield',
        '終値_US': 'US_10Y_Yield',
        '金利差_JP-US_10Y': '金利差_JP-US_10Y' # こちらは変更なしでも良いが、 明示的に
   }
)
# 結合するデータフレームのリストを作成
# combined_data: 'SP500', 'Nikkei225', 'USD/JPY'
# InterestRateDifferentials: '金利差_JP-US_2Y'
# InterestRateDifferentials10Y: 'JP_10Y_Yield', 'US_10Y_Yield', '金利差_JP- US_10Y'
# commodity_data: 'WTI_Oil', 'Gold'
# vix_df: 'VIX'
dataframes_to_join = [
   combined_data, # combined_data には 'USD/JPY' が含まれています
   InterestRateDifferentials,
   InterestRateDifferentials10Y,
   commodity_data,
   vix_df
]
# pd.concat を使用して、共通のインデックス(日付)で内部結合 (inner join)
# axis=1 は列方向に結合することを意味します
# join='inner' はすべてのデータフレームに存在するインデックスのみを保持します
for_predict_usdjpy = pd.concat(dataframes_to_join, axis=1, join='inner')
# 'USD/JPY' 列を削除する処理を削除またはコメントアウト
# if 'USD/JPY' in for_predict_usdjpy.columns:
#     for_predict_usdjpy = for_predict_usdjpy.drop(columns=['USD/JPY'])
#     print("\n'USD/JPY' column has been dropped.")
# else:
#     print("\n'USD/JPY' column not found in the DataFrame.") 
# 結果の確認
print("Combined DataFrame for Prediction (for_predict_usdjpy) Head:")
print(for_predict_usdjpy.head())
print("\nCombined DataFrame for Prediction (for_predict_usdjpy) Info:")
for_predict_usdjpy.info()
print("\nCombined DataFrame for Prediction (for_predict_usdjpy) Description:")
print(for_predict_usdjpy.describe()) 
# 完成したデータフレームを表示
for_predict_usdjpy
dataframe.png

用いるデータ 

データ項目時間足ダウンロード方法 データ項目時間足ダウンロード方法
SP500日足yfinanceによって取得.データ収集のプログラムにコードあり Nikkei225日足yfinanceによって取得,データ収集のプログラムにコードあり
USD/JPY日足yfinanceによって取得,データ収集のプログラムにコードあり 金利差(2年)日足investing.comから日本および米国の2年債券利回りを取得,そこから計算
symbolicregressionhttps://github.com/facebookresearch/symbolicregression requests2.32.3pip install requests==2.32.3
IPython  sys
os sys

分析のプログラム 

   import torch
   import numpy as np
   import sympy as sp
   import os, sys
   import symbolicregression
   import requests
   import pandas as pd
   import pathlib
   from IPython.display import display
   from sklearn.metrics import mean_squared_error
   import pathlib
   model_path = "model.pt"
   try:
       if not os.path.isfile(model_path):
           url = "https://dl.fbaipublicfiles.com/symbolicregression/model1.pt"
           r = requests.get(url, allow_redirects=True)
           open(model_path, 'wb').write(r.content)
       _posix_path_backup = None
       _posix_path_created = False # Flag to track if PosixPath was newly created by the patch
       if sys.platform == "win32": # Apply patch only on Windows
           if hasattr(pathlib, 'PosixPath'):
               _posix_path_backup = pathlib.PosixPath # Backup original PosixPath
               pathlib.PosixPath = pathlib.WindowsPath # Monkeypatch to WindowsPath
           else:
               # If pathlib.PosixPath does not exist, create it temporarily
               # as the pickled object might refer to 'pathlib.PosixPath' by name.
               pathlib.PosixPath = pathlib.WindowsPath
               _posix_path_created = True
       try: # Inner try for torch.load, which needs the patch
           if not torch.cuda.is_available():
               model = torch.load(model_path, map_location=torch.device('cpu'), weights_only=False)
           else:
               model = torch.load(model_path, weights_only=False)
               model = model.cuda()
           print(model.device)
           print("Model successfully loaded!")
       finally: # This finally block ensures the patch is reverted
           if sys.platform == "win32":
               if _posix_path_backup is not None:
                   pathlib.PosixPath = _posix_path_backup # Restore original PosixPath
               elif _posix_path_created:
                   delattr(pathlib, 'PosixPath') # Remove if it was newly created by the patch
   except Exception as e:
       print("ERROR: model not loaded! path was: {}".format(model_path))
       print(e)   

ここの数字をいじれば精度が変わります. 色々動かしてみたところこれくらいが計算速度,精度ともに良好. 対象によっては色々変えてみることを推奨します.

   est = symbolicregression.model.SymbolicTransformerRegressor(
                           model=model,
                           max_input_points=2000,
                           n_trees_to_refine=5000,
                           rescale=True
                           )

  data = pd.read_csv("C:/kenkyu/market_data/for_predict_usdjpy.csv",encoding="utf-8-sig") # ここでデータを読み込む

   import pandas as pd
   import numpy as np
   # データの読み込み
   file_path = "C:/kenkyu/market_data/for_predict_usdjpy.csv"
   data = pd.read_csv(file_path, encoding="utf-8-sig", index_col='Date', parse_dates=True)
   print("Data loaded successfully. Shape:", data.shape)
   print("Columns:", data.columns)
   # 目的変数と説明変数の設定
   y_target_name = 'USD/JPY'
   if y_target_name in data.columns:
       y_series = data[y_target_name]
       X_df = data.drop(columns=[y_target_name])
       print(f"\nTarget variable '{y_target_name}' found and separated.")
       print(f"Shape of X_df (features): {X_df.shape}")
       print(f"Shape of y_series (target): {y_series.shape}")
   else:
       print(f"Error: Target column '{y_target_name}' not found in the loaded data.")
       print("Please ensure 'USD/JPY' column exists in for_predict_usdjpy.csv or adjust y_target_name.")
       # エラーが発生した場合、後続の処理が失敗するため、ここで処理を中断するか、ダミーデータを設定します。
       # この例では、処理を進められるようにダミーデータを設定しますが、実際にはエラーを確認・修正してください。
       X_df = data.copy() # 全データをXとする(不適切だがエラー回避のため)
       y_series = pd.Series(np.random.rand(len(data)), index=data.index, name=y_target_name) # ダミーのy
       print("Using dummy y_series due to missing target column.")
   # NaN値の確認と処理 (もしあれば)
   # X_df と y_series を結合して NaN を一括で処理し、再度分離する
   temp_combined_df = X_df.join(y_series, how='inner') # inner join で日付が一致するもののみ
   if temp_combined_df.isnull().values.any():
       print("\nNaN values found. Dropping rows with any NaN.")
       rows_before_dropna = len(temp_combined_df)
       temp_combined_df.dropna(inplace=True)
       rows_after_dropna = len(temp_combined_df)
       print(f"Dropped {rows_before_dropna - rows_after_dropna} rows due to NaNs.")
   else:
       print("\nNo NaN values found in the combined X and y data based on common dates.")
   # NaN処理後のデータで再度Xとyを定義
   if not temp_combined_df.empty and y_target_name in temp_combined_df.columns:
       X_df_aligned = temp_combined_df.drop(columns=[y_target_name])
       y_series_aligned = temp_combined_df[y_target_name]
   else:
       print("Error: Data became empty after NaN handling or target column lost. Check data.")
       # Fallback to potentially problematic data to avoid crashing, but this needs investigation
       X_df_aligned = X_df
       y_series_aligned = y_series
   # NumPy配列に変換
   X_ts = X_df_aligned.values
   y_ts = y_series_aligned.values
   print("\nData prepared for Symbolic Regression:")
   print(f"X_ts shape: {X_ts.shape}")
   print(f"y_ts shape: {y_ts.shape}")
   if X_ts.shape[0] > 0 and y_ts.shape[0] > 0:
       print("\nFirst 5 rows of X_df_aligned (features):")
       print(X_df_aligned.head())
       print("\nFirst 5 values of y_series_aligned (target):")
       print(y_series_aligned.head())
       
       # グローバル変数に x, y を設定して、後続のセルで利用できるようにする
       x = X_ts
       y = y_ts
       print("\nGlobal variables 'x' and 'y' have been set for use in subsequent cells.")
   else:
       print("\nError: Not enough data after processing. Check for NaN issues or empty CSV.")
       # グローバル変数を空の配列などで初期化するか、エラーを発生させる
       x = np.array([])
       y = np.array([])
       print("Global variables 'x' and 'y' are empty due to data preparation issues.")
   import pandas as pd
   import numpy as np
   # データの読み込み
   file_path = "C:/kenkyu/market_data/for_predict_usdjpy.csv"
   data = pd.read_csv(file_path, encoding="utf-8-sig", index_col='Date', parse_dates=True)
   print("Data loaded successfully. Shape:", data.shape)
   print("Columns:", data.columns)

目的変数と説明変数を設定する. この変数を変えるだけではうまくいかないので時系列ずらしから変更してください

   # 目的変数と説明変数の設定
   y_target_name = 'USD/JPY'
   if y_target_name in data.columns:
       y_series_t = data[y_target_name].copy() # y(t) を保持
       X_df_t_others = data.drop(columns=[y_target_name]).copy() # y(t)以外を X_others(t) として保持
       
       print(f"\nTarget variable '{y_target_name}' (y(t)) found. Shape: {y_series_t.shape}")
       print(f"Other features X_others(t) found. Shape: {X_df_t_others.shape}")
   else:
       print(f"Error: Target column '{y_target_name}' not found in the loaded data.")
       # 適切なエラー処理 (現在のコードと同様)
       X_df_t_others = data.copy() 
       y_series_t = pd.Series(np.random.rand(len(data)), index=data.index, name=y_target_name)
       print("Using dummy y_series due to missing target column.")
   # --- ラグ特徴量の生成 ---
   # 1. 他の説明変数 X_others の t-1 の値を作成
   X_df_t_minus_1_others = X_df_t_others.shift(1)
   print("\nCreated lagged features for other variables (X_others at t-1). Shape:", X_df_t_minus_1_others.shape)
   # 2. y の t-1 の値 (USD/JPY_lag1) を作成
   y_series_t_minus_1 = y_series_t.shift(1).rename(f"{y_target_name}_lag1")
   print(f"Created lagged target variable ({y_target_name}_lag1 at t-1). Shape:", y_series_t_minus_1.shape)
   # 3. 全ての特徴量 (X_others(t-1) と y(t-1)) を結合して X_features(t-1) を作成
   #    ここで y_series_t_minus_1 (y(t-1)) が特徴量として追加される
   X_df_features_t_minus_1 = X_df_t_minus_1_others.join(y_series_t_minus_1)
   print("Combined all lagged features (X_others(t-1) and y(t-1)). Shape:", X_df_features_t_minus_1.shape)
   if f"{y_target_name}_lag1" not in X_df_features_t_minus_1.columns:
       print(f"Warning: {y_target_name}_lag1 was not successfully added to features.")
   # 4. y(t) と 全特徴量(t-1) を結合して、共通のインデックスでNaNを処理
   # y_series_t は t 시점のまま (これが目的変数)
   # X_df_features_t_minus_1 は t-1 시점の特徴量群
   # これらを結合し、ラグ生成で発生したNaNを持つ行を除去
   combined_df_for_regression = X_df_features_t_minus_1.join(y_series_t, how='inner') # y_series_t が y(t)
   print(f"\nShape before dropping NaN from combined (Features(t-1) and y(t)): {combined_df_for_regression.shape}")
   combined_df_for_regression.dropna(inplace=True)
   print(f"Shape after dropping NaN: {combined_df_for_regression.shape}")
   if not combined_df_for_regression.empty and y_target_name in combined_df_for_regression.columns:
       # X_df_final は y(t) を除いた全ての特徴量 (X_others(t-1) と y(t-1) を含む)
       X_df_final = combined_df_for_regression.drop(columns=[y_target_name])
       # y_series_final は y(t)
       y_series_final = combined_df_for_regression[y_target_name]
       
       # NumPy配列に変換
   X_ts_lagged_with_y_lag = X_df_final.values
   y_ts_aligned = y_series_final.values
   print("\nData prepared for Symbolic Regression with lagged features (including y(t-1)):")
   print(f"X_ts (features at t-1, including y(t-1)) shape: {X_ts_lagged_with_y_lag.shape}")
   print(f"y_ts (target at t) shape: {y_ts_aligned.shape}")
   print(f"Columns in X_df_final: {X_df_final.columns.tolist()}")
   # --- 追加のデバッグプリント ---
   print(f"Debug: Shape of X_ts_lagged_with_y_lag before global assignment: {X_ts_lagged_with_y_lag.shape}")
   print(f"Debug: Shape of y_ts_aligned before global assignment: {y_ts_aligned.shape}")
   if X_ts_lagged_with_y_lag.shape[0] > 0:
       print(f"Debug: First 5 rows of X_ts_lagged_with_y_lag: \n{X_ts_lagged_with_y_lag[:5]}")
   if y_ts_aligned.shape[0] > 0:
       print(f"Debug: First 5 values of y_ts_aligned: \n{y_ts_aligned[:5]}")
   # --- ここまで追加 ---
   if X_ts_lagged_with_y_lag.shape[0] > 0 and y_ts_aligned.shape[0] > 0:
           print("\nFirst 5 rows of X_df_final (features at t-1, including y(t-1)):")
           display(X_df_final.head())
           print("\nFirst 5 values of y_series_final (target at t):")
           display(y_series_final.head())
           
           # グローバル変数に x, y を設定して、後続のセルで利用できるようにする
           x = X_ts_lagged_with_y_lag # シンボリック回帰モデルへの入力X
           y = y_ts_aligned  # シンボリック回帰モデルへの入力y
           
           print("\nGlobal variables 'x' and 'y' have been set to include y(t-1) in x.")
   else:
       print("\nError: Not enough data after processing lags and NaNs.")
       x = np.array([])
       y = np.array([])
       print("Global variables 'x' and 'y' are empty.")
   # 後続のプロットセルで日付インデックスを使うために X_df_aligned と y_series_aligned を更新
   if 'X_df_final' in locals() and 'y_series_final' in locals():
       X_df_aligned = X_df_final.copy() # X_df_final には y(t-1) が含まれる
       y_series_aligned = y_series_final.copy() # y_series_final は y(t)
       print("\nUpdated X_df_aligned and y_series_aligned to reflect data with y(t-1) as a feature for potential plotting.")
   from sklearn.model_selection import train_test_split
   from sklearn.metrics import mean_squared_error, r2_score
   import matplotlib.pyplot as plt
   import sympy as sp # sympyがインポートされていることを確認
   import numpy as np # numpy をインポート
   # グローバル変数 x, y が前のセルで X_ts_lagged, y_ts_aligned に設定されていることを前提とする
   if 'x' not in globals() or 'y' not in globals() or x.shape[0] == 0:
       print("Error: Input data x or y is not defined or empty. Please run the data preparation cell first.")
       sympy_expr_full = None
       sympy_expr_train = None
   else:
       print(f"Using data for Symbolic Regression: x shape={x.shape}, y shape={y.shape}")
       # 1. モデルの学習 (全データを使用)
       print("\nFitting model on the entire dataset (lagged X, aligned y)...")
       est.fit(x, y)
       print("Model fitting complete.")
       # 2. 数式の取得と表示
       replace_ops = {"add": "+", "mul": "*", "sub": "-", "pow": "**"}
       try:
           model_str_full = est.retrieve_tree(with_infos=True)["relabed_predicted_tree"].infix()
           for op, replace_op in replace_ops.items():
               model_str_full = model_str_full.replace(op, replace_op)
           sympy_expr_full = sp.parse_expr(model_str_full)
           print("\nRetrieved symbolic expression (trained on full data):")
           display(sympy_expr_full)
       except Exception as e:
           print(f"Error retrieving or parsing symbolic expression for full data: {e}")
           sympy_expr_full = None
       # 3. 評価指標の計算 (全データ)
       if sympy_expr_full:
           try:
               num_features_full = x.shape[1]
               variables_full = [sp.Symbol(f'x_{i}') for i in range(num_features_full)]
               def safe_inv(x_val):
                   res = np.divide(1.0, x_val, out=np.full_like(x_val, np.nan), where=x_val!=0)
                   return res
               custom_modules = [{'inv': safe_inv}, 'numpy']
               func_full = sp.lambdify(variables_full, sympy_expr_full, modules=custom_modules)
               input_data_for_func_full = [x[:, i] for i in range(x.shape[1])]
               y_pred_full = func_full(*input_data_for_func_full)
               if isinstance(y_pred_full, (int, float)):
                   y_pred_full = np.full_like(y, y_pred_full, dtype=np.float64)
               y_pred_full = np.asarray(y_pred_full).flatten()
               finite_mask_full = np.isfinite(y_pred_full) & np.isfinite(y)
               y_clean_full = y[finite_mask_full]
               y_pred_clean_full = y_pred_full[finite_mask_full]
               if len(y_pred_clean_full) > 0 and len(y_clean_full) > 0:
                   if len(y_pred_clean_full) == len(y_clean_full):
                       mse_full = mean_squared_error(y_clean_full, y_pred_clean_full)
                       rmse_full = np.sqrt(mse_full)
                       r2_full = r2_score(y_clean_full, y_pred_clean_full)
                       print(f"\n--- Evaluation on Entire Dataset ---")
                       print(f"RMSE (finite values only): {rmse_full:.4f}")
                       print(f"R-squared (finite values only): {r2_full:.4f}")
                       if np.sum(~finite_mask_full) > 0:
                           print(f"Note: {np.sum(~finite_mask_full)} rows with non-finite values were excluded from calculation.")
                   else:
                       print("Warning: Length mismatch after cleaning non-finite values for full data. Metrics not calculated.")
               else:
                   print("No finite predictions available for metrics calculation on full data.")
           except Exception as e:
               print(f"Error during metrics calculation for full data: {e}")
       else:
           print("Skipping metrics for full data as symbolic expression was not retrieved.")
       # --- 時系列を考慮した訓練/テスト分割 ---
       split_ratio = 0.8
       split_index = int(len(x) * split_ratio)
       x_train_ts, x_test_ts = x[:split_index], x[split_index:]
       y_train_ts, y_test_ts = y[:split_index], y[split_index:]
       if 'X_df_aligned' in globals() and 'y_series_aligned' in globals() and \
       hasattr(X_df_aligned, 'index') and hasattr(y_series_aligned, 'index') and \
       len(X_df_aligned) == len(x) and len(y_series_aligned) == len(y):
           train_dates = X_df_aligned.index[:split_index]
           test_dates = X_df_aligned.index[split_index:]
           if not train_dates.empty and not test_dates.empty:
               print(f"\nTraining data covers dates from {train_dates.min()} to {train_dates.max()} (Length: {len(train_dates)})")
               print(f"Test data covers dates from {test_dates.min()} to {test_dates.max()} (Length: {len(test_dates)})")
           else:
               print("\nWarning: Date range for train/test is empty. Using numerical index for plotting.")
               train_dates = pd.RangeIndex(start=0, stop=len(x_train_ts))
               test_dates = pd.RangeIndex(start=len(x_train_ts), stop=len(x_train_ts) + len(x_test_ts))
       else:
           print("\nWarning: X_df_aligned or y_series_aligned not found, not having an index, or length mismatch. Using numerical index for plotting.")
           train_dates = pd.RangeIndex(start=0, stop=len(x_train_ts))
           test_dates = pd.RangeIndex(start=len(x_train_ts), stop=len(x_train_ts) + len(x_test_ts))
       print(f"\nTraining data shape: x_train_ts={x_train_ts.shape}, y_train_ts={y_train_ts.shape}")
       print(f"Test data shape: x_test_ts={x_test_ts.shape}, y_test_ts={y_test_ts.shape}")
       if len(x_train_ts) > 0 and len(x_test_ts) > 0:
           print("\nFitting model on training data (x_train_ts, y_train_ts)...")
           est.fit(x_train_ts, y_train_ts)
           print("Model fitting on training data complete.")
           sympy_expr_train = None
           func_train = None
           try:
               model_str_train = est.retrieve_tree(with_infos=True)["relabed_predicted_tree"].infix()
               for op, replace_op in replace_ops.items():
                   model_str_train = model_str_train.replace(op, replace_op)
               sympy_expr_train = sp.parse_expr(model_str_train)
               print(f"\nRetrieved symbolic expression (trained on training data):")
               display(sympy_expr_train)
               # --- 可読性向上処理ここから ---
               from sympy import Float
               def readable_expr_transform(expr, col_names, digits=3):
                   """
                   Sympy数式を直接操作して、定数の丸めと変数名の置換を行う(文字列化を回避)
                   """
                   # 1. 数式内のすべての浮動小数点数を丸める
                   try:
                       replacements = {n: n.round(digits) for n in expr.atoms(Float)}
                       rounded_expr = expr.xreplace(replacements)
                   except Exception as e:
                       print(f"Could not round floats: {e}")
                       rounded_expr = expr
                   # 2. x_i 形式の変数を実際の列名に置換する
                   try:
                       # インデックスの大きい順に置換(x_10をx_1で誤置換しないため)
                       subs_dict = {
                           sp.Symbol(f'x_{i}'): sp.Symbol(f'`{name}`')
                           for i, name in reversed(list(enumerate(col_names)))
                       }
                       final_expr = rounded_expr.subs(subs_dict)
                   except Exception as e:
                       print(f"Could not substitute variable names: {e}")
                       final_expr = rounded_expr
                   
                   return final_expr
               try:
                   if 'X_df_final' in globals() and sympy_expr_train is not None:
                       # 変換処理を適用
                       simplified_expr = readable_expr_transform(sympy_expr_train, X_df_final.columns.tolist())
                       
                       print("\n--- 可読性向上後の式(簡約・丸め・変数名反映)---")
                       display(simplified_expr)
                       print(sp.pretty(simplified_expr))
                   else:
                       print("Skipping readability improvement: column names or expression not available.")
               except Exception as e:
                   print(f"可読性向上処理でエラー: {e}")
                   if 'sympy_expr_train' in locals():
                       print("Original expression:", sympy_expr_train)
               # --- 可読性向上処理ここまで ---
               num_features_train = x_train_ts.shape[1]
               variables_train = [sp.Symbol(f'x_{i}') for i in range(num_features_train)]
               custom_modules_train = [{'inv': safe_inv}, 'numpy']
               func_train = sp.lambdify(variables_train, sympy_expr_train, modules=custom_modules_train)
               print("Expression from training data lambdified successfully.")
           except Exception as e:
               print(f"Error during expression retrieval or lambdify from training: {e}")
               # エラーが発生した場合、後続の処理が失敗しないように変数をNoneに設定
               func_train = None
               sympy_expr_train = None
           if func_train:
               y_train_pred_ts = None
               y_test_pred_ts = None
               y_train_ts_clean = y_train_ts
               y_train_pred_ts_clean = None
               y_test_ts_clean = y_test_ts
               y_test_pred_ts_clean = None
               try:
                   if x_train_ts.shape[0] > 0:
                       input_data_train_func = [x_train_ts[:, i] for i in range(x_train_ts.shape[1])]
                       y_train_pred_ts = func_train(*input_data_train_func)
                       if isinstance(y_train_pred_ts, (int, float)):
                           y_train_pred_ts = np.full_like(y_train_ts, y_train_pred_ts, dtype=np.float64)
                       y_train_pred_ts = np.asarray(y_train_pred_ts).flatten()
                       
                       finite_mask_train = np.isfinite(y_train_pred_ts) & np.isfinite(y_train_ts)
                       y_train_ts_clean = y_train_ts[finite_mask_train]
                       y_train_pred_ts_clean = y_train_pred_ts[finite_mask_train]
                       
                       if len(y_train_pred_ts_clean) > 0 and len(y_train_ts_clean) > 0:
                           mse_train = mean_squared_error(y_train_ts_clean, y_train_pred_ts_clean)
                           rmse_train = np.sqrt(mse_train)
                           r2_train = r2_score(y_train_ts_clean, y_train_pred_ts_clean)
                           print(f"\n--- Evaluation on Training Data ---")
                           print(f"RMSE (finite values only): {rmse_train:.4f}")
                           print(f"R-squared (finite values only): {r2_train:.4f}")
                           if np.sum(~finite_mask_train) > 0:
                               print(f"Note: {np.sum(~finite_mask_train)} rows with non-finite values were excluded from calculation.")
                       else:
                           print("No finite predictions available for metrics calculation on training data.")
               except Exception as e:
                   print(f"Error predicting or calculating metrics on training data: {e}")
                   y_train_pred_ts = None
               try:
                   if x_test_ts.shape[0] > 0:
                       input_data_test_func = [x_test_ts[:, i] for i in range(x_test_ts.shape[1])]
                       y_test_pred_ts = func_train(*input_data_test_func)
                       if isinstance(y_test_pred_ts, (int, float)):
                           y_test_pred_ts = np.full_like(y_test_ts, y_test_pred_ts, dtype=np.float64)
                       y_test_pred_ts = np.asarray(y_test_pred_ts).flatten()
                       finite_mask_test = np.isfinite(y_test_pred_ts) & np.isfinite(y_test_ts)
                       y_test_ts_clean = y_test_ts[finite_mask_test]
                       y_test_pred_ts_clean = y_test_pred_ts[finite_mask_test]
                       if len(y_test_pred_ts_clean) > 0 and len(y_test_ts_clean) > 0:
                           mse_test = mean_squared_error(y_test_ts_clean, y_test_pred_ts_clean)
                           rmse_test = np.sqrt(mse_test)
                           r2_test = r2_score(y_test_ts_clean, y_test_pred_ts_clean)
                           print(f"\n--- Evaluation on Test Data ---")
                           print(f"RMSE (finite values only): {rmse_test:.4f}")
                           print(f"R-squared (finite values only): {r2_test:.4f}")
                           if np.sum(~finite_mask_test) > 0:
                               print(f"Note: {np.sum(~finite_mask_test)} rows with non-finite values were excluded from calculation.")
                       else:
                           print("No finite predictions available for metrics calculation on test data.")
               except Exception as e:
                   print(f"Error predicting or calculating metrics on test data: {e}")
                   y_test_pred_ts = None
               # 7. グラフの描画
               plt.figure(figsize=(15, 7))
               if len(train_dates) == len(y_train_ts):
                   plt.plot(train_dates, y_train_ts, label='Actual Train USD/JPY', color='blue', alpha=0.7)
               if y_train_pred_ts is not None and len(train_dates) == len(y_train_pred_ts):
                   plt.plot(train_dates, y_train_pred_ts, label='Predicted Train USD/JPY', color='deepskyblue', linestyle='--')
               
               if len(test_dates) == len(y_test_ts):
                   plt.plot(test_dates, y_test_ts, label='Actual Test USD/JPY', color='green', alpha=0.7)
               if y_test_pred_ts is not None and len(test_dates) == len(y_test_pred_ts):
                   plt.plot(test_dates, y_test_pred_ts, label='Predicted Test USD/JPY', color='springgreen', linestyle='--')
               
               plt.title('USD/JPY Prediction: Actual vs. Predicted (Lagged Features)')
               plt.xlabel('Date')
               plt.ylabel('USD/JPY')
               plt.legend()
               plt.grid(True)
               plt.show()
               fig, axes = plt.subplots(1, 2, figsize=(14, 6), sharey=True)
               if y_train_pred_ts_clean is not None and len(y_train_pred_ts_clean) > 0:
                   axes[0].scatter(y_train_ts_clean, y_train_pred_ts_clean, alpha=0.5, label='Predictions', color='blue')
                   min_val_train = min(np.min(y_train_ts_clean), np.min(y_train_pred_ts_clean))
                   max_val_train = max(np.max(y_train_ts_clean), np.max(y_train_pred_ts_clean))
                   axes[0].plot([min_val_train, max_val_train], [min_val_train, max_val_train], 'r--', lw=2, label='Perfect Fit')
               else:
                   axes[0].text(0.5, 0.5, "No valid training predictions to plot.", ha='center', va='center')
               axes[0].set_xlabel("Actual Values (Train)")
               axes[0].set_ylabel("Predicted Values (Train)")
               axes[0].set_title("Training Data: Actual vs. Predicted")
               axes[0].legend()
               axes[0].grid(True)
               if y_test_pred_ts_clean is not None and len(y_test_pred_ts_clean) > 0:
                   axes[1].scatter(y_test_ts_clean, y_test_pred_ts_clean, alpha=0.5, label='Predictions', color='green')
                   min_val_test = min(np.min(y_test_ts_clean), np.min(y_test_pred_ts_clean))
                   max_val_test = max(np.max(y_test_ts_clean), np.max(y_test_pred_ts_clean))
                   axes[1].plot([min_val_test, max_val_test], [min_val_test, max_val_test], 'r--', lw=2, label='Perfect Fit')
               else:
                   axes[1].text(0.5, 0.5, "No valid test predictions to plot.", ha='center', va='center')
               axes[1].set_xlabel("Actual Values (Test)")
               axes[1].set_ylabel("Predicted Values (Test)")
               axes[1].set_title("Test Data: Actual vs. Predicted")
               axes[1].legend()
               axes[1].grid(True)
               
               plt.tight_layout()
               plt.show()
           else:
               print("\nSkipping prediction and plotting for train/test split because the function (func_train) could not be created.")
       else:
           print("\nSkipping train/test split evaluation as there is not enough data for both training and testing after splitting.")
   if 'sympy_expr_full' not in locals() or sympy_expr_full is None:
       print("\nSymbolic regression (full data) could not be performed due to data preparation issues or model fitting errors.")
   if 'sympy_expr_train' not in locals() or sympy_expr_train is None:
       print("\nSymbolic regression (train data) could not be performed due to data preparation issues or model fitting errors.")
kekka.png

分析には以下のフォルダの中にあるExample.ipynbを実行すればよい.


トップ   新規 一覧 検索 最終更新   ヘルプ   最終更新のRSS