빛하루 2024. 11. 20. 16:37
import requests
import pandas as pd
import numpy as np
import time
from datetime import datetime, timedelta
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from pytz import timezone

def fetch_bithumb_data(coin_symbol, save_path):
    특정 코인의 1분 간격 데이터를 최근 3일 간 수집하여 CSV로 저장
    base_url = ""
    timeframe = "1m"  # 1분 간격 데이터
    url = f"{base_url}{coin_symbol}_KRW/{timeframe}"
    now =
    start_time = int((now - timedelta(days=3)).timestamp()) * 1000  # 밀리초 단위로 변환
        response = requests.get(url)
        data = response.json()
        if data['status'] != '0000':
            print(f"Error fetching data: {data['message']}")

        candlesticks = data['data']
        filtered_data = [item for item in candlesticks if item[0] >= start_time]

        columns = ["timestamp", "open", "close", "high", "low", "volume"]
        df = pd.DataFrame(filtered_data, columns=columns)
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        df.set_index('timestamp', inplace=True)

        # CSV에 마지막 1000개 데이터 저장
        df = df.tail(1000)
        print(f"Initial data saved to {save_path}")
    except Exception as e:
        print(f"An error occurred: {e}")

def fetch_current_price_as_candlestick(coin_symbol):
    현재 코인의 정보를 기존 CSV와 동일한 형식으로 반환
    url = f"{coin_symbol}_KRW"
        response = requests.get(url)
        data = response.json()
        if data['status'] == '0000':
            now ='UTC'))  # UTC 시간으로 변환
            open_price = float(data['data']['opening_price'])
            close_price = float(data['data']['closing_price'])
            high_price = float(data['data']['max_price'])
            low_price = float(data['data']['min_price'])
            volume = float(data['data']['units_traded'])
            return {
                "timestamp": now.strftime('%Y-%m-%d %H:%M:%S'),  # 밀리초와 UTC 제거한 형식으로 변환
                "open": open_price,
                "close": close_price,
                "high": high_price,
                "low": low_price,
                "volume": volume,
            print(f"Error fetching current price: {data['message']}")
            return None
    except Exception as e:
        print(f"An error occurred while fetching current price: {e}")
        return None

def create_lstm_model(input_shape):
    LSTM 모델을 생성합니다.
    model = Sequential()
    model.add(LSTM(50, return_sequences=False, input_shape=input_shape))
    model.add(Dense(1))  # 출력층: 1개의 값 예측 (다음 시간의 가격)
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

def preprocess_data(df):
    데이터를 전처리하여 LSTM 모델에 맞는 형태로 변환
    scaler = MinMaxScaler(feature_range=(0, 1))
    df_scaled = scaler.fit_transform(df[['close']].values)
    X, y = [], []
    for i in range(60, len(df_scaled)):  # 60분 간격 데이터를 사용
        X.append(df_scaled[i-60:i, 0])  # 이전 60분 데이터
        y.append(df_scaled[i, 0])  # 현재 가격
    X, y = np.array(X), np.array(y)
    X = np.reshape(X, (X.shape[0], X.shape[1], 1))  # LSTM 입력 형상 맞추기
    return X, y, scaler

def predict_next_30_prices(model, X_input, scaler):
    모델을 사용하여 향후 30개 가격 예측
    predicted_prices = []
    for _ in range(30):
        predicted_price = model.predict(X_input)
        # 예측된 가격을 X_input에 추가하고 슬라이딩 윈도우로 업데이트
        predicted_price_reshaped = predicted_price.reshape(1, 1, 1)  # 차원을 맞추기
        X_input = np.append(X_input[:, 1:, :], predicted_price_reshaped, axis=1)  # 슬라이딩 윈도우 적용
    predicted_prices = np.array(predicted_prices)
    predicted_prices = scaler.inverse_transform(predicted_prices.reshape(-1, 1))  # 스케일 복원
    return predicted_prices.flatten()

def monitor_price_with_prediction(coin_symbol, save_path, initial_capital):
    실시간 데이터 수집 및 머신러닝 기반 미래 가격 예측 (CSV에서 마지막 1000개의 데이터를 사용)
    print("Starting price monitoring with prediction. Press Ctrl+C to stop.")
    current_cash = initial_capital  # 초기 자본
    current_holding = 0  # 보유 코인 수량
    buy_price = 0  # 마지막 매수 가격 (평균 매수 단가)
    total_predictions = 0  # 총 예측 횟수
    successful_predictions = 0  # 성공한 예측 횟수

        while True:
            df = pd.read_csv(save_path, index_col='timestamp', parse_dates=True)
            candlestick = fetch_current_price_as_candlestick(coin_symbol)
            if candlestick:
                # 최신 데이터를 CSV에 추가
                current_price = candlestick["close"]
                new_row = pd.DataFrame([candlestick]).set_index('timestamp')
                df = pd.concat([df, new_row])
                df.to_csv(save_path, mode='w', index=True)  # CSV 업데이트

                # 마지막 100개 데이터 로드
                df = pd.read_csv(save_path, index_col='timestamp', parse_dates=True).iloc[-70:]

                # LSTM 모델 훈련
                X, y, scaler = preprocess_data(df)
                model = create_lstm_model((X.shape[1], 1))
      , y, epochs=12, batch_size=3)

                # 향후 20개 가격 예측
                last_input = X[-1]  # 마지막 데이터로 예측 시작
                predicted_prices = predict_next_30_prices(model, last_input.reshape(1, 60, 1), scaler)

                # 예측된 가격의 평균, 최저치, 최고치, 등락률 계산
                avg_price = np.mean(predicted_prices)
                min_price = np.min(predicted_prices)
                max_price = np.max(predicted_prices)
                price_change = ((avg_price - current_price)/ current_price) * 100  # 등락률
                print(f"현재 가격: {current_price:.5f} KRW")
                print(f"예측된 가격(30개)의 평균값: {avg_price:.5f} KRW")
                print(f"예측된 가격(30개)의 최소값: {min_price:.5f} KRW")
                print(f"예측된 가격(30개)의 최고값: {max_price:.5f} KRW")
                print(f"예측된 등락률: {price_change:.2f}%")

                # 매수/매도 전략
                if price_change >= 0:  # 예측 상승
                    if current_cash > 0:
                        current_holding = current_cash / current_price  # 전량 매수
                        buy_price = current_price
                        current_cash = 0  # 현금 소진
                        print(f"전량 매수: {current_holding:.6f} {coin_symbol} @ {current_price:.5f} KRW")
                else:  # 예측 하락
                    if current_holding > 0:
                        current_cash = current_holding * current_price  # 전량 매도
                        profit_loss = current_cash - initial_capital  # 실현된 수익/손실
                        current_holding = 0
                        print(f"전량 매도: 현금 {current_cash:.5f} KRW (수익/손실: {profit_loss:.2f} KRW)")
                # 현재 투자 결과 출력
                total_assets = current_cash + (current_holding * current_price)
                total_profit_loss = total_assets - initial_capital
                print(f"총 자산: {total_assets:.5f} KRW (수익률: {total_profit_loss / initial_capital * 100:.2f}%)")

            time.sleep(60)  # 60초 대기
    except KeyboardInterrupt:
        print("Monitoring stopped.")
        total_assets = current_cash + (current_holding * current_price)
        total_profit_loss = total_assets - initial_capital
        prediction_accuracy = (successful_predictions / total_predictions) * 100
        print(f"최종 자산: {total_assets:.5f} KRW (최종 수익률: {total_profit_loss / initial_capital * 100:.2f}%)")
        print(f"최종 예측 정확도: {prediction_accuracy:.2f}%")

def main():
    프로그램 진입점
    coin = input("Enter the coin symbol (e.g., BTC, ETH): ").strip().upper()
    output_csv = f"bithumb_{coin}_data.csv"
    initial_capital = float(input("Enter your initial investment amount (KRW): ").strip())
    # 초기 3일 데이터 수집 및 저장
    fetch_bithumb_data(coin, output_csv)
    # 현재 가격 모니터링 및 예측
    monitor_price_with_prediction(coin, output_csv, initial_capital)

# 프로그램 실행
if __name__ == "__main__":