Creative Code

업비트 코인 자동거래 ver 3.0 본문

혼자 만든 Code/upbit 코인 자동거래

업비트 코인 자동거래 ver 3.0

빛하루 2025. 5. 7. 23:15
import requests
import pandas as pd
import talib as ts
import numpy as np
import json
import uuid
import jwt
import hashlib
import time
from urllib.parse import urlencode, unquote
from datetime import datetime
import ta

UPBIT_API_URL = "https://api.upbit.com/v1"
server_url = 'https://api.upbit.com'

def get_coin_data(coin, unit):
    url = f"https://api.upbit.com/v1/candles/minutes/{unit}"
    params = {'market': f'KRW-{coin}', 'count': 200}
    headers = {"accept": "application/json"}
    response = requests.get(url, params=params, headers=headers)

    if response.status_code != 200:
        raise Exception(f"API 요청 실패: {response.status_code}")

    data = response.json()
    df = pd.DataFrame(data)
    df = df.reindex(index=df.index[::-1]).reset_index(drop=True)
    return df

# JSON 파일에서 API 키 불러오기
def load_api_keys(config_file="upbit_config.json"):
    try:
        with open(config_file, "r") as file:
            keys = json.load(file)
            return keys["api_key"], keys["secret_key"]
    except Exception as e:
        print(f"Error loading API keys: {e}")
        return None, None
    
# 보유중인 코인 목록 가지고 오기기
def get_coin_holdings(api_key, secret_key):
    """
    업비트 API를 사용하여 보유 중인 코인의 종류와 수량을 가져옵니다.

    Returns:
        dict: 코인의 종류와 보유 수량을 포함한 딕셔너리
    """
    base_url = "https://api.upbit.com/v1/accounts"

    # JWT 토큰 생성
    payload = {
        'access_key': api_key,
        'nonce': str(uuid.uuid4()),
    }
    jwt_token = jwt.encode(payload, secret_key, algorithm="HS256")
    headers = {"Authorization": f"Bearer {jwt_token}"}

    try:
        # 계좌 정보 요청
        response = requests.get(base_url, headers=headers)
        response.raise_for_status()
        accounts = response.json()

        # 보유 코인 정보 필터링
        holdings = {account['currency']: float(account['balance']) for account in accounts if float(account['balance']) > 0}

        # 보유 코인이 없을 경우 처리
        if not holdings:
            print("보유 중인 코인이 없습니다.")
            return {}

        return holdings

    except Exception as e:
        print(f"Error fetching coin holdings: {e}")
        return {}
    
#미체결된 주문 조회 함수
def check_unfilled_orders(api_key, secret_key):
    """
    미체결 주문이 있는지 확인합니다.

    Returns:
        list: 미체결 주문 리스트 (없으면 빈 리스트)
    """
    base_url = "https://api.upbit.com/v1/orders"
    query = {
        'state': 'wait',  # 'wait' 상태는 미체결 주문을 의미
    }

    # Query Hash 계산
    query_string = '&'.join([f"{key}={value}" for key, value in query.items()])
    query_hash = hashlib.sha512(query_string.encode()).hexdigest()

    # JWT 토큰 생성
    payload = {
        'access_key': api_key,
        'nonce': str(uuid.uuid4()),
        'query_hash': query_hash,
        'query_hash_alg': 'SHA512',
    }
    jwt_token = jwt.encode(payload, secret_key, algorithm="HS256")
    headers = {"Authorization": f"Bearer {jwt_token}"}

    try:
        # 미체결 주문 요청
        response = requests.get(base_url, headers=headers, params=query)
        response.raise_for_status()
        orders = response.json()

        # 미체결 주문이 없는 경우 처리
        if not orders:
            print("미체결 주문이 없습니다.")
            return []

        print("미체결 주문이 있습니다.")
        return orders

    except Exception as e:
        print(f"Error fetching unfilled orders: {e}")
        return []
    
#미체결된 주문 취소하는 함수
def cancel_unfilled_order(api_key, secret_key, order_id):
    """
    특정 미체결 주문을 취소합니다.

    Args:
        api_key (str): 업비트 API 키
        secret_key (str): 업비트 Secret 키
        order_id (str): 취소할 주문의 UUID

    Returns:
        dict: 주문 취소 결과
    """
    base_url = "https://api.upbit.com/v1/order"
    query = {
        'uuid': order_id,
    }

    # JWT 토큰 생성
    query_string = '&'.join([f"{key}={value}" for key, value in query.items()])
    payload = {
        'access_key': api_key,
        'nonce': str(uuid.uuid4()),
        'query_hash': hashlib.sha512(query_string.encode()).hexdigest(),
        'query_hash_alg': 'SHA512',
    }
    jwt_token = jwt.encode(payload, secret_key, algorithm="HS256")
    headers = {"Authorization": f"Bearer {jwt_token}"}

    try:
        # 주문 취소 요청
        response = requests.delete(base_url, headers=headers, params=query)
        response.raise_for_status()
        cancel_result = response.json()

        print(f"주문이 성공적으로 취소되었습니다: {cancel_result}")
        return cancel_result

    except requests.exceptions.HTTPError as e:
        print(f"HTTP 에러 발생: {e.response.status_code}, {e.response.text}")
        return {}
    except Exception as e:
        print(f"Error canceling order: {e}")
        return {}

#매수 1호가와 매도1호가 리턴하는 함수    
def get_orderbook(api_key, market):
    """
    주어진 마켓의 매수, 매도 1호가를 가져옵니다.

    Args:
        api_key (str): 업비트 API 키
        market (str): 마켓 코드 (예: "KRW-BTC")

    Returns:
        dict: 매수 1호가와 매도 1호가를 포함한 딕셔너리
    """
    url = f"https://api.upbit.com/v1/orderbook?markets={market}&level=0"

    headers = {"accept": "application/json"}

    try:
        # 주문서 정보 요청
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        orderbook = response.json()

        if not orderbook:
            print(f"{market}의 주문서 정보가 없습니다.")
            return None

        # 매수 1호가 (가장 높은 매도 호가)
        bid_price = orderbook[0]['orderbook_units'][0]['bid_price']
        # 매도 1호가 (가장 낮은 매수 호가)
        ask_price = orderbook[0]['orderbook_units'][0]['ask_price']

        return {"bid_price": bid_price, "ask_price": ask_price}

    except Exception as e:
        print(f"Error fetching orderbook: {e}")
        return None
    
api_key, secret_key = load_api_keys("upbit_config.json")
    
#지정가 매수함수    
def place_limit_buy_order(api_key, secret_key, market, price, budget):
    max_quantity = str(budget / price)
    params = {
        'market': market,
        'side': 'bid',
        'ord_type': 'limit',
        'price': price,
        'volume': max_quantity
    }
    query_string = unquote(urlencode(params, doseq=True)).encode("utf-8")

    m = hashlib.sha512()
    m.update(query_string)
    query_hash = m.hexdigest()

    payload = {
        'access_key': api_key,
        'nonce': str(uuid.uuid4()),
        'query_hash': query_hash,
        'query_hash_alg': 'SHA512',
    }

    jwt_token = jwt.encode(payload, secret_key)
    authorization = 'Bearer {}'.format(jwt_token)
    headers = {
    'Authorization': authorization,
    }

    res = requests.post(server_url + '/v1/orders', json=params, headers=headers)
    order_result = res.json()
    # 성공적으로 주문을 제출했다면 주문 UUID 반환
    order_uuid = order_result.get('uuid')
    if order_uuid:
        print(f"매수 주문이 성공적으로 생성되었습니다. 주문 UUID: {order_uuid}")
        time.sleep(10)
        return order_uuid
    else:
        print("주문 생성 실패: UUID를 찾을 수 없습니다.")
        return None
    
# 지정가 매도함수
def place_limit_sell_order(api_key, secret_key, market, price, quantity):
    max_quantity = str(quantity)
    params = {
        'market': market,
        'side': 'ask',
        'ord_type': 'limit',
        'price': price,
        'volume': max_quantity
    }
    query_string = unquote(urlencode(params, doseq=True)).encode("utf-8")

    m = hashlib.sha512()
    m.update(query_string)
    query_hash = m.hexdigest()

    payload = {
        'access_key': api_key,
        'nonce': str(uuid.uuid4()),
        'query_hash': query_hash,
        'query_hash_alg': 'SHA512',
    }

    jwt_token = jwt.encode(payload, secret_key)
    authorization = 'Bearer {}'.format(jwt_token)
    headers = {
    'Authorization': authorization,
    }

    res = requests.post(server_url + '/v1/orders', json=params, headers=headers)
    order_result = res.json()
    # 성공적으로 주문을 제출했다면 주문 UUID 반환
    order_uuid = order_result.get('uuid')
    if order_uuid:
        print(f"매도 주문이 성공적으로 생성되었습니다. 주문 UUID: {order_uuid}")
        time.sleep(10)
        return order_uuid
    else:
        print("주문 생성 실패: UUID를 찾을 수 없습니다.")
        return None
    
# 시장가 매도 함수
def place_market_sell_order(api_key, secret_key, market, quantity):
    max_quantity = str(quantity)
    params = {
        'market': market,
        'side': 'ask',
        'ord_type': 'market',
        'volume': max_quantity
    }
    query_string = unquote(urlencode(params, doseq=True)).encode("utf-8")

    m = hashlib.sha512()
    m.update(query_string)
    query_hash = m.hexdigest()

    payload = {
        'access_key': api_key,
        'nonce': str(uuid.uuid4()),
        'query_hash': query_hash,
        'query_hash_alg': 'SHA512',
    }

    jwt_token = jwt.encode(payload, secret_key)
    authorization = f'Bearer {jwt_token}'
    headers = {
        'Authorization': authorization,
    }

    res = requests.post(server_url + '/v1/orders', json=params, headers=headers)
    order_result = res.json()

    order_uuid = order_result.get('uuid')
    if order_uuid:
        print(f"시장가 매도 주문이 성공적으로 생성되었습니다. 주문 UUID: {order_uuid}")
        time.sleep(10)
        return order_uuid
    else:
        print("시장가 주문 생성 실패: UUID를 찾을 수 없습니다.")
        return None
    
#받아온 코인 데이터를 저장할 csv파일을 만드는 함수
def save_to_csv(df):
    filename = 'coin_data.csv'
    df.to_csv(filename, index=False)

#거래기록을 저장할 csv 파일일
columns = ['시간', '코인명', '매수/매도', '매매 단가']
df_trade = pd.DataFrame(columns=columns)
df_trade.to_csv('trade_list.csv', index=False, encoding='utf-8-sig')

#거래기록을 추가하는 함수
def add_trade(trade_time, coin_name, trade_type, avg_price):
    # CSV 파일 읽기
    df = pd.read_csv('trade_list.csv', encoding='utf-8-sig')
    
    # 새로운 데이터 추가
    new_data = {
        '시간': trade_time,
        '코인명': coin_name,
        '매수/매도': trade_type,
        '평균 단가': avg_price,
    }
    df = df.append(new_data, ignore_index=True)
    
    # 업데이트된 데이터프레임을 CSV 파일로 저장
    df.to_csv('trade_list.csv', index=False, encoding='utf-8-sig')

#현재 보유중인 원화 잔고를 가져오는 함수
def get_balance():
    """
    업비트 API를 사용하여 원화 잔고를 조회하는 함수
    """
    url = server_url + '/v1/accounts'
    
    # 요청 헤더 준비
    payload = {
        'access_key': api_key,
        'nonce': str(uuid.uuid4())
    }
    
    # 서명 생성
    m = hashlib.sha512()
    m.update(payload['nonce'].encode('utf-8'))
    signature = jwt.encode(payload, secret_key, algorithm='HS256')
    
    headers = {
        'Authorization': f'Bearer {signature}'
    }
    
    # API 요청
    response = requests.get(url, headers=headers)
    
    if response.status_code == 200:
        accounts = response.json()
        for account in accounts:
            if account['currency'] == 'KRW':  # 원화 잔고 찾기
                return float(account['balance'])
    else:
        print(f"Error: {response.status_code}")
        return None
    
#코인의 현재가격을 불러오는 함수
def get_current_price(coin):
    url = f"https://api.upbit.com/v1/ticker"
    params = {
        'markets': f'KRW-{coin}'  # 코인 이름 입력 (예: 'BTC' -> 'KRW-BTC')
    }
    
    response = requests.get(url, params=params)
    data = response.json()
    
    if len(data) > 0:
        return data[0]['trade_price']  # 현재 거래 가격 반환
    else:
        return None
    
def get_coin_order_unit(coin,price):
    special_coin_list = ['ADA','ALGO','BLUR','CELO', 'ELF', 'EOS', 'GRS', 'GRT', 'ICX', 'MANA', 'MINA', 'POL', 'SAND', 'SEI', 'STG', 'TRX']
    coin_order_unit = 0
    if coin not in special_coin_list:
        if price >= 2000000:
            coin_order_unit = 1000
        elif price >=1000000 and price<2000000:
            coin_order_unit = 500
        elif price >=500000 and price <1000000:
            coin_order_unit = 100
        elif price >=100000 and price<500000:
            coin_order_unit = 50
        elif price >=10000 and price<100000:
            coin_order_unit = 10
        elif price >=1000 and price<10000:
            coin_order_unit = 1
        elif price >=100 and price<1000:
            coin_order_unit = 0.1
        elif price >=10 and price<100:
            coin_order_unit = 0.01
        elif price >= 1 and price<10:
            coin_order_unit = 0.001
        elif price>=0.1 and price<1:
            coin_order_unit = 0.0001
        elif price>=0.01 and price<0.1:
            coin_order_unit = 0.00001
        elif price>=0.001 and price<0.01:
            coin_order_unit = 0.000001
        elif price>=0.0001 and price<0.001 :
            coin_order_unit = 0.0000001
        else :
            coin_order_unit = 0.00000001
    else:
        coin_order_unit = 1
    return coin_order_unit

# 지표 계산함수
def calculate_indicator(df):
    
    df['CCI'] = ts.CCI(high=df['high_price'], low=df['low_price'], close=df['trade_price'], timeperiod=14)
    df['CCI_Signal_EMA'] = df['CCI'].ewm(span=9).mean()
    cci = df['CCI'].iloc[-1]
    cci_signal = df['CCI_Signal_EMA'].iloc[-1]

    # ADX and DI+
    adx_indicator = ta.trend.ADXIndicator(df["high_price"], df["low_price"], df["trade_price"], window=14)
    df['DIm'] = adx_indicator.adx_neg()
    df['DIp'] = adx_indicator.adx_pos()
    df['Adx'] = adx_indicator.adx()
    dip = df['DIp'].iloc[-1]
    dim = df['DIm'].iloc[-1]
    adx = df['Adx'].iloc[-1]

    return df

def detect_signal(df):
    df = calculate_indicator(df)
    if len(df) < 2:
        return 'none'
    
    current = df.iloc[-1]
    prev = df.iloc[-2]
    
    # 현재 및 이전 값 추출
    adx, dip, dim = current['Adx'], current['DIp'], current['DIm']
    prev_adx, prev_dip, prev_dim = prev['Adx'], prev['DIp'], prev['DIm']
    
    # 순위 계산 함수
    def get_rank(a, b, c):
        sorted_values = sorted([(a, 'ADX'), (b, '+DI'), (c, '-DI')], key=lambda x: -x[0])
        return [item[1] for item in sorted_values]
    
    current_rank = get_rank(adx, dip, dim)
    prev_rank = get_rank(prev_adx, prev_dip, prev_dim)
    
    # ADX 강도 (20 기준)
    adx_strong = adx > 20  # 기준값 20으로 변경
    adx_weak = adx < 20
    
    # 모든 가능한 돌파 쌍 검사
    signals = []
    
    # 1. ADX와 +DI 돌파 검사
    if (adx > dip and prev_adx <= prev_dip) or (adx < dip and prev_adx >= prev_dip):
        adx_dir = 'up' if adx > prev_adx else 'down'
        dip_dir = 'up' if dip > prev_dip else 'down'
        
        if adx < dip and adx_dir == 'up' and dip_dir == 'up' and adx_strong:
            signals.append('buy')
        elif adx > dip and adx_dir == 'down' and dip_dir == 'down' and adx_strong:
            signals.append('sell')
        elif adx > dip and adx_dir == 'up' and dip_dir == 'down' and adx_strong:
            signals.append('sell')
    
    # 2. ADX와 -DI 돌파 검사
    if (adx > dim and prev_adx <= prev_dim) or (adx < dim and prev_adx >= prev_dim):
        adx_dir = 'up' if adx > prev_adx else 'down'
        dim_dir = 'up' if dim > prev_dim else 'down'
        
        if adx > dim and adx_dir == 'up' and dim_dir == 'down' and adx_strong:
            signals.append('buy')
        elif adx < dim and adx_dir == 'down' and dim_dir == 'up' and adx_strong:
            signals.append('sell')
    
    # 3. +DI와 -DI 돌파 검사
    if (dip > dim and prev_dip <= prev_dim) or (dip < dim and prev_dip >= prev_dim):
        dip_dir = 'up' if dip > prev_dip else 'down'
        dim_dir = 'up' if dim > prev_dim else 'down'
        
        if dip > dim and dip_dir == 'up' and dim_dir == 'down' and adx > prev_adx:
            signals.append('buy')
        elif dip < dim and dip_dir == 'down' and dim_dir == 'up' and adx > prev_adx:
            signals.append('sell')
    
    # 4. 3개 지표 동시 돌파
    if current_rank != prev_rank:
        adx_dir = 'up' if adx > prev_adx else 'down'
        dip_dir = 'up' if dip > prev_dip else 'down'
        dim_dir = 'up' if dim > prev_dim else 'down'
        
        if adx_dir == 'up' and dip_dir == 'up' and dim_dir == 'up':
            if current_rank[0] in ['+DI', 'ADX'] and adx_strong:
                signals.append('buy')
        elif adx_dir == 'down' and dip_dir == 'down' and dim_dir == 'down':
            if current_rank[0] in ['-DI', 'ADX'] and adx_strong:
                signals.append('sell')
    
    # 최종 신호 결정 (CCI 조건: 매수만 적용)
    if 'sell' in signals and 'buy' in signals:
        return 'none'
    elif 'sell' in signals:
        return 'sell'
    elif 'buy' in signals and prev['CCI'] < current['CCI']:  # CCI 상승 조건
        return 'buy'
    else:
        return 'none'

def get_top_gainers_upbit(status, number):
    
    base_url = "https://api.upbit.com/v1"
    try:
        # Step 1: Get market data
        markets_response = requests.get(f"{base_url}/market/all")
        markets_response.raise_for_status()
        markets = markets_response.json()

        # Filter for KRW markets only and extract symbols
        krw_symbols = [market['market'].split('-')[1] for market in markets if market['market'].startswith('KRW-')]

        # Step 2: Get ticker data for all KRW markets
        krw_markets = [f"KRW-{symbol}" for symbol in krw_symbols]
        tickers_response = requests.get(f"{base_url}/ticker", params={"markets": ",".join(krw_markets)})
        tickers_response.raise_for_status()
        tickers = tickers_response.json()

        # Step 3: Calculate price change percentages and filter
        gainers = []
        except_coin = ['USDC','USDT','BTC','ETH','BCH','AAVE','SOL','BSV','AVAX','EGLD','STX','BTT','XEM']
        
        for ticker in tickers:
            market = ticker['market']
            symbol = market.split('-')[1]
            prev_trade_price = ticker['prev_closing_price']
            current_price = ticker['trade_price']

            if prev_trade_price and prev_trade_price > 0 and symbol not in except_coin:
                change_percent = ((current_price - prev_trade_price) / prev_trade_price) * 100

                if change_percent >= 7:  # ✅ 10% 이상 상승한 종목만 필터링
                    gainers.append({
                        'symbol': symbol,
                        'change_percent': change_percent,
                        'current_price': current_price
                    })

        # Step 4: Sort by price change percentage
        gainers = sorted(gainers, key=lambda x: x['change_percent'], reverse=(status == 'up'))

        # Step 5: Return top N gainers
        return [gainer['symbol'] for gainer in gainers[:number]]

    except requests.exceptions.RequestException as e:
        print(f"Error while fetching data from Upbit API: {e}")
        return []


def main():
    # input_str = input("예측할 코인 심볼을 쉼표(,)로 구분해서 입력하세요 (예: BTC,ETH,XRP): ")
    # coin_list = [symbol.strip().upper() for symbol in input_str.split(",")]
    coin_list = []
    coin_holdings = get_coin_holdings(api_key, secret_key)
    money = int(get_balance() * 0.95)
    coin_list_copy = coin_list.copy()
    coin_list_copy.extend(get_top_gainers_upbit('up',2))

    while True:
        money = int(get_balance() * 0.95)
        coin_list_copy = coin_list.copy()
        coin_list_copy.extend(get_top_gainers_upbit('up',4))
        coin_holdings = get_coin_holdings(api_key, secret_key)
        if len(coin_holdings)==1:
            for coin in coin_list_copy:
                print('')
                print('------------------------------------------------------')
                print(f'<{coin}>')
                market = f"KRW-{coin}"
                status_coin_price = get_current_price(coin)
                coin_order_unit = get_coin_order_unit(coin,status_coin_price)

                try:
                    df_1min = get_coin_data(coin, 1)
                    decision = detect_signal(df_1min)
                    now = datetime.now()
                    print(f'{now.strftime("%Y-%m-%d %H:%M:%S")} {coin}의 현재 가격 및 상황 : {status_coin_price}원  - {decision}')
                    if decision == 'buy':# and signal_btc == '매수 신호':
                    
                        orderbook = get_orderbook(api_key, market)
                        buy_price = orderbook['bid_price'] + coin_order_unit
                        buy_uuid = place_limit_buy_order(api_key,secret_key,market,buy_price,money)
                        unfilled_orders = check_unfilled_orders(api_key, secret_key)
                        if unfilled_orders:
                            print('지정가 매수 실패, 미체결 주문 취소 작업 중..')
                            for order in unfilled_orders:
                                cancel_result = cancel_unfilled_order(api_key, secret_key, buy_uuid)
                        else:
                            current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                            add_trade(current_time, f'{coin}', 'buy', buy_price)
                            break
                    if len(get_coin_holdings(api_key,secret_key))>1:
                        break
                except Exception as e:
                    print(f"⚠️ [{coin}] 매매 판단 실패: {e}\n")
        else:
            hold_coin = list(coin_holdings.keys())[1]
            hold_coin_amount = float(coin_holdings[hold_coin])
            status_coin_price = get_current_price(hold_coin)
            coin_order_unit = get_coin_order_unit(hold_coin,status_coin_price)
            market = f'KRW-{hold_coin}'

            print('----------------------------------------------')
            
            try:
                df_1min = get_coin_data(hold_coin, 1)
                decision = detect_signal(df_1min)
                now = datetime.now()
                print(f'{now.strftime("%Y-%m-%d %H:%M:%S")} {coin}의 현재 가격 및 상황 : {status_coin_price}원  - {decision}')
                if decision == 'sell':
                    orderbook = get_orderbook(api_key, market)
                    market = f'KRW-{hold_coin}'
                    sell_price = orderbook['ask_price'] - coin_order_unit
                    sell_uuid = place_limit_sell_order(api_key,secret_key,market,sell_price,hold_coin_amount)
                    unfilled_orders = check_unfilled_orders(api_key, secret_key)
                    if unfilled_orders:
                        print('지정가 매도 실패, 미체결 주문 취소 작업 중..')
                        for order in unfilled_orders:
                            cancel_result = cancel_unfilled_order(api_key, secret_key, sell_uuid)
                        sell_uuid = place_market_sell_order(api_key,secret_key,market,hold_coin_amount)
                        
                    else:
                        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                        add_trade(current_time, f'{hold_coin}', 'sell', sell_price)
                        break
            except Exception as e:
                print(f"⚠️ [{hold_coin}] 매매 판단 실패: {e}\n")


if __name__ == "__main__":
    main()