PLAID AI 에이전트 Part. 1 : 어제의 회고와 오늘의 이슈로 받는 브리핑
“오늘 뭐부터 해야하지?”
업무를 하다 보면 종종 일이 한번에 몰리는 순간이 찾아옵니다. 그럴때에는 당장 뭐부터 시작해야 하는지 또 내가 놓친 업무는 없는지 고민을 하게 됩니다. Plaid는 이러한 불편함을 개선하고 팀의 효율을 높이기 위해 매일 아침 오늘 할 일과 나에게 배정된 업무들을 브리핑해주는 Plaid Agent를 만들기로 했습니다.
Plaid Agent의 역할
퇴근길(EOD): Slack에 남긴 오늘 회고를 분석해 피드백을 남기고 저장합니다.
출근길(Briefing): EOD와 Jira 이슈를 결합해 매일 아침 8시에 오늘의 업무 브리핑을 해줍니다.
Plaid는 Slack과 Jira를 활용하는 여러 프로세스를 가지고 있습니다.
그중에서도 Plaid Agent가 주목한 데이터는 팀원들이 매일 작성하는 EOD(End of Day Daily Log)와 프로젝트의 진행 상황을 담은 Jira Issue입니다.
이 두 가지 데이터를 핵심 재료로 활용했습니다. Slack의 EOD를 통해 개인의 업무 맥락을 파악하고, Jira Issue를 통해 할 일을 추적해 매일 아침 팀원들에게 자동으로 해야할 업무를 브리핑하는 시스템을 구축했습니다.
Plaid Agent의 시스템 아키텍처와 파이프라인
Slack EOD의 실시간 처리와 Jira의 가변성이 큰 Issue 데이터의 안정적인 처리를 위해 Plaid는 이벤트 기반의 비동기 아키텍처를 설계했습니다
Plaid Agent의 시스템 아키텍처의 주요 특징은 다음과 같습니다.
FastAPI와 Celery를 활용한 비동기 이벤트 처리
Slack Events API는 3초 이내에 응답하지 않을 시 요청을 실패로 간주하고 재전송합니다. 하지만 AI가 EOD를 분석하고 피드백을 생성하는데에는 그보다 더 긴 시간이 소요됩니다.
이를 해결하기 위해 FastAPI는 이벤트를 수신받으면 인증 절차만 거친 뒤 데이터 파싱, AI 호출, DB 저장같은 무거운 작업을 Celery 워커로 위임하고 즉시 200 OK를 반환하는 Fire-and-Forget 패턴을 적용했습니다.
이 외에도 Celery 워커가 백그라운드에서 하는 일은 다음과 같습니다.
EOD 파싱 후 AI 입력 및 답변 Slack 스레드 댓글 전송
Jira Issue 상태 변경 시 업서트 및 삭제
Jira Project 상태 변경 시 업서트 및 삭제
매일 오전 2시 Jira Issue 및 Project 동기화 작업
LangSmith 기반의 LLM 파이프라인
저희는 수집된 EOD 데이터와 Jira Issue들은 파싱을 통해 정제한 후 LangSmith로 불러온 chain을 통해 EOD 피드백과 데일리 브리핑을 생성했습니다.
# client 생성
from langsmith import Client
from langchain_core.runnables import Runnable
class LangSmithClient:
_instance = None
_client: Client = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(LangSmithClient, cls).__new__(cls)
try:
cls._client = Client()
except Exception as e:
print(f"❌ LangSmith 클라이언트 초기화 실패: {e}")
cls._client = None
return cls._instance
def pull_runnable(self, prompt_name: str) -> Runnable:
"""LangSmith Hub에서 Runnable(Chain)을 가져옵니다."""
if self._client is None:
raise RuntimeError("LangSmith Client가 초기화되지 않았습니다.")
if not prompt_name:
raise ValueError(f"유효하지 않은 프롬프트 이름입니다: {prompt_name}")
try:
return self._client.pull_prompt(prompt_name, include_model=True)
except Exception as e:
print(f"❌ LangSmith에서 프롬프트를 가져오는 데 실패했습니다: {prompt_name}")
raise e
langsmith_client = LangSmithClient()#사용 예시
from langchain_core.output_parsers import StrOutputParser
from app.clients.langsmith import langsmith_client
from app.core.config import settings
def generate_example(info: str) -> str:
"""
주어진 정보를 바탕으로 AI 답변을 생성합니다.
"""
try:
prompt_name = settings.LANGSMITH_PROMPT_NAME
if not prompt_name:
raise ValueError("사용할 프롬프트 이름이 .env에 설정되지 않았습니다.")
runable = langsmith_client.pull_runnable(prompt_name)
briefing_chain = runable | StrOutputParser()
result = briefing_chain.invoke({"info": info})
if hasattr(result, 'content'):
return result.content
return str(result)
except Exception as e:
print(f"🔥 AI 체인 실행 중 에러 발생: {e}")
raiseLangSmith를 사용한 이유는 다음과 같습니다.
프롬프트와 비즈니스 로직의 분리 (Prompt Management)
코드 안에 하드코딩된 프롬프트를 제거하고, AI의 역할이나 버전별 변경 사항을 클라우드에서 독립적으로 관리합니다.
실행 추적(Tracing)과 디버깅의 용이성 (Observability)
LLM의 입출력 과정을 시각화하여 추적하고, 문제가 발생하거나 다양한 상황을 대처하고 싶을 때 Playground에서 테스트할 수 있습니다.
코드는 견고하게 프롬프트는 유연하게
초기 개발 단계에서는 프롬프트를 코드 내 변수로 관리할 수 있습니다. 하지만 이러한 방법은 프롬프트 문구를 수정할 때마다 전체 서버를 다시 배포해야하는 비효율이 발생합니다.
Plaid Agent는 상황에 따라 각기 다른 프롬프트를 호출해야 합니다. Plaid는 LangSmith Hub를 통해 프롬프트들을 버전별로 관리하고 코드에서는 단순히 프롬프트 이름만 호출하는 방식을 택했습니다. 이로 인해 기획이나 피드백 로직이 바뀌어도 코드를 건드리지 않고 프롬프트만 수정하여 즉시 반영할 수 있는 유연한 구조를 갖출 수 있었습니다.
“AI 답변이 이상한데?”에 대한 해답
빈번히 일어나는 백그라운드 작업은 디버깅하기 불편한 점이 많습니다. LLM의 특성상 같은 입력에도 매번 다른 출력을 보이기 때문에 로그만으로는 알 수 있는 정보의 한계가 있습니다
Plaid는 LangSmith의 Tracing 기능을 통해 LLM 답변의 품질을 개선할 수 있었습니다. Tracing 기능을 통해 chain의 입력값, 토큰 사용량, 수행 시간, 출력값을 타임라인의 형태로 한눈에 볼 수 있습니다. 부적절한 답변을 남겼다면 해당 트레이스를 열어 Playground를 통해 수정하고 실행해보며 빠르게 퀄리티를 높일 수 있었습니다.
역할에 따른 스케줄링 전략 분리(Celery Beat / APScheduler)
안정적인 데이터 관리와 정시성 알림을 위해 Plaid는 성격이 다른 두 가지 스케줄링 도구를 적용했습니다.
Plaid Agent는 다음과 같은 상황에 스케줄링을 사용합니다.
매일 오전 2시 Jira Issue 및 Space 동기화
평일 오전 8시 팀원들에게 Slack DM으로 데일리 브리핑 전송
사용자 알림 및 브리핑 (APScheduler)
데일리 브리핑 기능은 정해진 시각에 정확히 실행되어야 하는 비즈니스 로직입니다. 애플리케이션 레벨에서 가볍게 스케줄을 트리거하고 실제 브리핑 생성 요청은 asyncio.to_thread를 통해 메인 스레드 차단 없이 AI를 호출했습니다.
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from app.tasks.daily_briefing_task import run_daily_briefing scheduler = AsyncIOScheduler(timezone="Asia/Seoul") def setup_scheduler(): """스케줄러에 작업을 등록합니다.""" scheduler.add_job( run_daily_briefing, trigger=CronTrigger(hour=8, minute=0, day_of_week='mon-fri', timezone="Asia/Seoul"), id="daily_briefing_job", name="매일 아침 데일리 브리핑 전송", replace_existing=True ) print("⏰ 스케줄러가 설정되었습니다. (월-금, 오전 8시)") def start_scheduler(): scheduler.start() print("✅ 스케줄러가 시작되었습니다.") def shutdown_scheduler(): if scheduler.running: scheduler.shutdown() print("🔒 스케줄러가 종료되었습니다.")데이터 동기화 및 시스템 작업 (Celery Beat)
Jira 이슈 동기화는 백그라운드 유지보수 작업입니다. 새벽 2시에 전체 데이터를 가져와 DB에 최신화하는 작업은 리소스 소모가 매우 큰 작업입니다. 따라서 분산 환경에서도 작업 큐가 밀리지 않고 안정적으로 데이터 무결성을 유지할 수 있도록 Celery Beat를 사용했습니다.
from celery import Celery from celery.schedules import crontab from app.core.config import settings REDIS_URL = settings.REDIS_URL celery = Celery( "pm_agent", broker=REDIS_URL, backend=REDIS_URL, include=["app.tasks.eod_task", "app.tasks.jira_issue_task", "app.tasks.jira_project_task"] ) celery.conf.beat_schedule = { "sync-jira-issues-every-2am": { "task": "app.tasks.jira_issue_task.sync_all_jira_issues", "schedule": crontab(hour=2, minute=0) }, "sync-jira-projects-daily": { "task": "app.tasks.jira_project_task.sync_all_jira_projects", "schedule": crontab(hour=3, minute=0) } } celery.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="Asia/Seoul", enable_utc=True, )
Slack과 Jira를 연결하는 데이터 파이프라인
저희는 성격이 다른 두 플랫폼에서 데이터를 수집하고 이를 하나의 맥락으로 엮어 브리핑을 생성해야 했습니다. 이 과정에서 Slack과 Jira에 각각의 API 활용 전략을 사용하여 하나의 맥락의 브리핑을 생성할 수 있었습니다.
Slack에서의 Filtering and Parsing 전략
Slack은 팀원들간의 여러 대화가 오가는 곳입니다. 여기서 유의미한 EOD 데이터를 받기 위해 필터링과 파싱 전략을 사용했습니다.
필터링 전략
EOD를 성공적으로 수집하기 위해서는 몇가지 필터링하는 과정이 필요했습니다.
Step 1. 신뢰할 수 있는 요청인가?
가장 먼저 Plaid의 Slack 앱에서 들어온 요청인지 검증하는 과정이 필요합니다. Slack은 요청 헤더에 X-Slack-Signature 값을 담아 보내는데 이를 서버가 갖고있는 SLACK_SIGNING_SECRET과 대조합니다.
여기에 더해 재전송을 방지하기 위해 타임스탬프 검증도 추가했습니다.
아래는 코드 예시입니다.
async def verify_slack_request(request: Request) -> bool: """ Slack 요청의 서명을 검증하여 실제 Slack에서 보낸 요청인지 확인합니다. - Replay Attack 방지를 위해 타임스탬프 검사 포함. """ slack_signature = request.headers.get("X-Slack-Signature") slack_timestamp = request.headers.get("X-Slack-Request-Timestamp") signing_secret = settings.SLACK_SIGNING_SECRET if not all([slack_signature, slack_timestamp, signing_secret]): print("⚠️ 보안 헤더 누락. 요청을 거부합니다.") return False if abs(time.time() - int(slack_timestamp)) > 60 * 5: print("⚠️ 타임스탬프 만료 (Replay Attack 가능성). 요청을 거부합니다.") return False body_bytes = await request.body() sig_basestring = f"v0:{slack_timestamp}:{body_bytes.decode('utf-8')}" my_signature = "v0=" + hmac.new( signing_secret.encode('utf-8'), sig_basestring.encode('utf-8'), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(my_signature, slack_signature): print("❌ 서명 불일치. 요청을 거부합니다.") return False return TrueStep 2. 의미있는 데이터인가?
Slack의 Webhook은 기본적으로 허용된 권한에 해당하는 모든 이벤트를 트리거로 전송하게됩니다. 따라서 필요한 데이터인 EOD 처리에 방해가 되는 이벤트를 걸러내는 작업이 필요했습니다. Slack Webhook은 이벤트 감지 시 다음과 같이 이벤트를 전송합니다.
// 메세지 게시 { "token": "...", "team_id": "T1234TEAM", "api_app_id": "A1234APP", "event": { "type": "message", "channel": "C0123CHANNEL", "user": "U12345ABC", "text": "오늘 EoD입니다.", "ts": "1733800000.000100", "event_ts": "1733800000.000100", "channel_type": "channel" }, "type": "event_callback", "event_id": "Ev0123EVENTID", "event_time": 1733800000 } // 봇 멘션 { "token": "...", "team_id": "T1234TEAM", "api_app_id": "A1234APP", "event": { "type": "app_mention", "user": "U12345ABC", "text": "<@U987BOTID> PAP-101 티켓 정보 보여줘", "ts": "1733800100.000200", "channel": "C0123CHANNEL", "event_ts": "1733800100.000200" }, "type": "event_callback", "event_id": "Ev0123EVENTID2", "event_time": 1733800100 } // 리액션 추가 { "token": "...", "team_id": "T1234TEAM", "api_app_id": "A1234APP", "event": { "type": "reaction_added", "user": "U56789XYZ", "reaction": "eyes", "item_user": "U12345ABC", "item": { "type": "message", "channel": "C0123CHANNEL", "ts": "1733800000.000100" }, "event_ts": "1733800200.000300" }, "type": "event_callback", "event_id": "Ev0123EVENTID3", "event_time": 1733800200 } // 등등실제로 서버로 들어오는 Payload를 봤을때 event 객체 내부의 데이터 구조가 상황마다 달랐습니다. 특히 type 필드를 통해 이벤트가 어떤 종류인지 파악할 수 있었습니다. 이 외에도 댓글 활동, 메세지 삭제 및 수정 활동, bot이 스스로 작성한 글을 필터링하는 로직으로 필요한 EOD 정보만 가져올 수 있었습니다.
JQL을 이용한 Jira 동기화
Jira Issue는 특성 상 하루에도 수십번씩 Status가 바뀌고 담당자가 변경되거나 내용이 수정됩니다. 이런 상황에서 AI가 과거의 상태를 보고 브리핑을 해준다면 원치 않는 브리핑을 받게 됩니다.
Plaid Agent는 이러한 데이터 변동성에 대응하기 위해 속도를 위한 Webhook과 정확도를 위한 JQL Polling을 결합한 이중 동기화 전략을 도입했습니다.
Issue 변경 시 Upsert하여 최신성 유지
Issue가 업데이트 될때마다 Jira Webhook을 이용해 실시간 변경 사항을 반영합니다. 하지만 여기서 다음과 같은 상황에 문제가 생길 수 있다고 판단했습니다.
네트워크 일시 장애로 인한 이벤트 유실 가능성
대량의 일괄 수정 발생 시 이벤트 순서 꼬임
이러한 데이터 문제를 방지하기 위해 매일 새벽 JQL을 이용한 전체 동기화를 수행하였습니다.
누락되거나 꼬인 데이터 강제 동기화
JQL 쿼리는 모든 데이터를 조건 없이 불러 오는걸 지원하지 않을 뿐더러 전체 조회가 가능하더라도 이는 너무 비효율적입니다. 또한 한번에 100개의 데이터만 전송되므로 더 큰 데이터를 불러오는데에 제약사항이 생깁니다.
이러한 문제를 해결하기 위해서 최근 1년간의 이슈만 불러왔으며 payload에 nextPageToken을 추가해 커서 기반에 페이지네이션 방식으로 데이터를 불러올 수 있었습니다.
아래는 코드 예시입니다.
def get_all_issues() -> list[dict]: """모든 Jira 이슈 가져오기""" endpoint = "/rest/api/3/search/jql" next_page_token = None max_results = 100 all_issues = [] year_ago = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d") jql_query = f"created >= '{year_ago}' order by created DESC" while True: payload = { "jql": jql_query, "maxResults": max_results, "fields": REQUIRED_FIELDS } if next_page_token: payload["nextPageToken"] = next_page_token print(f"Jira API (POST to {endpoint}) 호출 중... (Token: {next_page_token})") try: response = sync_atlassian_client.post(endpoint, json_data=payload) data = response.json() page_issues = data.get("issues", []) if not page_issues: print(" -> 더 이상 가져올 이슈가 없습니다. 종료합니다.") break all_issues.extend(page_issues) next_page_token = data.get("nextPageToken") if not next_page_token: print(f" -> 마지막 페이지를 가져왔습니다. 총 {len(all_issues)}개.") break time.sleep(0.5) except Exception as e: print(f"Jira 모든 이슈 조회 중 에러 발생: {e}") if hasattr(e, 'response'): print(f"Jira 서버 응답: {e.response.text}") break print(f"✅ Jira API 호출 완료. 총 {len(all_issues)}개의 이슈를 가져왔습니다.") return all_issues
마치며
“오늘 뭐부터 해야하지?”라는 사소한 불편함에서 시작된 프로젝트가 이제는 팀의 아침을 여는 루틴으로 이어졌습니다. Plaid Agent가 도입된 후 Plaid는 기억을 찾는 시간을 아껴 효율 및 생산성을 높일 수 있었습니다.
Plaid Agent는 축적된 EOD와 Issue 데이터를 토대로 주간 브리핑과 인사 평가, 더 나아가 Issue에 대한 해결책을 제시해주는 RAG 기반의 에이전트로 고도화 시킬 예정입니다.
이 글이 사내 업무 자동화나 AI Agent 구축을 고민하는 분들에게 작은 도움이 되었기를 바랍니다.
긴 글을 읽어주셔서 감사합니다.