Notice
Recent Posts
Recent Comments
250x250
Creative Code
Bithumb 실제 코인 자동거래 베타테스트(지정가 매수/매도,시장가 매수/매도 혼합) 본문
728x90
import pandas as pd
import time
import numpy as np
from pybithumb import Bithumb
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
from sklearn.preprocessing import MinMaxScaler
import jwt
import uuid
import hashlib
import requests
import json
from urllib.parse import urlencode
conkey = 'your bithumb api key' # 이곳에 bithumb api key 발급받아 입력
seckey = 'your bithumb secret key' # 이곳에 bithumb secret key 발급받아 입력
apiUrl = 'https://api.bithumb.com'
# Bithumb 객체 생성
bithumb = Bithumb(conkey, seckey)
#현재 자산을 조회하는 함수
def see_my_bag():
payload = {
'access_key': conkey,
'nonce': str(uuid.uuid4()),
'timestamp': round(time.time() * 1000)
}
jwt_token = jwt.encode(payload, seckey)
authorization_token = 'Bearer {}'.format(jwt_token)
headers = {
'Authorization': authorization_token
}
try:
# Call API
response = requests.get(apiUrl + '/v1/accounts', headers=headers)
# handle to success or fail
return (response.json())
except Exception as err:
# handle exception
print(err)
#코인의 현재 가격을 조회하는 함수
def view_status_coin_price(coin):
price = bithumb.get_current_price(coin)
return price
#코인의 현재 데이터를 수집하는 함수
def collect_data(coin):
try:
orderbook = bithumb.get_orderbook(coin)
buy_orders = orderbook['bids'][:5] # 매수 호가 5개
sell_orders = orderbook['asks'][:5] # 매도 호가 5개
price = bithumb.get_current_price(coin)
buy_orders_str = ", ".join([f"{order['price']} ({order['quantity']})" for order in buy_orders])
sell_orders_str = ", ".join([f"{order['price']} ({order['quantity']})" for order in sell_orders])
data = {
"Coin": [coin],
"Price": [price],
"Buy Orders": [buy_orders_str],
"Sell Orders": [sell_orders_str]
}
return data, price
except Exception as e:
print(f"Error occurred: {e}")
return {}, None
# CSV 초기화
csv_filename = "coin_data.csv"
pd.DataFrame(columns=["Coin", "Price", "Buy Orders", "Sell Orders"]).to_csv(csv_filename, index=False)
# 미체결된 주문 확인
def check_trading_order():
param = dict( market='KRW-XRP', limit=100, page=1, order_by='desc' )
uuids = [
'C0106000032400700021', 'C0106000043000097801'
]
query = urlencode(param)
uuid_query = '&'.join([f'uuids[]={uuid}' for uuid in uuids])
query = query + "&" + uuid_query
# Generate access token
hash = hashlib.sha512()
hash.update(query.encode())
query_hash = hash.hexdigest()
payload = {
'access_key': conkey,
'nonce': str(uuid.uuid4()),
'timestamp': round(time.time() * 1000),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, seckey)
authorization_token = 'Bearer {}'.format(jwt_token)
headers = {
'Authorization': authorization_token
}
try:
# Call API
response = requests.get(apiUrl + '/v1/orders?' + query, headers=headers)
# handle to success or fail
# print(response.json())
return (response.json())
except Exception as err:
# handle exception
print(err)
#시장가 매수/매도 함수
def execute_coin_buy_sell_market(action,coin,price):
try:
current_coin_price = view_status_coin_price() # 실시간 시장 가격
check_having_coin = len(see_my_bag())
check_status_trader = check_trading_order()
if current_coin_price is None:
print("[알림] 가격 조회 실패. 거래를 종료합니다.")
return
if action == "buy":
if current_coin_price < price:
print(f"[알림] 잔액 부족으로 {price}원만큼 매수할 수 없습니다.")
return
requestBody = {
'market': f'KRW-{coin}',
'side': 'bid',
'price': price,
'ord_type': 'price'
}
query = urlencode(requestBody).encode()
hash = hashlib.sha512()
hash.update(query)
query_hash = hash.hexdigest()
payload = {
'access_key': conkey,
'nonce': str(uuid.uuid4()),
'timestamp': round(time.time() * 1000),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, seckey)
authorization_token = f'Bearer {jwt_token}'
headers = {
'Authorization': authorization_token,
'Content-Type': 'application/json'
}
if check_having_coin == 2:
response = requests.post(f'{apiUrl}/v1/orders', data=json.dumps(requestBody), headers=headers)
print(f"[매수 주문 요청] 응답 코드: {response.status_code}")
print('매수 체결 완료 되었습니다.')
coins_held = float(see_my_bag()[2]['balance'])
else :
print("[알림] 이미 보유한 코인이 있습니다")
return
elif action == "sell":
if check_having_coin == 2:
print("[알림] 보유한 코인이 없어 매도할 수 없습니다.")
return
# 매도할 코인 수량은 현재 보유 수량 전부
requestBody = {
'market': f'KRW-{coin}',
'side': 'ask',
'volume': coins_held,
'ord_type': 'market' # 시장가 매도
}
query = urlencode(requestBody).encode()
hash = hashlib.sha512()
hash.update(query)
query_hash = hash.hexdigest()
payload = {
'access_key': conkey,
'nonce': str(uuid.uuid4()),
'timestamp': round(time.time() * 1000),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, seckey)
authorization_token = f'Bearer {jwt_token}'
headers = {
'Authorization': authorization_token,
'Content-Type': 'application/json'
}
response = requests.post(f'{apiUrl}/v1/orders', data=json.dumps(requestBody), headers=headers)
if check_having_coin>2:
print('매도 체결이 완료되었습니다.')
check_having_coin=2
except Exception as e:
print(f"[거래 오류] {action} 실행 중 오류 발생: {e}")
# 미체결 주문 조회 함수
def order_check(coin,my_uuid):
param = dict( market=f'KRW-{coin}', limit=100, page=1, order_by='desc')
uuids = [
my_uuid
]
query = urlencode(param)
uuid_query = '&'.join([f'uuids[]={uuid}' for uuid in uuids])
query = query + "&" + uuid_query
# Generate access token
hash = hashlib.sha512()
hash.update(query.encode())
query_hash = hash.hexdigest()
payload = {
'access_key': conkey,
'nonce': str(uuid.uuid4()),
'timestamp': round(time.time() * 1000),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, seckey)
authorization_token = 'Bearer {}'.format(jwt_token)
headers = {
'Authorization': authorization_token
}
try:
# Call API
response = requests.get(apiUrl + '/v1/orders?' + query, headers=headers)
# handle to success or fail
#print(response.status_code)
if len(response.json())>0 and 'uuid' in response.json()[0]:
return response.json()[0]['uuid']
else :
return 'fail'
except Exception as err:
# handle exception
print(err)
# 주문 취소 함수
def order_cancel(order_uuid):
param = dict( uuid=order_uuid)
# Generate access token
query = urlencode(param).encode()
hash = hashlib.sha512()
hash.update(query)
query_hash = hash.hexdigest()
payload = {
'access_key': conkey,
'nonce': str(uuid.uuid4()),
'timestamp': round(time.time() * 1000),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, seckey)
authorization_token = 'Bearer {}'.format(jwt_token)
headers = {
'Authorization': authorization_token
}
try:
# Call API
response = requests.delete(apiUrl + '/v1/order', params=param, headers=headers)
# handle to success or fail
#print(response.status_code)
#print(response.json())
print('미체결 주문 거래 실패 - 주문 취소 완료!')
print('')
except Exception as err:
# handle exception
print(err)
#지정가 매수/매도 함수
def execute_coin_buy_sell_limit(action,coin,price,trade_price):
try:
current_coin_price = view_status_coin_price(coin) # 실시간 시장 가격
check_having_coin = len(see_my_bag())
having_coin_amount = 0
if check_having_coin>2 :
having_coin_amount = float(see_my_bag()[2]['balance'])
if current_coin_price is None:
print("[알림] 가격 조회 실패. 거래를 종료합니다.")
return
if action == "buy":
volume = np.floor((price/trade_price))
# Set API parameters for limit buy
requestBody = {
'market': f'KRW-{coin}', # 예: 'KRW-BTC'
'side': 'bid', # 'bid'는 매수
'volume': volume,
'price': trade_price, # 가격은 0으로 설정하여 시장가 매수
'ord_type': 'limit' # 시장가 주문
}
# Generate access token
query = urlencode(requestBody).encode()
hash = hashlib.sha512()
hash.update(query)
query_hash = hash.hexdigest()
payload = {
'access_key': conkey,
'nonce': str(uuid.uuid4()),
'timestamp': round(time.time() * 1000),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, seckey)
authorization_token = 'Bearer {}'.format(jwt_token)
headers = {
'Authorization': authorization_token,
'Content-Type': 'application/json'
}
try:
if check_having_coin==2 and see_my_bag()[1]['locked'] == '0':
# Call API to place the market buy order
response = requests.post(apiUrl + '/v1/orders', data=json.dumps(requestBody), headers=headers)
# Print response status and JSON data
print('매수 주문 요청 성공')
time.sleep(7)
order_response1 = response.json()['uuid']
order_result = order_check(coin,order_response1)
if order_result != 'fail':
order_cancel(order_result)
print('지정가 매수실패- 매수주문을 취소합니다.')
#print(response.json())
else :
print('지정가 매수성공')
else :
print('이미 보유 또는 매수중인 코인이 있습니다.')
return
except Exception as err:
print(f"Error: {err}")
elif action == "sell":
if check_having_coin == 2:
print("[알림] 보유한 코인이 없어 매도할 수 없습니다.")
return
avg_buy_coin_price = float(see_my_bag()[2]['avg_buy_price'])
if avg_buy_coin_price/trade_price <=1.04:
print('수수료 손실이 발생할 수 있어 매도를 취소합니다.')
return
# 매도할 코인 수량은 현재 보유 수량 전부
requestBody = {
'market': f'KRW-{coin}',
'side': 'ask',
'volume': having_coin_amount,
'price': trade_price,
'ord_type': 'limit' # 지정가 매도
}
query = urlencode(requestBody).encode()
hash = hashlib.sha512()
hash.update(query)
query_hash = hash.hexdigest()
payload = {
'access_key': conkey,
'nonce': str(uuid.uuid4()),
'timestamp': round(time.time() * 1000),
'query_hash': query_hash,
'query_hash_alg': 'SHA512',
}
jwt_token = jwt.encode(payload, seckey)
authorization_token = f'Bearer {jwt_token}'
headers = {
'Authorization': authorization_token,
'Content-Type': 'application/json'
}
response = requests.post(f'{apiUrl}/v1/orders', data=json.dumps(requestBody), headers=headers)
time.sleep(7)
order_response2 = response.json()['uuid']
order_result2 = order_check(coin,order_response2)
if order_result2 != 'fail':
print('지정가 매도에 실패해 매도 주문을 취소합니다.')
order_cancel(order_result2)
else:
print('매도 체결이 완료되었습니다.')
check_having_coin=2
except Exception as e:
print(f"[거래 오류] {action} 실행 중 오류 발생: {e}")
first_my_krw = float(see_my_bag()[1]['balance']) # 거래시작전 최초 현금 보유량
data_list = []
print('--------------------------------------------------------------------------')
print('다음 중 실행할 프로그램을 선택 해주세요.')
print('')
print('1.현재 자산 조회')
print('2.코인 데이터 분석')
print('3.실제 매수/매도 진행')
print('')
select = int(input('실행할 프로그램의 숫자를 눌러주세요 : '))
print('--------------------------------------------------------------------------')
if (select < 1 or select>3):
print('입력오류')
elif (select == 1) :
print('')
print('다음은 현재 자산 정보 입니다.')
print('')
krw_in_my_bag = float(see_my_bag()[1]['balance'])
if len(see_my_bag())==2:
print(f'현재 {krw_in_my_bag}원 보유하고 있으며 보유중인 코인은 없습니다.')
print('')
else :
coin_in_my_bag = see_my_bag()[2]['currency']
coin_in_my_bag_locked = see_my_bag()[2]['locked']
coin_in_my_bag_amount = float(see_my_bag()[2]['balance'])
coin_in_my_bag_avg_buy_price = float(see_my_bag()[2]['avg_buy_price'])
status_coin_price = view_status_coin_price(coin_in_my_bag)
status_profit = (coin_in_my_bag_avg_buy_price - status_coin_price)*coin_in_my_bag_amount
print(f'현재 보유하고 있는 원화는 {krw_in_my_bag}원이고')
print(f'보유하고 있는 코인은 {coin_in_my_bag_amount} {coin_in_my_bag} 이며')
print(f'평균 구매 가격은 {coin_in_my_bag_avg_buy_price}원이며')
print(f'현재 코인의 가격은 {status_coin_price}원이고')
print(f'현재 수익은 {status_profit}원 입니다.')
print('')
print(f'총 자산은 {krw_in_my_bag + coin_in_my_bag_amount * status_coin_price + coin_in_my_bag_locked * status_coin_price}원 입니다.')
print('')
elif select == 2:
print('')
coin = input('데이터를 받아올 코인의 심볼을 입력해주세요.(예시-BTC) : ')
while True:
try:
data, price = collect_data(coin)
df = pd.DataFrame(data)
df.to_csv(csv_filename, mode='a', header=False, index=False)
data_list.append(price)
if len(data_list) > 30:
print("Training model...")
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(np.array(data_list).reshape(-1, 1))
X = []
y = []
for i in range(30, len(scaled_data)):
X.append(scaled_data[i-30:i, 0])
y.append(scaled_data[i, 0])
X = np.array(X)
y = np.array(y)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
model = Sequential()
model.add(LSTM(units=30, return_sequences=True, input_shape=(X.shape[1], 1)))
model.add(LSTM(units=30))
model.add(Dense(units=10))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X, y, epochs=12, batch_size=3, verbose=0)
predicted_price = model.predict(X[-1].reshape(1, X.shape[1], 1))
predicted_price = scaler.inverse_transform(predicted_price)[0, 0]
predicted_min_price = np.min(predicted_price)
predicted_max_price = np.max(predicted_price)
expected_change_rate = ((predicted_price - price) / price) * 100
print('----------------------------------------------')
print(f"현재 코인의 가격 : {price}")
print('')
print(f'다음 예측된 코인의 최소가격 : {predicted_min_price}')
print(f'다음 예측된 코인의 최대가격 : {predicted_max_price}')
print('')
print(f"다음 예측된 코인의 평균가격 : {predicted_price}")
print('')
print(f"예상 등락률: {expected_change_rate:.3f}%")
print('')
time.sleep(3)
except Exception as e:
print(f"[오류 발생]: {e}")
elif select == 3:
print('')
coin = input('거래를 진행할 코인의 심볼을 입력해주세요.(예시 - BTC) : ')
input_price = float(input('거래를 진행할 금액을 입력해주세요.(단위 : 원) : '))
print('')
while True:
try:
data, price = collect_data(coin)
df = pd.DataFrame(data)
df.to_csv(csv_filename, mode='a', header=False, index=False)
data_list.append(price)
if len(data_list) > 30:
print("Training model...")
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(np.array(data_list).reshape(-1, 1))
X = []
y = []
for i in range(30, len(scaled_data)):
X.append(scaled_data[i-30:i, 0])
y.append(scaled_data[i, 0])
X = np.array(X)
y = np.array(y)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
model = Sequential()
model.add(LSTM(units=30, return_sequences=True, input_shape=(X.shape[1], 1)))
model.add(LSTM(units=30))
model.add(Dense(units=10))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X, y, epochs=12, batch_size=3, verbose=0)
predicted_price = model.predict(X[-1].reshape(1, X.shape[1], 1))
predicted_price = scaler.inverse_transform(predicted_price)[0, 0]
expected_change_rate = ((predicted_price - price) / price) * 100
print('----------------------------------------------')
print(f"현재 코인 가격 : {price:.8f}원")
print('')
print(f"다음 예측된 코인의 평균가격: {predicted_price:.8f}원")
print(f"예상 등락률: {expected_change_rate:.3f}%")
print('')
my_krw = float(see_my_bag()[1]['balance'])
if len(see_my_bag())>2:
purchase_price = float(see_my_bag()[2]['balance']) * float(see_my_bag()[2]['avg_buy_price'])
my_coin = float(see_my_bag()[2]['balance'])
my_coin_buy_price = float(see_my_bag()[2]['avg_buy_price'])
print(f'보유중인 원화 : {my_krw}원')
print(f'보유중인 코인 : {my_coin} {coin}')
print(f'평균 매수가 : {my_coin_buy_price}원')
print(f'보유중인 총 자산 : {my_krw + price * my_coin:.3f}원')
status_my_krw = my_krw+price*my_coin
print(f'현재수익률 : {(status_my_krw - first_my_krw)/first_my_krw * 100:.3f}%')
current_price_rate = (status_my_krw - first_my_krw)/first_my_krw * 100
if current_price_rate > 7 :
execute_coin_buy_sell_market("sell",coin,input_price,price)
print('보유 총 자산 대비 수익률 7% 달성 완료! 거래를 종료합니다.')
break
elif current_price_rate < -5:
execute_coin_buy_sell_market('sell',coin,input_price,price)
print('보유 총 자산 대비 손실률이 5%를 넘었습니다.. 거래를 종료합니다.')
break
else :
print(f'보유중인 총 자산 : {my_krw}원')
print(f'현재수익률 : {(my_krw- first_my_krw)/first_my_krw * 100}%')
if expected_change_rate > 0.3 and expected_change_rate<2.5:
buy_price_1st = float(data['Buy Orders'][0].split(', ')[0].split(' ')[0])
execute_coin_buy_sell_limit("buy",coin, input_price,buy_price_1st)
elif expected_change_rate < -0.3 and expected_change_rate>-2.5:
sell_price_1st = float(data['Sell Orders'][0].split(', ')[0].split(' ')[0])
execute_coin_buy_sell_limit("sell",coin, input_price,sell_price_1st)
elif expected_change_rate>=2.5:
execute_coin_buy_sell_market('buy',coin,input_price)
elif expected_change_rate<=-2.5:
execute_coin_buy_sell_market('sell',coin,input_price)
print('----------------------------------------------')
time.sleep(3)
except Exception as e:
print(f"[오류 발생]: {e}")
728x90
'혼자 만든 Code' 카테고리의 다른 글
bithumb 특정 코인의 오늘 종가,최고가, 최저가 예측 프로그램 (0) | 2024.12.01 |
---|---|
Bithumb 실제 자동 코인 거래 프로그램 (시장가 매수,매도) (0) | 2024.11.22 |
코인가격 예측 프로그램(60초 간격 예측), 모의 투자 기능 (0) | 2024.11.20 |
코인 가격 예측 프로그램(10초마다 예측) (1) | 2024.11.20 |
[Python]명함 관리 프로그램 (0) | 2023.10.16 |