Notice
Recent Posts
Recent Comments
250x250
Creative Code
Lunar-(15)전체코드 중간정리2(함수 부분) 본문
728x90
※state/auth.py 전체 코드
"""The authentication state."""
import reflex as rx
from sqlmodel import select,and_,or_
from .base import State, User
import re
class AuthState(State):
"""The authentication state for sign up and login page."""
# 비밀번호 찾기 화면에서 유저가 입력한 아이디를 저장할 변수
user_find_password_id:str
# 비밀번호 찾기 화면에서 유저가 입력한 이메일을 저장할 변수
user_find_password_email_address:str
# 비밀번호 찾기 화면에서 유저가 입력한 생일을 저장할 변수1
user_find_password_year:str
# 비밀번호 찾기 화면에서 유저가 입력한 생일을 저장할 변수2
user_find_password_month:str
# 비밀번호 찾기 화면에서 유저가 입력한 생일을 저장할 변수3
user_find_password_day:str
# 아이디 찾기 화면에서 유저가 입력한 이름을 저장할 변수
user_find_id_name:str
# 아이디 찾기 화면에서 유저가 입력한 이메일을 저장할 변수
user_find_id_email:str
# 아이디 찾기 화면에서 유저가 입력한 생일을 저장할 변수1
user_find_id_year:str
# 아이디 찾기 화면에서 유저가 입력한 생일을 저장할 변수2
user_find_id_month:str
# 아이디 찾기 화면에서 유저가 입력한 생일을 저장할 변수3
user_find_id_day:str
# 태어난 연도를 선택하기 위한 리스트
year : list[str] = ['1960','1961','1962','1963','1964','1965','1966','1967','1968','1969','1970',
'1971','1972','1973','1974','1975','1976','1977','1978','1979','1980',
'1981','1982','1983','1984','1985','1986','1987','1988','1989','1990',
'1991','1992','1993','1994','1995','1996','1997','1998','1999','2000',
'2001','2002','2003','2004','2005','2006','2007','2008','2009','2010',
'2011','2012','2013','2014','2015','2016','2017','2018','2019','2020',
'2021','2022','2023']
# 태어난 달을 선택하기 위한 리스트
month : list[str] = ['1','2','3','4','5','6','7','8','9','10','11','12']
# 태어난 일을 선택하기 위한 리스트
day : list[str] = ['1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20',
'21','22','23','24','25','26','27','28','29','30','31']
# 유저의 실제 이름을 저장하는 변수
user_realname : str
# 유저의 아이디를 저장하는 변수
username: str
# 유저의 비밀번호를 저장하는 변수
password: str
# 유저의 태어난 연도를 저장하는 변수
user_birthday_year : str
# 유저의 태어난 달을 저장하는 변수
user_birthday_month : str
# 유저의 태어난 일을 저장하는 변수
user_birthday_day : str
# 비밀번호 확인값을 저장하는 변수
confirm_password: str
# 유저의 이메일 주소를 저장하는 변수
user_email_address:str
# 유저가 입력한 정보가 유효한 값인지 체크하는 변수
user_password_valid:bool=False
user_realname_valid:bool=False
user_confirm_password_valid:bool=False
user_email_address_valid:bool=False
user_birthday_year_valid:bool = False
user_birthday_month_valid:bool = False
user_birthday_day_valid:bool = False
# 설정한 회원가입정보 입력값이 유효한지 실시간으로 확인하는 함수
@rx.var
def time_valid_user_password(self)->bool:
return self.user_password_valid
@rx.var
def time_valid_user_realname(self)->bool:
return self.user_realname_valid
@rx.var
def time_valid_confirm_password(self)->bool:
if (self.confirm_password != self.password) or self.confirm_password=='':
self.user_confirm_password_valid=False
else:
self.user_confirm_password_valid=True
return (self.confirm_password != self.password) or self.confirm_password==''
@rx.var
def time_valid_email_address(self)->bool:
if '@' not in self.user_email_address:
self.user_email_address_valid = False
else :
self.user_email_address_valid = True
return '@' not in self.user_email_address
@rx.var
def time_valid_user_birthday_year(self)->bool:
if self.user_birthday_year == '':
self.user_birthday_year_valid = False
else :
self.user_birthday_year_valid = True
return self.user_birthday_year==''
@rx.var
def time_valid_user_birthday_month(self)->bool:
if self.user_birthday_month == '':
self.user_birthday_month_valid = False
else :
self.user_birthday_month_valid = True
return self.user_birthday_month==''
@rx.var
def time_valid_user_birthday_day(self)->bool:
if self.user_birthday_day == '':
self.user_birthday_day_valid = False
else :
self.user_birthday_day_valid = True
return self.user_birthday_day==''
# 유저의 실제 이름 입력값이 유효한지 실시간으로 확인하는 함수
@rx.var
def time_valid_username(self)->bool:
if len(self.user_realname) >=2 and len(self.user_realname)<=20:
self.user_realname_valid=True
else :
self.user_realname_valid=False
return len(self.user_realname)>20 or len(self.user_realname)<2
# 유저가 입력한 비밀번호가 유효한지 실시간으로 확인하는 함수
@rx.var
def time_valid_password(self)->bool:
pattern = re.compile(r'^[a-zA-Z0-9]{8,16}$')
if bool(pattern.match(self.password)) == True:
self.user_password_valid=True
else:
self.user_password_valid=False
return not bool(pattern.match(self.password))
# 설정한 아이디가 유효한 값인지 확인하는 함수
def is_valid_string(self,text):
pattern = re.compile("^[a-z][a-z0-9]*$")
return bool(pattern.match(text))
# 아이디 중복체크를 하는 함수
def id_check(self):
if len(self.username) < 6 :
return rx.window_alert('The Nickname must contain at least 6 characters.')
if self.username.islower() == False:
return rx.window_alert('The Nickname must be composed of lowercase letters or a combination of lowercase letters and numbers.')
if self.username.isalnum() == False:
return rx.window_alert('Special characters and spaces cannot be included.')
if self.is_valid_string(self.username) == False:
return rx.window_alert('Characters other than alphabets and numbers cannot be entered.')
self.user_id_valid=False
with rx.session() as session:
if session.exec(select(User).where(User.username == self.username)).first():
return rx.window_alert('User nickname already exists.')
else :
return rx.window_alert('Username is available.')
# 회원가입을 하는 버튼
def signup(self):
# 사용자가 입력한 모든 정보가 유효한 값일때때
if (self.user_password_valid == True) and (self.user_confirm_password_valid == True) and (self.user_realname_valid==True) and (self.user_email_address_valid == True) and (self.user_birthday_year_valid == True) and (self.user_birthday_month_valid==True) and (self.user_birthday_day_valid == True):
year = int(self.user_birthday_year)
month = int(self.user_birthday_month)
day = int(self.user_birthday_day)
if year%4==0 and month ==2 and day >=29:
return rx.window_alert('invalid birthday')
if month in [2,4,6,9,11] and day ==31:
return rx.window_alert('invalid birthday')
with rx.session() as session:
# 아이디 중복체크를 했는지 판별
if session.exec(select(User).where(User.username == self.username)).first():
return rx.window_alert('The ID that already exists. Please check ID duplicates first.')
# 서버에 데이터 저장
self.user = User(
username=self.username,
password=self.password,
user_realname=self.user_realname,
user_email = self.user_email_address,
user_birthday_year = self.user_birthday_year,
user_birthday_month = self.user_birthday_month,
user_birthday_day= self.user_birthday_day
)
session.add(self.user)
session.expire_on_commit = False
session.commit()
return rx.redirect("/")
else :
return rx.window_alert('Please enter the information accurately')
# 유저 아이디를 찾는 함수
def find_user_id(self):
with rx.session() as session:
user_query = select(User).where(and_(User.user_realname == self.user_find_id_name, User.user_email == self.user_find_id_email
,User.user_birthday_year == self.user_find_id_year,User.user_birthday_month == self.user_find_id_month,
User.user_birthday_day == self.user_find_id_day))
found_user = session.exec(user_query).one_or_none()
if found_user:
return rx.window_alert(f'your id : {found_user.username}')
else :
return rx.window_alert('There is no matching user information.')
# 유저 비밀번호를 찾는 함수
def find_user_password(self):
with rx.session() as session:
user_query = select(User).where(and_(User.username == self.user_find_password_id, User.user_email == self.user_find_password_email_address
,User.user_birthday_year == self.user_find_password_year,User.user_birthday_month == self.user_find_password_month,
User.user_birthday_day == self.user_find_password_day))
found_user = session.exec(user_query).one_or_none()
if found_user:
password_list = list(found_user.password)
for i in range(3, len(password_list)):
password_list[i] = '*'
password_hint = "".join(password_list)
return rx.window_alert(f'Password Hint: {password_hint}')
else:
return rx.window_alert('There is no matching user information.')
def login(self):
"""Log in a user."""
with rx.session() as session:
user = session.exec(
select(User).where(User.username == self.username)
).first()
if user and user.password == self.password:
self.user = user
return rx.redirect("/")
else:
return rx.window_alert("Invalid username or password.")
※state/base.py 전체 코드
"""Base state for Twitter example. Schema is inspired by https://drawsql.app/templates/twitter."""
from typing import Optional,List,Union
from sqlmodel import Field
import reflex as rx
class Music_Playlist(rx.Model, table = True):
user_id : str = Field()
music_title : str = Field()
music_album : str = Field()
music_artist : str = Field()
class Video_Playlist(rx.Model,table = True):
user_id : str = Field()
video_url : str = Field()
video_title : str = Field()
class Hotplace(rx.Model,table=True):
search_at:str = Field()
search_place:str=Field()
class Follows(rx.Model, table=True):
"""A table of Follows. This is a many-to-many join table.
See https://sqlmodel.tiangolo.com/tutorial/many-to-many/ for more information.
"""
followed_username: str = Field(primary_key=True)
follower_username: str = Field(primary_key=True)
class User(rx.Model, table=True):
"""A table of Users."""
user_realname : str = Field()
user_email : str = Field()
user_birthday_year : str = Field()
user_birthday_month : str = Field()
user_birthday_day : str = Field()
username: str = Field()
password: str = Field()
class Crater(rx.Model, table=True):
"""A table of Tweets."""
content: str = Field()
created_at: str = Field()
author: str = Field()
image_content: str = Field()
class State(rx.State):
"""The base state for the app."""
user: Optional[User] = None
def logout(self):
"""Log out a user."""
self.reset()
return rx.redirect("/")
def check_login(self):
"""Check if a user is logged in."""
if not self.logged_in:
return rx.redirect("/login")
@rx.var
def logged_in(self):
"""Check if a user is logged in."""
return self.user is not None
※state/home.py 전체 코드
"""The state for the home page."""
from datetime import datetime,timedelta
import tkinter as tk
import reflex as rx
from sqlmodel import select
import os
from .base import Follows, State, Crater, User,Hotplace,Video_Playlist,Music_Playlist
from tkinter import filedialog
import folium
from folium.plugins import MiniMap
import requests
import pandas as pd
import numpy as np
import json
import asyncio
from sqlalchemy.sql import func,desc
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
import datetime
from datetime import datetime
from melon import *
import time
from PIL import Image
from bs4 import BeautifulSoup as bs
class HomeState(State):
"""The state for the home page."""
# 데이터 베이스 저장된 crater 불러오기
crater: str
craters: list[Crater] = []
# 친구,crater 검색
friend: str
search: str
# 파일 선택 변수
img: list[str]
files: list[str] = []
imgshow:bool=False
# map 키워드 검색
map_count:int=1
map_search_input:str=''
map_html:str='/map.html'
map_iframe:str
df : pd.DataFrame
start_location_x:str
start_location_y:str
end_location_x:str
end_location_y:str
KAKAO_REST_API_KEY:str
taxi_fee:str
toll_fee:str
distance:str
path_time:str
map_hotplaces : list[Hotplace] = []
# video 검색
search_video:str
youtube_results: list[str]=[]
saved_video_results : list[Video_Playlist]
show : bool = False
popup_video_url:str
popup_video_title:str
# music 검색
search_singer:str
search_singer_result:list[dict]
music_chart_info : list[dict]
saved_music_results : list[Music_Playlist]
#재난문자 저장
weather_message_result : list[dict]
#날씨 웹 크롤링
weather_search_show : bool = False
weather_search:str
status_climate:str
status_climate_icon_url:str
status_temperature:str
status_feel_temperature:str
today_min_temperature:str
today_max_temperature:str
status_humidity:str
status_atmospheric_pressure:str
status_wind_direction:str
status_wind_speed:str
image:str
@rx.var
def time_map_iframe(self)->str:
self.map_iframe=f'<iframe src="{self.map_html}" width="100%" height="500px"></iframe>'
return self.map_iframe
def handle_file_selection(self):
root = tk.Tk()
root.withdraw() # 화면에 창을 보이지 않도록 함
root.attributes('-topmost', True)
file_paths = filedialog.askopenfilenames(master=root)
# 선택된 파일 경로에 대한 처리
for file_path in file_paths:
file_name = os.path.basename(file_path) # 파일 이름과 확장자를 추출
file_extension = os.path.splitext(file_name)[1]
upload_data = open(file_path, "rb").read() # 선택한 파일을 저장
outfile = f".web/public/{file_name}"
# Save the file.
with open(outfile, "wb") as file_object:
file_object.write(upload_data)
# Update the img var.
self.img.append(file_name)
# Set the files attribute
self.files.append(file_name)
if len(self.img)>0:
self.change()
# 파일 업로드 함수
async def handle_upload(
self, files: list[rx.UploadFile]
):
for file in files:
upload_data = await file.read()
outfile = f"/{file.filename}"
# Save the file.
with open(outfile, "wb") as file_object:
file_object.write(upload_data)
# Update the img var.
self.img.append(file.filename)
# 파일선택창 화면띄우는 함수
def change(self):
self.imgshow = not (self.imgshow)
# Crater 파일 선택 취소 함수
async def file_select_cancel(self):
self.img=[]
self.files=[]
if len(self.img)>0:
self.change()
# Crater 업로드 함수
async def post_crater(self):
if not self.logged_in:
return rx.window_alert("Please log in to post a crater.") # 로그인이 되어있지 않을 시 경고 메시지
if len(self.crater)==0:
return rx.window_alert('Please write at least one character!') # story 추가시 최소 한 글자 입력 경고 메시지
if len(self.crater)>70:
return rx.window_alert('Please enter within 70 characters!') # 150글자 이내로 입력제한
await self.handle_upload(rx.upload_files()) # 이미지 추가
with rx.session() as session: # session에 생성한 story 모델 저장
crater = Crater(
author=self.user.username, # author : 유저 아이디
content=self.crater, # content : 유저 story 입력 내용
created_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # created_at : stroy 작성 시간
image_content=", ".join(self.files), # image_content : 이미지 파일
heart_list='',
comment_list='',
heart_num=0,
comment_num=0,
)
session.add(crater)
session.commit()
self.crater = "" # session에 저장 후 story내용 초기화
self.img=[]
self.files=[]
return self.get_craters()
# Crater 내용 불러오는 함수
def get_craters(self):
"""Get craters from the database."""
with rx.session() as session:
if self.search: # story 검색 입력어가 있을경우
self.craters = (
session.query(Crater)
.filter(Crater.content.contains(self.search)) # story 검색 입력단어가 포함된 story를 모두 가져옴
.all()[::-1]
)
else:
self.craters = session.query(Crater).all()[::-1] # session에 저장된 모든 story를 가져옴
def set_search(self, search):
"""Set the search query."""
self.search = search
return self.get_craters()
def follow_user(self, username):
"""Follow a user."""
with rx.session() as session:
friend = Follows(
follower_username=self.user.username, followed_username=username
)
session.add(friend)
session.commit()
@rx.var
def following(self) -> list[Follows]:
"""Get a list of users the current user is following."""
if self.logged_in:
with rx.session() as session:
return session.exec(
select(Follows).where(
Follows.follower_username == self.user.username
)
).all()
return []
@rx.var
def followers(self) -> list[Follows]:
"""Get a list of users following the current user."""
if self.logged_in:
with rx.session() as session:
return session.exec(
select(Follows).where(
Follows.followed_username == self.user.username
)
).all()
return []
@rx.var
def search_users(self) -> list[User]:
"""Get a list of users matching the search query."""
if self.friend != "":
with rx.session() as session:
current_username = self.user.username if self.user is not None else ""
users = session.exec(
select(User).where(
User.username.contains(self.friend),
User.username != current_username,
)
).all()
return users
return []
# KaKao Rest API Key를 받아오는 함수
def kakao_api(self):
key=''
with open('kakaoapikey.json','r')as f:
key = json.load(f)
self.KAKAO_REST_API_KEY = key['key']
# kakao api 검색으로 장소 목록을 받는 함수
def elec_location(self,region,page_num):
self.kakao_api()
url = 'https://dapi.kakao.com/v2/local/search/keyword.json'
params = {'query': region,'page': page_num}
headers = {"Authorization": f'KakaoAK {self.KAKAO_REST_API_KEY}'}
places = requests.get(url, params=params, headers=headers).json()['documents']
return places
# 장소목록의 정보를 가져오는 함수
def elec_info(self,places):
X = [] # 경도
Y = [] # 위도
stores = []
road_address = []
place_url = []
ID = []
for place in places:
X.append(float(place['x']))
Y.append(float(place['y']))
stores.append(place['place_name'])
road_address.append(place['road_address_name'])
place_url.append(place['place_url'])
ID.append(place['id'])
ar = np.array([ID,stores, X, Y, road_address,place_url]).T
df = pd.DataFrame(ar, columns = ['ID','stores', 'X', 'Y','road_address','place_url'])
return df
#사용자가 입력한 키워드로 정보를 받아와 데이터 프레임 생성
def keywords(self):
df = None
for loca in self.locations:
for page in range(1,4):
local_name = self.elec_location(loca, page)
local_elec_info = self.elec_info(local_name)
if df is None:
df = local_elec_info
elif local_elec_info is None:
continue
else:
df = pd.concat([df, local_elec_info],join='outer', ignore_index = True)
return df
# 데이터 프레임을 기준으로 지도를 생성하는 함수
def make_map(self,dfs):
m = folium.Map(location=[37.5518911,126.9917937],
zoom_start=7)
minimap = MiniMap()
m.add_child(minimap)
for i in range(len(dfs)):
folium.Marker([dfs['Y'][i],dfs['X'][i]],
tooltip=dfs['stores'][i],
popup=dfs['place_url'][i],
).add_to(m)
return m
# 지도 기본설정
def standard_map(self):
m = folium.Map(location=[37.5518911,126.9917931],zoom_start=12)
m.save('assets/map.html')
# 키워드로 지도검색하는 함수
async def map_search(self):
if self.map_search_input == "":
return rx.window_alert('Please enter your search term!')
self.map_count+=1
self.locations = self.map_search_input.split(',')
self.df = self.keywords()
m = self.make_map(self.df)
if self.map_html == '/map2.html':
m.save('assets/map3.html')
self.map_html = '/map3.html'
else :
m.save('assets/map2.html')
self.map_html = '/map2.html'
await asyncio.sleep(1)
self.map_iframe = self.time_map_iframe
self.df = self.df.drop_duplicates(['ID'])
self.df['place url'] = self.df['place_url']
self.df = self.df.drop('place_url', axis=1)
self.df = self.df.reset_index()
with rx.session() as session:
hotplace = Hotplace(
search_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
search_place = self.map_search_input,
)
session.add(hotplace)
session.commit()
# 지도 초기화 함수
def map_clear(self):
self.map_html = '/map.html'
# kakaoapi 길찾기함수
async def get_directions(self):
self.kakao_api()
api_url = "https://apis-navi.kakaomobility.com/v1/directions"
origin = self.start_location_x+','+self.start_location_y
destination = self.end_location_x+','+self.end_location_y
headers = {
"Authorization": f"KakaoAK {self.KAKAO_REST_API_KEY}"
}
params = {
"origin": origin,
"destination": destination,
}
response = requests.get(api_url, headers=headers, params=params)
result = response.json()
m = folium.Map(location=[37.5518911,126.9917931],zoom_start=7)
location_list =[]
for i in range(0,len(result['routes'][0]['sections'][0]['guides'])):
location_list.append([result['routes'][0]['sections'][0]['guides'][i]['y'],result['routes'][0]['sections'][0]['guides'][i]['x']])
folium.Marker([result['routes'][0]['sections'][0]['guides'][i]['y'],result['routes'][0]['sections'][0]['guides'][i]['x']],
tooltip=f"{result['routes'][0]['sections'][0]['guides'][i]['name']}",
popup=f"{result['routes'][0]['sections'][0]['guides'][i]['name']}",
icon = folium.Icon(color='orange',icon='info-sign'),
).add_to(m)
folium.PolyLine(locations=location_list,
color = 'black',
).add_to(m)
if self.map_html == '/map2.html':
m.save('assets/map3.html')
self.map_html = '/map3.html'
else :
m.save('assets/map2.html')
self.map_html = '/map2.html'
await asyncio.sleep(1)
self.map_iframe = self.time_map_iframe
self.taxi_fee = f"택시비용 : {result['routes'][0]['summary']['fare']['taxi']}원"
self.toll_fee = f"톨게이트비용 : {result['routes'][0]['summary']['fare']['toll']}원"
distance = result['routes'][0]['summary']['distance']
self.distance = f'총 이동거리 : {float(distance)/float(1000)}km'
path_time = result['routes'][0]['summary']['duration']
path_time_h = path_time//3600
path_time_m = (path_time%3600)//60
path_time_s = path_time%60
self.path_time = f'소요시간 : {path_time_h}시간 {path_time_m}분 {path_time_s}초'
# 최근 하루 동안 핫플레이스 검색하는 함수
def hotplaces(self):
twenty_four_hours_ago = datetime.utcnow() - timedelta(hours=24)
twenty_four_hours_ago_without_microseconds = twenty_four_hours_ago.replace(microsecond=0)
with rx.session() as session:
self.map_hotplaces = (
session.query(Hotplace)
.filter(
func.datetime(Hotplace.search_at) >= twenty_four_hours_ago_without_microseconds.strftime("%Y-%m-%d %H:%M:%S")
)
.order_by(
desc(func.count(Hotplace.search_place)),
desc(func.datetime(Hotplace.search_at))
)
.group_by(Hotplace.search_place)
.all()
)
# youtube링크를 가져오는 함수
def get_youtube_links(self):
search_word = self.search_video
url = f'https://www.youtube.com/results?search_query={search_word}'
service = Service(ChromeDriverManager().install())
options = webdriver.ChromeOptions()
options.add_argument('headless')
driver = webdriver.Chrome(service=service,options=options)
driver.get(url)
driver.execute_script("window.scrollTo(0,document.documentElement.scrollHeight);")
titles = driver.find_elements(By.CSS_SELECTOR,"#dismissible.style-scope.ytd-video-renderer")
self.youtube_results = []
for title in titles:
result = ''
main_title = title.find_element(By.CSS_SELECTOR,"#video-title").get_property("title")
tube_url = title.find_element(By.CSS_SELECTOR,"#video-title").get_property("href")
result += main_title
result += ','
result += tube_url
self.youtube_results.append(result)
# 나중에 볼 동영상에 추가
def add_video_playlist(self, video_url,video_title):
if not self.logged_in:
return rx.window_alert("Please log in to save video.")
with rx.session() as session:
existing_playlist = session.query(Video_Playlist).filter_by(
user_id=self.user.username,
video_url=video_url,
video_title=video_title
).first()
if existing_playlist:
# Playlist with the same URL and title already exists
return rx.window_alert("This video is already in your playlist.")
new_playlist = Video_Playlist(
user_id = self.user.username,
video_url = video_url,
video_title = video_title,
)
session.add(new_playlist)
session.commit()
return self.get_saved_video()
# 나중에 볼 영상에서 제거
def remove_video_playlist(self, video_url):
if not self.logged_in:
return rx.window_alert("Please log in to remove video.")
with rx.session() as session:
video_playlist_entry = session.query(Video_Playlist).filter_by(user_id=self.user.username, video_url=video_url).first()
if video_playlist_entry:
# 찾은 동영상을 재생목록에서 제거하고 커밋
session.delete(video_playlist_entry)
session.commit()
return self.get_saved_video()
# 동영상 저장 리스트를 불러오는 함수
def get_saved_video(self):
if not self.logged_in:
return rx.window_alert("Please log in to see saved video.")
with rx.session() as session:
self.saved_video_results = session.query(Video_Playlist).filter(Video_Playlist.user_id == self.user.username).all()[::-1]
# 동영상 팝업을 띄우는 함수
def popup_video(self,video_url,video_title):
self.show = not (self.show)
self.popup_video_url = video_url
self.popup_video_title = video_title
# 실시간 멜론차트 Top100을 불러오는 함수
def music_chart(self):
self.music_chart_info = []
chart_data = ChartData(imageSize=500)
chart_entries = chart_data.entries
for entry in chart_entries:
entry_dict = {
'artist' : entry.artist,
'title': entry.title,
'lastPos':entry.lastPos,
'rank': entry.rank,
'isNew' : entry.isNew,
}
self.music_chart_info.append(entry_dict)
# 멜론 가수 크롤링
def melon_singer_crawling(self):
url = "https://www.melon.com/search/keyword/index.json"
params = {
'jscallback' : "jQuery19105357803934720518_1603168193882",
'query' : self.search_singer
}
headers = {
'Referer' : "http://www.melon.com/index.htm",
"User-Agent" : ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36\
(KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36")
}
response = requests.get(url, headers = headers, params = params)
response = response.text
json_string = response.replace(params['jscallback'] + '(', '').replace(');', '')
result_dict = json.loads(json_string)
self.search_singer_result = []
for song in result_dict['SONGCONTENTS']:
data = {'ARTIST NAME' : song['ARTISTNAME'], 'ALBUM NAME' : song['ALBUMNAME'],'SONG NAME' : song['SONGNAME']}
self.search_singer_result.append(data)
# 음악 플레이리스트 추가
def add_music_playlist(self, music_title,music_album,music_artist):
if not self.logged_in:
return rx.window_alert("Please log in to save video.")
with rx.session() as session:
existing_playlist = session.query(Music_Playlist).filter_by(
user_id=self.user.username,
music_title = music_title,
music_album = music_album,
music_artist = music_artist,
).first()
if existing_playlist:
# Playlist with the same URL and title already exists
return rx.window_alert("This Music is already in your playlist.")
new_playlist = Music_Playlist(
user_id = self.user.username,
music_title = music_title,
music_album = music_album,
music_artist = music_artist,
)
session.add(new_playlist)
session.commit()
return self.get_saved_music()
# 음악 플레이리스트를 불러오는 함수
def get_saved_music(self):
if not self.logged_in:
return rx.window_alert("Please log in to see saved video.")
with rx.session() as session:
self.saved_music_results = session.query(Music_Playlist).filter(Music_Playlist.user_id == self.user.username).all()[::-1]
# 음악 플레이리스트에서 제거
def remove_music_playlist(self, music_title, music_album, music_artist):
if not self.logged_in:
return rx.window_alert("Please log in to remove video.")
with rx.session() as session:
music_playlist_entry = session.query(Music_Playlist).filter_by(user_id=self.user.username, music_title = music_title, music_album = music_album, music_artist = music_artist).first()
if music_playlist_entry:
# 찾은 동영상을 재생목록에서 제거하고 커밋
session.delete(music_playlist_entry)
session.commit()
return self.get_saved_music()
# 재난문자 웹 크롤링
def climate_message(self):
url = "https://search.naver.com/search.naver?where=nexearch&sm=top_hty&fbm=0&ie=utf8&query=%EC%9E%AC%EB%82%9C%EB%AC%B8%EC%9E%90"
response = requests.get(url)
html = response.text
soup = bs(html, 'html.parser')
areas = soup.select(".area")
disaster_texts = soup.select(".disaster_text")
dates = soup.select(".date")
self.weather_message_result = []
# 결과 출력
for area, disaster_text, date in zip(areas, disaster_texts, dates):
data = {'area' : area.get_text(strip=True), 'date' : date.get_text(strip=True), 'text':disaster_text.get_text(strip = True) }
self.weather_message_result.append(data)
# 날씨 웹 크롤링
def climate_websearch(self):
key=''
with open('weatherapikey.json','r')as f:
key = json.load(f)
weather_api_key = key['key']
url = f"http://api.openweathermap.org/data/2.5/weather?q={self.weather_search}&appid={weather_api_key}&lang=kr&units=metric"
result = requests.get(url)
data = json.loads(result.text)
self.status_climate = data['weather'][0]['description']
self.status_temperature = data['main']['temp']
self.status_feel_temperature = data['main']['feels_like']
self.today_min_temperature = data['main']['temp_min']
self.today_max_temperature = data['main']['temp_max']
self.status_humidity = data['main']['humidity']
self.status_atmospheric_pressure = data['main']['pressure']
self.status_wind_direction = data['wind']['deg']
self.status_wind_speed = data['wind']['speed']
self.status_climate_icon_url= f"https://openweathermap.org/img/wn/{data['weather'][0]['icon']}@2x.png"
self.image = Image.open((requests.get(self.status_climate_icon_url, stream=True).raw))
self.weather_search_show=True
728x90
'Projects' 카테고리의 다른 글
Lunar-(17)ai-chat 날짜별 대화기록 조회 및 삭제 기능 (0) | 2024.01.26 |
---|---|
Lunar-(16)ai chat(구글 Gemini api 키 이용) (1) | 2024.01.21 |
Lunar-(14)전체코드 중간 정리1(프론트엔드 부분) (3) | 2024.01.08 |
Lunar-(13)날씨(실시간 재난문자, 전세계 지역 현재 날씨 검색, 기상관련 사이트 링크) (1) | 2024.01.08 |
Lunar-(12)멜론 차트 top100 가져오기, 가수 크롤링 검색결과 (1) | 2024.01.03 |