本研究の目的は,ヘドニック・アプローチにおける限界を克服し,価格形成におけるより 正確で信頼性の高い評価手法を提供することである.ヘドニックモデルでは,多重共線性, 欠落変数のバイアス,交互作用効果および非線形効果の取り扱いに問題がある[2].これら は不動産市場や消費財の価格評価において誤差を生じさせる要因となる.本研究は,これ らの問題を解決するために,スパース推定法を活用した新たなアプローチを提案する. 従来のヘドニックモデルは,そのシンプルな線形構造と主要変数の関係を明確に示す点 で強力であったが,現実の市場では非線形な相互作用や高次の効果が絡むため,これらを 十分に捉えることができないことが多かった.また,異なる要因が相互作用を持つ場合,そ の効果を正確に評価するためには,より複雑なモデルが求められる.そこで本研究では, ElasticNetおよびその進化形であるAdaptiveElasticNet(AEN)を用い,従来の限界を克服 することを目指す. ElasticNetはL1およびL2正則化を組み合わせることで,多重共線性の問題に対応する と同時に,重要な変数の選択を行う能力を持つ.これにより,データ中で重要な特徴を捉 え,効果的な変数選択を実現できる.また,AENは,従来のElasticNetをさらに発展させ, データの特性に応じて正則化パラメータを動的に調整する.この適応的なアプローチは,複 雑なデータ構造に対して非常に効果的であり,非線形性や交互作用を持つデータにも適用 できる.これにより,不動産市場や消費財の価格形成における微細な変動や相互作用を捉 え,価格予測の精度を向上させることが可能となる. さらに,本研究では,従来の線形回帰モデルでは捉えきれなかった非線形性や交互作用 効果をスパース推定法を活用することで反映させることができる.非線形回帰や機械学習 技術と組み合わせることにより,より精緻な価格形成モデルが構築され,従来の方法では 見過ごされがちな要因を捉えることが可能となる.例えば,不動産市場における立地や面 積の相互作用,消費財におけるブランド力と機能性の複合的影響など,これらの複雑な関 係性を反映させることで,より現実的な市場分析が行えるようになる. また,こうした高度な統計手法や機械学習技術の活用により,従来のヘドニック・アプ ローチを大きく強化することができる.特に,従来のアプローチでは捉えきれなかった価 格形成における微細な要因を,スパース推定を用いることで明示化し,より高精度な価格 評価を可能にする.これにより,不動産市場や消費財市場における価格形成メカニズムの 理解が深まり,実務的な応用に貢献することが期待される.
本研究で使用するファイルはすべてGoogleDriveにある。
user:iie.lab.tpu.2324@gmail.com
「マイドライブ」→「学生」→「24_o3中島」→「保存期間5年」→「3)卒論(プログラム)」の中にある。
「引継ぎ用.zip」をダウンロードする。
ファイルの中身は以下のようになっている。
これを上から順に実行していく過程を以下の章に示す。
| フォルダ名 | 用途 | ファイル名 | ファイルの場所 |
| データセットの作成 | 犯罪データの取得 | get_crime_data.py | データセットの作成\get_crime_data.py |
| データセットの作成 | 統計データなどの作成 | hop_統計.ipynb | データセットの作成\hop_統計.ipynb |
| データセットの作成 | 取引データのメッシュ変換 | 緯度経度.py | データセットの作成\緯度経度.py |
| データセットの作成 | 各データを取引データにまとめる | hop_メッシュ.ipynb | データセットの作成\hop_メッシュ.ipynb |
| データセットの作成 | 犯罪場所との最短距離 | distance.ipynb | データセットの作成\distance.ipynb |
| 前処理 | データセットの前処理 | 前処理.ipynb | 前処理\前処理.ipynb |
| 変数選択 | 変数の選択とハイパーパラメータのチューニング | 変数選択__削除.ipynb | 変数選択\変数選択_削除.ipynb |
| 要因分析 | 選択された変数を用いたヘドニック・アプローチによる要因分析 | hedo_削除.ipynb | 要因分析\hedo_削除.ipynb |
コードの変更点はディレクトリを変更するだけで大丈夫.
pythonのバージョンは12.0.0を用いる.
使用したモジュールのバージョンは下になります。(すべては使用しておりません)
#ref(): File not found: "モジュール1.png" at page "中島さん卒論_backup"
#ref(): File not found: "モジュール2.png" at page "中島さん卒論_backup"
#ref(): File not found: "モジュール3.png" at page "中島さん卒論_backup"
#ref(): File not found: "モジュール4.png" at page "中島さん卒論_backup"
「get_crime_data.py」を実行すると、富山県警が公開している「犯罪発生マップ」(http://www.machi-info.jp/machikado/police_pref_toyama/index.jsp)から犯罪発生データを取得し、「crimes.csv」として保存されます。
建築年の年号を西暦にするために以下の式を書いた列を挿入する。
=IF(LEFT(M2,2)="令和", 2018 + MID(M2,3,LEN(M2)-3), IF(LEFT(M2,2)="平成", 1988 + MID(M2,3,LEN(M2)-3), IF(LEFT(M2,2)="昭和", 1925 + MID(M2,3,LEN(M2)-3), "エラー")))
次に取引時点から西暦を抜き出す。
=LEFT(W2,4)
そして、取引時点の西暦から建築年の西暦を引くことで建築年数を算出する。
「hop_統計.ipynb」を実行すると、取引データのメッシュごとの統計データの作成が行われる. NAVITIMEからのスクレイピングや地図画像からのデータ作成を行い、メッシュごとのデータを最終的にdata_by_mesh.csvに格納する.
df_torihiki = pd.read_csv( HERE / "df_housing_toyamacityメッシュ入り.csv", index_col = "住所", encoding = 'cp932')
meshes = df_torihiki["mesh_code"].astype(str).tolist()
作成する統計データは以下のようになる。
df_estats_block = pd.read_csv( DATA_DIR / "estats_block.csv", index_col = "KEY_CODE")
df_estats_mesh = pd.read_csv( DATA_DIR / "estats_mesh.csv", index_col = "KEY_CODE")
df_selected_block = pd.DataFrame() df_selected_mesh = pd.DataFrame()
for line in stats_data_text.splitlines():
formula, label = line.rsplit(", ", 2)
if len( formula.split(" ") ) == 1:
if formula in df_estats_block.columns:
df_selected_block.loc[:, label] = df_estats_block.loc[:, formula]
if formula in df_estats_mesh.columns:
df_selected_mesh.loc[:, label] = df_estats_mesh.loc[:, formula]
elif len( formula.split(" ") ) >= 2:
contains_keys = re.findall("T.........", formula)
if contains_keys[0] in df_estats_block.columns:
for key in contains_keys:
formula = formula[:formula.find(key)] + "df_estats_block.loc[:, '" + \
formula[formula.find(key) : formula.find(key) + len(key)] + \
"']" + formula[formula.find(key) + len(key):]
exec(f"df_selected_block.loc[:, '{label}'] = {formula}")
elif contains_keys[0] in df_estats_mesh.columns:
for key in contains_keys:
formula = formula[:formula.find(key)] + "df_estats_mesh.loc[:, '" + \
formula[formula.find(key) : formula.find(key) + len(key)] + \
"']" + formula[formula.find(key) + len(key):]
exec(f"df_selected_mesh.loc[:, '{label}'] = {formula}")
df_selected_mesh = df_selected_mesh.fillna(0)
df_selected_block = df_selected_block.fillna(0)
df_selected_mesh df_selected_block
# ここで,NAVITIMEからスクレイピング
df_buildings = pd.DataFrame()
def address2coords(address: str):
for trials in range(5):
res = requests.get(
urllib.parse.quote(
string = f"https://msearch.gsi.go.jp/address-search/AddressSearch?q={address}",
safe = "://?="
)
)
if res.status_code == 200:
break
else:
time.sleep(5 * trials)
else:
raise Exception("国土地理院のジオコーディングAPIに接続できません。")
result = res.json()
time.sleep(0.5)
# サーバに負担をかけないように0.5秒待つ
if len(result) == 0:
return None
else:
return result[0]["geometry"]["coordinates"]
# 複数ある場合でも,0番目を採用
def get_building_info(category): df_buildings = pd.DataFrame(columns = ["name", "address", "lng", "lat"])
for i in range(1, sys.maxsize):
# "while True"と同義
if i == 1:
url = f"https://www.navitime.co.jp/category/{category}/16201/"
else:
url = f"https://www.navitime.co.jp/category/{category}/16201/?page={i}"
for trials in range(5):
res = requests.get(url)
if res.status_code == 200:
break
else:
time.sleep(5)
else:
break
# NAVITIME から 5 回連続で取得できなかったら、終了
soup = BeautifulSoup(res.text, "html.parser")
buildings = []
for element in soup.find_all(class_ = "spot-name-text"):
content = element.get_text()
buildings += [{"name": content}]
count = 0
for element in soup.find_all(class_ = "spot-detail-value-text"):
content = element.get_text()
matches = re.findall(r"(北海道|東京都|大阪府|京都府|神奈川県|和歌山県|鹿児島県|..県)(.{1,6}[市郡区町村])", content)
# 完璧ではないが、これで住所かを判定
if len(matches) > 0:
address = content
coords = address2coords(address)
print(address, coords)
buildings[count]["address"] = address
if coords == None:
buildings[count]["lng"] = None
buildings[count]["lat"] = None
else:
buildings[count]["lng"] = coords[0]
buildings[count]["lat"] = coords[1]
count += 1
for building in buildings:
df_buildings = add_dict_to_df(building, df_buildings)
return df_buildings
for line in building_data_text.splitlines():
code, label = line.rsplit(", ", 2)
if not (DATA_DIR / f"{code}.csv").exists():
df_this_buildings = get_building_info(code)
df_this_buildings.to_csv(DATA_DIR / f"{code}.csv", index = False)
else:
df_this_buildings = pd.read_csv(DATA_DIR / f"{code}.csv")
df_buildings[label] = None
for index, row in df_this_buildings.iterrows():
if not index in df_buildings.index:
df_buildings.loc[index] = pd.Series(dtype='float64')
df_buildings.at[index, label] = f"{row['lng']},{row['lat']}"
df_buildings
# のちに使う関数たち
def get_distance_between(point1, point2): point1[0] = point1[0] * math.pi / 180 point1[1] = point1[1] * math.pi / 180 point2[0] = point2[0] * math.pi / 180 point2[1] = point2[1] * math.pi / 180 dx = point2[0] - point1[0] dy = point2[1] - point1[1] p = (point2[1] + point1[1]) / 2 rx = 6378137.0 ry = 6356752.314245 e = math.sqrt( (rx**2 - ry**2) / rx**2 ) w = math.sqrt( 1 - (e**2 * math.sin(p)**2 ) ) n = rx / w m = rx * (1 - e**2) / w**3 return math.sqrt( (dy * m)**2 + (dx * n * math.cos(p))**2 )
def get_min_distance(lng, lat, targets):
min_distance = float("infinity")
for coords in targets:
if not pd.isna(coords):
_lng, _lat = coords.split(",")
distance = get_distance_between([lng, lat], [float(_lng),
float(_lat)])
distance = round(distance, 1)
if distance < min_distance:
min_distance = distance
return min_distance
def get_contains(north, south, east, west, targets):
count = 0
for coords in targets:
if not pd.isna(coords):
lng, lat = coords.split(",")
if (west < float(lng) <= east) and (south < float(lat) <= north):
count += 1
return count
def get_features_from_map(lng, lat, zoom_level, use_local = False):
data_to_return = {}
def write_access_log():
access_log_file = HERE / DATA_DIR / "mapimgs" / "access_log.txt"
if not access_log_file.exists():
with access_log_file.open(mode = "w") as f:
f.write(",".join(["datetime", "xtile", "ytile", "zoom_level"]))
f.write("\n")
with access_log_file.open(mode = "a") as f:
f.write(",".join([dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
str(xtile), str(ytile), str(zoom_level)]))
f.write("\n")
def deg2num(lat_deg, lon_deg, zoom):
lat_rad = math.radians(lat_deg)
n = 2.0 ** zoom
xtile = int((lon_deg + 180.0) / 360.0 * n)
ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return (xtile, ytile)
xtile, ytile = deg2num(lat, lng, zoom_level)
file_to_road_or_save = HERE / "data" / "mapimgs" / f"
{zoom_level}_{xtile}_{ytile}.png"
if use_local:
if file_to_road_or_save.exists():
tmp = file_to_road_or_save.read_bytes()
tmp = np.frombuffer(tmp, dtype = np.uint8)
mapimg = cv2.imdecode(tmp, flags = cv2.IMREAD_COLOR)
else:
use_local = False
if not use_local:
USERNAME = "p0ne2x48"
ACCESS_TOKEN = "pk.eyJ1IjoicDBuZTJ4NDgiLCJhIjoiY2w5cTN1eHp3MDYzbjNvazkxbDFjYm9rOSJ9.yeycMGAU4VoObRgPtZBTiQ"
# STYLE_ID = "cl9q5ujk2000915mapgbgghlp"
STYLE_ID = "clddd9b43000w01pk9i95un88"
API_URL = f"https://api.mapbox.com/styles/v1/{USERNAME}/{STYLE_ID}/tiles/{zoom_level}/{xtile}/{ytile}?access_token={ACCESS_TOKEN}"
session = requests.Session()
retries = Retry(total = 5, backoff_factor = 1, status_forcelist = [500, 502, 503, 504])
session.mount("https://", HTTPAdapter(max_retries = retries))
res = session.get(API_URL)
write_access_log()
tmp = res.content
file_to_road_or_save.write_bytes(tmp)
tmp = np.frombuffer(tmp, dtype = np.uint8)
mapimg = cv2.imdecode(tmp, flags = cv2.IMREAD_COLOR)
data_to_return.update( get_features_density(mapimg) ) data_to_return.update( get_road_features(mapimg) )
return data_to_return
def get_features_density(mapimg):
COLOR_SET = {
"道路の面積比率": [255, 255, 255],
"建物の面積比率": [0, 0, 255],
"水の面積比率": [255, 0, 0],
"空き地の面積比率": [0, 255, 0]
# "commercial_area": [255, 0, 255],
# "education": [0, 0, 255],
# "industrial_area": [0, 255, 255],
# "park": [255, 0, 0]
}
pixel_counts = {}
for key in COLOR_SET.keys():
pixel_counts[key] = 0
for i in range(mapimg.shape[0]):
for j in range(mapimg.shape[1]):
bl, gr, re = mapimg[i, j, :]
#if [bl, gr, re] > [10, 10, 10]:
def get_nearest_obj(blue, green, red):
bl = blue
gr = green
re = red
dis_min = {"obj": None, "value": float("infinity")}
for obj, bgr in COLOR_SET.items():
s_bl = bgr[0]
s_gr = bgr[1]
s_re = bgr[2]
distance = math.sqrt( (s_bl - bl)**2 + (s_gr - gr)**2 + (s_re - re)**2 )
if distance < dis_min["value"]:
dis_min["obj"] = obj
dis_min["value"] = distance
return dis_min["obj"]
obj_of_this_pixel = get_nearest_obj(bl, gr, re)
pixel_counts[obj_of_this_pixel] += 1
densities = {}
for key, value in pixel_counts.items():
densities[key] = round(value / (mapimg.shape[0] * mapimg.shape[0]), 3)
# print(densities)
return densities
def get_road_features(mapimg):
for i in range(mapimg.shape[0]):
for j in range(mapimg.shape[1]):
bl, gr, re = mapimg[i, j, :]
distance = math.sqrt( (255 - bl)**2 + (255 - gr)**2 + (255 - re)**2 )
if distance <= 100:
mapimg[i, j, :] = [255, 255, 255]
else:
mapimg[i, j, :] = [0, 0, 0]
mapimg = cv2.dilate(mapimg, np.ones((5,5),np.uint8)) mapimg = cv2.erode(mapimg, np.ones((5,5),np.uint8))
g = extract_network.get_graph(mapimg, [255, 255, 255]) nodes = len(g.nodes()) edges = len(g.edges())
data_to_return = {
"ノード数(道路)": nodes,
"エッジ数(道路)": edges,
"密度(道路)": edges / (nodes * (nodes - 1) / 2) if (nodes * (nodes - 1) / 2) != 0 else 0,
"平均次数(道路)": (edges * 2) / nodes if nodes != 0 else 0
} # print(data_to_return)
return data_to_return
def get_mesh_data(meshcode):
meshcode = int(meshcode)
if meshcode in df_selected_mesh.index:
return df_selected_mesh.loc[meshcode].to_dict()
else:
temp = {}
for i in df_selected_mesh.columns.to_list():
temp.update({
i: 0
})
return temp
def get_block_data(blockcode):
blockcode = int(blockcode)
if blockcode in df_selected_block.index:
return df_selected_block.loc[blockcode].to_dict()
else:
temp = {}
for i in df_selected_block.columns.to_list():
temp.update({
i: 0
})
return temp
def mesh_code_to_latlon(mesh_code: str, apply_correction: bool = True): """ 日本の地域メッシュコードを緯度・経度に変換する。
標準地域メッシュ(4桁)から5次メッシュ(10桁)までをサポート。 無効なメッシュコードの場合は ValueError を発生させる。
Parameters
----------
mesh_code : str
変換する地域メッシュコード。ハイフン("-")が含まれていても自動で除去される。
apply_correction : bool, optional
メッシュの中心点を取得するかどうか(デフォルトは True)。
False にすると、メッシュの南西端(左下)の座標を返す。
Returns
-------
tuple[float, float]
(緯度, 経度) のタプルを返す。
Raises
------
ValueError
メッシュコードが無効(4桁未満、5桁・7桁、または10桁超過)である場合。
"""
# ハイフンを削除
mesh_code = mesh_code.replace("-", "")
# 無効なメッシュコードのチェック
if not mesh_code.isdigit() or len(mesh_code) not in {4, 6, 8, 9, 10}:
raise ValueError("Invalid mesh code.")
# メッシュコードの種類を特定
mesh_level = {4: 0, 6: 1, 8: 2, 9: 3, 10: 4}.get(len(mesh_code))
# 各メッシュレベルの増分(緯度・経度の単位距離)
increments = [
(40 / 60, 1), # 第1次メッシュ
(5 / 60, 7.5 / 60), # 第2次メッシュ
(30 / 3600, 45 / 3600), # 第3次メッシュ
(15 / 3600, 22.5 / 3600), # 第4次メッシュ(2分の1地域メッシュ, 1/2メッシュ)
(7.5 / 3600, 11.25 / 3600) # 第5次メッシュ(4分の1地域メッシュ, 1/4メッシュ)
]
# メッシュレベルごとに座標を加算
for i, (lat_inc, lon_inc) in enumerate(increments):
# メッシュコード内の該当位置を取得
if i == 0: # 第1次メッシュ(2桁)
# 基本の緯度・経度
latitude = int(mesh_code[:2]) / 1.5
longitude = int(mesh_code[2:4]) + 100
else:
if i < 3: # 第2・3次メッシュ(2桁)
index = 4 + (i - 1) * 2
else: # 第4・5次メッシュ(1桁)
index = 8 + (i - 3)
# メッシュレベルごとの加算処理
if i < 3:
# 2桁番号に応じた緯度経度の加算
latitude += int(mesh_code[index]) * lat_inc # 緯度方向の番号
longitude += int(mesh_code[index + 1]) * lon_inc # 経度方向の番号
else:
# 1桁番号で縦横2分割した緯度経度の加算
code_value = int(mesh_code[index]) # 1桁の番号
latitude += ((code_value - 1) // 2) * lat_inc # 緯度方向の移動
longitude += ((code_value - 1) % 2) * lon_inc # 経度方向の移動
# 最終レベルの場合、補正値を適用して返す
if i == mesh_level:
if apply_correction:
return latitude + lat_inc / 2, longitude + lon_inc / 2
else:
return latitude, longitude
def mesh_code_to_latlon_borders(mesh_code: str, apply_correction: bool = True): """ 日本の地域メッシュコードを用いて、メッシュの東西南北の座標を求める。
Parameters
----------
mesh_code : str
変換する地域メッシュコード。
apply_correction : bool, optional
メッシュの中心点を取得するかどうか(デフォルトは `True`)。
Returns
-------
dict
メッシュコードに対応する南北東西、中心点(center)の座標情報を辞書形式で返す。
"""
# メッシュコードの長さに基づくレベルの設定
mesh_lengths = {4: 0, 6: 1, 8: 2, 9: 3, 10: 4}
# メッシュコードの長さを確認
mesh_length = len(mesh_code.replace("-", "")) # ハイフンを削除して長さを取得
if mesh_length not in mesh_lengths:
raise ValueError(f"無効なメッシュコードです。長さが {mesh_length} では対応していません。")
# メッシュレベルを取得
mesh_level = mesh_lengths[mesh_length]
# メッシュの中心点の座標を取得
center_lat, center_lon = mesh_code_to_latlon(mesh_code, apply_correction)
# メッシュのレベルに応じた増分(緯度・経度の単位距離)
increments = [
(40 / 60, 1), # 第1次メッシュ
(5 / 60, 7.5 / 60), # 第2次メッシュ
(30 / 3600, 45 / 3600), # 第3次メッシュ
(15 / 3600, 22.5 / 3600), # 第4次メッシュ(2分の1地域メッシュ)
(7.5 / 3600, 11.25 / 3600) # 第5次メッシュ(4分の1地域メッシュ)
]
# メッシュの中心点からの増分(経度、緯度) lat_inc, lon_inc = increments[mesh_level] # メッシュレベルに基づいて増分を取得
# メッシュの南北東西の座標を計算 south = center_lat - lat_inc / 2 north = center_lat + lat_inc / 2 west = center_lon - lon_inc / 2 east = center_lon + lon_inc / 2
# 結果として辞書形式で南北東西、中心を返す
return {
'north': north,
'south': south,
'east': east,
'west': west,
'center': [center_lon, center_lat]
}
# メッシュコードごとに座標情報を計算し、格納
_meshes = {}
# meshes(メッシュコードのリスト)が与えられている前提で、それぞれのメッシュ コードに対する座標情報を計算 for meshcode in meshes: # 各メッシュコードの境界座標を取得 _meshes[meshcode] = mesh_code_to_latlon_borders(meshcode)
# 結果として_meshesには9桁のメッシュコードに対応する座標情報が格納される meshes = _meshes # 最終的にmeshesを更新
# 結果を確認 print(meshes)
# のちで使う関数
def get_data_in_this_mesh(meshcode, coords): blockcode = mesh2block[meshcode]
data_in_this_mesh = {}
data_in_this_mesh.update( get_mesh_data(meshcode) )
data_in_this_mesh.update( get_block_data(blockcode) )
for label in df_buildings.columns.to_list():
data_in_this_mesh.update( {
label: get_contains(coords["north"], coords["south"], coords["east"], coords["west"], df_buildings[label]),
label: get_min_distance(coords["center"][0], coords["center"][1],
df_buildings[label])
} )
data_in_this_mesh.update( get_features_from_map(coords["center"][0],
coords["center"][1], 16, use_local = True) )
return {"meshcode": meshcode, "data": data_in_this_mesh}
# ここでグリッドセルごとのデータを作成する # さっき作った統計データ:そのまま入れる # さっき作った施設データ:そのグリッドセルにある数と,そのグリッドセルまでの 最短距離を求め,入れる # 地図画像にもとづく特徴量: # MapboxのAPIを使って,地図画像を取得する. # 特定のRGB値(たとえば,建物だったら[255, 0, 0])のピクセル数を求め,画像 サイズの512 * 512で割って(面積比率),入れる. # さらに,道路だけを抜き出した2値画像をつくり,道路ネットワークを抽出.ノ ード数,エッジ数,密度,平均次数を求めて,入れる.
df_data_by_mesh = pd.DataFrame(columns = ["mesh_code"])
df_data_by_mesh = df_data_by_mesh.set_index("mesh_code")
_count = 1
for meshcode, coords in meshes.items():
outputs = get_data_in_this_mesh(meshcode, coords)
df_data_by_mesh = add_dict_to_df(outputs["data"], df_data_by_mesh,
outputs["meshcode"])
print(f"{_count} / {len(meshes)}")
_count += 1
df_data_by_mesh df_data_by_mesh.to_csv(HERE / "data_by_mesh.csv")
「緯度経度.py」を実行すると、地理情報(緯度・経度)から日本独自のメッシュコードを計算し、住宅データなどのCSVファイルにそのコードを追加し、「df_housing_toyamacityメッシュ入り.csv」を生成します。
import pandas as pd
def latlon_to_mesh(lat, lon):
"""
緯度経度から9桁のメッシュコード(2分の1地域メッシュ)を取得する。
:param lat: 緯度(度)
:param lon: 経度(度)
:return: 9桁のメッシュコード(文字列)
"""
# 1次メッシュ
mesh1_lat = int(lat * 1.5)
mesh1_lon = int(lon - 100)
# 2次メッシュ
mesh2_lat = int((lat * 1.5 - mesh1_lat) * 8)
mesh2_lon = int((lon - 100 - mesh1_lon) * 8)
# 3次メッシュ
mesh3_lat = int(((lat * 1.5 - mesh1_lat) * 8 - mesh2_lat) * 10)
mesh3_lon = int(((lon - 100 - mesh1_lon) * 8 - mesh2_lon) * 10)
# 4次メッシュ(2分の1地域メッシュ)
mesh4_lat = int((((lat * 1.5 - mesh1_lat) * 8 - mesh2_lat) * 10 - mesh3_lat) * 2)
mesh4_lon = int((((lon - 100 - mesh1_lon) * 8 - mesh2_lon) * 10 - mesh3_lon) * 2)
# メッシュコードの組み立て(9桁になるよう修正)
mesh_code = f"{mesh1_lat:01}{mesh1_lon:01}{mesh2_lat:01}{mesh2_lon:01}{mesh3_lat:01}{mesh3_lon:01}{mesh4_lat:01}{mesh4_lon:01}"
# 最後の2桁のみ取り出して、9桁にする
mesh_code = mesh_code[:9]
return mesh_code
# CSVファイルの読み込み
file_path = r"C:\Users\nt011\Desktop\研究\富山取引のほう富山市のみ
\df_housing_toyamacity.csv"
df = pd.read_csv(file_path, encoding='utf-8')
# メッシュコードの計算
df["mesh_code"] = df.apply(lambda row: latlon_to_mesh(row["緯度"], row["経度"]), axis=1)
cols = ["mesh_code"] + [col for col in df.columns if col != "mesh_code"]
df = df[cols]
# 結果のCSVを保存
output_path = r"C:\Users\nt011\Desktop\研究\富山取引のほう富山市のみ
\df_housing_toyamacityメッシュ入り.csv"
df.to_csv(output_path, index=False)
print(f"処理完了: {output_path}")
「hop_メッシュ.ipynb」を実行すると、メッシュコードごとにデータを結合し「取引_統計データ付き.csv」に生成します。緯度・経度をもとに、500mメッシュ単位で犯罪データを分類できます。
# メッシュGeoJsonを,分かりやすい形に直す
_meshes = {}
for mesh in meshes:
meshcode = mesh["properties"]["code"]
coords = mesh["geometry"]["coordinates"][0]
north = coords[1][1]
south = coords[0][1]
east = coords[2][0]
west = coords[0][0]
center = [(east + west) / 2, (north + south) / 2]
_meshes[meshcode] = {
"north": north,
"south": south,
"east": east,
"west": west,
"center": center
}
meshes = _meshes
# 犯罪データの座標から,どのグリッドセルに含まれるかを求め
# 「mesh_code」列に格納する
# 読み込んだメッシュGeoJsonに含まれない犯罪データは除外
df_temp = df_crimes[0:0].copy()
df_temp["mesh_code"] = None
for index, row in df_crimes.iterrows():
for meshcode, coords in meshes.items():
if ((coords["south"] <= row["latitude"] < coords["north"]) and
(coords["west"] <= row["longitude"] < coords["east"])):
if not pd.isnull(row["datetime"]):
df_temp.loc[index] = pd.concat([
row,
pd.Series({
"mesh_code": meshcode
})
])
break
print(f"{df_crimes.index.get_loc(index)} / {len(df_crimes)}")
df_crimes = df_temp.copy()
df_crimes["datetime"] = pd.to_datetime(df_crimes["datetime"], format = "%Y/%m/%d %H:%M")
# CSVファイルの読み込み df_crimes = pd.read_csv(HERE / "犯罪_メッシュ入り.csv") #メッシュごとに件数をカウント mesh_counts = df_crimes['mesh_code'].value_counts().reset_index() mesh_counts.columns = ['mesh_code', '件数'] # 結果を表示 print(mesh_counts) # 結果をCSVファイルに保存する場合 mesh_counts.to_csv(HERE / "犯罪_メッシュ件数.csv", index=False, encoding='utf-8-sig')
# 件数データ df_number = pd.read_csv( HERE / "犯罪_メッシュ件数.csv", index_col = "mesh_code")
# 取引データ df_torihiki = pd.read_csv( HERE / "df_housing_toyamacityメッシュ入り.csv", index_col = "mesh_code", encoding = 'cp932')
# メッシュコードをキーとして結合
df_merged = pd.merge(df_torihiki, df_number, on='mesh_code', how='left',
suffixes=('', '_number'))
print(df_merged.head()) # 結果をCSVファイルに保存する場合 df_merged.to_csv(HERE / "取引_件数付き.csv", encoding='utf-8-sig')
df_mesh = pd.read_csv( HERE / "data_by_mesh.csv", index_col = "mesh_code", encoding = "cp932")
df_torihiki2 = pd.read_csv( HERE / "取引_件数付き.csv", index_col = "mesh_code")
# メッシュコードでデータを結合 merged_df = pd.merge(df_torihiki2, df_mesh, on='mesh_code', how='left') print(merged_df.head()) merged_df.to_csv(HERE / "取引_統計データ付き.csv", index=False, encoding='utf-8-sig')
犯罪発生率(件/250000)はhop_メッシュ.ipynbを行った後にcsvファイル上の新しい列(犯罪発生率(件/250000)を追加して行う。 計算方法は=件数/250000で行う。
「distance.ipynb」を実行すると、住宅地データ (取引_統計データ付き.csv) と犯罪データ (犯罪_メッシュ入り.csv) を読み込み、各住宅地から最寄りの犯罪発生地点までの最短距離を計算するために、geopy.distanceモジュールのgeodesic関数を使用します。この関数は、2点間の地球上での距離(メートル単位)を計算します。契約された土地から最短の距離で犯罪が起きた場所までの最短距離を「取引_統計データ付き.csv」に追加した「住宅地_犯罪_最短距離.csv」を生成します。
import pandas as pd
from geopy.distance import geodesic
# ファイルを読み込む
df_housing = pd.read_csv("C:/Users/nt011/Desktop/統計正しいほう/データセット
の作成/取引_統計データ付き.csv")
df_crimes = pd.read_csv("C:/Users/nt011/Desktop/統計正しいほう/データセット
の作成/犯罪_メッシュ入り.csv")
print(df_housing.columns)
# 1. 住宅地データの緯度経度情報を抽出
df_housing['latitude'] = df_housing['緯度']
df_housing['longitude'] = df_housing['経度']
# 2. 犯罪データの緯度経度情報を抽出
df_crimes['latitude'] = df_crimes['latitude']
df_crimes['longitude'] = df_crimes['longitude']
# 3.最短距離を計算する関数
def calculate_min_distance(lat1, lon1, df_target, lat_col, lon_col):
distances = df_target.apply(lambda row: geodesic((lat1, lon1),
(row[lat_col], row[lon_col])).meters, axis=1)
return distances.min()
# 4. 各住宅地から犯罪発生場所までの最短距離を計算
df_housing['min_distance_to_crime'] = df_housing.apply(lambda row:
calculate_min_distance(row['latitude'], row['longitude'], df_crimes, 'latitude', 'longitude'), axis=1)
# 距離が0になる住宅地の行を表示
df_housing[df_housing['min_distance_to_crime'] == 0]
# 必要なカラムを選択して新しいDataFrameに保存
df_result = df_housing[['mesh_code', '住所', '経度', '緯度', '最寄駅:名称', '最寄駅:距離(分)', '取引価格(総額)', '面積(㎡)', '取引価格(㎡単価)',
'土地の形状', '間口', '延床面積(㎡)','建築年数', '建物の構造',
'用途', '前面道路:方位', '前面道路:種類', '前面道路:幅員(m)', '都市計画', '建ぺい率(%)',
'容積率(%)', '取引時点', '件数', '犯罪発生率(件/250000)', '人口', '18歳未満人口割合',
'65歳以上人口割合', '外国人人口割合', '世帯数', '単身世帯割合', '核家族世帯割合', '正規労働者割合',
'非正規労働者割合', '最終学歴が中学以下の人口割合', '最終学歴が高校の人口割合', '最終学歴が大学以上の人口割合',
'居住年数5年未満の人口割合', '居住年数が20年以上の人口割合', '一戸建て割合', 'アパート・低中層マンション割合',
'高層マンション割合', 'コンビニ', '駅', '駐車場', '駐輪場', '金融機関', '旅館', 'ホテル', 'スーパー', 'ショッピングモール', 'デパート', '警察署/交番', '小学校', '中学校', '高校', '大学',
'保育園/幼稚園', 'レジャー施設','道路の面積比率', '建物の面積比率', '水の面積比率', '空き地の面積比率',
'ノード数(道路)', 'エッジ数(道路)', '密度(道路)', '平均次数(道路)', 'min_distance_to_crime']]
# 結果の保存
df_result.to_csv('C:/Users/nt011/Desktop/統計正しいほう/データセットの作成/
住宅地_犯罪_最短距離.csv', index=False, encoding='utf-8-sig')
作成したデータセットを回帰分析がかけられる形に整える。
「前処理.ipynb」を実行し、「住宅地_犯罪_最短距離.csv」から「建築年数」と「最寄駅:距離(分)」の列に関する欠損値を含む行を削除した「住宅地_犯罪_最短距離_欠損値削除後.csv」を生成します。
import pandas as pd
# CSVファイルのパス
file_path = r"C:\Users\nt011\Desktop\統計正しいほう\前処理\住宅地_犯罪_最短距離.csv"
# CSVファイルを読み込む
df = pd.read_csv(file_path)
# 欠損値を含む行を削除
df = df.dropna()
# 「建築年数」列に `-1` または `"VALUE"` を含む行を削除
df = df[~df["建築年数"].astype(str).isin(["-1", "#VALUE!"])]
# 「最寄駅:距離(分)」の値を変換
replace_dict = {
"30分~60分": 45,
"1H~1H30": 75,
"1H30~2H": 105
}
df["最寄駅:距離(分)"] = df["最寄駅:距離(分)"].replace(replace_dict)
# クリーンなデータを新しいCSVファイルとして保存
output_path = r"C:\Users\nt011\Desktop\統計正しいほう\前処理\住宅地_犯罪_最短距離_欠損値削除後.csv"
df.to_csv(output_path, index=False, encoding='utf-8-sig')
print("データ処理完了: 欠損値と指定された異常値を削除し、駅距離の値を変換しました。")
print(f"新しいCSVファイルが {output_path} に保存されました。")
生成した「住宅地_犯罪_最短距離_欠損値削除後.csv」から連続変数の外れ値を検出するカラムリストを定義し、データ型を数値に変換したのち検出された列を削除したリスト「外れ値削除.csv」を生成します。ダミー変数の処理を行う前にcsvファイル内の建物の構造の英語の部分を変換する必要がある。
◆不動産取引価格
https://www.reinfolib.mlit.go.jp/realEstatePrices/
このurlの制度と用語のページを参考にして変換を行う。
import pandas as pd
# CSVファイルのパス
file_path = "C:/Users/nt011/Desktop/統計正しいほう/前処理/住宅地_犯罪_最短距
離_欠損値削除後.csv"
# CSVファイルを読み込む
df = pd.read_csv(file_path)
# 外れ値を検出する連続変数のカラムリスト
continuous_columns = [
"最寄駅:距離(分)", "取引価格(総額)", "面積(㎡)", "取引価格(㎡単価)",
"間口", "延床面積(㎡)", "建築年数", "前面道路:幅員(m)",
"建ぺい率(%)", "容積率(%)", "犯罪発生率(件/250000)", "人口",
"18歳未満人口割合", "65歳以上人口割合", "外国人人口割合", "世帯数",
"単身世帯割合", "核家族世帯割合", "正規労働者割合", "非正規労働者割合",
"最終学歴が中学以下の人口割合", "最終学歴が高校の人口割合", "最終学歴が大学以上の人口割合",
"居住年数5年未満の人口割合", "居住年数が20年以上の人口割合", "一戸建て割合",
"アパート・低中層マンション割合", "高層マンション割合", "道路の面積比率",
"建物の面積比率", "水の面積比率", "空き地の面積比率", "ノード数(道路)",
"エッジ数(道路)", "密度(道路)", "平均次数(道路)", "min_distance_to_crime"
]
# データ型を数値に変換(エラーが出たらNaNにする)
for column in continuous_columns:
if column in df.columns:
df[column] = pd.to_numeric(df[column], errors='coerce')
# 外れ値を持つ行のインデックスを格納するリスト
outlier_indices = set()
# 外れ値の検出と削除
for column in continuous_columns:
if column in df.columns:
# 欠損値を除外
df_clean = df[column].dropna()
# IQR(四分位範囲)の計算
Q1 = df_clean.quantile(0.25)
Q3 = df_clean.quantile(0.75)
IQR = Q3 - Q1
# 外れ値の閾値を設定
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 外れ値のインデックスを取得
outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
outlier_indices.update(outliers.index)
# 外れ値の行を削除
df_cleaned = df.drop(index=outlier_indices)
# 結果を新しいCSVに保存
output_path = "C:/Users/nt011/Desktop/統計正しいほう/前処理/外れ値削除.csv"
df_cleaned.to_csv(output_path, encoding="utf-8-sig", index=False)
print(f"外れ値を削除したデータを {output_path} に保存しました。")
print(f"削除した行数: {len(outlier_indices)}")
print(f"最終データの行数: {df_cleaned.shape[0]}")
それぞれのカラムの要素を必要に応じてダミー変数化した後、データフレームを保存した「削除_ダミー後.csv」を生成します。
import pandas as pd
# CSVファイルを読み込み
df_data = pd.read_csv("C:/Users/nt011/Desktop/統計正しいほう/前処理/外れ値削除.csv")
# カラム名を取得
columns = df_data.columns.tolist()
# 建物の構造列に「RC」がある場合、「鉄筋コンクリート造」に変換
df_data['建物の構造'] = df_data['建物の構造'].replace('RC', '鉄筋コンクリート造')
# 最寄り駅名を二値化(列名を「最寄り駅名_富山」に変更)
index = columns.index("最寄駅:名称") # 変換前の位置を取得
df_data.insert(index + 1, '最寄り駅名_富山', df_data['最寄駅:名称'].apply(lambda x: 1 if x == '富山' else 0))
df_data = df_data.drop(columns=['最寄駅:名称']) # 元の列を削除
# 都市計画を二値化(列名を「都市計画_市街化調整区域」に変更)
index = columns.index("都市計画")
df_data.insert(index + 1, '都市計画_市街化調整区域', df_data['都市計画'].apply(lambda x: 1 if x == '市街化調整区域' else 0))
df_data = df_data.drop(columns=['都市計画'])
# 土地の形状を二値化(列名を「土地の形状_不整形」に変更)
index = columns.index("土地の形状")
df_data.insert(index + 1, '土地の形状_不整形', df_data['土地の形状'].apply(lambda x: 1 if x == '不整形' else 0))
df_data = df_data.drop(columns=['土地の形状'])
# 用途を二値化(列名を「用途_住宅」に変更)
index = columns.index("用途")
df_data.insert(index + 1, '用途_住宅', df_data['用途'].apply(lambda x: 1 if x == '住宅' else 0))
df_data = df_data.drop(columns=['用途'])
# 建物の構造を二値化(列名を「建物の構造_木造」に変更)
index = columns.index("建物の構造")
df_data.insert(index + 1, '建物の構造_木造', df_data['建物の構造'].apply(lambda x: 1 if x == '木造' else 0))
df_data = df_data.drop(columns=['建物の構造'])
# 「前面道路:方位」のダミー変数化(基準値「北西」を0に設定)
index = columns.index("前面道路:方位") # 変換前の位置を取得
dummies = pd.get_dummies(df_data['前面道路:方位'], prefix='前面道路:方位', dtype=int)
# 基準値は削除
baseline_value = '北西'
if f'前面道路:方位_{baseline_value}' in dummies.columns:
dummies = dummies.drop(columns=[f'前面道路:方位_{baseline_value}'])
# 挿入(前面道路:方位のダミー変数)
ordered_dummies = ['前面道路:方位_西', '前面道路:方位_東', '前面道路:方位_南西', '前面道路:方位_南東',
'前面道路:方位_南', '前面道路:方位_北東', '前面道路:方位_北']
# 順番に従って挿入
for col in ordered_dummies:
if col in dummies.columns:
df_data.insert(index + 1, col, dummies[col])
# 「前面道路:種類」のダミー変数化
road_types = ['市道', '私道', '県道']
# 順番を指定してダミー変数を挿入
for road in road_types:
df_data.insert(index + 1, f'前面道路:種類_{road}', (df_data['前面道路:種類'] == road).astype(int))
# 元の「前面道路:方位」および「前面道路:種類」列を削除
df_data = df_data.drop(columns=['前面道路:方位', '前面道路:種類'])
# 確認用にダミー変数化後のデータフレームを表示
print(df_data.head())
# ダミー変数化後のデータを保存
df_data.to_csv("C:/Users/nt011/Desktop/統計正しいほう/前処理/削除_ダミー後.csv",
index=False, encoding='utf-8-sig')
print("ダミー変数変換完了!データが保存されました。")
連続変数とダミー変数を手動で分類し、連続変数のみ二次項の作成をする。 交互作用項の作成したデータフレームを保存した「削除_変数作成後.csv」を生成します。
import pandas as pd
import numpy as np
from itertools import combinations
# CSVファイルのパスを指定
file_path = "C:/Users/nt011/Desktop/統計正しいほう/前処理/削除_ダミー後.csv"
# UTF-8で読み込む
df = pd.read_csv(file_path)
# データの先頭を表示
print(df.head())
# 連続変数とダミー変数を手動で分類
continuous_vars = [
'最寄駅:距離(分)', '面積(㎡)', '間口', '延床面積(㎡)', '建築年数', '前面道路:幅員(m)',
'建ぺい率(%)', '容積率(%)', '犯罪発生率(件/250000)', '人口', '18歳未満人口割合',
'65歳以上人口割合', '外国人人口割合', '世帯数', '単身世帯割合', '核家族世帯割合',
'正規労働者割合', '非正規労働者割合', '最終学歴が中学以下の人口割合', '最終学歴が高校の人口割合',
'最終学歴が大学以上の人口割合', '居住年数5年未満の人口割合', '居住年数が20年以上の人口割合',
'一戸建て割合', 'アパート・低中層マンション割合', 'コンビニ', '駅', '駐車場', '駐輪場',
'金融機関', '旅館', 'ホテル', 'スーパー', 'ショッピングモール',
'デパート', '警察署/交番', '小学校', '中学校', '高校', '大学', '保育園/幼稚園',
'レジャー施設','道路の面積比率', '建物の面積比率', '水の面積比率', '空き地の面積比率',
'ノード数(道路)', 'エッジ数(道路)', '密度(道路)', '平均次数(道路)', 'min_distance_to_crime'
]
# ダミー変数をリストとして定義
dummy_vars = [
'最寄り駅名_富山','土地の形状_不整形', '建物の構造_木造',
'用途_住宅', '前面道路:方位_北', '前面道路:方位_北東',
'前面道路:方位_南', '前面道路:方位_南東',
'前面道路:方位_南西', '前面道路:方位_東', '前面道路:方位_西',
'前面道路:種類_市道', '前面道路:種類_県道', '前面道路:種類_私道',
'都市計画_市街化調整区域'
]
# 二次項の作成(連続変数のみ)
for var in continuous_vars:
df[f'{var}^2'] = df[var] ** 2
# 交互作用項の作成
# 連続変数 × 連続変数
for var1, var2 in combinations(continuous_vars, 2):
df[f'{var1}×{var2}'] = df[var1] * df[var2]
# ダミー変数 × 連続変数
for dummy_var in dummy_vars:
for cont_var in continuous_vars:
df[f'{dummy_var}×{cont_var}'] = df[dummy_var] * df[cont_var]
# ダミー変数 × ダミー変数
for var1, var2 in combinations(dummy_vars, 2):
df[f'{var1}×{var2}'] = df[var1] * df[var2]
# CSVに保存
output_path = "C:/Users/nt011/Desktop/統計正しいほう/前処理/削除_変数作成後.csv"
df.to_csv(output_path, index=False, encoding='utf-8-sig')
print(f"処理が完了しました。生成されたデータは {output_path} に保存されました。")
Elastic Netは、L1 正則化(Lasso)と L2 正則化(Ridge)の組み合わせにより、多重共線性があるデータに対して安定した回帰分析を行う手法です。
Lasso(L1正則化):L1正則化は、回帰モデルの係数の一部をゼロにすることによって、特徴量選択を行います。これにより、重要な特徴量だけを残すことができますが、多重共線性が高い場合には不安定になることがあります。
Ridge(L2正則化):L2正則化は、回帰モデルの係数を小さくすることで、過学習を防ぎます。L2正則化は係数をゼロにすることはなく、むしろ係数の大きさを均等に抑える傾向がありますが、特徴量選択の機能はありません。
StandardScaler() を用いて X(説明変数)と y(目的変数)を標準化 しているため、変数のスケールによる影響を排除し、モデルの安定性を向上させています。
Adaptive Elastic Netを用いて土地の価格に影響する重要な特徴量の係数を取得し、影響の大きい特徴量を特定します。
「変数選択__削除.ipynb」を実行すると、重要な変数のみを抽出し、「選ばれた変数.csv」に保存します。
import numbers
import warnings
import cvxpy
import numpy as np
from asgl import ASGL
from sklearn.base import MultiOutputMixin
from sklearn.base import RegressorMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import ElasticNet
from sklearn.linear_model._coordinate_descent import _alpha_grid
from sklearn.utils import check_X_y
from sklearn.utils.validation import check_is_fitted
class AdaptiveElasticNet(ASGL, ElasticNet, MultiOutputMixin, RegressorMixin):
"""
Objective function and parameters as described with modifications
to allow alpha1, alpha2, l1_ratio1, and l1_ratio2 customization.
"""
def __init__(
self,
alpha1=0.021544346900318846, # First-stage ElasticNet alpha
alpha2=0.0009443498043343188, # Second-stage ElasticNet alpha
*,
l1_ratio1=0.8889, # First-stage ElasticNet L1 ratio
l1_ratio2=0.778, # Second-stage ElasticNet L1 ratio
gamma=0.5, # Weight adjustment exponent
fit_intercept=True,
precompute=False,
max_iter=10000,
copy_X=True,
solver=None,
tol=None,
positive=False,
positive_tol=1e-3,
random_state=None,
eps_coef=1e-6,
verbose=True
):
params_asgl = dict(model="lm", penalization="asgl")
if solver is not None:
params_asgl["solver"] = solver
if tol is not None:
params_asgl["tol"] = tol
super().__init__(**params_asgl)
self.alpha1 = alpha1
self.alpha2 = alpha2
self.l1_ratio1 = l1_ratio1
self.l1_ratio2 = l1_ratio2
self.gamma = gamma
self.fit_intercept = fit_intercept
self.max_iter = max_iter
self.precompute = precompute
self.copy_X = copy_X
self.positive = positive
self.positive_tol = positive_tol
self.random_state = random_state
self.eps_coef = eps_coef
self.verbose = verbose
if not self.fit_intercept:
raise NotImplementedError
def fit(self, X, y, check_input=True):
if check_input:
X_copied = self.copy_X and self.fit_intercept
X, y = self._validate_data(
X,
y,
accept_sparse="csc",
order="F",
dtype=[np.float64, np.float32],
copy=X_copied,
multi_output=True,
y_numeric=True,
)
# 第一段階の ElasticNet 実行
enet_model_1 = self.elastic_net(self.l1_ratio1, alpha=self.alpha1)
enet_model_1.fit(X, y)
enet_coef = enet_model_1.coef_
# 重みの計算
weights = 1.0 / (np.maximum(np.abs(enet_coef), self.eps_coef) ** self.gamma)
# 第二段階の最適化
self.coef_, self.intercept_ = self._optimize_second_stage(X, y, weights)
# モデル属性を格納
self.enet_coef_ = enet_coef
self.weights_ = weights
return self
def predict(self, X):
check_is_fitted(self, ["coef_", "intercept_"])
return super(ElasticNet, self).predict(X)
def elastic_net(self, l1_ratio, **params):
"""
Create an ElasticNet model with the specified parameters.
"""
elastic_net = ElasticNet(l1_ratio=l1_ratio)
for k, v in self.get_params().items():
try:
elastic_net = elastic_net.set_params(**{k: v})
except ValueError:
pass # Ignore parameters not supported by ElasticNet
elastic_net = elastic_net.set_params(**params)
return elastic_net
def _optimize_second_stage(self, X, y, weights):
"""
Perform second-stage optimization with adaptive weights.
Returns
-------
coef : np.array, shape (n_features,)
intercept : float
"""
n_samples, n_features = X.shape
beta_variables = [cvxpy.Variable(n_features)]
model_prediction = 0.0
if self.fit_intercept:
beta_variables = [cvxpy.Variable(1)] + beta_variables
ones = cvxpy.Constant(np.ones((n_samples, 1)))
model_prediction += ones @ beta_variables[0]
# モデル予測
model_prediction += X @ beta_variables[1]
error = cvxpy.sum_squares(y - model_prediction) / (2 * n_samples)
l1_coefs = self.alpha2 * self.l1_ratio2
# 第二段階の正則化項
l1_penalty = cvxpy.Constant(l1_coefs * weights) @ cvxpy.abs(
beta_variables[1]
)
l2_penalty = (
cvxpy.Constant(self.alpha1 * (1 - self.l1_ratio1))
* cvxpy.sum_squares(beta_variables[1])
)
constraints = [b >= 0 for b in beta_variables] if self.positive else []
# 最適化問題の定義
problem = cvxpy.Problem(
cvxpy.Minimize(error + l1_penalty + l2_penalty),
constraints=constraints
)
problem.solve(solver="OSQP", max_iter=self.max_iter)
if problem.status != "optimal":
raise ConvergenceWarning(
f"Solver did not reach optimum (Status: {problem.status})"
)
beta_sol = np.concatenate([b.value for b in beta_variables], axis=0)
beta_sol[np.abs(beta_sol) < self.tol] = 0
intercept, coef = beta_sol[0], beta_sol[1:]
coef = np.maximum(coef, 0) if self.positive else coef
return coef, intercept
from sklearn.linear_model import ElasticNetCV
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import make_regression
import numpy as np
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.preprocessing import StandardScaler
# CSVファイルのパスを指定してください
file_path = "C:/Users/nt011/Desktop/統計正しいほう/変数選択/削除_変数作成後.csv"
# CSVファイルの読み込み
df = pd.read_csv(file_path) # 文字コードが異なる場合は、'utf-8' を他のエンコーディングに変更してください
# 特徴量とターゲットの分割
X = df.drop(columns=['取引価格(㎡単価)']) # 取引価格(㎡単価)をyとして分離
y = df['取引価格(㎡単価)'] # ターゲット変数
# 🔥 標準化の実施
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = scaler_X.fit_transform(X) # Xの標準化
y_scaled = scaler_y.fit_transform(y.values.reshape(-1, 1)).ravel() # yの標準化
# データ分割
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled,
test_size=0.2, random_state=42)
# 訓練データの一部を使用
X_sample, _, y_sample, _ = train_test_split(X_train, y_train, test_size=0.8,
random_state=42)
# 第一段階: ElasticNetCVを使用した最適パラメータの導出
enet_cv = ElasticNetCV(
l1_ratio=np.linspace(0.0001, 1, 25), # l1_ratioの候補
alphas=np.logspace(-5, 0, 25), # alphaの候補
cv=5, # 交差検証の分割数
random_state=42,
n_jobs=-1,
verbose=1
)
enet_cv.fit(X_train, y_train)
# 第一段階の最適パラメータと係数を取得
alpha1_opt = enet_cv.alpha_
l1_ratio1_opt = enet_cv.l1_ratio_
enet_coef = enet_cv.coef_
print(f"第一段階の最適パラメータ: alpha1={alpha1_opt}, l1_ratio1=
{l1_ratio1_opt}")
# ベストパラメータをalpha2とl1_ratio2でそれぞれ格納 best_alpha2 = best_params['alpha2'] best_l1_ratio2 = best_params['l1_ratio2'] print(best_alpha2) print(best_l1_ratio2)
ASGLとElasticNetを組み合わせたAdaptiveElasticNetクラスを定義し、CSVから読み込んだデータを標準化後にモデルをフィッティングして、非ゼロの係数を持つ重要な特徴量を抽出し、その結果をCSVに保存する一連の処理を実行します。
import numbers
import warnings
import cvxpy
import numpy as np
from asgl import ASGL
from sklearn.base import MultiOutputMixin
from sklearn.base import RegressorMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import ElasticNet
from sklearn.linear_model._coordinate_descent import _alpha_grid
from sklearn.utils import check_X_y
from sklearn.utils.validation import check_is_fitted
class AdaptiveElasticNet(ASGL, ElasticNet, MultiOutputMixin, RegressorMixin):
"""
Objective function and parameters as described with modifications
to allow alpha1, alpha2, l1_ratio1, and l1_ratio2 customization.
"""
def __init__(
self,
alpha1=0.01333521432163324, # 第一段階で得られたalpha1を指定
alpha2=0.034807005884284134, # Second-stage ElasticNet alpha
*,
l1_ratio1=1, # First-stage ElasticNet L1 ratio
l1_ratio2=0.1667499999999998, # Second-stage ElasticNet L1 ratio
gamma=0.5, # Weight adjustment exponent
fit_intercept=True,
precompute=False,
max_iter=10000,
copy_X=True,
solver=None,
tol=None,
positive=False,
positive_tol=1e-3,
random_state=None,
eps_coef=1e-6,
verbose=True
):
params_asgl = dict(model="lm", penalization="asgl")
if solver is not None:
params_asgl["solver"] = solver
if tol is not None:
params_asgl["tol"] = tol
super().__init__(**params_asgl)
self.alpha1 = alpha1
self.alpha2 = alpha2
self.l1_ratio1 = l1_ratio1
self.l1_ratio2 = l1_ratio2
self.gamma = gamma
self.fit_intercept = fit_intercept
self.max_iter = max_iter
self.precompute = precompute
self.copy_X = copy_X
self.positive = positive
self.positive_tol = positive_tol
self.random_state = random_state
self.eps_coef = eps_coef
self.verbose = verbose
if not self.fit_intercept:
raise NotImplementedError
def fit(self, X, y, check_input=True):
if check_input:
X_copied = self.copy_X and self.fit_intercept
X, y = self._validate_data(
X,
y,
accept_sparse="csc",
order="F",
dtype=[np.float64, np.float32],
copy=X_copied,
multi_output=True,
y_numeric=True,
)
# 第一段階の ElasticNet 実行
enet_model_1 = self.elastic_net(self.l1_ratio1, alpha=self.alpha1)
enet_model_1.fit(X, y)
enet_coef = enet_model_1.coef_
# 重みの計算
weights = 1.0 / (np.maximum(np.abs(enet_coef), self.eps_coef) ** self.gamma)
# 第二段階の最適化
self.coef_, self.intercept_ = self._optimize_second_stage(X, y, weights)
# モデル属性を格納
self.enet_coef_ = enet_coef
self.weights_ = weights
return self
def predict(self, X):
check_is_fitted(self, ["coef_", "intercept_"])
return super(ElasticNet, self).predict(X)
def elastic_net(self, l1_ratio, **params):
"""
Create an ElasticNet model with the specified parameters.
"""
elastic_net = ElasticNet(l1_ratio=l1_ratio)
for k, v in self.get_params().items():
try:
elastic_net = elastic_net.set_params(**{k: v})
except ValueError:
pass # Ignore parameters not supported by ElasticNet
elastic_net = elastic_net.set_params(**params)
return elastic_net
def _optimize_second_stage(self, X, y, weights):
"""
Perform second-stage optimization with adaptive weights.
Returns
-------
coef : np.array, shape (n_features,)
intercept : float
"""
n_samples, n_features = X.shape
beta_variables = [cvxpy.Variable(n_features)]
model_prediction = 0.0
if self.fit_intercept:
beta_variables = [cvxpy.Variable(1)] + beta_variables
ones = cvxpy.Constant(np.ones((n_samples, 1)))
model_prediction += ones @ beta_variables[0]
# モデル予測
model_prediction += X @ beta_variables[1]
error = cvxpy.sum_squares(y - model_prediction) / (2 * n_samples)
l1_coefs = self.alpha2 * self.l1_ratio2
# 第二段階の正則化項
l1_penalty = cvxpy.Constant(l1_coefs * weights) @ cvxpy.abs(
beta_variables[1]
)
l2_penalty = (
cvxpy.Constant(self.alpha1 * (1 - self.l1_ratio1))
* cvxpy.sum_squares(beta_variables[1])
)
constraints = [b >= 0 for b in beta_variables] if self.positive else []
# 最適化問題の定義
problem = cvxpy.Problem(
cvxpy.Minimize(error + l1_penalty + l2_penalty),
constraints=constraints
)
problem.solve(solver="OSQP", max_iter=self.max_iter)
if problem.status != "optimal":
raise ConvergenceWarning(
f"Solver did not reach optimum (Status: {problem.status})"
)
beta_sol = np.concatenate([b.value for b in beta_variables], axis=0)
beta_sol[np.abs(beta_sol) < self.tol] = 0
intercept, coef = beta_sol[0], beta_sol[1:]
coef = np.maximum(coef, 0) if self.positive else coef
return coef, intercept
import pandas as pd
from sklearn.preprocessing import StandardScaler
# CSVファイルのパスを指定してください
file_path = "C:/Users/monda/OneDrive/デスクトップ/統計正しいほう/変数選択/削除_変数作成後.csv"
# CSVファイルの読み込み
df = pd.read_csv(file_path) # 文字コードが異なる場合は、'utf-8' を他のエンコー ディングに変更してください
# 特徴量とターゲットの分割
X = df.drop(columns=['取引価格(㎡単価)']) # 取引価格(㎡単価)をyとして分離
y = df['取引価格(㎡単価)'] # ターゲット変数
# 🔥 標準化の実施
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = scaler_X.fit_transform(X) # Xの標準化
y_scaled = scaler_y.fit_transform(y.values.reshape(-1, 1)).ravel() # yの標準化
model = AdaptiveElasticNet()
model.fit(X_scaled, y_scaled)
# 係数の表示(重要な特徴量を確認)
coef = model.coef_
print("特徴量の係数:", coef)
# 係数がゼロでない特徴量を選定
selected_features = X.columns[coef != 0]
print("選定された特徴量:", selected_features)
# 係数と特徴量名をデータフレームに変換
selected_features_df = pd.DataFrame({
'Feature': X.columns,
'Coefficient': coef
})
# 係数がゼロでない特徴量のみをフィルタリング
selected_features_df = selected_features_df[selected_features_df['Coefficient'] != 0]
# CSVファイルとして保存
output_file_path = "選ばれた変数.csv" # 保存先を指定してください
selected_features_df.to_csv(output_file_path, index=False, encoding='utf-8-sig')
# 日本語対応のエンコーディング
print(f"選定された特徴量と係数を {output_file_path} に保存しました。")
#ref(変数選択.png,,50%)
AENの数式は以下のようなものを扱う。そして、今回行ったハイパーパラメータは以下のものである。
AENの推計は,二段階で行われる.まず,予備推計としてEN で係数を推定する.その
うえで,係数の絶対値が小さい変数に対してより強い罰則を与えるようL1 ノルムの正則化
項を変数ごとに調整したうえで,改めてEN を実施する.
選択された変数は以下のようになる
「hedo.ipynb」を実行すると、有意水準に基づいて有意性を判定する関数
import pandas as pd
# Excelファイルを読み込み
file_path = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/位置情報_削除.csv"
df = pd.read_csv(file_path)
# 住所が含まれている列を特定して、住所ごとの出現回数を確認
address_column = '住所' # 住所列の名前がわからない場合はここを変更してください
if address_column in df.columns:
address_counts = df[address_column].dropna().value_counts() # 住所ごとの出現回数を集計
# 出現回数をデータフレームに変換
address_counts_df = address_counts.reset_index()
address_counts_df.columns = ['住所', '出現回数'] # 列名を設定
# 結果をCSVファイルに書き出し
output_file = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/住所出現回数_削除.csv"
address_counts_df.to_csv(output_file, index=False, encoding='utf-8-sig')
# UTF-8エンコーディングで保存
print(f"住所ごとの出現回数がCSVファイルに保存されました: {output_file}")
else:
print("住所列が見つかりません")
回帰係数、t値、p値、有意性を格納したリストを「回帰分析結果_削除.csv」に生成します。
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
# ファイルパスの指定
selected_features_path = "選ばれた変数_削除.csv"
all_data_path = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/削除_変数作成後.csv"
location_data_path = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/位置情報_削除.csv"
output_data_path = "選択された特徴量とデータ_削除.csv"
# 選定された変数のCSVファイルを読み込み
selected_features_df = pd.read_csv(selected_features_path, encoding='utf-8-sig')
selected_features = selected_features_df['Feature'].tolist()
# 元データを再度読み込み
all_data_df = pd.read_csv(all_data_path)
# 選定された説明変数と目的変数を抽出
df = all_data_df[selected_features + ['取引価格(㎡単価)']]
# 位置情報データを読み込み
location_df = pd.read_csv(location_data_path, usecols=['住所'])
# 元データと位置情報を結合(共通のキーがある場合)
df = pd.concat([df, location_df], axis=1)
# 統合データをCSVファイルとして保存
df.to_csv(output_data_path, index=False, encoding='utf-8-sig')
print(f"選定された変数と位置情報を含むデータを {output_data_path} に保存しました。")
import pandas as pd
# ファイルパス
count_file = "c:/Users/nt011/Desktop/統計正しいほう/要因分析/住所出現回数_削除.csv"
data_file = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/選択された特徴量とデータ_削除.csv"
output_file = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/ソート済みデータ_削除.csv"
# CSVファイルの読み込み
count_df = pd.read_csv(count_file, encoding='utf-8-sig')
data_df = pd.read_csv(data_file, encoding='utf-8-sig')
# 住所列の名前
address_column = '住所'
# 緯度・経度を除外(該当カラムがある場合)
drop_columns = ['緯度', '経度']
data_df = data_df.drop(columns=[col for col in drop_columns if col in
data_df.columns])
# 住所の出現回数をデータに統合
merged_df = data_df.merge(count_df, on=address_column, how='left')
# 出現回数の多い順にソート
sorted_df = merged_df.sort_values(by='出現回数', ascending=False).drop(columns=['出現回数'])
# ソート済みデータをCSVに保存
sorted_df.to_csv(output_file, encoding='utf-8-sig', index=False)
print(f"ソート済みデータを {output_file} に保存しました。")
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import statsmodels.api as sm
# データの読み込み
file_path = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/ソート済みデータ_削除.csv"
data = pd.read_csv(file_path)
# 目的変数と説明変数の設定
y = data['取引価格(㎡単価)'] # 取引価格を目的変数に
X = data.drop(columns=['取引価格(㎡単価)', '住所']) # 取引価格と住所以外を説明変数に
# 🔥 標準化の実施
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = scaler_X.fit_transform(X) # Xの標準化
y_scaled = scaler_y.fit_transform(y.values.reshape(-1, 1)).ravel() # yの標準化
# 線形回帰モデルの作成
model = LinearRegression()
model.fit(X_scaled, y)
# 回帰分析の結果を表示
X_scaled_with_const = sm.add_constant(X_scaled) # 定数項を追加
ols_model = sm.OLS(y, X_scaled_with_const).fit()
# 必要な情報を抽出
r_squared = ols_model.rsquared # 決定係数
coefficients = ols_model.params # 回帰係数(定数項も含む)
t_values = ols_model.tvalues # t値(定数項も含む)
p_values = ols_model.pvalues # p値(有意水準を判定するため)
variable_names = ['定数項'] + list(X.columns)
# 有意水準に基づいて有意性を判定する関数
def significance(p_value):
if p_value < 0.01:
return "1% 有意"
elif p_value < 0.05:
return "5% 有意"
elif p_value < 0.10:
return "10% 有意"
else:
return "有意でない"
# 結果を格納するリスト
results = []
# 回帰係数、t値、p値、有意性をリストに格納
for var_name, coef, t_val, p_val in zip(variable_names, coefficients, t_values, p_values):
significance_level = significance(p_val)
results.append([var_name, coef, t_val, p_val, significance_level])
# 結果をDataFrameに変換
results_df = pd.DataFrame(results, columns=["変数", "回帰係数", "t値", "p値", "有意性"])
# 予測値の計算
data["標準化予測取引価格(㎡単価)"] = ols_model.predict(X_scaled_with_const)
data["標準化実測取引価格(㎡単価)"] = y_scaled
# **結果を表示**
print("\n==== 回帰分析結果 ====")
print(f"決定係数 (R^2): {r_squared:.4f}\n")
print(results_df)
# 予測結果を含めたデータを保存
predicted_output_file_path = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/予測結果_削除.csv"
data.to_csv(predicted_output_file_path, index=False, encoding="utf-8-sig")
# CSVファイルとして保存
output_file_path = "C:/Users/nt011/Desktop/統計正しいほう/要因分析/回帰分析結果_削除.csv"
results_df.to_csv(output_file_path, index=False, encoding="utf-8-sig")
print(f"回帰分析結果が{output_file_path}に保存されました。")