Notice
Recent Posts
Recent Comments
250x250
Creative Code
코인 가격 예측 프로그램(10초마다 예측) 본문
728x90
import requests
import pandas as pd
import numpy as np
import time
from datetime import datetime, timedelta
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from pytz import timezone
def fetch_bithumb_data(coin_symbol, save_path):
"""
특정 코인의 1분 간격 데이터를 최근 3일 간 수집하여 CSV로 저장
"""
base_url = "https://api.bithumb.com/public/candlestick/"
timeframe = "1m" # 1분 간격 데이터
url = f"{base_url}{coin_symbol}_KRW/{timeframe}"
now = datetime.now()
start_time = int((now - timedelta(days=3)).timestamp()) * 1000 # 밀리초 단위로 변환
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if data['status'] != '0000':
print(f"Error fetching data: {data['message']}")
return
candlesticks = data['data']
filtered_data = [item for item in candlesticks if item[0] >= start_time]
columns = ["timestamp", "open", "close", "high", "low", "volume"]
df = pd.DataFrame(filtered_data, columns=columns)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
# CSV에 마지막 1000개 데이터 저장
df = df.tail(1000)
df.to_csv(save_path)
print(f"Initial data saved to {save_path}")
except Exception as e:
print(f"An error occurred: {e}")
def fetch_current_price_as_candlestick(coin_symbol):
"""
현재 코인의 정보를 기존 CSV와 동일한 형식으로 반환
"""
url = f"https://api.bithumb.com/public/ticker/{coin_symbol}_KRW"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if data['status'] == '0000':
now = datetime.now(timezone('UTC')) # UTC 시간으로 변환
open_price = float(data['data']['opening_price'])
close_price = float(data['data']['closing_price'])
high_price = float(data['data']['max_price'])
low_price = float(data['data']['min_price'])
volume = float(data['data']['units_traded'])
return {
"timestamp": now.strftime('%Y-%m-%d %H:%M:%S'), # 밀리초와 UTC 제거한 형식으로 변환
"open": open_price,
"close": close_price,
"high": high_price,
"low": low_price,
"volume": volume,
}
else:
print(f"Error fetching current price: {data['message']}")
return None
except Exception as e:
print(f"An error occurred while fetching current price: {e}")
return None
def create_lstm_model(input_shape):
"""
LSTM 모델을 생성합니다.
"""
model = Sequential()
model.add(LSTM(50, return_sequences=False, input_shape=input_shape))
model.add(Dense(1)) # 출력층: 1개의 값 예측 (다음 시간의 가격)
model.compile(optimizer='adam', loss='mean_squared_error')
return model
def preprocess_data(df):
"""
데이터를 전처리하여 LSTM 모델에 맞는 형태로 변환
"""
scaler = MinMaxScaler(feature_range=(0, 1))
df_scaled = scaler.fit_transform(df[['close']].values)
X, y = [], []
for i in range(60, len(df_scaled)): # 60분 간격 데이터를 사용
X.append(df_scaled[i-60:i, 0]) # 이전 60분 데이터
y.append(df_scaled[i, 0]) # 현재 가격
X, y = np.array(X), np.array(y)
X = np.reshape(X, (X.shape[0], X.shape[1], 1)) # LSTM 입력 형상 맞추기
return X, y, scaler
def predict_next_80_prices(model, X_input, scaler):
"""
모델을 사용하여 향후 80개 가격 예측
"""
predicted_prices = []
for _ in range(80):
predicted_price = model.predict(X_input)
predicted_prices.append(predicted_price[0][0])
# 예측된 가격을 X_input에 추가하고 슬라이딩 윈도우로 업데이트
predicted_price_reshaped = predicted_price.reshape(1, 1, 1) # 차원을 맞추기
X_input = np.append(X_input[:, 1:, :], predicted_price_reshaped, axis=1) # 슬라이딩 윈도우 적용
predicted_prices = np.array(predicted_prices)
predicted_prices = scaler.inverse_transform(predicted_prices.reshape(-1, 1)) # 스케일 복원
return predicted_prices.flatten()
def monitor_price_with_prediction(coin_symbol, save_path):
"""
실시간 데이터 수집 및 머신러닝 기반 미래 가격 예측 (CSV에서 마지막 1000개의 데이터를 사용)
"""
print("Starting price monitoring with prediction. Press Ctrl+C to stop.")
try:
while True:
df = pd.read_csv(save_path, index_col='timestamp', parse_dates=True)
candlestick = fetch_current_price_as_candlestick(coin_symbol)
if candlestick:
# 최신 데이터를 CSV에 추가
current_price = candlestick["close"]
new_row = pd.DataFrame([candlestick]).set_index('timestamp')
df = pd.concat([df, new_row])
df.to_csv(save_path, mode='w', index=True) # CSV 업데이트
# 마지막 100개 데이터 로드
df = pd.read_csv(save_path, index_col='timestamp', parse_dates=True).iloc[-150:]
# LSTM 모델 훈련
X, y, scaler = preprocess_data(df)
model = create_lstm_model((X.shape[1], 1))
model.fit(X, y, epochs=12, batch_size=10)
# 향후 80개 가격 예측
last_input = X[-1] # 마지막 데이터로 예측 시작
predicted_prices = predict_next_80_prices(model, last_input.reshape(1, 60, 1), scaler)
# 예측된 가격의 평균, 최저치, 최고치, 등락률 계산
avg_price = np.mean(predicted_prices)
min_price = np.min(predicted_prices)
max_price = np.max(predicted_prices)
price_change = ((predicted_prices[-1] - predicted_prices[0]) / predicted_prices[0]) * 100 # 등락률
print(f"현재 가격: {current_price:.5f} KRW")
print(f"예측된 가격(80개)의 평균값: {avg_price:.5f} KRW")
print(f"예측된 가격(80개)의 최소값: {min_price:.5f} KRW")
print(f"예측된 가격(80개)의 최고값: {max_price:.5f} KRW")
print(f"예측된 등락률: {price_change:.2f}%")
time.sleep(10) # 10초 대기
except KeyboardInterrupt:
print("Monitoring stopped.")
def main():
"""
프로그램 진입점
"""
coin = input("Enter the coin symbol (e.g., BTC, ETH): ").strip().upper()
output_csv = f"bithumb_{coin}_data.csv"
# 초기 3일 데이터 수집 및 저장
fetch_bithumb_data(coin, output_csv)
# 현재 가격 모니터링 및 예측
monitor_price_with_prediction(coin, output_csv)
# 프로그램 실행
if __name__ == "__main__":
main()
728x90
'혼자 만든 Code' 카테고리의 다른 글
Bithumb 실제 자동 코인 거래 프로그램 (시장가 매수,매도) (0) | 2024.11.22 |
---|---|
코인가격 예측 프로그램(60초 간격 예측), 모의 투자 기능 (0) | 2024.11.20 |
[Python]명함 관리 프로그램 (0) | 2023.10.16 |
[Python]명함 관리 프로그램 (0) | 2023.10.16 |
[C++]소수로 만들 수 있는 가장 긴 합성수 수열 (0) | 2023.08.28 |