| フォルダ名 | 用途 | ファイル名 | ファイルの場所 |
| kawasehendou | 為替変動の影響分析 | 7to12_Yfinance.ipynb | kawasehendou |
| kabuka | 株価データのデータ収集 | 7to12_JPX401.ipynb | kabuka |
| sankakuka_2 | 産業連関表の三角化 | sankakuka_2020.py | sankakuka_2 |
| 3Dgraph | jsonファイルの作成 | 3Dgraph_kihon.ipynb | 3Dgraph |
| 3Dgraph | 3Dグラフを可視化するための実行ファイル | app_0114.py | 3Dgraph |
| 3Dgraph | javascriptで読み込む用のjsonファイル | output_toda_kihon_zenbu.json | 3Dgraph/static/json |
| 3Dgraph | 3Dグラフのデータの可視化 | index_0114.html | 3Dgraph/templates |
| 3Dgraph | 3Dグラフを作成するときのjavascriptのファイル | main2_0114.js | 3Dgraph/static |
「get_crime_data.py」を実行すると、富山県警が公開している「犯罪発生マップ」(http://www.machi-info.jp/machikado/police_pref_toyama/index.jsp)から犯罪発生データを取得し、「crimes.csv」として保存されます。
「hop_統計.ipynb」を実行すると、メッシュコードごとにデータを結合し「取引_件数付き.csv」に生成します。
# メッシュGeoJsonを,分かりやすい形に直す
_meshes = {}
for mesh in meshes:
meshcode = mesh["properties"]["code"]
coords = mesh["geometry"]["coordinates"][0]
north = coords[1][1]
south = coords[0][1]
east = coords[2][0]
west = coords[0][0]
center = [(east + west) / 2, (north + south) / 2]
_meshes[meshcode] = {
"north": north,
"south": south,
"east": east,
"west": west,
"center": center
}
meshes = _meshes
# 犯罪データの座標から,どのグリッドセルに含まれるかを求め
# 「mesh_code」列に格納する
# 読み込んだメッシュGeoJsonに含まれない犯罪データは除外
df_temp = df_crimes[0:0].copy()
df_temp["mesh_code"] = None
for index, row in df_crimes.iterrows():
for meshcode, coords in meshes.items():
if ((coords["south"] <= row["latitude"] < coords["north"]) and
(coords["west"] <= row["longitude"] < coords["east"])):
if not pd.isnull(row["datetime"]):
df_temp.loc[index] = pd.concat([
row,
pd.Series({
"mesh_code": meshcode
})
])
break
print(f"{df_crimes.index.get_loc(index)} / {len(df_crimes)}")
df_crimes = df_temp.copy()
df_crimes["datetime"] = pd.to_datetime(df_crimes["datetime"], format = "%Y/%m/%d %H:%M")
# CSVファイルの読み込み df_crimes = pd.read_csv(HERE / "犯罪_メッシュ入り.csv") #メッシュごとに件数をカウント mesh_counts = df_crimes['mesh_code'].value_counts().reset_index() mesh_counts.columns = ['mesh_code', '件数'] # 結果を表示 print(mesh_counts) # 結果をCSVファイルに保存する場合 mesh_counts.to_csv(HERE / "犯罪_メッシュ件数.csv", index=False, encoding='utf-8-sig')
「hop_メッシュ.ipynb」を実行すると、メッシュコードごとにデータを結合し「取引_統計データ付き.csv」に生成します。緯度・経度をもとに、500mメッシュ単位で犯罪データを分類できます。
「distance.ipynb」を実行すると、住宅地データ (取引_統計データ付き.csv) と犯罪データ (犯罪_メッシュ入り.csv) を読み込み、各住宅地から最寄りの犯罪発生地点までの最短距離を計算するために、geopy.distanceモジュールのgeodesic関数を使用します。この関数は、2点間の地球上での距離(メートル単位)を計算します。契約された土地から最短の距離で犯罪が起きた場所までの最短距離を「取引_統計データ付き.csv」に追加した「住宅地_犯罪_最短距離.csv」を生成します。
import pandas as pd
from geopy.distance import geodesic
# ファイルを読み込む
df_housing = pd.read_csv("C:/Users/nt011/Desktop/統計正しいほう/データセット
の作成/取引_統計データ付き.csv")
df_crimes = pd.read_csv("C:/Users/nt011/Desktop/統計正しいほう/データセット
の作成/犯罪_メッシュ入り.csv")
print(df_housing.columns)
# 1. 住宅地データの緯度経度情報を抽出
df_housing['latitude'] = df_housing['緯度']
df_housing['longitude'] = df_housing['経度']
# 2. 犯罪データの緯度経度情報を抽出
df_crimes['latitude'] = df_crimes['latitude']
df_crimes['longitude'] = df_crimes['longitude']
# 3.最短距離を計算する関数
def calculate_min_distance(lat1, lon1, df_target, lat_col, lon_col):
distances = df_target.apply(lambda row: geodesic((lat1, lon1),
(row[lat_col], row[lon_col])).meters, axis=1)
return distances.min()
# 4. 各住宅地から犯罪発生場所までの最短距離を計算
df_housing['min_distance_to_crime'] = df_housing.apply(lambda row:
calculate_min_distance(row['latitude'], row['longitude'], df_crimes, 'latitude', 'longitude'), axis=1)
# 距離が0になる住宅地の行を表示
df_housing[df_housing['min_distance_to_crime'] == 0]
# 必要なカラムを選択して新しいDataFrameに保存
df_result = df_housing[['mesh_code', '住所', '経度', '緯度', '最寄駅:名称', '最寄駅:距離(分)', '取引価格(総額)', '面積(㎡)', '取引価格(㎡単価)',
'土地の形状', '間口', '延床面積(㎡)','建築年数', '建物の構造',
'用途', '前面道路:方位', '前面道路:種類', '前面道路:幅員(m)', '都市計画', '建ぺい率(%)',
'容積率(%)', '取引時点', '件数', '犯罪発生率(件/250000)', '人口', '18歳未満人口割合',
'65歳以上人口割合', '外国人人口割合', '世帯数', '単身世帯割合', '核家族世帯割合', '正規労働者割合',
'非正規労働者割合', '最終学歴が中学以下の人口割合', '最終学歴が高校の人口割合', '最終学歴が大学以上の人口割合',
'居住年数5年未満の人口割合', '居住年数が20年以上の人口割合', '一戸建て割合', 'アパート・低中層マンション割合',
'高層マンション割合', 'コンビニ', '駅', '駐車場', '駐輪場', '金融機関', '旅館', 'ホテル', 'スーパー', 'ショッピングモール', 'デパート', '警察署/交番', '小学校', '中学校', '高校', '大学',
'保育園/幼稚園', 'レジャー施設','道路の面積比率', '建物の面積比率', '水の面積比率', '空き地の面積比率',
'ノード数(道路)', 'エッジ数(道路)', '密度(道路)', '平均次数(道路)', 'min_distance_to_crime']]
# 結果の保存
df_result.to_csv('C:/Users/nt011/Desktop/統計正しいほう/データセットの作成/
住宅地_犯罪_最短距離.csv', index=False, encoding='utf-8-sig')
作成したデータセットを回帰分析がかけられる形に整える。
「前処理.ipynb」を実行し、「住宅地_犯罪_最短距離.csv」から「建築年数」と「最寄駅:距離(分)」の列に関する欠損値を含む行を削除した「住宅地_犯罪_最短距離_欠損値削除後.csv」を生成します。
import pandas as pd
# CSVファイルのパス
file_path = r"C:\Users\nt011\Desktop\統計正しいほう\前処理\住宅地_犯罪_最短距離.csv"
# CSVファイルを読み込む
df = pd.read_csv(file_path)
# 欠損値を含む行を削除
df = df.dropna()
# 「建築年数」列に `-1` または `"VALUE"` を含む行を削除
df = df[~df["建築年数"].astype(str).isin(["-1", "#VALUE!"])]
# 「最寄駅:距離(分)」の値を変換
replace_dict = {
"30分~60分": 45,
"1H~1H30": 75,
"1H30~2H": 105
}
df["最寄駅:距離(分)"] = df["最寄駅:距離(分)"].replace(replace_dict)
# クリーンなデータを新しいCSVファイルとして保存
output_path = r"C:\Users\nt011\Desktop\統計正しいほう\前処理\住宅地_犯罪_最短距離_欠損値削除後.csv"
df.to_csv(output_path, index=False, encoding='utf-8-sig')
print("データ処理完了: 欠損値と指定された異常値を削除し、駅距離の値を変換しました。")
print(f"新しいCSVファイルが {output_path} に保存されました。")
生成した「住宅地_犯罪_最短距離_欠損値削除後.csv」から連続変数の外れ値を検出するカラムリストを定義し、データ型を数値に変換したのち検出された列を削除したリスト「外れ値削除.csv」を生成します。
import pandas as pd
# CSVファイルのパス
file_path = "C:/Users/nt011/Desktop/統計正しいほう/前処理/住宅地_犯罪_最短距
離_欠損値削除後.csv"
# CSVファイルを読み込む
df = pd.read_csv(file_path)
# 外れ値を検出する連続変数のカラムリスト
continuous_columns = [
"最寄駅:距離(分)", "取引価格(総額)", "面積(㎡)", "取引価格(㎡単価)",
"間口", "延床面積(㎡)", "建築年数", "前面道路:幅員(m)",
"建ぺい率(%)", "容積率(%)", "犯罪発生率(件/250000)", "人口",
"18歳未満人口割合", "65歳以上人口割合", "外国人人口割合", "世帯数",
"単身世帯割合", "核家族世帯割合", "正規労働者割合", "非正規労働者割合",
"最終学歴が中学以下の人口割合", "最終学歴が高校の人口割合", "最終学歴が大学以上の人口割合",
"居住年数5年未満の人口割合", "居住年数が20年以上の人口割合", "一戸建て割合",
"アパート・低中層マンション割合", "高層マンション割合", "道路の面積比率",
"建物の面積比率", "水の面積比率", "空き地の面積比率", "ノード数(道路)",
"エッジ数(道路)", "密度(道路)", "平均次数(道路)", "min_distance_to_crime"
]
# データ型を数値に変換(エラーが出たらNaNにする)
for column in continuous_columns:
if column in df.columns:
df[column] = pd.to_numeric(df[column], errors='coerce')
# 外れ値を持つ行のインデックスを格納するリスト
outlier_indices = set()
# 外れ値の検出と削除
for column in continuous_columns:
if column in df.columns:
# 欠損値を除外
df_clean = df[column].dropna()
# IQR(四分位範囲)の計算
Q1 = df_clean.quantile(0.25)
Q3 = df_clean.quantile(0.75)
IQR = Q3 - Q1
# 外れ値の閾値を設定
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 外れ値のインデックスを取得
outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
outlier_indices.update(outliers.index)
# 外れ値の行を削除
df_cleaned = df.drop(index=outlier_indices)
# 結果を新しいCSVに保存
output_path = "C:/Users/nt011/Desktop/統計正しいほう/前処理/外れ値削除.csv"
df_cleaned.to_csv(output_path, encoding="utf-8-sig", index=False)
print(f"外れ値を削除したデータを {output_path} に保存しました。")
print(f"削除した行数: {len(outlier_indices)}")
print(f"最終データの行数: {df_cleaned.shape[0]}")
それぞれのカラムの要素を必要に応じてダミー変数化した後、データフレームを保存した「削除_ダミー後.csv」を生成します。
import pandas as pd
# CSVファイルを読み込み
df_data = pd.read_csv("C:/Users/nt011/Desktop/統計正しいほう/前処理/外れ値削除.csv")
# カラム名を取得
columns = df_data.columns.tolist()
# 建物の構造列に「RC」がある場合、「鉄筋コンクリート造」に変換
df_data['建物の構造'] = df_data['建物の構造'].replace('RC', '鉄筋コンクリート造')
# 最寄り駅名を二値化(列名を「最寄り駅名_富山」に変更)
index = columns.index("最寄駅:名称") # 変換前の位置を取得
df_data.insert(index + 1, '最寄り駅名_富山', df_data['最寄駅:名称'].apply(lambda x: 1 if x == '富山' else 0))
df_data = df_data.drop(columns=['最寄駅:名称']) # 元の列を削除
# 都市計画を二値化(列名を「都市計画_市街化調整区域」に変更)
index = columns.index("都市計画")
df_data.insert(index + 1, '都市計画_市街化調整区域', df_data['都市計画'].apply(lambda x: 1 if x == '市街化調整区域' else 0))
df_data = df_data.drop(columns=['都市計画'])
# 土地の形状を二値化(列名を「土地の形状_不整形」に変更)
index = columns.index("土地の形状")
df_data.insert(index + 1, '土地の形状_不整形', df_data['土地の形状'].apply(lambda x: 1 if x == '不整形' else 0))
df_data = df_data.drop(columns=['土地の形状'])
# 用途を二値化(列名を「用途_住宅」に変更)
index = columns.index("用途")
df_data.insert(index + 1, '用途_住宅', df_data['用途'].apply(lambda x: 1 if x == '住宅' else 0))
df_data = df_data.drop(columns=['用途'])
# 建物の構造を二値化(列名を「建物の構造_木造」に変更)
index = columns.index("建物の構造")
df_data.insert(index + 1, '建物の構造_木造', df_data['建物の構造'].apply(lambda x: 1 if x == '木造' else 0))
df_data = df_data.drop(columns=['建物の構造'])
# 「前面道路:方位」のダミー変数化(基準値「北西」を0に設定)
index = columns.index("前面道路:方位") # 変換前の位置を取得
dummies = pd.get_dummies(df_data['前面道路:方位'], prefix='前面道路:方位', dtype=int)
# 基準値は削除
baseline_value = '北西'
if f'前面道路:方位_{baseline_value}' in dummies.columns:
dummies = dummies.drop(columns=[f'前面道路:方位_{baseline_value}'])
# 挿入(前面道路:方位のダミー変数)
ordered_dummies = ['前面道路:方位_西', '前面道路:方位_東', '前面道路:方位_南西', '前面道路:方位_南東',
'前面道路:方位_南', '前面道路:方位_北東', '前面道路:方位_北']
# 順番に従って挿入
for col in ordered_dummies:
if col in dummies.columns:
df_data.insert(index + 1, col, dummies[col])
# 「前面道路:種類」のダミー変数化
road_types = ['市道', '私道', '県道']
# 順番を指定してダミー変数を挿入
for road in road_types:
df_data.insert(index + 1, f'前面道路:種類_{road}', (df_data['前面道路:種類'] == road).astype(int))
# 元の「前面道路:方位」および「前面道路:種類」列を削除
df_data = df_data.drop(columns=['前面道路:方位', '前面道路:種類'])
# 確認用にダミー変数化後のデータフレームを表示
print(df_data.head())
# ダミー変数化後のデータを保存
df_data.to_csv("C:/Users/nt011/Desktop/統計正しいほう/前処理/削除_ダミー後.csv",
index=False, encoding='utf-8-sig')
print("ダミー変数変換完了!データが保存されました。")
連続変数とダミー変数を手動で分類し、連続変数のみ二次項の作成をする。 交互作用項の作成したデータフレームを保存した「削除_変数作成後.csv」を生成します。
import pandas as pd
import numpy as np
from itertools import combinations
# CSVファイルのパスを指定
file_path = "C:/Users/nt011/Desktop/統計正しいほう/前処理/削除_ダミー後.csv"
# UTF-8で読み込む
df = pd.read_csv(file_path)
# データの先頭を表示
print(df.head())
# 連続変数とダミー変数を手動で分類
continuous_vars = [
'最寄駅:距離(分)', '面積(㎡)', '間口', '延床面積(㎡)', '建築年数', '前面道路:幅員(m)',
'建ぺい率(%)', '容積率(%)', '犯罪発生率(件/250000)', '人口', '18歳未満人口割合',
'65歳以上人口割合', '外国人人口割合', '世帯数', '単身世帯割合', '核家族世帯割合',
'正規労働者割合', '非正規労働者割合', '最終学歴が中学以下の人口割合', '最終学歴が高校の人口割合',
'最終学歴が大学以上の人口割合', '居住年数5年未満の人口割合', '居住年数が20年以上の人口割合',
'一戸建て割合', 'アパート・低中層マンション割合', 'コンビニ', '駅', '駐車場', '駐輪場',
'金融機関', '旅館', 'ホテル', 'スーパー', 'ショッピングモール',
'デパート', '警察署/交番', '小学校', '中学校', '高校', '大学', '保育園/幼稚園',
'レジャー施設','道路の面積比率', '建物の面積比率', '水の面積比率', '空き地の面積比率',
'ノード数(道路)', 'エッジ数(道路)', '密度(道路)', '平均次数(道路)', 'min_distance_to_crime'
]
# ダミー変数をリストとして定義
dummy_vars = [
'最寄り駅名_富山','土地の形状_不整形', '建物の構造_木造',
'用途_住宅', '前面道路:方位_北', '前面道路:方位_北東',
'前面道路:方位_南', '前面道路:方位_南東',
'前面道路:方位_南西', '前面道路:方位_東', '前面道路:方位_西',
'前面道路:種類_市道', '前面道路:種類_県道', '前面道路:種類_私道',
'都市計画_市街化調整区域'
]
# 二次項の作成(連続変数のみ)
for var in continuous_vars:
df[f'{var}^2'] = df[var] ** 2
# 交互作用項の作成
# 連続変数 × 連続変数
for var1, var2 in combinations(continuous_vars, 2):
df[f'{var1}×{var2}'] = df[var1] * df[var2]
# ダミー変数 × 連続変数
for dummy_var in dummy_vars:
for cont_var in continuous_vars:
df[f'{dummy_var}×{cont_var}'] = df[dummy_var] * df[cont_var]
# ダミー変数 × ダミー変数
for var1, var2 in combinations(dummy_vars, 2):
df[f'{var1}×{var2}'] = df[var1] * df[var2]
# CSVに保存
output_path = "C:/Users/nt011/Desktop/統計正しいほう/前処理/削除_変数作成後.csv"
df.to_csv(output_path, index=False, encoding='utf-8-sig')
print(f"処理が完了しました。生成されたデータは {output_path} に保存されました。")
Elastic Netは、L1 正則化(Lasso)と L2 正則化(Ridge)の組み合わせにより、多重共線性があるデータに対して安定した回帰分析を行う手法です。
Lasso(L1正則化):L1正則化は、回帰モデルの係数の一部をゼロにすることによって、特徴量選択を行います。これにより、重要な特徴量だけを残すことができますが、多重共線性が高い場合には不安定になることがあります。
Ridge(L2正則化):L2正則化は、回帰モデルの係数を小さくすることで、過学習を防ぎます。L2正則化は係数をゼロにすることはなく、むしろ係数の大きさを均等に抑える傾向がありますが、特徴量選択の機能はありません。
StandardScaler() を用いて X(説明変数)と y(目的変数)を標準化 しているため、変数のスケールによる影響を排除し、モデルの安定性を向上させています。
Adaptive Elastic Netを用いて土地の価格に影響する重要な特徴量の係数を取得し、影響の大きい特徴量を特定します。
重要な変数のみを抽出し、「選ばれた変数.csv」に保存
import numbers
import warnings
import cvxpy
import numpy as np
from asgl import ASGL
from sklearn.base import MultiOutputMixin
from sklearn.base import RegressorMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import ElasticNet
from sklearn.linear_model._coordinate_descent import _alpha_grid
from sklearn.utils import check_X_y
from sklearn.utils.validation import check_is_fitted
class AdaptiveElasticNet(ASGL, ElasticNet, MultiOutputMixin, RegressorMixin):
"""
Objective function and parameters as described with modifications
to allow alpha1, alpha2, l1_ratio1, and l1_ratio2 customization.
"""
def __init__(
self,
alpha1=0.021544346900318846, # First-stage ElasticNet alpha
alpha2=0.0009443498043343188, # Second-stage ElasticNet alpha
*,
l1_ratio1=0.8889, # First-stage ElasticNet L1 ratio
l1_ratio2=0.778, # Second-stage ElasticNet L1 ratio
gamma=0.5, # Weight adjustment exponent
fit_intercept=True,
precompute=False,
max_iter=10000,
copy_X=True,
solver=None,
tol=None,
positive=False,
positive_tol=1e-3,
random_state=None,
eps_coef=1e-6,
verbose=True
):
params_asgl = dict(model="lm", penalization="asgl")
if solver is not None:
params_asgl["solver"] = solver
if tol is not None:
params_asgl["tol"] = tol
super().__init__(**params_asgl)
self.alpha1 = alpha1
self.alpha2 = alpha2
self.l1_ratio1 = l1_ratio1
self.l1_ratio2 = l1_ratio2
self.gamma = gamma
self.fit_intercept = fit_intercept
self.max_iter = max_iter
self.precompute = precompute
self.copy_X = copy_X
self.positive = positive
self.positive_tol = positive_tol
self.random_state = random_state
self.eps_coef = eps_coef
self.verbose = verbose
if not self.fit_intercept:
raise NotImplementedError
def fit(self, X, y, check_input=True):
if check_input:
X_copied = self.copy_X and self.fit_intercept
X, y = self._validate_data(
X,
y,
accept_sparse="csc",
order="F",
dtype=[np.float64, np.float32],
copy=X_copied,
multi_output=True,
y_numeric=True,
)
# 第一段階の ElasticNet 実行
enet_model_1 = self.elastic_net(self.l1_ratio1, alpha=self.alpha1)
enet_model_1.fit(X, y)
enet_coef = enet_model_1.coef_
# 重みの計算
weights = 1.0 / (np.maximum(np.abs(enet_coef), self.eps_coef) ** self.gamma)
# 第二段階の最適化
self.coef_, self.intercept_ = self._optimize_second_stage(X, y, weights)
# モデル属性を格納
self.enet_coef_ = enet_coef
self.weights_ = weights
return self
def predict(self, X):
check_is_fitted(self, ["coef_", "intercept_"])
return super(ElasticNet, self).predict(X)
def elastic_net(self, l1_ratio, **params):
"""
Create an ElasticNet model with the specified parameters.
"""
elastic_net = ElasticNet(l1_ratio=l1_ratio)
for k, v in self.get_params().items():
try:
elastic_net = elastic_net.set_params(**{k: v})
except ValueError:
pass # Ignore parameters not supported by ElasticNet
elastic_net = elastic_net.set_params(**params)
return elastic_net
def _optimize_second_stage(self, X, y, weights):
"""
Perform second-stage optimization with adaptive weights.
Returns
-------
coef : np.array, shape (n_features,)
intercept : float
"""
n_samples, n_features = X.shape
beta_variables = [cvxpy.Variable(n_features)]
model_prediction = 0.0
if self.fit_intercept:
beta_variables = [cvxpy.Variable(1)] + beta_variables
ones = cvxpy.Constant(np.ones((n_samples, 1)))
model_prediction += ones @ beta_variables[0]
# モデル予測
model_prediction += X @ beta_variables[1]
error = cvxpy.sum_squares(y - model_prediction) / (2 * n_samples)
l1_coefs = self.alpha2 * self.l1_ratio2
# 第二段階の正則化項
l1_penalty = cvxpy.Constant(l1_coefs * weights) @ cvxpy.abs(
beta_variables[1]
)
l2_penalty = (
cvxpy.Constant(self.alpha1 * (1 - self.l1_ratio1))
* cvxpy.sum_squares(beta_variables[1])
)
constraints = [b >= 0 for b in beta_variables] if self.positive else []
# 最適化問題の定義
problem = cvxpy.Problem(
cvxpy.Minimize(error + l1_penalty + l2_penalty),
constraints=constraints
)
problem.solve(solver="OSQP", max_iter=self.max_iter)
if problem.status != "optimal":
raise ConvergenceWarning(
f"Solver did not reach optimum (Status: {problem.status})"
)
beta_sol = np.concatenate([b.value for b in beta_variables], axis=0)
beta_sol[np.abs(beta_sol) < self.tol] = 0
intercept, coef = beta_sol[0], beta_sol[1:]
coef = np.maximum(coef, 0) if self.positive else coef
return coef, intercept
from sklearn.linear_model import ElasticNetCV
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import make_regression
import numpy as np
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.preprocessing import StandardScaler
# CSVファイルのパスを指定してください
file_path = "C:/Users/nt011/Desktop/統計正しいほう/変数選択/削除_変数作成後.csv"
# CSVファイルの読み込み
df = pd.read_csv(file_path) # 文字コードが異なる場合は、'utf-8' を他のエンコーディングに変更してください
# 特徴量とターゲットの分割
X = df.drop(columns=['取引価格(㎡単価)']) # 取引価格(㎡単価)をyとして分離
y = df['取引価格(㎡単価)'] # ターゲット変数
# 🔥 標準化の実施
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = scaler_X.fit_transform(X) # Xの標準化
y_scaled = scaler_y.fit_transform(y.values.reshape(-1, 1)).ravel() # yの標準化
# データ分割
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled,
test_size=0.2, random_state=42)
# 訓練データの一部を使用
X_sample, _, y_sample, _ = train_test_split(X_train, y_train, test_size=0.8,
random_state=42)
# 第一段階: ElasticNetCVを使用した最適パラメータの導出
enet_cv = ElasticNetCV(
l1_ratio=np.linspace(0.0001, 1, 25), # l1_ratioの候補
alphas=np.logspace(-5, 0, 25), # alphaの候補
cv=5, # 交差検証の分割数
random_state=42,
n_jobs=-1,
verbose=1
)
enet_cv.fit(X_train, y_train)
# 第一段階の最適パラメータと係数を取得
alpha1_opt = enet_cv.alpha_
l1_ratio1_opt = enet_cv.l1_ratio_
enet_coef = enet_cv.coef_
print(f"第一段階の最適パラメータ: alpha1={alpha1_opt}, l1_ratio1=
{l1_ratio1_opt}")
# ベストパラメータをalpha2とl1_ratio2でそれぞれ格納 best_alpha2 = best_params['alpha2'] best_l1_ratio2 = best_params['l1_ratio2'] print(best_alpha2) print(best_l1_ratio2)
import numbers
import warnings
import cvxpy
import numpy as np
from asgl import ASGL
from sklearn.base import MultiOutputMixin
from sklearn.base import RegressorMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import ElasticNet
from sklearn.linear_model._coordinate_descent import _alpha_grid
from sklearn.utils import check_X_y
from sklearn.utils.validation import check_is_fitted
class AdaptiveElasticNet(ASGL, ElasticNet, MultiOutputMixin, RegressorMixin):
"""
Objective function and parameters as described with modifications
to allow alpha1, alpha2, l1_ratio1, and l1_ratio2 customization.
"""
def __init__(
self,
alpha1=0.01333521432163324, # 第一段階で得られたalpha1を指定
alpha2=0.034807005884284134, # Second-stage ElasticNet alpha
*,
l1_ratio1=1, # First-stage ElasticNet L1 ratio
l1_ratio2=0.1667499999999998, # Second-stage ElasticNet L1 ratio
gamma=0.5, # Weight adjustment exponent
fit_intercept=True,
precompute=False,
max_iter=10000,
copy_X=True,
solver=None,
tol=None,
positive=False,
positive_tol=1e-3,
random_state=None,
eps_coef=1e-6,
verbose=True
):
params_asgl = dict(model="lm", penalization="asgl")
if solver is not None:
params_asgl["solver"] = solver
if tol is not None:
params_asgl["tol"] = tol
super().__init__(**params_asgl)
self.alpha1 = alpha1
self.alpha2 = alpha2
self.l1_ratio1 = l1_ratio1
self.l1_ratio2 = l1_ratio2
self.gamma = gamma
self.fit_intercept = fit_intercept
self.max_iter = max_iter
self.precompute = precompute
self.copy_X = copy_X
self.positive = positive
self.positive_tol = positive_tol
self.random_state = random_state
self.eps_coef = eps_coef
self.verbose = verbose
if not self.fit_intercept:
raise NotImplementedError
def fit(self, X, y, check_input=True):
if check_input:
X_copied = self.copy_X and self.fit_intercept
X, y = self._validate_data(
X,
y,
accept_sparse="csc",
order="F",
dtype=[np.float64, np.float32],
copy=X_copied,
multi_output=True,
y_numeric=True,
)
# 第一段階の ElasticNet 実行
enet_model_1 = self.elastic_net(self.l1_ratio1, alpha=self.alpha1)
enet_model_1.fit(X, y)
enet_coef = enet_model_1.coef_
# 重みの計算
weights = 1.0 / (np.maximum(np.abs(enet_coef), self.eps_coef) ** self.gamma)
# 第二段階の最適化
self.coef_, self.intercept_ = self._optimize_second_stage(X, y, weights)
# モデル属性を格納
self.enet_coef_ = enet_coef
self.weights_ = weights
return self
def predict(self, X):
check_is_fitted(self, ["coef_", "intercept_"])
return super(ElasticNet, self).predict(X)
def elastic_net(self, l1_ratio, **params):
"""
Create an ElasticNet model with the specified parameters.
"""
elastic_net = ElasticNet(l1_ratio=l1_ratio)
for k, v in self.get_params().items():
try:
elastic_net = elastic_net.set_params(**{k: v})
except ValueError:
pass # Ignore parameters not supported by ElasticNet
elastic_net = elastic_net.set_params(**params)
return elastic_net
def _optimize_second_stage(self, X, y, weights):
"""
Perform second-stage optimization with adaptive weights.
Returns
-------
coef : np.array, shape (n_features,)
intercept : float
"""
n_samples, n_features = X.shape
beta_variables = [cvxpy.Variable(n_features)]
model_prediction = 0.0
if self.fit_intercept:
beta_variables = [cvxpy.Variable(1)] + beta_variables
ones = cvxpy.Constant(np.ones((n_samples, 1)))
model_prediction += ones @ beta_variables[0]
# モデル予測
model_prediction += X @ beta_variables[1]
error = cvxpy.sum_squares(y - model_prediction) / (2 * n_samples)
l1_coefs = self.alpha2 * self.l1_ratio2
# 第二段階の正則化項
l1_penalty = cvxpy.Constant(l1_coefs * weights) @ cvxpy.abs(
beta_variables[1]
)
l2_penalty = (
cvxpy.Constant(self.alpha1 * (1 - self.l1_ratio1))
* cvxpy.sum_squares(beta_variables[1])
)
constraints = [b >= 0 for b in beta_variables] if self.positive else []
# 最適化問題の定義
problem = cvxpy.Problem(
cvxpy.Minimize(error + l1_penalty + l2_penalty),
constraints=constraints
)
problem.solve(solver="OSQP", max_iter=self.max_iter)
if problem.status != "optimal":
raise ConvergenceWarning(
f"Solver did not reach optimum (Status: {problem.status})"
)
beta_sol = np.concatenate([b.value for b in beta_variables], axis=0)
beta_sol[np.abs(beta_sol) < self.tol] = 0
intercept, coef = beta_sol[0], beta_sol[1:]
coef = np.maximum(coef, 0) if self.positive else coef
return coef, intercept
import pandas as pd
from sklearn.preprocessing import StandardScaler
# CSVファイルのパスを指定してください
file_path = "C:/Users/monda/OneDrive/デスクトップ/統計正しいほう/変数選択/削除_変数作成後.csv"
# CSVファイルの読み込み
df = pd.read_csv(file_path) # 文字コードが異なる場合は、'utf-8' を他のエンコー ディングに変更してください
# 特徴量とターゲットの分割
X = df.drop(columns=['取引価格(㎡単価)']) # 取引価格(㎡単価)をyとして分離
y = df['取引価格(㎡単価)'] # ターゲット変数
# 🔥 標準化の実施
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = scaler_X.fit_transform(X) # Xの標準化
y_scaled = scaler_y.fit_transform(y.values.reshape(-1, 1)).ravel() # yの標準化
model = AdaptiveElasticNet()
model.fit(X_scaled, y_scaled)
# 係数の表示(重要な特徴量を確認)
coef = model.coef_
print("特徴量の係数:", coef)
# 係数がゼロでない特徴量を選定
selected_features = X.columns[coef != 0]
print("選定された特徴量:", selected_features)
# 係数と特徴量名をデータフレームに変換
selected_features_df = pd.DataFrame({
'Feature': X.columns,
'Coefficient': coef
})
# 係数がゼロでない特徴量のみをフィルタリング
selected_features_df = selected_features_df[selected_features_df['Coefficient'] != 0]
# CSVファイルとして保存
output_file_path = "選ばれた変数.csv" # 保存先を指定してください
selected_features_df.to_csv(output_file_path, index=False, encoding='utf-8-sig')
# 日本語対応のエンコーディング
print(f"選定された特徴量と係数を {output_file_path} に保存しました。")
#ref(変数選択.png,,50%)
ヘドニックアプローチとは、財やサービスの価格を、それらが持つ特性(属性)の価値に基づいて分析する手法です。本手法では、商品の市場価格を構成要素ごとに分解し、各属性が価格に与える影響を定量的に評価します。特に、不動産市場や環境経済学において、住宅価格や土地価格の決定要因を解析するために広く活用されています。ヘドニック法においては、次のような関数で推定が行われます。
「hedo.ipynb」を実行すると、有意水準に基づいて有意性を判定する関数
import pandas as pd
# Excelファイルを読み込み
file_path = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/位置情報_削除.csv"
df = pd.read_csv(file_path)
# 住所が含まれている列を特定して、住所ごとの出現回数を確認
address_column = '住所' # 住所列の名前がわからない場合はここを変更してください
if address_column in df.columns:
address_counts = df[address_column].dropna().value_counts() # 住所ごとの出現回数を集計
# 出現回数をデータフレームに変換
address_counts_df = address_counts.reset_index()
address_counts_df.columns = ['住所', '出現回数'] # 列名を設定
# 結果をCSVファイルに書き出し
output_file = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/住所出現回数_削除.csv"
address_counts_df.to_csv(output_file, index=False, encoding='utf-8-sig')
# UTF-8エンコーディングで保存
print(f"住所ごとの出現回数がCSVファイルに保存されました: {output_file}")
else:
print("住所列が見つかりません")
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
# ファイルパスの指定
selected_features_path = "選ばれた変数_削除.csv"
all_data_path = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/削除_変数作成後.csv"
location_data_path = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/位置情報_削除.csv"
output_data_path = "選択された特徴量とデータ_削除.csv"
# 選定された変数のCSVファイルを読み込み
selected_features_df = pd.read_csv(selected_features_path, encoding='utf-8-sig')
selected_features = selected_features_df['Feature'].tolist()
# 元データを再度読み込み
all_data_df = pd.read_csv(all_data_path)
# 選定された説明変数と目的変数を抽出
df = all_data_df[selected_features + ['取引価格(㎡単価)']]
# 位置情報データを読み込み
location_df = pd.read_csv(location_data_path, usecols=['住所'])
# 元データと位置情報を結合(共通のキーがある場合)
df = pd.concat([df, location_df], axis=1)
# 統合データをCSVファイルとして保存
df.to_csv(output_data_path, index=False, encoding='utf-8-sig')
print(f"選定された変数と位置情報を含むデータを {output_data_path} に保存しました。")
import pandas as pd
# ファイルパス
count_file = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/住所出現回数_削除.csv"
data_file = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/選択された特徴量とデータ_削除.csv"
output_file = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/ソート済みデータ_削除.csv"
# CSVファイルの読み込み
count_df = pd.read_csv(count_file, encoding='utf-8-sig')
data_df = pd.read_csv(data_file, encoding='utf-8-sig')
# 住所列の名前
address_column = '住所'
# 緯度・経度を除外(該当カラムがある場合)
drop_columns = ['緯度', '経度']
data_df = data_df.drop(columns=[col for col in drop_columns if col in
data_df.columns])
# 住所の出現回数をデータに統合
merged_df = data_df.merge(count_df, on=address_column, how='left')
# 出現回数の多い順にソート
sorted_df = merged_df.sort_values(by='出現回数', ascending=False).drop(columns=['出現回数'])
# ソート済みデータをCSVに保存
sorted_df.to_csv(output_file, encoding='utf-8-sig', index=False)
print(f"ソート済みデータを {output_file} に保存しました。")
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import statsmodels.api as sm
# データの読み込み
file_path = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/ソート済みデータ_削除.csv"
data = pd.read_csv(file_path)
# 目的変数と説明変数の設定
y = data['取引価格(㎡単価)'] # 取引価格を目的変数に
X = data.drop(columns=['取引価格(㎡単価)', '住所']) # 取引価格と住所以外を説明変数に
# 🔥 標準化の実施
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = scaler_X.fit_transform(X) # Xの標準化
y_scaled = scaler_y.fit_transform(y.values.reshape(-1, 1)).ravel() # yの標準化
# 線形回帰モデルの作成
model = LinearRegression()
model.fit(X_scaled, y)
# 回帰分析の結果を表示
X_scaled_with_const = sm.add_constant(X_scaled) # 定数項を追加
ols_model = sm.OLS(y, X_scaled_with_const).fit()
# 必要な情報を抽出
r_squared = ols_model.rsquared # 決定係数
coefficients = ols_model.params # 回帰係数(定数項も含む)
t_values = ols_model.tvalues # t値(定数項も含む)
p_values = ols_model.pvalues # p値(有意水準を判定するため)
variable_names = ['定数項'] + list(X.columns)
# 有意水準に基づいて有意性を判定する関数
def significance(p_value):
if p_value < 0.01:
return "1% 有意"
elif p_value < 0.05:
return "5% 有意"
elif p_value < 0.10:
return "10% 有意"
else:
return "有意でない"
# 結果を格納するリスト
results = []
# 回帰係数、t値、p値、有意性をリストに格納
for var_name, coef, t_val, p_val in zip(variable_names, coefficients, t_values, p_values):
significance_level = significance(p_val)
results.append([var_name, coef, t_val, p_val, significance_level])
# 結果をDataFrameに変換
results_df = pd.DataFrame(results, columns=["変数", "回帰係数", "t値", "p値", "有意性"])
# 予測値の計算
data["標準化予測取引価格(㎡単価)"] = ols_model.predict(X_scaled_with_const)
data["標準化実測取引価格(㎡単価)"] = y_scaled
# **結果を表示**
print("\n==== 回帰分析結果 ====")
print(f"決定係数 (R^2): {r_squared:.4f}\n")
print(results_df)
# 予測結果を含めたデータを保存
predicted_output_file_path = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/予測結果_削除.csv"
data.to_csv(predicted_output_file_path, index=False, encoding="utf-8-sig")
# CSVファイルとして保存
output_file_path = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/回帰分析結果_削除.csv"
results_df.to_csv(output_file_path, index=False, encoding="utf-8-sig")
print(f"回帰分析結果が{output_file_path}に保存されました。")