Flux를 활용한 SSE 구현기

누가 읽으면 좋을까?

다양한 EventSreaming 구현방식에 대해 알아봅니다. 또한 Cloudy의 채팅 에 왜 SSE를 선택했고 어떻게 개발했는지 사례를 공유합니다. 기술적 내용에 대해 자세히 알고 싶다면 하단 참고 자료에 관련 링크들을 참조해주세요

아래와 같은 키워드가 등장합니다. 본문에서 개념에 대해 설명하며 진행할 예정이나, 관련 배경지식이 있다면 더 쉽게 이해할 수 있습니다.

  • Event Streaming
  • SSE
  • Webflux의 Flux
  • Nginx

문제점 : 너무 느린 속도

Cloudy 챗봇들은 질문이 들어오면 다음과 같이 작동합니다.

1. AOP가 적용되어 있어 불건전 질문을 자동으로 필터링합니다. 
2. 질문에 대해서 관련 AWS 서비스, it 키워드로 추출합니다. 
3. 추출해낸 키워드를 바탕으로 Vector DB[pinecone]에서 관련 데이터를 찾습니다. 
4. 찾아낸 데이터를 바탕으로 최종 답변을 생성해냅니다. 

REST API로 처리할 경우, 1~4 과정(평균 6~8초) 동안 사용자는 로딩 스피너만 바라보는 상황이 발생합니다. 특히 ChatGPT API를 활용해서 답변을 생성해 내는 시간이 가장 오래 걸립니다. Chat GPT Streaming API와 SSE를 조합해 답변을 생성 중 일지라도, 데이터를 실시간으로 받아볼 수 있게끔 개선하였습니다.

chatbotOudy.gif

Event Streaming

먼저 SSE 뿐만 아니라, Polling 등 여러 관련된 방식을 포함하는 Event Streaming을 구현할 수 있는 여러 방법에 대해 알아봅시다.

Polling

  • 주기적으로 클라이언트가 서버에 요청을 보냅니다.
  • 서버는 데이터나 이벤트가 없으면 빈 값을, 있으면 값을 보내줍니다.

한계

  • 클라이언트에서 대기하는 시간이 길다면 실시간성이 떨어지고, 대기하는 시간이 짧다면 서버에 부담이 간다.

Long Polling

  • 주기적으로 클라이언트가 서버에 요청을 보냅니ㅣ다.
  • 서버는 바로 응답하는 것이 아닌, 데이터가 발생하거나, 타임아웃이 발생하면 클라이언트에 응답을 전달합니다.
  • 클라이언트는 응답을 받은 후 대기를 하지 않고 바로 long poll 요청을 전달합니다.
  • 쉽게 구현할 수 있습니다.
  • 이벤트, 데이터가 생길 때 마다 응답을 돌려주기 때문에 실시간성이 높습니다.

한계

  • 요청과 응답 모두 독립적이기 때문에 header를 모두 포함합니다.
    --> 원래는 하나의 http 응답입니다. 공통되는 요소를 반복해서 보내야하기 때문에 오버헤드가 발생합니다.
  • 클라이언트와 서버 모두 TCP/IP 연결을 연상태로 대기합니다.
    -> 한정된 커넥션 풀과 관련된 리소스를 신경써야합니다.
  • 클라이언트에게 제공할 이벤트가 큐에 쌓이면 각각의 이벤트를 단건으로 여러개의 long poll 요청에 나눠서 전달해야합니다.
  • 브라우저, gateway 등 다른 구성요소의 timeout을 고려하여 대기 시간을 설정해야합니다.

HTTP Streaming

클라이언트와 서버가 연결된 상태에서 지속적으로 데이터를 얻는 방식입니다.
주로 비디오 스트리밍, 음악 스트리밍 등 대용량의 연속적인 데이터 전송에 사용됩니다.

Long Polling과 다르게, 하나의 http 응답을 여러개의 http응답으로 나눠서 보내는 것이 아닌, http 응답을 잘게 짤라서(=chnuk 단위)보냅니다.

프로토콜 및 표준

  • HTTP/1.1또는 HTTP/2를 사용할 수 있습니다

  • HTTP/2의 경우 멀티플렉싱을 이용할 수 있습니다.

  • 클라이언트가 서버에 요청을 보냅니다.

  • 서버가 전달할 이벤트, 데이터 등이 있다면 응답의 일부분을 전달합니다.

  • 요청이나 연결을 닫지 않고 이벤트, 데이터를 전달할 때까지 대기합니다.

구현 방식

동적으로 content를 생성하는 경우 정확한 Content-Length 를 미리 제공할 수 없기 때문에 아래의 방식으로 HTTP Streaming을 구현합니다.

Transfer-Encoding 헤더

  • Transfer-Encoding:chunked 를 헤더에 추가합니다.
  • 텅 빈 chunk를 전달하기 전까지 값을 읽습니다.
  • Http/1.1 이상에서만 사용할 수 있습니다.

EOF

  • Connection: close 를 헤더에 추가합니다.
  • 서버가 연결을 종료할때까지 들어오는 값을 읽습니다.

Server Sent Event

이벤트 스트리밍을 단방향으로 언제든지 가능하게 합니다.
텍스트 기반의 실시간 업데이트에 적합합니다.

  • 이벤트 : 정의한 포멧에 따라 UTF-8f로 인코딩된 텍스트 데이터의 스트림

작동방식

  • 클라이언트가 서버에 EventSource 객체를 사용해 연결을 엽니다.
  • 서버는 text/event-stream MIME 타입을 사용해 이벤트를 전송합니다.
  • 연결은 클라이언트가 끊을 때까지 지속합니다.

프로토콜

  • HTTP/1.1 을 사용합니다.

SSE를 선택한 이유

VS Websocket

관련된 기술로서 가장 먼저 생각나는 것은 웹소켓입니다. Websocket이 아닌, SSE를 선택한 이유는 크게 두가지였습니다.

1. Websocket은 양방향, SSE는 단방향 통신을 지원합니다.

Websocket의 동작 방식

SSE의 동작방식

특징을 정리해봅시다. SSE는 서버에서 데이터가 생성될 때마다 stream하는 단방향통신이고, websocket은 핸드 셰이크를 통해 커넥션을 수립하기 때문에 , 클라이언트와 서버 둘다 양방향 통신이 가능합니다.

Cloudy에서 제공하는 챗봇을 사용할 때 유저 플로우를 살펴봅시다.

1. 유저가 질문을 입력한다. 
2. 답변이 나올 때까지 기다린다. 
3. 답변을 받고나서 질문을 입력한다. 

기존 채팅의 유저 플로우를 살펴봅시다.

1. 유저가 질문을 입력한다. 
2. 답변이 나올때까지 기다린다. or 답변이 오기전 다른 대화 주제로 틀어버릴 수 있다. 

2번 과정을 비교해보겠습니다. 일반 사용자끼리의 채팅처럼 일상적인 대화의 경우, 상대방의 대화를 듣기 전 대화 주제가 변할 수 있습니다. 즉, 응답을 받고 있는 중에도 채팅을 보낼 수 있어야합니다. 하지만 Cloudy 처럼 QNA와 관련된 챗봇의 특성상 사용자가 질문을 하자마자 주제가 바뀔 우려는 거의 없습니다. 왜냐하면 사용자는 질문에 대해 답변을 받고 그 답변을 바탕으로 다른 질문을 생성해내기 때문입니다.
즉, cloudy의 서비스 특성상, 사용자 입장에서는 답변이 생성되고 있는 중에, 다른 질문을 보내는 것보다는 완성된 답변을 읽고 답변의 내용을 바탕으로 다른 질문을 보낼 확률이 더 큽니다. 사실상 단방향 통신인 셈입니다. 때문에 서비스의 특성상 one-way communication을 지원하는 SSE로 챗봇을 구현하더라도 크게 상관 없겠다는 판단이 들었습니다.

2. 유지보수성

웹소켓을 구현할 경우 고려하고 관리해야하는 범위가 늘어납니다.

  1. 웹소켓 핸드 셰이크를 위한 config 클래스
  2. stomp 환경에서 작동할 수 있는 메시징 브로커 그 자체
    1. 메시징 브로커와 관련된 Config 클래스들
  3. 채팅 publish와 로그 저장, 채팅 로그 조회 관련된 클래스

하지만 SSE를 활용하여 구현할 경우 Websocket보다는 고려하고 관리해야하는 범위가 좁습니다.

  1. 핸드 셰이크가 필요없이 단일한 REST API 하나를 이용해서 응답을 주고 받습니다.
  2. 메시징 브로커를 쓰지 않습니다.
  3. 채팅 publish를 할 필요가 없으며 로그 저장 및 채팅 로그 조회 관련 클래스만 구현하면 됩니다,

구성요소

  • Chunked Transfer-Encoding 기반입니다.
  • chunk 단위로 여러 줄로 구성된 문자열을 전달합니다.
  • new line으로 이벤트를 구분합니다.
  • 각각의 문자열은 일반적으로 <field>:<value> 형태로 구성됩니다.

각 필드

필드명설명
id- 이벤트의 Id를 가리킨다.
- Last-Event-ID 헤더에 첨부하여 가장 마지막으로 받은 이벤트가 무엇인지 전달합니다.
- 이를 이용해서 서버는 lastEventID보다 큰 이벤트만 전달할 수 있습니다.
Event이벤트 타입을 표현합니다.
data이벤트의 데이터를 표현합니다.
- 여러 줄의 data 필드를 이용하면 multi line data를 표현할 수 있습니다.
retryreconnection을 위한 대기 시간을 클라이언트에게 전달합니다.
comment- 기능을 한다기 보다는 정보를 남기기 위한 역할입니다.

Flux로 구현해보기

작동방식

  • 클라이언트가 서버에 EventSource 객체를 사용해 연결을 엽니다.
  • 서버는 text/event-stream MIME 타입을 사용해 이벤트를 전송합니다.
    • 서버는 유저 질의를 OpenAI의 Streaming ChatModel을 사용하여 실시간으로 데이터를 받아들이고 이를 Reacive Stream 의 Fluxfh 반환합니다.
  • 연결은 클라이언트가 끊을 때까지 지속합니다.

Flux와 FluxSink

응답을 Flux로 반환하기 때문에, Flux와 FluxSink에 대해서 알아보고 가겠습니다.

Flux

0~N개의 데이터 항목을 비동기적으로 스트리밍하는 Publisher를 나타냅니다.
데이터 스트림을 처리하고 변환하는 다양한 연산자를 제공합니다.

FluxSink

Flux.create와 함께 사용하며, Flux의 데이터를 push 방식으로 제공할 수 있게끔합니다. 또한 Flux 스트림 내에서 데이터를 동적으로 생성하고 내보낼 수 있습니다.

  • next, complete, error 메서드를 오버라이딩하여 데이터를 전송하거나 스트림의 완료, 에러 시 처리를 커스텀할 수 있습니다.

Flux와 FluxSink의 관계

두 객체를 활용해 스트림을 동적으로 생성하고 데이터를 push할 수 있습니다.
Flux.create 메서드는 FluxSink를 인자로 받아 데이터를 스트림에 공급할 수 있는 Flux를 생성합니다.

Controller

클라이언트로 부터 유저 질의를 받아 실시간으로 생성되는 챗봇의 응답을 스트리밍하는 컨트롤러 입니다.

  

@PostMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> generateChat(@RequestBody ChatReq question,   
								 @AuthenticationPrincipal Member member) {

	
	return chatBotService.question(question, member.getId());

}

produces = MediaType.TEXT_EVENT_STREAM_VALUE
이 API의 반환 값이 text/event-stream MIME 타입임을 명시합니다.

Flux
스트리밍 방식으로 여러개의 문자를 전송하기 위해서 Project Reactor의 Flux 타입을 사용합니다.

GPT Stream API와 통신하여 stream 데이터를 생성하는 서비스

Open AI의 Streaming Chat Model을 사용하여 실시간으로 데이터를 받아들이고, 이를 Reactive Streams의 Flux로 변환하여 반환합니다.



  

@Override
public Flux<String> generateStreamingChat(String template, 
										  Map<String, Object> variables, 
										  String userId, Chatbot chatbot) {

  

	Prompt prompt = getPrompt(template, variables);
	
	
	if (openAiStreamingChatModel == null) {
	openAiStreamingChatModel = OpenAiStreamingChatModel.builder()
								.apiKey(openAiKey)
								.modelName(GPT_3_5_TURBO)
								.build();
	}

  
  
  

	return Flux.create(emitter ->
			openAiStreamingChatModel.generate(prompt.text(), 
			new StreamingResponseHandler<>() {
			
			  
			
			@Override
			public void onNext(String token) {
				emitter.next(token);
			}
			
			  
			
			@Override
			public void onComplete(Response<AiMessage> response) {
				chatQueryService.saveChat(userId, 
										 . chatbot, 
										   . response.content().text(), 
											false
										);
			
				emitter.complete();
			}
			
			  
			  
			
			@Override
			public void onError(Throwable error) {	
				log.error("[OpenAiChatService generateStreamingChat] 에러 발생 ::{}", error);
				emitter.next("에러가 발생했습니다. 관리자에게 문의하세요.");
				emitter.complete();
	
			}));

}

Flux.create
Flux 스트림을 생성합니다. emitter는 FluxSink 객체로, 데이터 스트림을 내보낼 수 있습니다. Flux.create는 데이터를 생성하고 Flux.sink를 통해 비즈니스 로직을 제공받아 가공된 Flux를 생성합니다.

emitter
FluxSink 인터페이스의 인스턴스입니다. 데이터를 Flux 스트림으로 내보냅니다. next, complete, error 등의 메서드를 오버라이딩하여 스트림을 제어할 수 있습니다.

StreamingResponseHandler
Open AI의 스트리밍 응답을 처리하기 위해 해당 StreamingResponseHandler를 사용합니다. 이 핸들러를 사용하기 위해서는 onNext, onComplete, onError 3가지 메서드를 구현해야합니다.

onNext 메서드

실시간으로 생성되는 데이터를 수신했을 때 onNext 메서드를 호출합니다.
emitter.next(token) 을 호출해 받은 데이터를 Flux 스트림으로 전달합니다.

@Override
public void onNext(String token) {
	emitter.next(token);
}

onComplete 메서드

스트리밍이 완료되었을 때 호출합니다.

@Override
public void onComplete(Response<AiMessage> response) {
    chatQueryService.saveChat(userId, 
							  chatbot, 
                              response.content().text(), 
                              false
                             );

    emitter.complete();
}

  1. 다음 접속시에 채팅 내역을 제공해야하기 때문에 Dynamo DB를 활용해 채팅 내용을 저장합니다.
  2. emitter.complete() 를 호출하여 Flux 스트림을 활용합니다.

onError 메서드

스트리밍 중 에러가 발생했을 때 호출합니다.


@Override
public void onError(Throwable error) {	
	log.error("[OpenAiChatService generateStreamingChat] 에러 발생 ::{}", error);
	emitter.next("에러가 발생했습니다. 관리자에게 문의하세요.");
	emitter.complete();
}));

원래는 emitter.error() 를 활용해 Publisher에서 에러를 발생시켜야합니다. 하지만, 특정 에러 응답을 반환하기 보다는, 서버에서 에러 로깅 후 클라이언트 대화창에서 바로 에러메시지를 출력하기로 구현 스펙을 결정했기 때문에 다음과 같은 비즈니스 로직으로 구현하였습니다.

  1. 에러를 로깅한다.
  2. 대화 로그에 에러 발생 메시지를 포함시킨다.
  3. 스트리밍을 종료한다.

Nginx 설정법

HTTP 1.1을 사용하도록 변경

Nginx의 디폴트 값
Nginx는 기본적으로 업스트림 요청을 보낼 때, HTTP/1.0버전을 사용합니다. 하지만 SSE는 HTTP/1.1버전 부터 사용할 수 있습니다.
또한 Connection:close 헤더를 사용합니다. SSE는 지속 연결이 되어 있어야하는데, Nginx에서 바로 지속연결을 닫아버리기 때문에 문제가 발생합니다.

변경된 설정값

proxy_set_header Connection '';
proxy_http_version 1.1;

Proxy Buffering 해제

Proxy Buffering이란
클라이언트와 서버 중간에 위치한 Nginx는 트래픽 최적화를 위해, 요청 및 응답을 일시적으로 저장하고 처리합니다.

SSE와 Proxy Buffering의 관계
SSE의 특성상 실시간으로 데이터를 스트리밍합니다. 이 스트리밍된 데이터는 바로바로 유저에게 전달되어야합니다. Proxy Buffering이 켜져있을 경우 Nginx가 서버의 응답을 일부 버퍼에 저장하고 버퍼가 차거나 응답 데이터를 모두 전송했을 경우 한번에 클라이언트로 전송합니다. 즉 원래 기능 명세대로, 한글자씩 반환하는 것이 아닌 몇 줄에 한번씩 클라이언트는 답변을 확인할 수 있어 실시간성이 떨어지게 됩니다.

X-Accel 활용하기

@PostMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) 
public Flux<String> generateChat(@RequestBody ChatReq question, 
								 @AuthenticationPrincipal Member member, 
								 ServerHttpResponse response) { 
	log.info("{}", question); 
	headers = response.getHeaders(); 
	headers.add("X-Accel-Buffering", "no"); 
	return chatBotService.question(question, member.getId()); }
}

응답 헤더에 X-accel로 시작하는 헤더가 있으면 Nginx는 버퍼링을 수행하지 않습니다.  SSE 응답을 반환하는 API의 헤더에 X-Accel-Buffering: no를 붙여줘 SSE 응답만 버퍼링을 하지 않도록 설정하였습니다.

추후 개선 point

DB Connection 고갈 문제

확인해야할 사항
SSE 통신을 하는 동안에는 HTTP Connection이 계속 열려있습니다. 챗봇은 기본적으로 Dynamo DB에 채팅 로그를 저장합니다. HTTP 연결이 지속되는 동안에 DynamoDB 커넥션이 열려있는지 확인하는 과정이 필요합니다.

만약 커넥션이 열려있다면?

DynamoDB의 제약조건 (파티션)

  1. 초당 1k WCU(4kb/s or req/s)제공
  2. 초당 3k RCU(1kb/s or req/s) 제공
  3. RCU와 WCU는 독립적으로 동작
  4. 10GB

Dynamo DB의 장점은

Dynamo DB의 세벌복제 시스템

  • 데이터는 항상 3개의 가용영역에 복제됩니다.
  • 서비스는 3개의 가용 영역에서 실행됩니다.