Flux를 활용한 SSE 구현기
다양한 EventSreaming 구현방식에 대해 알아봅니다. 또한 Cloudy의 채팅 에 왜 SSE를 선택했고 어떻게 개발했는지 사례를 공유합니다. 기술적 내용에 대해 자세히 알고 싶다면 하단 참고 자료에 관련 링크들을 참조해주세요
아래와 같은 키워드가 등장합니다. 본문에서 개념에 대해 설명하며 진행할 예정이나, 관련 배경지식이 있다면 더 쉽게 이해할 수 있습니다.
Cloudy 챗봇들은 질문이 들어오면 다음과 같이 작동합니다.
1. AOP가 적용되어 있어 불건전 질문을 자동으로 필터링합니다.
2. 질문에 대해서 관련 AWS 서비스, it 키워드로 추출합니다.
3. 추출해낸 키워드를 바탕으로 Vector DB[pinecone]에서 관련 데이터를 찾습니다.
4. 찾아낸 데이터를 바탕으로 최종 답변을 생성해냅니다.
REST API로 처리할 경우, 1~4 과정(평균 6~8초) 동안 사용자는 로딩 스피너만 바라보는 상황이 발생합니다. 특히 ChatGPT API를 활용해서 답변을 생성해 내는 시간이 가장 오래 걸립니다. Chat GPT Streaming API와 SSE를 조합해 답변을 생성 중 일지라도, 데이터를 실시간으로 받아볼 수 있게끔 개선하였습니다.
먼저 SSE 뿐만 아니라, Polling 등 여러 관련된 방식을 포함하는 Event Streaming을 구현할 수 있는 여러 방법에 대해 알아봅시다.
클라이언트와 서버가 연결된 상태에서 지속적으로 데이터를 얻는 방식입니다.
주로 비디오 스트리밍, 음악 스트리밍 등 대용량의 연속적인 데이터 전송에 사용됩니다.
Long Polling과 다르게, 하나의 http 응답을 여러개의 http응답으로 나눠서 보내는 것이 아닌, http 응답을 잘게 짤라서(=chnuk 단위)보냅니다.
HTTP/1.1또는 HTTP/2를 사용할 수 있습니다
HTTP/2의 경우 멀티플렉싱을 이용할 수 있습니다.
클라이언트가 서버에 요청을 보냅니다.
서버가 전달할 이벤트, 데이터 등이 있다면 응답의 일부분을 전달합니다.
요청이나 연결을 닫지 않고 이벤트, 데이터를 전달할 때까지 대기합니다.
동적으로 content를 생성하는 경우 정확한
Content-Length
를 미리 제공할 수 없기 때문에 아래의 방식으로 HTTP Streaming을 구현합니다.
Transfer-Encoding:chunked
를 헤더에 추가합니다. Connection: close
를 헤더에 추가합니다. 이벤트 스트리밍을 단방향으로 언제든지 가능하게 합니다.
텍스트 기반의 실시간 업데이트에 적합합니다.
EventSource
객체를 사용해 연결을 엽니다. text/event-stream
MIME 타입을 사용해 이벤트를 전송합니다. HTTP/1.1
을 사용합니다. 관련된 기술로서 가장 먼저 생각나는 것은 웹소켓입니다. Websocket이 아닌, SSE를 선택한 이유는 크게 두가지였습니다.
특징을 정리해봅시다. SSE는 서버에서 데이터가 생성될 때마다 stream하는 단방향통신이고, websocket은 핸드 셰이크를 통해 커넥션을 수립하기 때문에 , 클라이언트와 서버 둘다 양방향 통신이 가능합니다.
Cloudy에서 제공하는 챗봇을 사용할 때 유저 플로우를 살펴봅시다.
1. 유저가 질문을 입력한다.
2. 답변이 나올 때까지 기다린다.
3. 답변을 받고나서 질문을 입력한다.
기존 채팅의 유저 플로우를 살펴봅시다.
1. 유저가 질문을 입력한다.
2. 답변이 나올때까지 기다린다. or 답변이 오기전 다른 대화 주제로 틀어버릴 수 있다.
2번 과정을 비교해보겠습니다. 일반 사용자끼리의 채팅처럼 일상적인 대화의 경우, 상대방의 대화를 듣기 전 대화 주제가 변할 수 있습니다. 즉, 응답을 받고 있는 중에도 채팅을 보낼 수 있어야합니다. 하지만 Cloudy 처럼 QNA와 관련된 챗봇의 특성상 사용자가 질문을 하자마자 주제가 바뀔 우려는 거의 없습니다. 왜냐하면 사용자는 질문에 대해 답변을 받고 그 답변을 바탕으로 다른 질문을 생성해내기 때문입니다.
즉, cloudy의 서비스 특성상, 사용자 입장에서는 답변이 생성되고 있는 중에, 다른 질문을 보내는 것보다는 완성된 답변을 읽고 답변의 내용을 바탕으로 다른 질문을 보낼 확률이 더 큽니다. 사실상 단방향 통신인 셈입니다. 때문에 서비스의 특성상 one-way communication을 지원하는 SSE로 챗봇을 구현하더라도 크게 상관 없겠다는 판단이 들었습니다.
웹소켓을 구현할 경우 고려하고 관리해야하는 범위가 늘어납니다.
하지만 SSE를 활용하여 구현할 경우 Websocket보다는 고려하고 관리해야하는 범위가 좁습니다.
<field>:<value>
형태로 구성됩니다. 필드명 | 설명 |
---|---|
id | - 이벤트의 Id를 가리킨다. - Last-Event-ID 헤더에 첨부하여 가장 마지막으로 받은 이벤트가 무엇인지 전달합니다. - 이를 이용해서 서버는 lastEventID보다 큰 이벤트만 전달할 수 있습니다. |
Event | 이벤트 타입을 표현합니다. |
data | 이벤트의 데이터를 표현합니다. - 여러 줄의 data 필드를 이용하면 multi line data를 표현할 수 있습니다. |
retry | reconnection을 위한 대기 시간을 클라이언트에게 전달합니다. |
comment | - 기능을 한다기 보다는 정보를 남기기 위한 역할입니다. |
EventSource
객체를 사용해 연결을 엽니다. text/event-stream
MIME 타입을 사용해 이벤트를 전송합니다.
응답을 Flux로 반환하기 때문에, Flux와 FluxSink에 대해서 알아보고 가겠습니다.
0~N개의 데이터 항목을 비동기적으로 스트리밍하는 Publisher를 나타냅니다.
데이터 스트림을 처리하고 변환하는 다양한 연산자를 제공합니다.
Flux.create와 함께 사용하며, Flux의 데이터를 push 방식으로 제공할 수 있게끔합니다. 또한 Flux 스트림 내에서 데이터를 동적으로 생성하고 내보낼 수 있습니다.
두 객체를 활용해 스트림을 동적으로 생성하고 데이터를 push할 수 있습니다.
Flux.create 메서드는 FluxSink를 인자로 받아 데이터를 스트림에 공급할 수 있는 Flux를 생성합니다.
클라이언트로 부터 유저 질의를 받아 실시간으로 생성되는 챗봇의 응답을 스트리밍하는 컨트롤러 입니다.
@PostMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> generateChat(@RequestBody ChatReq question,
@AuthenticationPrincipal Member member) {
return chatBotService.question(question, member.getId());
}
produces = MediaType.TEXT_EVENT_STREAM_VALUE
이 API의 반환 값이 text/event-stream
MIME 타입임을 명시합니다.
Flux
스트리밍 방식으로 여러개의 문자를 전송하기 위해서 Project Reactor의 Flux 타입을 사용합니다.
Open AI의 Streaming Chat Model을 사용하여 실시간으로 데이터를 받아들이고, 이를 Reactive Streams의 Flux로 변환하여 반환합니다.
@Override
public Flux<String> generateStreamingChat(String template,
Map<String, Object> variables,
String userId, Chatbot chatbot) {
Prompt prompt = getPrompt(template, variables);
if (openAiStreamingChatModel == null) {
openAiStreamingChatModel = OpenAiStreamingChatModel.builder()
.apiKey(openAiKey)
.modelName(GPT_3_5_TURBO)
.build();
}
return Flux.create(emitter ->
openAiStreamingChatModel.generate(prompt.text(),
new StreamingResponseHandler<>() {
@Override
public void onNext(String token) {
emitter.next(token);
}
@Override
public void onComplete(Response<AiMessage> response) {
chatQueryService.saveChat(userId,
. chatbot,
. response.content().text(),
false
);
emitter.complete();
}
@Override
public void onError(Throwable error) {
log.error("[OpenAiChatService generateStreamingChat] 에러 발생 ::{}", error);
emitter.next("에러가 발생했습니다. 관리자에게 문의하세요.");
emitter.complete();
}));
}
Flux.create
Flux 스트림을 생성합니다. emitter는 FluxSink 객체로, 데이터 스트림을 내보낼 수 있습니다. Flux.create는 데이터를 생성하고 Flux.sink를 통해 비즈니스 로직을 제공받아 가공된 Flux를 생성합니다.
emitter
FluxSink 인터페이스의 인스턴스입니다. 데이터를 Flux 스트림으로 내보냅니다. next, complete, error 등의 메서드를 오버라이딩하여 스트림을 제어할 수 있습니다.
StreamingResponseHandler
Open AI의 스트리밍 응답을 처리하기 위해 해당 StreamingResponseHandler를 사용합니다. 이 핸들러를 사용하기 위해서는 onNext
, onComplete
, onError
3가지 메서드를 구현해야합니다.
onNext 메서드
실시간으로 생성되는 데이터를 수신했을 때 onNext 메서드를 호출합니다.
emitter.next(token)
을 호출해 받은 데이터를 Flux 스트림으로 전달합니다.
@Override
public void onNext(String token) {
emitter.next(token);
}
onComplete 메서드
스트리밍이 완료되었을 때 호출합니다.
@Override
public void onComplete(Response<AiMessage> response) {
chatQueryService.saveChat(userId,
chatbot,
response.content().text(),
false
);
emitter.complete();
}
emitter.complete()
를 호출하여 Flux 스트림을 활용합니다. onError 메서드
스트리밍 중 에러가 발생했을 때 호출합니다.
@Override
public void onError(Throwable error) {
log.error("[OpenAiChatService generateStreamingChat] 에러 발생 ::{}", error);
emitter.next("에러가 발생했습니다. 관리자에게 문의하세요.");
emitter.complete();
}));
원래는 emitter.error()
를 활용해 Publisher에서 에러를 발생시켜야합니다. 하지만, 특정 에러 응답을 반환하기 보다는, 서버에서 에러 로깅 후 클라이언트 대화창에서 바로 에러메시지를 출력하기로 구현 스펙을 결정했기 때문에 다음과 같은 비즈니스 로직으로 구현하였습니다.
Nginx의 디폴트 값
Nginx는 기본적으로 업스트림 요청을 보낼 때, HTTP/1.0버전을 사용합니다. 하지만 SSE는 HTTP/1.1버전 부터 사용할 수 있습니다.
또한 Connection:close
헤더를 사용합니다. SSE는 지속 연결이 되어 있어야하는데, Nginx에서 바로 지속연결을 닫아버리기 때문에 문제가 발생합니다.
변경된 설정값
proxy_set_header Connection '';
proxy_http_version 1.1;
Proxy Buffering이란
클라이언트와 서버 중간에 위치한 Nginx는 트래픽 최적화를 위해, 요청 및 응답을 일시적으로 저장하고 처리합니다.
SSE와 Proxy Buffering의 관계
SSE의 특성상 실시간으로 데이터를 스트리밍합니다. 이 스트리밍된 데이터는 바로바로 유저에게 전달되어야합니다. Proxy Buffering이 켜져있을 경우 Nginx가 서버의 응답을 일부 버퍼에 저장하고 버퍼가 차거나 응답 데이터를 모두 전송했을 경우 한번에 클라이언트로 전송합니다. 즉 원래 기능 명세대로, 한글자씩 반환하는 것이 아닌 몇 줄에 한번씩 클라이언트는 답변을 확인할 수 있어 실시간성이 떨어지게 됩니다.
X-Accel 활용하기
@PostMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> generateChat(@RequestBody ChatReq question,
@AuthenticationPrincipal Member member,
ServerHttpResponse response) {
log.info("{}", question);
headers = response.getHeaders();
headers.add("X-Accel-Buffering", "no");
return chatBotService.question(question, member.getId()); }
}
응답 헤더에 X-accel로 시작하는 헤더가 있으면 Nginx는 버퍼링을 수행하지 않습니다. SSE 응답을 반환하는 API의 헤더에 X-Accel-Buffering: no
를 붙여줘 SSE 응답만 버퍼링을 하지 않도록 설정하였습니다.
확인해야할 사항
SSE 통신을 하는 동안에는 HTTP Connection이 계속 열려있습니다. 챗봇은 기본적으로 Dynamo DB에 채팅 로그를 저장합니다. HTTP 연결이 지속되는 동안에 DynamoDB 커넥션이 열려있는지 확인하는 과정이 필요합니다.
만약 커넥션이 열려있다면?
DynamoDB의 제약조건 (파티션)
Dynamo DB의 장점은
Dynamo DB의 세벌복제 시스템