미국 주식시장 뉴스API를 통한 Daily 증시 시황정리 프로그램

AI주식자동매매|2025. 6. 3. 18:20
반응형

make_summary.py 데이터 플로우 프로세스 메뉴얼

📋 프로그램 개요

목적: 미국 뉴스 API에서 분야별 Top-뉴스를 수집하고, OpenAI를 통해 섹션별 요약 및 추천 업종·종목을 생성하여 JSON 파일로 저장

주요 기능:

  • NewsAPI.org를 통한 미국 비즈니스 뉴스 수집
    Developer price 0$ 사용중: 100 requests over a 24 hour period (50 requests available every 12 hours)
  • OpenAI(o4-mini) 모델을 이용한 뉴스 요약 및 시장 분석
  • 섹션별 정리된 JSON 결과물 생성

🔄 전체 데이터 플로우

 
[시작] → [날짜 계산] → [뉴스 수집] → [데이터 통합] → [AI 분석] → [JSON 저장] → [종료]

📅 STEP 1: 날짜 계산 및 설정

🎯 목적

뉴욕 시간 기준으로 분석 대상 날짜 범위를 설정

📊 데이터 플로우

 
현재 시점(뉴욕 시간)
    ↓
target_date = 현재 - 5일 (분석 기준일)
    ↓
start_date = target_date - 5일 (검색 시작일)
    ↓
target_datetime = 타임스탬프 생성

🔧 함수: get_target_and_start_dates()

입력: 없음
출력:

  • target_date_str: "2025-05-29" (파일명용)
  • start_datetime_str: "2025-05-24 10:30:00" (정렬용)
  • start_date_str: "2025-05-24" (API 검색용)
  • now_ny: 뉴욕 현재 시간 객체

📝 실행 로그 예시

 
▶ Target date (NY 기준): 2025-05-29
▶ Target datetime (NY 기준): 2025-05-29 10:30:00
▶ Start date (5일 전): 2025-05-24

📰 STEP 2: 뉴스 데이터 수집

🎯 목적

NewsAPI를 통해 비즈니스 뉴스와 주요 키워드별 뉴스를 수집

📊 데이터 플로우

2-1. 비즈니스 헤드라인 수집

 
NewsAPI (top-headlines)
    ↓ category="business", country="us"
biz_raw (최대 20개 기사)
    ↓ publishedAt 기준 내림차순 정렬
biz_articles (상위 3개 선택)
    ↓
market_headlines 리스트 생성

2-2. 키워드별 뉴스 수집

 
6개 키워드 검색:
├── economics: "economic indicators"
├── ai_software: "AI software"  
├── ai_semiconductor: "AI semiconductor"
├── quantum: "quantum computing OR quantum networking"
├── ai_robotics: "AI robotics"
└── aerospace: "Aerospace & Defense"
    ↓
각 키워드별 NewsAPI (everything) 호출
    ↓ start_date ~ target_date 범위
raw_articles (최대 30개씩)
    ↓ publishedAt 기준 내림차순 정렬  
topics_articles[키워드] (상위 3개씩)

🔧 주요 함수들

fetch_headlines_with_image(category, country, page_size)

  • 입력: 카테고리, 국가, 페이지 크기
  • 출력: 정제된 기사 리스트
  • API: /v2/top-headlines

fetch_everything(query, from_date, to_date, page_size)

  • 입력: 검색어, 시작일, 종료일, 페이지 크기
  • 출력: 원본 기사 리스트
  • API: /v2/everything

📝 실행 로그 예시

 
▶ fetch_headlines_with_image 반환 기사 개수: 17
▶ biz_articles (상위 3개) 개수: 3
▶ economics 키워드, 상위 3개 뉴스 개수: 3
▶ ai_software 키워드, 상위 3개 뉴스 개수: 3

🔍 STEP 3: 콘텐츠 보강 (크롤링)

🎯 목적

NewsAPI에서 제공하는 짧은 스니펫을 실제 기사 본문으로 보강

📊 데이터 플로우

 
기사 스니펫 검사
    ↓ (길이 < 100자 OR 끝이 "…")
해당 기사 URL 크롤링
    ↓ BeautifulSoup으로 <p> 태그 추출
본문 텍스트 (최대 5,000자)
    ↓
기존 snippet 대체

🔧 함수: fetch_full_text(url, max_chars=5000)

  • 입력: 기사 URL, 최대 문자 수
  • 출력: 추출된 본문 텍스트
  • 오류 처리: 크롤링 실패 시 빈 문자열 반환

🗂️ STEP 4: 데이터 통합 및 구조화

🎯 목적

수집된 모든 기사를 섹션별로 분류하고 통합 데이터 구조 생성

📊 데이터 플로우

4-1. 전체 기사 리스트 생성

 
market_headlines (비즈니스 3개)
    ↓ category="미국 주식시장 동향" 추가
topics_articles (키워드별 3개씩)
    ↓ 한글 섹션명 매핑
        ├── economics → "주요 경제지표"
        ├── ai_software → "AI 소프트웨어"
        ├── ai_semiconductor → "AI 반도체"
        ├── quantum → "양자 컴퓨팅·네트워킹"
        ├── ai_robotics → "AI 로보틱스"
        └── aerospace → "항공우주 방산"
    ↓
all_articles (전체 기사 통합, 총 21개)

4-2. 섹션별 그룹핑

 
sections_map 초기화 (6개 섹션)
    ↓
all_articles 순회
    ↓ category별 분류
sections_map[섹션명] = [기사1, 기사2, 기사3]
    ↓
JSON 직렬화 (간결한 형태)

📋 데이터 구조

 
python
# 각 기사 객체 구조
{
    "category": "섹션명(한글)",
    "headline": "기사 제목",
    "source": "출처명",
    "url": "기사 URL",
    "urlToImage": "이미지 URL",
    "snippet": "기사 내용",
    "date": "발행일시"
}

🤖 STEP 5: AI 분석 및 요약

🎯 목적

OpenAI를 통해 섹션별 기사 요약 및 시장 분석 리포트 생성

📊 데이터 플로우

 
sections_map (JSON 직렬화)
    ↓
OpenAI 프롬프트 구성
    ├── System Message: 역할 및 출력 형식 지정
    └── User Message: 실제 뉴스 데이터 전달
    ↓
OpenAI API 호출 (o4-mini 모델)
    ↓
JSON 응답 파싱
    ├── 성공: 요약 섹션 + 추천 종목
    └── 실패: 원본 기사 + 빈 추천 리스트

🔧 AI 프롬프트 구조

System Message 핵심 내용

  • 역할: "미국 금융 시장 리포터"
  • 작업: 6개 섹션 기사 요약 + 추천 업종·종목 3선
  • 형식: 순수 JSON 객체만 출력
  • 키 구조: date, sections, recommendations

User Message 구성

  • 현재 날짜 명시: target_date
  • 실제 뉴스 데이터: sections_map JSON
  • 출력 요구사항 재강조

📝 응답 파싱 및 오류 처리

 
python
try:
    parsed = json.loads(content)
    found_sections = parsed.get("summary_sections", [])
    found_recommendations = parsed.get("recommendations", [])
except json.JSONDecodeError:
    # 파싱 실패 시 원본 데이터 사용
    found_sections = 원본_sections_map
    found_recommendations = []

💾 STEP 6: 최종 JSON 생성 및 저장

🎯 목적

분석 결과를 구조화된 JSON 파일로 저장

📊 데이터 플로우

 
AI 분석 결과
    ↓
summary_obj 객체 생성
    ├── date: target_datetime
    ├── sections: 요약된 섹션 리스트
    └── recommendations: 추천 업종·종목
    ↓
summaries 디렉터리 생성
    ↓
JSON 파일 저장: us_market_summary_{target_date}.json

📋 최종 JSON 구조

 
json
{
  "date": "2025-05-29 10:30:00",
  "sections": [
    {
      "title": "미국 주식시장 동향",
      "items": [
        {
          "headline": "기사 제목",
          "source": "출처명",
          "url": "기사 URL",
          "urlToImage": "이미지 URL",
          "snippet": "기사 내용",
          "date": "2025-05-29T08:00:00Z"
        }
      ]
    }
  ],
  "recommendations": [
    {
      "sector": "AI 소프트웨어",
      "stocks": ["AAPL", "MSFT", "GOOGL"]
    }
  ]
}

📝 파일 저장 규칙

  • 디렉터리: summaries/
  • 파일명: us_market_summary_{target_date}.json
  • 인코딩: UTF-8
  • 형식: 들여쓰기 2칸으로 가독성 향상

⚠️ 예외 처리 및 오류 대응

🔧 주요 오류 처리 포인트

1. API 호출 실패

 
NewsAPI 응답 오류
    ↓
빈 리스트 반환
    ↓
다음 단계 정상 진행

2. 크롤링 실패

 
URL 접근 오류
    ↓
빈 문자열 반환
    ↓
원본 snippet 유지

3. OpenAI 파싱 실패

 
JSON 파싱 오류
    ↓
경고 메시지 출력
    ↓
원본 sections_map 사용

4. AttributeError 방지

  • 원인: art.get("source", {}).get("name", "")에서 source가 문자열일 때
  • 해결: 섹션별로 데이터 구조 분리 처리

🚀 실행 방법

📋 사전 준비

  1. 설정 파일: efriend_config.yaml에 API 키 설정
     
    yaml
    OPENAI_SEC_KEY: "your-openai-api-key"
    NEWS_API_KEY: "your-news-api-key"
  2. 패키지 설치:
     
    bash
    pip install openai requests beautifulsoup4 PyYAML pandas python-dateutil pytz

🎯 실행 명령

 
bash
python make_summary.py

📊 실행 결과

  • 성공: summaries/us_market_summary_YYYY-MM-DD.json 파일 생성
  • 로그: 각 단계별 진행 상황 출력
  • 오류: 상세한 에러 메시지와 함께 traceback 출력

📈 확장 및 개선 아이디어

🔧 설정 변경 가능 항목

  • 키워드 추가: topic_queries에 새로운 검색어 추가
  • 기사 개수 조정: 상위 3개 → 5개 또는 다른 개수로 변경
  • AI 모델 설정: 토큰 수, 온도 등 파라미터 조절
  • 크롤링 범위: 최대 문자 수 조정

🚀 자동화 방안

  • 스케줄링: Cron Job, AWS Lambda, Kubernetes CronJob
  • 알림 기능: 오류 발생 시 Email/SMS 알림
  • 모니터링: 실행 상태 및 결과 품질 모니터링

📞 문제 해결 가이드

❓ 자주 발생하는 문제

  1. API 키 오류: efriend_config.yaml 파일 확인
  2. 네트워크 오류: 인터넷 연결 및 방화벽 설정 확인
  3. JSON 파싱 실패: OpenAI 응답 로그 확인 후 프롬프트 조정
  4. 파일 저장 실패: summaries 디렉터리 권한 확인

🔍 디버깅 팁

  • 각 단계별 로그 메시지 확인
  • serialized_sections 출력으로 데이터 구조 검증
  • OpenAI 응답 내용 직접 확인
  • 예외 발생 시 traceback 전체 내용 분석

이 메뉴얼은 make_summary.py 프로그램의 전체 데이터 플로우를 단계별로 설명하여, 프로그램의 동작 원리를 쉽게 이해할 수 있도록 구성되었습니다.

위처럼 생성된 json파일은 하기의 웹앱으로 보내집니다. 참조하시기 바랍니다.

https://howardkim.pythonanywhere.com/

 

반응형

댓글()

강화학습 PPO 모델을 이용한 자동매매 2

AI주식자동매매|2024. 8. 9. 07:28
반응형

import numpy as np
import pandas as pd
import gym
from gym import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from sklearn.model_selection import train_test_split
from datetime import datetime
from pytz import timezone
from sklearn.model_selection import ParameterGrid
import warnings

# 특정 경고 메시지 무시
warnings.filterwarnings("ignore", category=UserWarning)

class TradingEnv(gym.Env):
    def __init__(self, df):
        super(TradingEnv, self).__init__()
        self.df = df
        self.current_step = 0
        self.df_numeric = self._convert_to_numeric(self.df)
        self.action_space = spaces.Discrete(2)  # Buy or Hold

        self.observation_space = spaces.Box(low=0, high=1, shape=(self.df_numeric.shape[1],), dtype=np.float32)

        self.initial_balance = 10000
        self.balance = self.initial_balance
        self.net_worth = self.initial_balance
        self.max_net_worth = self.initial_balance
        self.shares_held = 0
        self.current_price = self.df.iloc[self.current_step]['close']  # 현재 스텝의 가격으로 초기화
        self.price_history = self.df['close'].tolist()  # 전체 가격 데이터를 리스트로 저장

    def _convert_to_numeric(self, df):
        df_numeric = df.copy()
        df_numeric = df_numeric.filter(regex='^m_')
        df_numeric.reset_index(drop=True, inplace=True)  # 인덱스를 드롭하고 리셋
        for column in df_numeric.columns:
            df_numeric[column] = pd.to_numeric(df_numeric[column], errors='coerce')
        df_numeric.fillna(0, inplace=True)
        return df_numeric

    def reset(self):
        self.balance = self.initial_balance
        self.net_worth = self.initial_balance
        self.max_net_worth = self.initial_balance
        self.shares_held = 0
        self.current_step = 0
        self.current_price = self.df.iloc[self.current_step]['close']  # 리셋 시 현재 가격 초기화
        return self._next_observation()

    def _next_observation(self):
        obs = self.df_numeric.iloc[self.current_step].values
        obs_max = obs.max() if obs.max() != 0 else 1  # Prevent division by zero
        obs = obs / obs_max
        return obs

    def step(self, action):
        self.current_step += 1
        self.current_price = self.df.iloc[self.current_step]['close']  # 매 스텝마다 현재 가격 업데이트
        self.low_price = self.df.iloc[self.current_step]['low']  # 매 스텝마다 현재 가격 업데이트
        self.current_time = self.df.index[self.current_step]  # 매 스텝마다 현재 가격 업데이트

        if action == 1:  # Buy
            self.shares_held += self.balance / self.current_price
            self.balance = 0
        elif action == 0:  # Hold
            pass

        self.net_worth = self.balance + self.shares_held * self.current_price
        self.max_net_worth = max(self.max_net_worth, self.net_worth)

        # 1시간 후 가격 변동을 확인하여 보상을 계산
        reward = self.calculate_reward(action)
        done = self.current_step >= len(self.df) - 1

        obs = self._next_observation()
        return obs, reward, done, {}

    def calculate_reward(self, action):
        '''
        현재 가격에서 시작하여 다음 12 스텝 동안의 가격을 모두 체크하며, 그 중 하나라도 5% 이상 상승한 경우 보상으로 1을 반환합니다. 1시간 동안 5% 이상 상승한 적이 없다면 보상으로 0을 반환합니다.
        즉, buy 의견을 제시한것이 잘했는지를 평가할때, reward 보상으로 학습을 시킨다.
        '''
        end_step = min(self.current_step + 12, len(self.df) - 1)  # 1시간 = 12 steps (assuming 5-minute intervals)
        reward = 0

        if action == 1:  # Buy 액션일 경우에만 보상 계산
            for step in range(1, end_step - self.current_step + 1):
                future_price = self.price_history[self.current_step + step]
                price_increase = (future_price - self.current_price) / self.current_price
                if (step - self.current_step) <= 5:
                    if future_price < self.low_price:  # 5봉 이내(30분이내) 현재가보다 하락하고 있으면, reward 없음.
                        break
                if price_increase >= 0.05:  # 5% 이상 상승
                    print("%s self.current_step:%s" % (self.current_time, self.current_step))
                    print("for range step:%s" % (step))
                    print("future_price:%s" % (self.price_history[self.current_step + step]))
                    print("price_increase:%s" % ((future_price - self.current_price) / self.current_price))
                    print("reward = 1")
                    reward = 1
                    break
        return reward  # 1시간 동안 5% 이상 상승하지 않음

def optimize_ppo(data, param_grid, model_path="ppo_trading_model"):
    env = TradingEnv(data)
    best_model = None
    best_reward = -float('inf')
    
    for params in ParameterGrid(param_grid):
        model = PPO('MlpPolicy', env, verbose=1, **params)
        model.learn(total_timesteps=10000)
        total_rewards = evaluate_model(model, data)
        if total_rewards > best_reward:
            best_reward = total_rewards
            best_model = model
            best_model.save(model_path)
    
    return best_model

def train_model(data, model_path="ppo_trading_model"):
    env = TradingEnv(data)
    try:
        model = PPO.load(model_path)
        print("Model loaded successfully. Continuing training...")
    except:
        model = PPO('MlpPolicy', env, verbose=1)
        print("New model initialized.")

    model.set_env(env)
    param_grid = {
        'n_steps': [128, 256, 512],
        'learning_rate': [1e-3, 1e-4, 1e-5],
        'batch_size': [128, 256],  # 변경된 부분: 128의 배수로 설정
    }
    best_model = optimize_ppo(data, param_grid, model_path)
    best_model.learn(total_timesteps=10000)
    best_model.save(model_path)
    return best_model


def load_model(model_path="ppo_trading_model"):
    return PPO.load(model_path)


def evaluate_model(model, data):
    env = TradingEnv(data)
    obs = env.reset()
    total_rewards = 0
    done = False
    while not done:
        action, _states = model.predict(obs)
        obs, reward, done, _ = env.step(action)
        total_rewards += reward
    return total_rewards


def main():
    ticker = 'XEM'
    chart_intervals = 'minute5'
    current_time = pd.to_datetime(datetime.now(timezone('Asia/Seoul'))).strftime("%Y-%m-%d %H:%M:%S")
    chart_data = save_db_market_infos(ticker=ticker, chart_intervals=chart_intervals, current_time=current_time)
    chart_data.set_index('time', inplace=True)
    strategy_data = get_strategy_mst_data()

    if chart_data is not None:
        strategy_chart_data = calculate_indicators(chart_data, ticker)
        strategy_chart_data_df = pd.DataFrame([strategy_chart_data])

        train_data, test_data = train_test_split(chart_data, test_size=0.2, shuffle=False)

        model = train_model(train_data)

        total_rewards = evaluate_model(model, test_data)
        print(f"Total Rewards: {total_rewards}")
        if isinstance(strategy_chart_data_df, pd.DataFrame):
            obs = strategy_chart_data_df.values.flatten().astype(np.float32)
            obs = np.expand_dims(obs, axis=0)
            action, _states = model.predict(obs)
            print("Buy Signal:", "Yes" if action == 1 else "No")
        else:
            print("Error: strategy_chart_data is not a DataFrame")
    else:
        print("Error: chart_data is None")

if __name__ == "__main__":
    main()

반응형

댓글()

미국주식 자동매매 시작하기-3

AI주식자동매매|2021. 12. 25. 07:20
반응형

오늘은 미국주식 자동매매 실제 구동되는 모습을 영상으로 담아서 유튜브에 올렸습니다.

 

12월 21일 미국시간 09:30분부터 약 30분간 구동되는 모습입니다.

 

아직 수정하고 보완해야 할 부분은 많지만, 

매수, 매도는 되는 버젼이라 올려봤습니다.

보시고, 많은 개선의견 부탁드리겠습니다~^^;;

 

https://youtu.be/pgR1zqPqU0g

 

반응형

댓글()