Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
Tags
- 커피
- 빅데이터 분석기사
- geopandas
- 네이버 웹툰
- 로맨스
- 웹툰
- 완결
- 완결 웹툰
- 가족
- 진심
- 액션
- 애니메이션
- 만화 영화
- 빅데이터 분석기사 필기
- 습관
- 아주 작은 습관의 힘
- 넷플릭스
- 서귀포
- 산책
- 제주도
- 이기적 출판사
- 네이버
- 제임스 클리어
- 사랑
- pandas
- 네이버 완결 웹툰
- QGIS
- 이범선
- 영화
- python
Archives
- Today
- Total
JuJuKwakKwak
LSTM 시계열 예측 + 300번 이상 반복 본문
In [ ]:
# 라이브러리 임포트
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import random
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from scipy.signal import savgol_filter
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense,Dropout, LSTM, TimeDistributed
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import logging
logging.getLogger("tensorflow").setLevel(logging.ERROR)
plt.rc('font', family='NanumBarunGothic')
In [ ]:
raw_data = pd.read_csv('data.csv', encoding='utf-8') # 데이터 불러오기
raw_data = raw_data.fillna(0) # 결측값 0으로 채우기
raw_data = raw_data.drop(columns='Unnamed: 0', axis=1) # 필요없는 열 삭제
행정동_list = list(set(raw_data['행정동'].values)) # 행정동만 모은 리스트 생성
업종_list = list(set(raw_data['업종'].values)) # 업종만 모은 리스트 생성
df_list = [] # 행정동별 업종별 데이터프레임 모은 리스트 생성
for adm in 행정동_list:
for kkkk in 업종_list:
sub_data = raw_data.loc[(raw_data['행정동'] == adm) & (raw_data['업종'] == kkkk)].reset_index(drop=True)
df_list.append(sub_data)
len_list = [] # 아예 비어있는 데이터프레임 개수 확인
for i in range(len(df_list)):
len_list.append(len(df_list[i]))
len_df = pd.DataFrame(len_list, columns=['length'])
len_df.value_counts()
In [ ]:
df_list_m0 = [ x for x in df_list if len(x) > 0 ] # 비어있는 거 제외한 데이터프레임 모으기 : 304개
In [ ]:
def make_dictionary(df): # 매출을 담는 딕셔너리 만드는 함수
sales = df['매출'].tolist() # 매출 데이터만 추출한 리스트
num_to_index = {} # 매출을 인덱스로 변환한 딕셔너리
for index, num in enumerate(sales):
if num == 0: # 0은 어쩔 수 없이 0으로 채우기
num_to_index[index] = num
else:
num_to_index[num] = index
index_to_num = {} # 인덱스를 다시 매출로 변환한 딕셔너리
for key, value in num_to_index.items():
if value == 0: # 0은 어쩔 수 없이 0으로 채우기
index_to_num[key] = value
else:
index_to_num[value] = key
return sales, num_to_index, index_to_num
def make_dataset(seq_length, sales): # LSTM에 들어갈 수 있는 형태로 만들기
n_samples = int(np.floor((len(sales) - 1) / seq_length))
train_x = []
train_y = []
for i in range(n_samples):
x_sample = sales[i * seq_length : (i + 1) * seq_length]
# 인덱스로 변환
x_encoded = []
for n in x_sample:
if n in num_to_index:
x_encoded.append(num_to_index[n])
else: # 0은 어쩔 수 없이 0으로 채우기
x_encoded.append(0)
train_x.append(x_encoded)
# 오른쪽으로 1칸 쉬프트
y_sample = sales[i * seq_length +1 : (i + 1) * seq_length + 1]
y_encoded = []
for n in x_sample:
if n in num_to_index:
y_encoded.append(num_to_index[n])
else: # 0은 어쩔 수 없이 0으로 채우기
y_encoded.append(0)
train_y.append(y_encoded)
train_x = np.array(train_x) # array 형태로 바꿔야 딥러닝 돌릴 수 있다
train_y = np.array(train_y)
train_x = train_x.reshape(train_x.shape[0], seq_length, 1) # 왜 1만 해야 에러가 안 나오는지 궁금
train_y = train_y.reshape(train_y.shape[0], seq_length, 1)
return n_samples, train_x, train_y
def make_model(hidden_units, train_x, train_y, epochs_num): # 딥러닝 모델 만들기
model = Sequential()
model.add(LSTM(hidden_units, return_sequences=True, input_shape=(None, train_x.shape[2]))) # LSTM 레이어
model.add(LSTM(hidden_units, return_sequences=False)) # LSTM 레이어
model.add(Dense(1, activation='linear')) # 예측값 1개
model.compile(loss='mean_squared_error', optimizer='adam') # 손실함수 지정, 최적화 지정
model.summary() # 모델 구성 확인
early_stop = EarlyStopping(monitor='loss', patience=5, verbose=1) # loss가 5번 똑같으면 학습 멈추기
model.fit(train_x, train_y, epochs=epochs_num, verbose=0, callbacks = [early_stop]) # 모델 학습
return model
def sales_generation(model, length): # 예측값 만들기
ix = [] # 가장 마지막 매출의 인덱스
if sales[-1] in num_to_index:
ix = [num_to_index[sales[-1]]]
else: # 0은 어쩔 수 없이 0으로 채우기
ix.append(0)
y_num = [] # 가장 마지막 매출 값
if ix[-1] in index_to_num:
y_num = [index_to_num[ix[-1]]]
else:
y_num.append(0)
y_num
print('6월 매출:', y_num, '로 예측 시작')
x = np.zeros((1, seq_length, 1)) # LSTM의 입력 시퀀스 생성
for i in range(length):
x[0][i][0] = y_num[-1] # 예측 매출을 다음 입력 시퀀스에 추가
ix = [int(model.predict(x[:, :i+1, :])[0][0])]
if ix[-1] in index_to_num:
y_num.append(index_to_num[ix[-1]])
else: # 0은 어쩔 수 없이 0으로 채우기
y_num.append(0)
print(y_num[1:])
return y_num
In [ ]:
bin_dict = {} # 전체 담을 그릇
for i, df_small in enumerate(df_list_m0):
sub_dict = {} # 3개월 예측 담을 그릇
sales, num_to_index, index_to_num = make_dictionary(df_small)
seq_length = 5
n_samples, train_x, train_y = make_dataset(seq_length, sales)
hidden_units = 128
epochs_num = 300
model = make_model(hidden_units, train_x, train_y, epochs_num)
length = 3
result = sales_generation(model, length)[1:]
print('\n\n')
date_pred = [202207, 202208, 202209]
for j in range(length):
three_dict = {} # 3개월에서 각각 하나씩 담을 그릇
three_dict['BS_YR_MON'] = date_pred[j]
three_dict['nm_ADM'] = df_small['nm_ADM'][0]
three_dict['cd_KSIC_L1'] = df_small['cd_KSIC_L1'][0]
three_dict['nm_KSIC_L1'] = df_small['nm_KSIC_L1'][0]
three_dict['amt_sales'] = result[j]
sub_dict[j] = three_dict
bin_dict[i] = sub_dict
In [ ]:
bin_dict
In [ ]:
temp_list = [] # 딕셔너리 안에 저장된 데이터프레임을 낱개로 저장할 그릇
for cnt in range(len(bin_dict)):
for dic in range(len(bin_dict[cnt])):
df_dic = pd.DataFrame.from_dict(bin_dict[cnt][dic], orient = 'index').transpose()
temp_list.append(df_dic)
temp_list
In [ ]:
temp_df = pd.concat(temp_list) # 하나의 데이터프레임으로 합치기
In [ ]:
temp_df.to_csv('pred.csv', encoding='utf-8') # csv로 저장하기
In [ ]:
raw_data.head()
In [ ]:
temp_df.head()
In [ ]:
data_total = pd.concat([raw_data, temp_df]) # 원래 데이터와 예측 데이터 합치기
print(data_total.shape)
In [ ]:
data_total = data_total.sort_values(by = ['행정동', '업종', '연월']).reset_index(drop=True) # 정렬
display(data_total[33:74])
In [ ]:
data_total.to_csv('lstm_매출_0000.csv', encoding='utf-8', index=False)