Notice
Recent Posts
Recent Comments
Creative Code
업비트 코인 자동거래 ver 3.0 본문
import requests
import pandas as pd
import talib as ts
import numpy as np
import json
import uuid
import jwt
import hashlib
import time
from urllib.parse import urlencode, unquote
from datetime import datetime
import ta
UPBIT_API_URL = "https://api.upbit.com/v1"
server_url = 'https://api.upbit.com'
def get_coin_data(coin, unit):
url = f"https://api.upbit.com/v1/candles/minutes/{unit}"
params = {'market': f'KRW-{coin}', 'count': 200}
headers = {"accept": "application/json"}
response = requests.get(url, params=params, headers=headers)
if response.status_code != 200:
raise Exception(f"API 요청 실패: {response.status_code}")
data = response.json()
df = pd.DataFrame(data)
df = df.reindex(index=df.index[::-1]).reset_index(drop=True)
return df
# JSON 파일에서 API 키 불러오기
def load_api_keys(config_file="upbit_config.json"):
try:
with open(config_file, "r") as file:
keys = json.load(file)
return keys["api_key"], keys["secret_key"]
except Exception as e:
print(f"Error loading API keys: {e}")
return None, None
# 보유중인 코인 목록 가지고 오기기
def get_coin_holdings(api_key, secret_key):
"""
업비트 API를 사용하여 보유 중인 코인의 종류와 수량을 가져옵니다.
Returns:
dict: 코인의 종류와 보유 수량을 포함한 딕셔너리
"""
base_url = "https://api.upbit.com/v1/accounts"
# JWT 토큰 생성
payload = {
'access_key': api_key,
'nonce': str(uuid.uuid4()),
}
jwt_token = jwt.encode(payload, secret_key, algorithm="HS256")
headers = {"Authorization": f"Bearer {jwt_token}"}
try:
# 계좌 정보 요청
response = requests.get(base_url, headers=headers)
response.raise_for_status()
accounts = response.json()
# 보유 코인 정보 필터링
holdings = {account['currency']: float(account['balance']) for account in accounts if float(account['balance']) > 0}
# 보유 코인이 없을 경우 처리
if not holdings:
print("보유 중인 코인이 없습니다.")
return {}
return holdings
except Exception as e:
print(f"Error fetching coin holdings: {e}")
return {}
#미체결된 주문 조회 함수
def check_unfilled_orders(api_key, secret_key):
"""
미체결 주문이 있는지 확인합니다.
Returns:
list: 미체결 주문 리스트 (없으면 빈 리스트)
"""
base_url = "https://api.upbit.com/v1/orders"
query = {
'state': 'wait', # 'wait' 상태는 미체결 주문을 의미
}
# Query Hash 계산
query_string = '&'.join([f"{key}={value}" for key, value in query.items()])
query_hash = hashlib.sha512(query_string.encode()).hexdigest()
# JWT 토큰 생성
payload = {
'access_key': api_key,
'nonce': str(uuid.uuid4()),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, secret_key, algorithm="HS256")
headers = {"Authorization": f"Bearer {jwt_token}"}
try:
# 미체결 주문 요청
response = requests.get(base_url, headers=headers, params=query)
response.raise_for_status()
orders = response.json()
# 미체결 주문이 없는 경우 처리
if not orders:
print("미체결 주문이 없습니다.")
return []
print("미체결 주문이 있습니다.")
return orders
except Exception as e:
print(f"Error fetching unfilled orders: {e}")
return []
#미체결된 주문 취소하는 함수
def cancel_unfilled_order(api_key, secret_key, order_id):
"""
특정 미체결 주문을 취소합니다.
Args:
api_key (str): 업비트 API 키
secret_key (str): 업비트 Secret 키
order_id (str): 취소할 주문의 UUID
Returns:
dict: 주문 취소 결과
"""
base_url = "https://api.upbit.com/v1/order"
query = {
'uuid': order_id,
}
# JWT 토큰 생성
query_string = '&'.join([f"{key}={value}" for key, value in query.items()])
payload = {
'access_key': api_key,
'nonce': str(uuid.uuid4()),
'query_hash': hashlib.sha512(query_string.encode()).hexdigest(),
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, secret_key, algorithm="HS256")
headers = {"Authorization": f"Bearer {jwt_token}"}
try:
# 주문 취소 요청
response = requests.delete(base_url, headers=headers, params=query)
response.raise_for_status()
cancel_result = response.json()
print(f"주문이 성공적으로 취소되었습니다: {cancel_result}")
return cancel_result
except requests.exceptions.HTTPError as e:
print(f"HTTP 에러 발생: {e.response.status_code}, {e.response.text}")
return {}
except Exception as e:
print(f"Error canceling order: {e}")
return {}
#매수 1호가와 매도1호가 리턴하는 함수
def get_orderbook(api_key, market):
"""
주어진 마켓의 매수, 매도 1호가를 가져옵니다.
Args:
api_key (str): 업비트 API 키
market (str): 마켓 코드 (예: "KRW-BTC")
Returns:
dict: 매수 1호가와 매도 1호가를 포함한 딕셔너리
"""
url = f"https://api.upbit.com/v1/orderbook?markets={market}&level=0"
headers = {"accept": "application/json"}
try:
# 주문서 정보 요청
response = requests.get(url, headers=headers)
response.raise_for_status()
orderbook = response.json()
if not orderbook:
print(f"{market}의 주문서 정보가 없습니다.")
return None
# 매수 1호가 (가장 높은 매도 호가)
bid_price = orderbook[0]['orderbook_units'][0]['bid_price']
# 매도 1호가 (가장 낮은 매수 호가)
ask_price = orderbook[0]['orderbook_units'][0]['ask_price']
return {"bid_price": bid_price, "ask_price": ask_price}
except Exception as e:
print(f"Error fetching orderbook: {e}")
return None
api_key, secret_key = load_api_keys("upbit_config.json")
#지정가 매수함수
def place_limit_buy_order(api_key, secret_key, market, price, budget):
max_quantity = str(budget / price)
params = {
'market': market,
'side': 'bid',
'ord_type': 'limit',
'price': price,
'volume': max_quantity
}
query_string = unquote(urlencode(params, doseq=True)).encode("utf-8")
m = hashlib.sha512()
m.update(query_string)
query_hash = m.hexdigest()
payload = {
'access_key': api_key,
'nonce': str(uuid.uuid4()),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, secret_key)
authorization = 'Bearer {}'.format(jwt_token)
headers = {
'Authorization': authorization,
}
res = requests.post(server_url + '/v1/orders', json=params, headers=headers)
order_result = res.json()
# 성공적으로 주문을 제출했다면 주문 UUID 반환
order_uuid = order_result.get('uuid')
if order_uuid:
print(f"매수 주문이 성공적으로 생성되었습니다. 주문 UUID: {order_uuid}")
time.sleep(10)
return order_uuid
else:
print("주문 생성 실패: UUID를 찾을 수 없습니다.")
return None
# 지정가 매도함수
def place_limit_sell_order(api_key, secret_key, market, price, quantity):
max_quantity = str(quantity)
params = {
'market': market,
'side': 'ask',
'ord_type': 'limit',
'price': price,
'volume': max_quantity
}
query_string = unquote(urlencode(params, doseq=True)).encode("utf-8")
m = hashlib.sha512()
m.update(query_string)
query_hash = m.hexdigest()
payload = {
'access_key': api_key,
'nonce': str(uuid.uuid4()),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, secret_key)
authorization = 'Bearer {}'.format(jwt_token)
headers = {
'Authorization': authorization,
}
res = requests.post(server_url + '/v1/orders', json=params, headers=headers)
order_result = res.json()
# 성공적으로 주문을 제출했다면 주문 UUID 반환
order_uuid = order_result.get('uuid')
if order_uuid:
print(f"매도 주문이 성공적으로 생성되었습니다. 주문 UUID: {order_uuid}")
time.sleep(10)
return order_uuid
else:
print("주문 생성 실패: UUID를 찾을 수 없습니다.")
return None
# 시장가 매도 함수
def place_market_sell_order(api_key, secret_key, market, quantity):
max_quantity = str(quantity)
params = {
'market': market,
'side': 'ask',
'ord_type': 'market',
'volume': max_quantity
}
query_string = unquote(urlencode(params, doseq=True)).encode("utf-8")
m = hashlib.sha512()
m.update(query_string)
query_hash = m.hexdigest()
payload = {
'access_key': api_key,
'nonce': str(uuid.uuid4()),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, secret_key)
authorization = f'Bearer {jwt_token}'
headers = {
'Authorization': authorization,
}
res = requests.post(server_url + '/v1/orders', json=params, headers=headers)
order_result = res.json()
order_uuid = order_result.get('uuid')
if order_uuid:
print(f"시장가 매도 주문이 성공적으로 생성되었습니다. 주문 UUID: {order_uuid}")
time.sleep(10)
return order_uuid
else:
print("시장가 주문 생성 실패: UUID를 찾을 수 없습니다.")
return None
#받아온 코인 데이터를 저장할 csv파일을 만드는 함수
def save_to_csv(df):
filename = 'coin_data.csv'
df.to_csv(filename, index=False)
#거래기록을 저장할 csv 파일일
columns = ['시간', '코인명', '매수/매도', '매매 단가']
df_trade = pd.DataFrame(columns=columns)
df_trade.to_csv('trade_list.csv', index=False, encoding='utf-8-sig')
#거래기록을 추가하는 함수
def add_trade(trade_time, coin_name, trade_type, avg_price):
# CSV 파일 읽기
df = pd.read_csv('trade_list.csv', encoding='utf-8-sig')
# 새로운 데이터 추가
new_data = {
'시간': trade_time,
'코인명': coin_name,
'매수/매도': trade_type,
'평균 단가': avg_price,
}
df = df.append(new_data, ignore_index=True)
# 업데이트된 데이터프레임을 CSV 파일로 저장
df.to_csv('trade_list.csv', index=False, encoding='utf-8-sig')
#현재 보유중인 원화 잔고를 가져오는 함수
def get_balance():
"""
업비트 API를 사용하여 원화 잔고를 조회하는 함수
"""
url = server_url + '/v1/accounts'
# 요청 헤더 준비
payload = {
'access_key': api_key,
'nonce': str(uuid.uuid4())
}
# 서명 생성
m = hashlib.sha512()
m.update(payload['nonce'].encode('utf-8'))
signature = jwt.encode(payload, secret_key, algorithm='HS256')
headers = {
'Authorization': f'Bearer {signature}'
}
# API 요청
response = requests.get(url, headers=headers)
if response.status_code == 200:
accounts = response.json()
for account in accounts:
if account['currency'] == 'KRW': # 원화 잔고 찾기
return float(account['balance'])
else:
print(f"Error: {response.status_code}")
return None
#코인의 현재가격을 불러오는 함수
def get_current_price(coin):
url = f"https://api.upbit.com/v1/ticker"
params = {
'markets': f'KRW-{coin}' # 코인 이름 입력 (예: 'BTC' -> 'KRW-BTC')
}
response = requests.get(url, params=params)
data = response.json()
if len(data) > 0:
return data[0]['trade_price'] # 현재 거래 가격 반환
else:
return None
def get_coin_order_unit(coin,price):
special_coin_list = ['ADA','ALGO','BLUR','CELO', 'ELF', 'EOS', 'GRS', 'GRT', 'ICX', 'MANA', 'MINA', 'POL', 'SAND', 'SEI', 'STG', 'TRX']
coin_order_unit = 0
if coin not in special_coin_list:
if price >= 2000000:
coin_order_unit = 1000
elif price >=1000000 and price<2000000:
coin_order_unit = 500
elif price >=500000 and price <1000000:
coin_order_unit = 100
elif price >=100000 and price<500000:
coin_order_unit = 50
elif price >=10000 and price<100000:
coin_order_unit = 10
elif price >=1000 and price<10000:
coin_order_unit = 1
elif price >=100 and price<1000:
coin_order_unit = 0.1
elif price >=10 and price<100:
coin_order_unit = 0.01
elif price >= 1 and price<10:
coin_order_unit = 0.001
elif price>=0.1 and price<1:
coin_order_unit = 0.0001
elif price>=0.01 and price<0.1:
coin_order_unit = 0.00001
elif price>=0.001 and price<0.01:
coin_order_unit = 0.000001
elif price>=0.0001 and price<0.001 :
coin_order_unit = 0.0000001
else :
coin_order_unit = 0.00000001
else:
coin_order_unit = 1
return coin_order_unit
# 지표 계산함수
def calculate_indicator(df):
df['CCI'] = ts.CCI(high=df['high_price'], low=df['low_price'], close=df['trade_price'], timeperiod=14)
df['CCI_Signal_EMA'] = df['CCI'].ewm(span=9).mean()
cci = df['CCI'].iloc[-1]
cci_signal = df['CCI_Signal_EMA'].iloc[-1]
# ADX and DI+
adx_indicator = ta.trend.ADXIndicator(df["high_price"], df["low_price"], df["trade_price"], window=14)
df['DIm'] = adx_indicator.adx_neg()
df['DIp'] = adx_indicator.adx_pos()
df['Adx'] = adx_indicator.adx()
dip = df['DIp'].iloc[-1]
dim = df['DIm'].iloc[-1]
adx = df['Adx'].iloc[-1]
return df
def detect_signal(df):
df = calculate_indicator(df)
if len(df) < 2:
return 'none'
current = df.iloc[-1]
prev = df.iloc[-2]
# 현재 및 이전 값 추출
adx, dip, dim = current['Adx'], current['DIp'], current['DIm']
prev_adx, prev_dip, prev_dim = prev['Adx'], prev['DIp'], prev['DIm']
# 순위 계산 함수
def get_rank(a, b, c):
sorted_values = sorted([(a, 'ADX'), (b, '+DI'), (c, '-DI')], key=lambda x: -x[0])
return [item[1] for item in sorted_values]
current_rank = get_rank(adx, dip, dim)
prev_rank = get_rank(prev_adx, prev_dip, prev_dim)
# ADX 강도 (20 기준)
adx_strong = adx > 20 # 기준값 20으로 변경
adx_weak = adx < 20
# 모든 가능한 돌파 쌍 검사
signals = []
# 1. ADX와 +DI 돌파 검사
if (adx > dip and prev_adx <= prev_dip) or (adx < dip and prev_adx >= prev_dip):
adx_dir = 'up' if adx > prev_adx else 'down'
dip_dir = 'up' if dip > prev_dip else 'down'
if adx < dip and adx_dir == 'up' and dip_dir == 'up' and adx_strong:
signals.append('buy')
elif adx > dip and adx_dir == 'down' and dip_dir == 'down' and adx_strong:
signals.append('sell')
elif adx > dip and adx_dir == 'up' and dip_dir == 'down' and adx_strong:
signals.append('sell')
# 2. ADX와 -DI 돌파 검사
if (adx > dim and prev_adx <= prev_dim) or (adx < dim and prev_adx >= prev_dim):
adx_dir = 'up' if adx > prev_adx else 'down'
dim_dir = 'up' if dim > prev_dim else 'down'
if adx > dim and adx_dir == 'up' and dim_dir == 'down' and adx_strong:
signals.append('buy')
elif adx < dim and adx_dir == 'down' and dim_dir == 'up' and adx_strong:
signals.append('sell')
# 3. +DI와 -DI 돌파 검사
if (dip > dim and prev_dip <= prev_dim) or (dip < dim and prev_dip >= prev_dim):
dip_dir = 'up' if dip > prev_dip else 'down'
dim_dir = 'up' if dim > prev_dim else 'down'
if dip > dim and dip_dir == 'up' and dim_dir == 'down' and adx > prev_adx:
signals.append('buy')
elif dip < dim and dip_dir == 'down' and dim_dir == 'up' and adx > prev_adx:
signals.append('sell')
# 4. 3개 지표 동시 돌파
if current_rank != prev_rank:
adx_dir = 'up' if adx > prev_adx else 'down'
dip_dir = 'up' if dip > prev_dip else 'down'
dim_dir = 'up' if dim > prev_dim else 'down'
if adx_dir == 'up' and dip_dir == 'up' and dim_dir == 'up':
if current_rank[0] in ['+DI', 'ADX'] and adx_strong:
signals.append('buy')
elif adx_dir == 'down' and dip_dir == 'down' and dim_dir == 'down':
if current_rank[0] in ['-DI', 'ADX'] and adx_strong:
signals.append('sell')
# 최종 신호 결정 (CCI 조건: 매수만 적용)
if 'sell' in signals and 'buy' in signals:
return 'none'
elif 'sell' in signals:
return 'sell'
elif 'buy' in signals and prev['CCI'] < current['CCI']: # CCI 상승 조건
return 'buy'
else:
return 'none'
def get_top_gainers_upbit(status, number):
base_url = "https://api.upbit.com/v1"
try:
# Step 1: Get market data
markets_response = requests.get(f"{base_url}/market/all")
markets_response.raise_for_status()
markets = markets_response.json()
# Filter for KRW markets only and extract symbols
krw_symbols = [market['market'].split('-')[1] for market in markets if market['market'].startswith('KRW-')]
# Step 2: Get ticker data for all KRW markets
krw_markets = [f"KRW-{symbol}" for symbol in krw_symbols]
tickers_response = requests.get(f"{base_url}/ticker", params={"markets": ",".join(krw_markets)})
tickers_response.raise_for_status()
tickers = tickers_response.json()
# Step 3: Calculate price change percentages and filter
gainers = []
except_coin = ['USDC','USDT','BTC','ETH','BCH','AAVE','SOL','BSV','AVAX','EGLD','STX','BTT','XEM']
for ticker in tickers:
market = ticker['market']
symbol = market.split('-')[1]
prev_trade_price = ticker['prev_closing_price']
current_price = ticker['trade_price']
if prev_trade_price and prev_trade_price > 0 and symbol not in except_coin:
change_percent = ((current_price - prev_trade_price) / prev_trade_price) * 100
if change_percent >= 7: # ✅ 10% 이상 상승한 종목만 필터링
gainers.append({
'symbol': symbol,
'change_percent': change_percent,
'current_price': current_price
})
# Step 4: Sort by price change percentage
gainers = sorted(gainers, key=lambda x: x['change_percent'], reverse=(status == 'up'))
# Step 5: Return top N gainers
return [gainer['symbol'] for gainer in gainers[:number]]
except requests.exceptions.RequestException as e:
print(f"Error while fetching data from Upbit API: {e}")
return []
def main():
# input_str = input("예측할 코인 심볼을 쉼표(,)로 구분해서 입력하세요 (예: BTC,ETH,XRP): ")
# coin_list = [symbol.strip().upper() for symbol in input_str.split(",")]
coin_list = []
coin_holdings = get_coin_holdings(api_key, secret_key)
money = int(get_balance() * 0.95)
coin_list_copy = coin_list.copy()
coin_list_copy.extend(get_top_gainers_upbit('up',2))
while True:
money = int(get_balance() * 0.95)
coin_list_copy = coin_list.copy()
coin_list_copy.extend(get_top_gainers_upbit('up',4))
coin_holdings = get_coin_holdings(api_key, secret_key)
if len(coin_holdings)==1:
for coin in coin_list_copy:
print('')
print('------------------------------------------------------')
print(f'<{coin}>')
market = f"KRW-{coin}"
status_coin_price = get_current_price(coin)
coin_order_unit = get_coin_order_unit(coin,status_coin_price)
try:
df_1min = get_coin_data(coin, 1)
decision = detect_signal(df_1min)
now = datetime.now()
print(f'{now.strftime("%Y-%m-%d %H:%M:%S")} {coin}의 현재 가격 및 상황 : {status_coin_price}원 - {decision}')
if decision == 'buy':# and signal_btc == '매수 신호':
orderbook = get_orderbook(api_key, market)
buy_price = orderbook['bid_price'] + coin_order_unit
buy_uuid = place_limit_buy_order(api_key,secret_key,market,buy_price,money)
unfilled_orders = check_unfilled_orders(api_key, secret_key)
if unfilled_orders:
print('지정가 매수 실패, 미체결 주문 취소 작업 중..')
for order in unfilled_orders:
cancel_result = cancel_unfilled_order(api_key, secret_key, buy_uuid)
else:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
add_trade(current_time, f'{coin}', 'buy', buy_price)
break
if len(get_coin_holdings(api_key,secret_key))>1:
break
except Exception as e:
print(f"⚠️ [{coin}] 매매 판단 실패: {e}\n")
else:
hold_coin = list(coin_holdings.keys())[1]
hold_coin_amount = float(coin_holdings[hold_coin])
status_coin_price = get_current_price(hold_coin)
coin_order_unit = get_coin_order_unit(hold_coin,status_coin_price)
market = f'KRW-{hold_coin}'
print('----------------------------------------------')
try:
df_1min = get_coin_data(hold_coin, 1)
decision = detect_signal(df_1min)
now = datetime.now()
print(f'{now.strftime("%Y-%m-%d %H:%M:%S")} {coin}의 현재 가격 및 상황 : {status_coin_price}원 - {decision}')
if decision == 'sell':
orderbook = get_orderbook(api_key, market)
market = f'KRW-{hold_coin}'
sell_price = orderbook['ask_price'] - coin_order_unit
sell_uuid = place_limit_sell_order(api_key,secret_key,market,sell_price,hold_coin_amount)
unfilled_orders = check_unfilled_orders(api_key, secret_key)
if unfilled_orders:
print('지정가 매도 실패, 미체결 주문 취소 작업 중..')
for order in unfilled_orders:
cancel_result = cancel_unfilled_order(api_key, secret_key, sell_uuid)
sell_uuid = place_market_sell_order(api_key,secret_key,market,hold_coin_amount)
else:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
add_trade(current_time, f'{hold_coin}', 'sell', sell_price)
break
except Exception as e:
print(f"⚠️ [{hold_coin}] 매매 판단 실패: {e}\n")
if __name__ == "__main__":
main()
'혼자 만든 Code > upbit 코인 자동거래' 카테고리의 다른 글
업비트 코인 자동거래 ver 2.1 (1) | 2025.04.25 |
---|---|
업비트 코인 자동 거래 ver 2.0 (0) | 2025.02.04 |
업비트 코인 자동거래 ver 1.41 (매수/매도 기준 변경) (0) | 2025.01.20 |
업비트 코인 자동거래 ver 1.4(가격 상승확률 계산 방식 변경) (0) | 2025.01.19 |
업비트 코인 자동거래 v1.31(매매 기준 수정) (0) | 2025.01.18 |