자바 네트워크 소녀 네티( 정경석 저 / 이희승 감수, 한빛미디어 ) 를 읽고 정리합니다.
📘2부 5장. 네티 상세 - 이벤트 모델
🔹이벤트 루프
🔸이벤트 기반 애플리케이션이 이벤트를 처리하는 방법
- 이벤트 리스너와 이벤트 처리 스레드를 사용하는 방법 : 이벤트를 처리하는 로직을 가진 이벤트 메서드를 대상 객체의 이벤트 리스너에 등록하고, 객체에 이벤트가 발생했을 때 이벤트 처리 스레드에서 등록된 메서드를 수행한다. (이때 메서드를 수행하는 스레드는 대부분 단일 스레드로 구현한다.)
- 이벤트 큐에 이벤트를 등록하고 이벤트 루프가 이벤트 큐에 접근하여 처리하는 방법:
이벤트 루프가 다중 스레드일 때, 이벤트 큐는 여러개의 스레드에서 공유되며 가장 먼저 이벤트 큐에 접근한 스레드가 첫 번째 이벤트를 가져와서 이벤트를 수행한다. ⇒ 이때 이벤트 큐에 처리된 이벤트를 처리하고자 이벤트 루프 스레드를 사용한다.
객체에서 발생한 이벤트는 이벤트 큐에 입력되고, 이벤트 루프는 이벤트 큐에 입력된 이벤트가 있을 때, 해당 이벤트를 꺼내서 이벤트를 실행한다.
이벤트 루프가 지원하는 스레드 종류에 따라 단일 스레드 이벤트 루프, 다중 스레드 이벤트 루프로 나뉜다.
또한 처리한 이벤트 결과를 돌려주는 방식에 따라서 콜백 패턴과 퓨쳐 패턴으로 나뉜다.
🔸단일 스레드와 다중 스레드 이벤트 루프
단일 스레드 이벤트 루프
- 이베트를 처리하는 스레드가 하나이므로 이벤트 루프의 구현이 단순하고 예측 가능한 동작을 보장한다.
- 하나의 스레드가 이벤트 큐에 입력된 이벤트를 처리하므로 이벤트가 발생한 순서대로 처리 가능하다.
- 하지만 다중코어 CPU를 효율적으로 사용하지 못하며, 이벤트 메서드에 처리 시간이 오래 걸리는 작업이 섞여 있을 때 나중에 들어온 이벤트는 처리까지 오래 걸린다.
다중 스레드 이벤트 루프
- 구현이 상대적으로 복잡하지만 이벤트 메서드를 병렬적으로 수행하므로 다중 코어 CPU를 효율적으로 사용한다.
-> 시간이 많이 걸리는 작업을 여러 스레드로 분할 처리해 전체 처리 시간을 단축시킬수 있다. - 이벤트의 발생순서와 실행순서가 일치한다는 보장이 없다.
- 여러 스레드가 하나의 이벤트 큐에 접근하므로 스레드 경합이 발생하기도 하며 컨텍스트 스위칭 비용이 발생한다.
- 스레드 개수가 증가에 따라 성능이 증가하다가 최대 성능을 넘기게 되면 처리량이 줄어들게된다.
⇒ 스레드 개수가 지속적으로 증가하면 다중 처리에 의해 성능 향상보다 스레드 경합에 의한 성능 저하가 발생하게 된다. 따라서 적절하게 스레드 개수를 설정하는 것이 중요하다
+) 컨텍스트 스위칭 : 운영체제는 현재 상태가 대기 , 슬립, 지연인 스레드 중의 하나를 선택해 실행 상태로 바꾸는데, 이때 스레드가 가진 스택 정보를 현재 코어의 레지스터로 복사하는 작업
🔸네티의 이벤트 루프
네티는 단일/다중 스레드 이벤트 루프를 모두 사용할 수 있다.
다중 스레드는 이벤트의 발생순서와 실행순서가 일치하지 않다고 했으나 네티는 이벤트의 발생순서와 실행순서 일치를 보장한다.
일반적인 다중 스레드 이벤트 루프를 사용하는 프레임워크는 다음과 같은 구조로 이벤트를 처리한다.
하나의 객체가 E1~E4의 이벤트를 발생시켰고 두개의 이벤트 루프 스레드가 E1, E2를 처리한다.
이때 이벤트 루프 스레드 1이 수행하고 있는 이벤트 E2가 가장 먼저 완료되면 다음에 수행되는 이벤트는 E3가 된다.
만약 E1이 수행이 완료되기 전에 이벤트 E3이 먼저 완료된다면 이벤트의 처리 순서는 E2,E3,E1,E4가 되어 이벤트 발생순서와 실행 순서가 일치하지 않는다.
이러한 상황은 다음과 같은 상황에 문제가 될 수 있다.
파일의 데이터를 읽어서 연결된 소켓 채널로 데이터를 전송하고 소켓 채널을 닫는 애플리케이션을 작성한다고 가정하자.
이 애플리케이션에서 처리해야하는 순서는 아래와 같다.
- 읽어들일 파일을 열어서 데이터를 버퍼로 읽어들이고 파일 닫기 이벤트를 발생시킨다. - 읽어들인 데이터를 채널에 기록하고 기록 완료 이벤트를 발생시킨다. - 데이터를 기록한 소켓 채널을 닫는 이벤트를 발생시킨다. |
만약 소켓 채널을 닫는 이벤트가 먼저 발생한다면 애플리케이션은 완전히 기록되지 않게 되는 것이다.
🔹네티가 이벤트 발생 순서와 실행 순서를 일치시킬 수 있는 이유
네티에서는 다중 스레드 이벤트 루프를 사용함에도 불구하고 이벤트 발생순서와 실행 순서를 일치시킬 수 있는 이유는 아래의 세가지 특지에 기반한다.
- 네티의 이벤트는 채널에서 발생한다.
- 이벤트 루프 객체는 이벤트 큐를 가지고 있다.
- 네티의 채널은 하나의 이벤트 루프에 등록된다.
채널은 각 이벤트 루프 스레드에 등록된다.
채널에서 발생한 이벤트는 항상 동일한 이벤트 루프 스레드에서 처리하여 이벤트 발생 순서와 처리 순서가 일치하게 된다.
이벤트의 수행 순서가 일치하지 않는 근본적인 이유는 이벤트 루프들이 이벤트 큐를 공유하기 때문에 발생하는데 네티는 이벤트큐를 이벤트 루프 스레드 내부에 둠으로써 수행 순서 불일치의 원인을 제거했다.
하나의 이벤트 루프 스레드는 여러 채널을 등록할 수 있다. 여러 채널이 이벤트 루프에 등록되었을때도 이벤트 처리는 항상 발생순서와 일치한다.
🔹SingleThreadEventExecutor
네티는 이벤트를 처리하기 위해 SingleThreadEventExecutor를 사용한다. 주요코드를 살펴보자
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
//1
private final Queue<Runnable> taskQueue;
...
//2
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
...
//3
protected Runnable pollTask() {
assert inEventLoop();
return pollTaskFrom(taskQueue);
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) {
return task;
}
}
}
...
//4
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = getCurrentTimeNanos();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
//AbstractEventExecutor
protected static void safeExecute(Runnable task) {
try {
runTask(task);
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
protected static void runTask(@Execute Runnable task) {
task.run();
}
...
}
- 이벤트 큐를 선언한다.
- 이벤트 큐를 저장하기 위해 LinkedBlockingQueue 객체를 사용하고
- 이벤트 루프 스레드의 생성자에서 newTaskQueue를 호출하여 Runnable객체 형태의 이벤트를 저장하는 큐 객체를 생성한다.
- taskQueue 에서 큐 하나를 가져온다. -> Queue 인터페이스의 poll 메서드는 큐에 입력된 가장 첫 번째 객체를 가져오고 큐에서 해당 객체를 삭제한다. 만약 큐가 비어있다면 null 객체를 돌려준다.
- 이벤트 큐에 입력된 모든 이벤트를 수행한다.
- 이벤트 큐에 입력된 이벤트 중에서 가장 먼저 발생한 이벤트를 하나 가져와 task 를 수행한다.
- 이벤트는 Runnable 인터페이스의 run을 통해 수행된다.
🔹네티의 비동기 I/O 처리
네티는 비동기 호출을 위한 2가지 패턴을 제공한다.
- 리액터 패턴의 구현체인 이벤트 핸들러
- 퓨쳐 패턴
퓨쳐 패턴은 미래에 완료될 작업을 등록하고 처리 결과를 확인하는 객체를 통해서 작업의 완료를 확인하는 패턴이다.
퓨쳐패턴은 메서드를 호출하는 즉시 퓨쳐 객체를 돌려준다. 메서드의 처리 결과는 나중의 퓨쳐 객체를 통해 확인가능하다.
public class EchoServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
앞서 살펴본 EchoServer에서 이미 퓨쳐패턴을 사용하고 있었다.
퓨쳐패턴의 구현체인 ChannelFuture 객체를 사용한다.
ChannelFuture는 채널 I/O 비동기 호출을 지원하고자 제공된다.
-> 메서드 호출의 결과로 ChannelFuture 객체를 돌려받게 되고 이 객체를 통해서 작업의 완료 유무를 확인할 수 있다.
⇒ 이때 while루프를 작성하여 작업의 완료 유무를 확인하는 방식은 코드의 복잡성이 증가하기 때문에 좋은 코드 패턴은 아니고,
네티에서는 이와 같은 경우에 ChannelFuture 객체에 작업이 완료되었을 때 수행할 채널 리스너를 설정할 수 있다.
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ChannelFuture channelFuture = ctx.writeAndFlush(msg);
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
연결된 클라이언트로부터 데이터를 수신하면 데이터를 돌려주고 데이터 전송이 완료되면 연결된 소켓 채널을 닫는다.
- 수신된 데이터를 클라이언트 소켓 버퍼에 기록하고 데이터를 채널로 전송하는 비동기 메서드인 writeAndFlush를 호출하고 ChannelFuture 객체를 돌려받는다.
- ChannelFuture 객체에 채널을 종료하는 리스너를 등록한다. ChannelFutureListener.CLOSE 리스너는 네티가 제공하는 기본 리스너로 ChannelFuture 객체가 완료 이벤트를 수신할 때 수행된다.
+)
CLOSE : ChannelFuture 객체가 작업 완료 이벤트를 수신했을 때, ChannelFuture 객체에 포함된 채널을 닫는다. 작업 성공 여부와 상관없이 수행된다.
CLOSE_ON_FAILURE: ChannelFuture 객체가 완료 이벤트를 수신하고 결과가 실패일 때 ChannelFuture 객체에 포함된 채널을 닫는다.
FIRE_EXCEPTION_ON_FAILURE: ChannelFuture 객체가 완료 이벤트를 수신하고 결과가 실패일 때 채널 예외 이벤트를 발생시킨다.
ChannelFutureListener 인터페이스를 구현한 클래스를 작성해 ChannelFuture 객체에 등록하면 네티가 제공하는 채널 리스너보다 더 복잡한 작업을 처리할 수 있다.
public class EchoServerHandlerWithFuture extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ChannelFuture channelFuture = ctx.writeAndFlush(msg);
final int writeMessageSize = ((ByteBuf)msg).readableBytes();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("전송한 Bytes: "+ writeMessageSize);
future.channel().close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
위 클래스는 channelRead 이벤트에서 수신한 데이터를 클라이언트로 돌려주고 전송한 데이터 크기를 출력한 다음 클라이언트 채널을 닫는다.
- msg 의 크기를 writeMessageSize에 저장하고, 사용자 정의 채널 리스너를 생성해 channelFuture 객체에 할당했다.
- operationComplete 메서드는 ChannelFuture 객체에서 발생하는 작업 완료 이벤트 메서드로서 사용자 정의 채널 리스터 구현에 포함되어있다.
- 그 후 전송한 데이터 크기를 출력하고 ChannelFuture 객체에 포함된 채널을 가져와 채널 닫기 이벤트를 발생시켰다.
'Netty' 카테고리의 다른 글
[Netty] 네티 바이트 버퍼 (0) | 2025.03.27 |
---|---|
[Netty] 코덱을 이용한 HTTP 서버 구현과 사용자 정의 코덱 작성 (0) | 2025.03.25 |
[Netty] 네티의 이벤트 실행, 채널 파이프라인, 이벤트 핸들러 (0) | 2025.03.23 |
[Netty] 부트스트랩이란, 부트스트랩 구조 및 설정(ServerBootStrap, BootStrap) (0) | 2025.03.22 |
[Netty] 네티의 주요특징 - 이벤트 기반 프로그래밍 (1) | 2025.03.15 |