python
(개발일지) 주식 크롤링 프로그램, 23-11-28 갱신
CBH_0417
2023. 11. 28. 10:57
11-28일 수정사항
-기존의 단순 출력문 형태는 정보 전달의 불편함이 크게 있으므로 메세지박스 UI 형태로 수정함
-환율 크롤링 기능 추가(사이트:네이버)
-검색한 종목에 한하여 그래프 생성기능 추가
-DB를 활용한검색기록 조회 기능 추가
-DB를 활용한검색기록 삭제 기능 추가
-환율 데이터베이스 추가
import tkinter as tk
from tkinter import ttk
from tkinter import messagebox
#UI를 위한 모듈 설치
from datetime import datetime
#시간
from operator import itemgetter
import sqlite3
import requests
from bs4 import BeautifulSoup
#DB와 크롤링 데이터 저장을 위한 모듈 설치
import matplotlib.pyplot as plt
#변동가격 그래프 출력을 위한 모듈 설치
def create_connection(db_file): #DB파일 연결 채킹 함수
try:
conn = sqlite3.connect(db_file)
return conn
except sqlite3.Error as e:
print(e)
return None
def create_stock_table(conn): #주식 DB 테이블 생성 함수
try:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS stock_prices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
company_code TEXT,
company_name TEXT,
price REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
except sqlite3.Error as e:
print(e)
def insert_record_stock(conn, company_code, company_name, price, timestamp): #크롤링된 데이터를 DB에 삽입하는 함수
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO stock_prices (company_code, company_name, price,timestamp)
VALUES (?, ?, ?, ?)
""", (company_code, company_name, price,timestamp))
conn.commit()
except sqlite3.Error as e:
print(e)
def create_exchange_talbe(conn): #환율 DB 생성 함수
try:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS exchange_rate (
id INTEGER PRIMARY KEY AUTOINCREMENT,
country_name TEXT,
country_code TEXT,
exchange_rate REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
except sqlite3.Error as e:
print(e)
def insert_record_exchange(conn, contry_name, contry_code, exchange_rate,timestamp ): #크롤링된 환율을 DB에 삽입하는 함수
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO exchange_rate (country_name, country_code, exchange_rate,timestamp)
VALUES (?, ?, ?, ?)
""", (contry_name, contry_code, exchange_rate,timestamp ))
conn.commit()
except sqlite3.Error as e:
print(e)
def Fluctuating_graph(conn, company_name): #크롤링된 데이터를 기반으로 DB에서 데이터를 가져와 그래프를 그리는 함수
cursor = conn.cursor()
cursor.execute("SELECT timestamp, price FROM stock_prices WHERE company_name = ? ORDER BY timestamp",
(company_name,))
data = cursor.fetchall()
print("Fetched data:", data)
data = [(timestamp, price) for timestamp, price in data if price != 0]
if data:
timestamps, prices = zip(*data)
plt.figure(figsize=(15, 10))
plt.plot(timestamps, prices, marker='o', linestyle='-', color='b')
plt.xlabel('Timestamp')
plt.ylabel('Price (KRW)')
plt.title(f'stock price change graph for {company_name}')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
else:
print(f"데이터베이스에 '{company_name}' 회사의 가격 정보가 없습니다.")
def show_graph(conn, company_info): #별도로 그래프를 불러와 직관성을 높임
company_name, _ = company_info.split()
Fluctuating_graph(conn, company_name)
def get_bs_obj_stock(company_code): #주식 관련 정보 크롤링
url = "https://finance.naver.com/item/main.nhn?code=" + company_code
result = requests.get(url)
bs_obj = BeautifulSoup(result.content, "html.parser")
return bs_obj
def get_price(company_code): #주식 크롤링 데이터 HTML 코드 규격 맞추기
bs_obj = get_bs_obj_stock(company_code)
no_today = bs_obj.find("p", {"class": "no_today"})
blind_now = no_today.find("span", {"class": "blind"})
return blind_now.text
def get_exchange(country_code):#환율 크롤링 데이터 HTML 코드 규격 맞추기
url= "https://finance.naver.com/marketindex/"
result = requests.get(url)
ex_obj = BeautifulSoup(result.content, "html.parser")
country_code = country_code.lower()
exchage_now = ex_obj.select_one(f"a.head.{country_code}>div.head_info>span.value")
return exchage_now.text
def main():
db_file = "economic_indicators_searching.db"
conn = create_connection(db_file)
if conn is not None:
create_stock_table(conn)
create_exchange_talbe(conn)
root = tk.Tk()
root.title("주요 경제 종목 검색")
# 레이아웃을 조직하기 위해 프레임을 만들고 설정합니다
frame = tk.Frame(root)
frame.pack(padx=100, pady=100)
label = tk.Label(frame, text="회사명과 종목코드를 입력하세요 (띄어쓰기로 구분)")
label.grid(row=0, column=0, columnspan=2, pady=5)
entry = tk.Text(frame, width=30, height=3, font=("Helvetica", 15)) # 필요에 따라 width와 height, font 값을 조절하세요
entry.grid(row=1, column=0, columnspan=2, pady=5)
search_button = tk.Button(frame, text="검색", command=lambda: search_stock(conn, entry.get("1.0", tk.END)))
search_button.grid(row=2, column=0, pady=5, sticky="W")
graph_button = tk.Button(frame, text="그래프", command=lambda: show_graph(conn, entry.get("1.0", tk.END)))
graph_button.grid(row=2, column=1, pady=5, sticky="W")
show_history_button = tk.Button(frame, text="주식 종목검색 기록", command=lambda: show_search_history(conn))
show_history_button.grid(row=3, column=0, pady=5, sticky="W")
search_reset_button = tk.Button(frame, text="초기화", command=lambda: reset_database(conn))
search_reset_button.grid(row=3, column=1, pady=5, sticky="W")
exchange_button = tk.Button(frame, text="국가별 환율 조회", command=lambda:search_exchange(conn, entry.get("1.0", tk.END)))
exchange_button.grid(row=4, column=0, pady=5, sticky="W")
exit_button = tk.Button(frame, text="종료", command=root.destroy)
exit_button.grid(row=4, column=1, columnspan=1, pady=5, sticky="W")
root.mainloop()
conn.close()
def reset_database(conn): #검색기록 초기화 함수
answer = messagebox.askquestion("초기화", "검색 기록을 삭제하시겠습니까?")
if answer == "yes":
cursor = conn.cursor()
cursor.execute("DELETE FROM stock_prices")
conn.commit()
messagebox.showinfo("초기화 완료", "검색 기록이 삭제되었습니다.")
else:
messagebox.showinfo("초기화 취소", "초기화가 취소되었습니다.")
def show_search_history(conn): #검색기록 조회 함수
cursor = conn.cursor()
cursor.execute("SELECT company_name, company_code, price, timestamp FROM stock_prices ORDER BY timestamp")
data = cursor.fetchall()
if data:
data_sorted = sorted(data, key=itemgetter(3)) # Sort by timestamp
history_text = "검색기록\n\n"
for record in data_sorted:
history_text += f"{record[0]}, KRW: {record[2]}, 검색 시간: {record[3]}\n"
history_window = tk.Toplevel()
history_window.title("Search History")
history_label = tk.Label(history_window, text=history_text)
history_label.pack()
else:
messagebox.showinfo("검색 기록", "검색 기록이 없습니다.")
def search_stock(conn, entry_text): #주식 검색 함수
global root
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
try:
# 입력받은 텍스트를 회사명과 종목코드로 분리
company_name, company_code = entry_text.split()
# 입력된 종목코드가 숫자로 이루어진지 확인
if not company_code.isdigit():
messagebox.showerror("오류", "종목코드 입력이 잘못되었습니다.")
return
# 주식 가격을 가져오기
price = get_price(company_code)
# 가져온 가격이 비어있으면 종목코드가 유효하지 않음을 의미
if price == "":
messagebox.showerror("오류", "종목코드 입력이 잘못되었습니다.")
else:
# 유효한 경우 주식 정보를 보여주고 그래프 및 레코드 삽입
messagebox.showinfo("검색 종목 정보", f"회사: {company_name}\n코드: {company_code}\n가격: {price}\n검색시간: {timestamp}" )
insert_record_stock(conn, company_code, company_name, price, timestamp)
except ValueError:
messagebox.showerror("오류", "잘못된 입력입니다. 회사명과 코드를 모두 입력하세요.")
except Exception as e:
messagebox.showerror("오류", f"입력이 잘못되었습니다. 오류 메시지: {str(e)}")
def search_exchange(conn, entry_text): #환율 검색 함수
global root
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
try:
# 입력받은 텍스트를 국가명과 국가코드로 분리
country_name, coutry_code = entry_text.split()
# 입력된 종목코드가 문자로 이루어진지 확인
if country_name.isdigit():
messagebox.showerror("오류", "국가 코드 입력이 잘못되었습니다.")
return
# 환율 가져오기
exchange = get_exchange(coutry_code)
# 가져온 가격이 비어있으면 국가 코드가 유효하지 않음을 의미
if exchange == "":
messagebox.showerror("오류", "국가 코드 입력이 잘못되었습니다.")
else:
# 유효한 경우 입력한 국가의 환율 정보를 보여주고 그래프 및 레코드 삽입
messagebox.showinfo("검색 종목 정보", f"국가: {country_name} { coutry_code}\n환율: {exchange}\n검색시간: {timestamp}" )
insert_record_exchange(conn, country_name, coutry_code, exchange, timestamp)
except ValueError:
messagebox.showerror("오류", "잘못된 입력입니다. 국가명과 코드를 모두 입력하세요.")
except Exception as e:
messagebox.showerror("오류", f"입력이 잘못되었습니다. 오류 메시지: {str(e)}")
if __name__ == "__main__":
main()