python

(개발일지) 주식 크롤링 프로그램, 23-11-28 갱신

CBH_0417 2023. 11. 28. 10:57

11-28일 수정사항

 

-기존의 단순 출력문 형태는 정보 전달의 불편함이 크게 있으므로 메세지박스 UI 형태로 수정함

-환율 크롤링 기능 추가(사이트:네이버)

-검색한 종목에 한하여 그래프 생성기능 추가

-DB를 활용한검색기록 조회 기능 추가

-DB를 활용한검색기록 삭제 기능 추가

-환율 데이터베이스 추가

 

import tkinter as tk
from tkinter import ttk
from tkinter import messagebox
 #UI를 위한 모듈 설치
from datetime import datetime
#시간
from operator import itemgetter
import sqlite3
import requests
from bs4 import BeautifulSoup
#DB와 크롤링 데이터 저장을 위한 모듈 설치
import matplotlib.pyplot as plt
#변동가격 그래프 출력을 위한 모듈 설치

def create_connection(db_file): #DB파일 연결 채킹 함수
    try:
        conn = sqlite3.connect(db_file)
        return conn
    except sqlite3.Error as e:
        print(e)
    return None

def create_stock_table(conn): #주식 DB 테이블 생성 함수
    try:
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS stock_prices (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                company_code TEXT,
                company_name TEXT,
                price REAL,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.commit()
    except sqlite3.Error as e:
        print(e)

def insert_record_stock(conn, company_code, company_name, price, timestamp): #크롤링된 데이터를 DB에 삽입하는 함수
    try:
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO stock_prices (company_code, company_name, price,timestamp)
            VALUES (?, ?, ?, ?)
        """, (company_code, company_name, price,timestamp))
        conn.commit()
    except sqlite3.Error as e:
        print(e)

def create_exchange_talbe(conn): #환율 DB 생성 함수
    try:
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS exchange_rate (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                country_name TEXT,
                country_code TEXT,
                exchange_rate REAL,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.commit()
    except sqlite3.Error as e:
        print(e)
def insert_record_exchange(conn, contry_name, contry_code, exchange_rate,timestamp ): #크롤링된 환율을 DB에 삽입하는 함수
    try:
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO exchange_rate (country_name, country_code, exchange_rate,timestamp)
            VALUES (?, ?, ?, ?)
        """, (contry_name, contry_code, exchange_rate,timestamp ))
        conn.commit()
    except sqlite3.Error as e:
        print(e)

def Fluctuating_graph(conn, company_name): #크롤링된 데이터를 기반으로 DB에서 데이터를 가져와 그래프를 그리는 함수
    cursor = conn.cursor()
    cursor.execute("SELECT timestamp, price FROM stock_prices WHERE company_name = ? ORDER BY timestamp",
                   (company_name,))
    data = cursor.fetchall()

    print("Fetched data:", data)
    data = [(timestamp, price) for timestamp, price in data if price != 0]

    if data:
        timestamps, prices = zip(*data)
        plt.figure(figsize=(15, 10))
        plt.plot(timestamps, prices, marker='o', linestyle='-', color='b')
        plt.xlabel('Timestamp')
        plt.ylabel('Price (KRW)')
        plt.title(f'stock price change graph for {company_name}')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
    else:
        print(f"데이터베이스에 '{company_name}' 회사의 가격 정보가 없습니다.")

def show_graph(conn, company_info): #별도로 그래프를 불러와 직관성을 높임
    company_name, _ = company_info.split()
    Fluctuating_graph(conn, company_name)

def get_bs_obj_stock(company_code): #주식 관련 정보 크롤링
    url = "https://finance.naver.com/item/main.nhn?code=" + company_code
    result = requests.get(url)
    bs_obj = BeautifulSoup(result.content, "html.parser")
    return bs_obj

def get_price(company_code): #주식 크롤링 데이터 HTML 코드 규격 맞추기
    bs_obj = get_bs_obj_stock(company_code)
    no_today = bs_obj.find("p", {"class": "no_today"})
    blind_now = no_today.find("span", {"class": "blind"})
    return blind_now.text

def get_exchange(country_code):#환율 크롤링 데이터 HTML 코드 규격 맞추기
    url= "https://finance.naver.com/marketindex/"
    result = requests.get(url)
    ex_obj = BeautifulSoup(result.content, "html.parser")
    country_code = country_code.lower()
    exchage_now = ex_obj.select_one(f"a.head.{country_code}>div.head_info>span.value")
    return exchage_now.text

def main():
    db_file = "economic_indicators_searching.db"
    conn = create_connection(db_file)
    if conn is not None:
        create_stock_table(conn)
        create_exchange_talbe(conn)
        root = tk.Tk()
        root.title("주요 경제 종목 검색")

        # 레이아웃을 조직하기 위해 프레임을 만들고 설정합니다
        frame = tk.Frame(root)
        frame.pack(padx=100, pady=100)

        label = tk.Label(frame, text="회사명과 종목코드를 입력하세요 (띄어쓰기로 구분)")
        label.grid(row=0, column=0, columnspan=2, pady=5)

        entry = tk.Text(frame, width=30, height=3, font=("Helvetica", 15))  # 필요에 따라 width와 height, font 값을 조절하세요
        entry.grid(row=1, column=0, columnspan=2, pady=5)

        search_button = tk.Button(frame, text="검색", command=lambda: search_stock(conn, entry.get("1.0", tk.END)))
        search_button.grid(row=2, column=0, pady=5, sticky="W")

        graph_button = tk.Button(frame, text="그래프", command=lambda: show_graph(conn, entry.get("1.0", tk.END)))
        graph_button.grid(row=2, column=1, pady=5, sticky="W")

        show_history_button = tk.Button(frame, text="주식 종목검색 기록", command=lambda: show_search_history(conn))
        show_history_button.grid(row=3, column=0, pady=5, sticky="W")

        search_reset_button = tk.Button(frame, text="초기화", command=lambda: reset_database(conn))
        search_reset_button.grid(row=3, column=1, pady=5, sticky="W")

        exchange_button = tk.Button(frame, text="국가별 환율 조회", command=lambda:search_exchange(conn, entry.get("1.0", tk.END)))
        exchange_button.grid(row=4, column=0, pady=5, sticky="W")

        exit_button = tk.Button(frame, text="종료", command=root.destroy)
        exit_button.grid(row=4, column=1, columnspan=1, pady=5, sticky="W")

        root.mainloop()

        conn.close()


def reset_database(conn): #검색기록 초기화 함수
    answer = messagebox.askquestion("초기화", "검색 기록을 삭제하시겠습니까?")
    if answer == "yes":
        cursor = conn.cursor()
        cursor.execute("DELETE FROM stock_prices")
        conn.commit()
        messagebox.showinfo("초기화 완료", "검색 기록이 삭제되었습니다.")
    else:
        messagebox.showinfo("초기화 취소", "초기화가 취소되었습니다.")

def show_search_history(conn): #검색기록 조회 함수
    cursor = conn.cursor()
    cursor.execute("SELECT company_name, company_code, price, timestamp FROM stock_prices ORDER BY timestamp")
    data = cursor.fetchall()

    if data:
        data_sorted = sorted(data, key=itemgetter(3))  # Sort by timestamp
        history_text = "검색기록\n\n"
        for record in data_sorted:
            history_text += f"{record[0]}, KRW: {record[2]}, 검색 시간: {record[3]}\n"

        history_window = tk.Toplevel()
        history_window.title("Search History")

        history_label = tk.Label(history_window, text=history_text)
        history_label.pack()

    else:
        messagebox.showinfo("검색 기록", "검색 기록이 없습니다.")

def search_stock(conn, entry_text): #주식 검색 함수
    global root
    now = datetime.now()
    timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
    try:
        # 입력받은 텍스트를 회사명과 종목코드로 분리
        company_name, company_code = entry_text.split()

        # 입력된 종목코드가 숫자로 이루어진지 확인
        if not company_code.isdigit():
            messagebox.showerror("오류", "종목코드 입력이 잘못되었습니다.")
            return

        # 주식 가격을 가져오기
        price = get_price(company_code)

        # 가져온 가격이 비어있으면 종목코드가 유효하지 않음을 의미
        if price == "":
            messagebox.showerror("오류", "종목코드 입력이 잘못되었습니다.")
        else:
            # 유효한 경우 주식 정보를 보여주고 그래프 및 레코드 삽입
            messagebox.showinfo("검색 종목 정보", f"회사: {company_name}\n코드: {company_code}\n가격: {price}\n검색시간: {timestamp}" )
            insert_record_stock(conn, company_code, company_name, price, timestamp)

    except ValueError:
        messagebox.showerror("오류", "잘못된 입력입니다. 회사명과 코드를 모두 입력하세요.")
    except Exception as e:
        messagebox.showerror("오류", f"입력이 잘못되었습니다. 오류 메시지: {str(e)}")

def search_exchange(conn, entry_text): #환율 검색 함수
    global root
    now = datetime.now()
    timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
    try:
        # 입력받은 텍스트를 국가명과 국가코드로 분리
        country_name, coutry_code = entry_text.split()

        # 입력된 종목코드가 문자로 이루어진지 확인
        if country_name.isdigit():
            messagebox.showerror("오류", "국가 코드 입력이 잘못되었습니다.")
            return

        # 환율 가져오기
        exchange = get_exchange(coutry_code)

        # 가져온 가격이 비어있으면 국가 코드가 유효하지 않음을 의미
        if exchange == "":
            messagebox.showerror("오류", "국가 코드 입력이 잘못되었습니다.")
        else:
            # 유효한 경우 입력한 국가의 환율 정보를 보여주고 그래프 및 레코드 삽입
            messagebox.showinfo("검색 종목 정보", f"국가: {country_name} { coutry_code}\n환율: {exchange}\n검색시간: {timestamp}" )
            insert_record_exchange(conn, country_name, coutry_code, exchange, timestamp)

    except ValueError:
        messagebox.showerror("오류", "잘못된 입력입니다. 국가명과 코드를 모두 입력하세요.")
    except Exception as e:
        messagebox.showerror("오류", f"입력이 잘못되었습니다. 오류 메시지: {str(e)}")

if __name__ == "__main__":
    main()