Java NIO 패키지

이전에 올렸던 java.io 패키지에 대해서 공부하고 포스팅했지만, Java 4부터 등장한 java.nio에 대해서도 궁금하여 포스팅하였습니다.

1. IO와 NIO의 차이

NIO는 의미만 봤을 때 Non-blocking IO의 줄임말이라고 생각했지만, 사실 New IO의 줄임말이였습니다.

java.io 패키지랑 무슨 차이가 있는지 javadoc에서 찾아보니 파일에 데이터를 읽고 쓰는 통로 역할을 하는 채널은 버퍼라는 곳에 항상 데이터를 read하거나 write 하도록 되어있다고 나와있습니다.

IO 같은 경우에는 Stream을 통해서 파일로부터 데이터를 읽거나 쓰도록 되어 있고, NIO는 Channel을 통해서 무조건 버퍼에 데이터를 읽거나 씁니다.

Stream

  • 파일을 읽기 위한 InputStream, 파일을 쓰기 위한 OutputStream 객체가 별도로 존재하고, 단방향으로만 데이터가 흐릅니다.

Channel

  • 양방향으로 데이터가 흐를 수 있고, ByteChannel, FileChannel을 만들어서 읽고 쓰는게 가능합니다.

  • io와 다르게 Non-Blocking 방식도 가능합니다. 하지만 언제나 Non-blocking 방식으로 동작하는 것이 아니라는것을 명심해야 합니다.

2. NIO Channel

Channel을 살펴보기전에 기본적으로 Buffer에 대한 개념을 알아야 합니다. 채널을 통한 파일 입출력은 무조건 버퍼를 사용해야 합니다. 기본적으로 nio 패키지에서 정적(static) 메서드를 이용하여 생성할 수 있습니다.

Channel을 통한 파일 읽기 예제

public class ChannelReadExam {

    public static void main(String[] args) {

        Path path = Paths.get("/Users/limjun-young/workspace/privacy/dev/test/video/video/temp.txt");
        // 채널 객체를 파일 읽기 모드로 생성합니다.
        try (FileChannel ch = FileChannel.open(path, StandardOpenOption.READ)) {
            // 1024 바이트 크기를 가진 Buffer 객체 생성
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            ch.read(buffer);

            buffer.flip();
            Charset charset = Charset.defaultCharset();
            String inputData = charset.decode(buffer).toString();
            System.out.println("inputData: " + inputData);

            buffer.clear();

        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("파일 작업 실패");
        }
    }
}

Channel을 통한 파일 wirte 예제 코드

public class ChannelWriteExam {

   public static void main(String[] args) {

        Path path = Paths.get("/Users/limjun-young/workspace/privacy/dev/test/video/video/output.txt");

        try (FileChannel ch = FileChannel.open(path, 
        StandardOpenOption.WRITE, 
        StandardOpenOption.CREATE)) {

            String data = "NIO Channel을 이용해서 파일에 데이터를 써보겠습니다.";
            Charset charset = Charset.defaultCharset();
            ByteBuffer buffer = charset.encode(data);
            ch.write(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

실행 결과

image

Channel 객체 생성

  • 채널(Channel) 생성 옵션을 가진 기본 라이브러리 Enum 클래스
  • open() 메서드를 이용한 채널 인스턴스 생성 시 옵션은 여러 개 중복으로 넣어줄 수 있습니다.
옵션 설명
READ 읽기용으로 파일을 엽니다.
WRITE 쓰기용으로 파일을 엽니다.
CREATE 파일이 없으면 새 파일을 생성합니다.
CREATE_NEW 새 파일 생성합니다. (기존에 존재하면 예외 발생)
APPEND 추가 모드로 파일을 엽니다.(EOF 위치부터 시작, WRITE / CREATE와 같이 사용)
DELETE_ON_CLOSE 채널이 닫힐 때 파일을 삭제합니다.
TRUNCATE_EXISTING 파일을 열 때 파일 내용을 모두 삭제 합니다.(0 바이트로 만들고, WRITE와 같이 사용합니다.)

java.nio.Path / Files 클래스를 이용해 미리 파일 상태를 확인해서 Path 객체를 생성한 뒤 적절한 옵션을 사용하면 됩니다.

ByteBuffer 객체 생성

파일 I/O를 자주하면 allocate()를 크게 하나 만들어두고 계속 사용합니다.

ByteBuffer buffer = ByteBuffer.allocate(10);

기본적으로 아래와 같이 메모리에 버퍼가 생성되고, 파일의 데이터를 가르키는 파일 포인터처럼 버퍼도 버퍼 포인터가 존재합니다.

  • Capacity: 버퍼의 전체 크기
  • Position: 현재 버퍼를 쓰거나 읽을 위치, 파일 포인터의 개념과 같은 버퍼 포인터라고 보면 됩니다.
  • Limit: 전체 크기 중에 실제 읽고 쓸 수 있는 위치를 따로 지정한 것으로 기존에 Capacity와 동일하게 생성됩니다.

image

Charset 객체 생성

위의 과정을 통해 파일과 채널을 생성하고 읽고 쓸 수 있는 버퍼 생성까지 완료했습니다. 이제 파일을 I/O 준비가 되었습니다.

외부의 문자 데이터를 주고 받을 때는 서로 같은 인코딩 타입을 사용하지 않을 수 있습니다. Window 환경에서는 메모장은 ANSI 코드를 사용하고, Java는 charset으로 유니코드를 사용합니다. 따라서 한글처럼 2byte 이상으로 이루어진 문자를 유니코드를 출력해도 메모장에서는 해당 문자가 깨지게 됩니다.

이러한 문제를 해결하기 위해 인코딩 타입 간 변환을 위해 일단 Charset 클래스의 인스턴스를 하나 생성해야 합니다.

아래와 같이 2가지 방법으로 Charset 인스턴스 생성이 가능합니다.

Charset charset = Charset.defaultCharset();
Charset charset = Charset.forName("UTF-8");
  • defaultCharset(): OS의 인코딩 타입 간 변환을 해주는 객체 생성합니다.
  • forName("타입"): 직접 입력한 타입 간 변환을 해주는 객체를 생성합니다.

위 예제 코드에서는 파일 읽기 용으로 Channel을 생성하였기 때문에, Buffer에 파일 데이터를 읽어와서 Charset을 통해서 해당 인코딩 타입으로 다시 디코드하여 문자열을 출력하고 있습니다.

Charset 인코딩

encode() 데이터를 UTF-8로 인코딩 후 버퍼에 저장하고 있습니다.

String data = "NIO Channel을 이용해서 파일에 데이터를 써보겠습니다.";
Charset charset = Charset.defaultCharset();
ByteBuffer buffer = charset.encode(data);

Charset 디코딩

decode() 메서드는 버퍼에 저장된 바이너리 값을 UTF-8로 디코딩 후 문자열로 리턴하고 있습니다.

Charset charset = Charset.defaultCharset();
String inputData = charset.decode(buffer).toString();
System.out.println("inputData: " + inputData);

3. NIO에 대한 오해

NIO가 의미만 봤을 때 Non-Blocking 방식으로 동작할 것 같지만 생각만큼 Non-Blocking 하지 않다고 합니다.

예를 들어서 아래 java.nio.Files는 NIO 중에서 File I/O를 담당합니다. 파일을 읽는데 사용되는 Files.newBufferedReader(), Files.newInputStream()등은 모두 blocking 입니다. 마찬가지로 Files.newBufferedWriter(), Files.newOutputStream 등도 모두 blocking 입니다.

그렇다면 왜 사용할까 찾아보니 성능적인면에서 FILE I/O에 사용되는 Channel이 blocking 모드로 동작하지만 데이터를 Buffer를 통해 이동시키므로써 기존의 java.io 패키지에서 사용하는 Stream I/O에서 병목을 유발하는 몇가지 레이어를 건너뛸 수 있어서 성능상의 이점을 누릴 수 있다고 합니다.

4. Non-Blocking 방식으로 I/O 처리

Java 7부터 도입되어 NIO2라고 불리는 NIO에는 AsynchronousFileChannel이 Non-Blocking 모드로 동작합니다.

AsynchronousFileChannel 예제 코드

아래는 AsynchronousFileChannel 객체를 이용하여 Non-Blocking 처리하는 예제 코드를 작성해봤습니다. 여기서 try-with-resources를 사용할 경우 파일을 버퍼에 저장 후 CompletionHandler의 콜백 함수가 실행이 되는데 try-with-resources에서 자동으로 닫히기 때문에 비동기 방식으로 파일을 읽으려고 하는 순간 예외가 발생합니다. 그래서 try-with-resources 구문을 사용하지 않고 콜백 함수에서 close 하도록 처리하였습니다.

public class AsynchronousFileChannelExam {

    public static void main(String[] args) {

        Path path = Paths.get("/Users/limjun-young/workspace/privacy/dev/test/video/video/output.txt");

        try {

            AsynchronousFileChannel ch = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            long position = 0;

            ch.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    // 읽은 바이트 수를 리턴합니다.
                    System.out.println("result = " + result);

                    attachment.flip();
                    byte[] data = new byte[attachment.limit()];
                    attachment.get(data);
                    System.out.println(new String(data));
                    attachment.clear();

                    // AsynchronousFileChannel close 처리
                    if (ch != null || ch.isOpen()) {
                        try {
                            ch.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                            throw new RuntimeException(e);
                        }
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    System.out.println("파일 읽기 실패");
                    exc.printStackTrace();
                }
            });
            System.out.println("Non-Blocking 중이니?");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

참조 사이트: https://codevang.tistory.com/160

'SpringFramework > JAVA' 카테고리의 다른 글

비동기 프로그래밍  (0) 2020.04.11
프록시 패턴 예제  (0) 2020.02.15
객체지향 프로그래밍  (0) 2020.01.01
멀티 스레드의 개념  (0) 2019.12.30
스트림 메소드 2편  (0) 2019.12.05

Cursor와 Paging 기반 ItemReader 구현체

1. Cursor 기반의 ItemReader 구현체

Cursor 방식은 DB와 커넥션을 맺은 후, Cursot를 한칸씩 옮기면서 지속적으로 데이터를 가져옵니다. DB와 어플리케이션 사이 통로를 하나 연결해서 하나씩 데이터를 가져온다고 생각하면 됩니다.

Cursor 기반의 ItemReader의 대표적인 구현체는 아래와 같습니다.

  • JdbcCursorItemReader
  • HibernateCursorItemReader
  • StoredProcedureItemReader

JdbcCursorItemReader 사용 예제코드

아래 예제코드는 jojoldu님의 블로그의 스프링 배치 글을 보고 따라서 작성해본 예제코드 입니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcCursorItemReaderJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    private static final int chunkSize = 10;

    @Bean
    public Job jobCursorItemReaderJob() {
        return jobBuilderFactory.get("jdbcCursorItemReaderJob")
                .start(jdbcCursorItemReaderStep())
                .build();
    }

    // ItemReader에서 반환하는 타입은 Pay, Writer에서 저장하는 타입도 Pay입니다.
    @Bean
    public Step jdbcCursorItemReaderStep() {
        return stepBuilderFactory.get("jdbcCursorItemReaderStep")
                .<Pay, Pay> chunk(chunkSize)
                .reader(jdbcCursorItemReader())
                .writer(jdbcCursorItemWriter())
                .build();
    }

    @Bean
    public JdbcCursorItemReader<Pay> jdbcCursorItemReader() {
        return new JdbcCursorItemReaderBuilder<Pay>()
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
                .name("jdbcCursorItemReader")
                .build();
    }

    @Bean
    public ItemWriter<Pay> jdbcCursorItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay= {}", pay);
            }
        };
    }
}

Pay.class

@ToString
@Setter
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Entity
public class Pay {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long amount;
    private String txName;
    private LocalDateTime txDateTime;

    private Pay(Long amount, String txName, LocalDateTime txDateTime) {
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = txDateTime;
    }

    private Pay(Long id, Long amount, String txName, LocalDateTime txDateTime) {
        this.id = id;
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = txDateTime;
    }

    public static Pay of(Long amount, String txName, LocalDateTime txDateTime) {
        return new Pay(amount, txName, txDateTime);
    }
}

실제로 AbstractItemCountingItemStreamItemReader.read() 메서드를 호출하면 하위 추상 클래스인 AbstractCursorItemReader.doRead() 메서드가 호출 됩니다.

@Nullable
@Override
protected T doRead() throws Exception {
    if (rs == null) {
        throw new ReaderNotOpenException("Reader must be open before it can be read.");
    }

    try {
        if (!rs.next()) {
            return null;
        }
        int currentRow = getCurrentItemCount();
        T item = readCursor(rs, currentRow);
        verifyCursorPosition(currentRow);
        return item;
    }
    catch (SQLException se) {
        throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
    }
}

템플릿 메서드 패턴을 사용하기 때문에 readCursor() 메서드는 실제로 비어있고, 하위 클래스인 JdbcCursorItemReader에서 구현하고 있습니다. 하나씩 DB에서 ResultSet을 open해서 읽어오고 있습니다.

JdbcCursorItemReader.class

@Nullable
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
    return rowMapper.mapRow(rs, currentRow);
}

JdbcCursorItemReaderBuilder를 통해서 JdbcCursorItemReader 구현체를 생성하기 위해 다양한 메서드를 호출하고 있는데 하나씩 살펴보겠습니다. 각 역할들을 살펴보겠습니다.

  • fetchSize: DB에서 한번에 가져올 데이터의 양을 나타냅니다. 분할처리 없이 내부적으로 가져오는 데이터는 FetchSize 만큼 가져와 read()를 통해 하나씩 가져옵니다.

ResultSet의 동작 과정을 알아야하는데 최초로 ResultSet.next() 메서드를 호출 시 한 꺼번에 fetchSize 만큼 DB에서 가져와 메모리에 저장합니다. 그 다음 read() 메서드로 메모리에서 하나씩 읽어서 처리합니다.

  • dataSource: Database에 접근하기 위해 사용할 Datasource 객체를 할당합니다.

  • rowMapper: 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper 입니다. 커스텀하기 보다는 매번 Mapper 클래스를 생성해야 하기 때문에 보편적으로 Spring에서 공식적으로 지원하는 BeanPropertyRowMapper를 많이 사용합니다.

  • name: ItemReader의 이름을 지정합니다. Bean의 이름이 아니고 Spring Batch의 ExecutionContext에 저장되어질 이름입니다.

ItemWriter는 Chunk 크기의 List 객체를 받아서 간단히 로그만 찍도록 처리하였습니다.

CursorItemReader 사용 시 주의사항

CursorItemReader를 사용할 때는 DB와 SocketTimeout을 충분히 큰 값으로 설정해야 합니다. 기본적으로 TCP 통신은 Socket으로 하기 때문에 타임아웃을 설정해줘야 합니다. Cursor는 하나의 Connection으로 Batch가 끝날때까지 사용되기 때문에 작업이 다 끝나기전에 Connection이 먼저 끊어질 수 있습니다.

Batch 수행 시간이 오래 걸리는 경우에 권장하는 방법은 PagingItemReader를 사용하는 것입니다. Paging의 경우 한 페이지를 읽을 때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있습니다.

2. Paging 기반의 ItemReader 구현체

Paging 방식은 한번에 10개 혹은 개발자가 지정한 pageSize만큼 데이터를 가져옵니다.

Paging 기반의 ItemReader 구현체는 아래와 같습니다.

  • JdbcPagingItemReader
  • HibernatePagingItemReader
  • JpaPagingItemReader

JdbcPagingItemReader는 JdbcCursorItemReader와 같은 JdbcTemplate 인터페이스를 이용한 PagingItemReader 입니다.

JdbcPagingItemReader 사용 예제코드

JdbcPagingItemReader.class

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcPagingItemReaderConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource; // DataSource DI

    private static final int chunkSize = 10;

    @Bean
    public Job jdbcPagingItemReaderJob() throws Exception {
        return jobBuilderFactory.get("jdbcPagingItemReaderJob")
                .start(jdbcPagingItemReaderStep())
                .build();
    }

    @Bean
    public Step jdbcPagingItemReaderStep() throws Exception {
        return stepBuilderFactory.get("jdbcPagingItemReaderStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(jdbcPagingItemReader())
                .writer(jdbcPagingItemWriter())
                .build();
    }

    @Bean
    public JdbcPagingItemReader<Pay> jdbcPagingItemReader() throws Exception {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("amount", 2000);

        return new JdbcPagingItemReaderBuilder<Pay>()
                .pageSize(chunkSize)
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .queryProvider(createQueryProvider())
                .parameterValues(parameterValues)
                .name("jdbcPagingItemReader")
                .build();
    }

    private ItemWriter<Pay> jdbcPagingItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay={}", pay);
            }
        };
    }

    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해
        queryProvider.setSelectClause("id, amount, tx_name, tx_date_time");
        queryProvider.setFromClause("from pay");
        queryProvider.setWhereClause("where amount >= :amount");

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        return queryProvider.getObject();
    }
}

JdbcCursorItemReader 클래스와 크게 다른 점은 쿼리를 생성하는 부분입니다. JdbcPagingItemReader 클래스는 PagingQueryProvider를 통해 쿼리를 생성합니다.

이렇게까지 하는 이유는 각 DB에 Paging을 지원하는 자체적인 전략 때문입니다.

SqlPagingQueryProviderFactoryBean.class

{
    providers.put(DB2, new Db2PagingQueryProvider());
    providers.put(DB2VSE, new Db2PagingQueryProvider());
    providers.put(DB2ZOS, new Db2PagingQueryProvider());
    providers.put(DB2AS400, new Db2PagingQueryProvider());
    providers.put(DERBY,new DerbyPagingQueryProvider());
    providers.put(HSQL,new HsqlPagingQueryProvider());
    providers.put(H2,new H2PagingQueryProvider());
    providers.put(MYSQL,new MySqlPagingQueryProvider());
    providers.put(ORACLE,new OraclePagingQueryProvider());
    providers.put(POSTGRES,new PostgresPagingQueryProvider());
    providers.put(SQLITE, new SqlitePagingQueryProvider());
    providers.put(SQLSERVER,new SqlServerPagingQueryProvider());
    providers.put(SYBASE,new SybasePagingQueryProvider());
}

Spring Batch는 SqlPagingQueryProviderFactoryBean을 통해 DataSource 설정 값을 보고 위 코드에서 작성된 Provider 중 하나를 자동으로 선택하도록 합니다. 이렇게 하면 코드 변경 사항이 적어서 Spring Batch에서 공식 지원하는 방법입니다.

그럼 다시 위의 코드들이 무엇을 의미하는지 살펴보겠습니다.

  • parameterValues: 쿼리에 대한 매개 변수 값의 Map 타입을 지정합니다.
    아래 queryProvider.setWhereClause을 보면 어떻게 변수를 사용하는지 알 수 있습니다.
 queryProvider.setWhereClause("where amount >= :amount");

where 절에서 선언된 파라미터 변수 명과 parameterValues.put("amount", 2000) 코드에서 key에 해당하는 변수명이 일치해야 합니다.

JdbcPagingItemReader는 추상클래스인 AbstractPagingItemReader를 상속받고 있습니다. Paging 쿼리로 limit, offset 만큼 조회를 하면 AbstractPagingItemReader가 내부적으로 가지고 있는 List<T> results 변수에서 페이지 크기만큼의 row들을 가지고 있다가 read() 메서드 호출 시에 List 내부에서 하나씩 Item을 리턴합니다.

PagingItemReader 사용 시 주의사항

정렬(Order)가 무조건 포함되어 있어야 합니다.

참조 사이트: https://n1tjrgns.tistory.com/159

스프링 부트 휴먼회원 배치 설계

위에서 살펴본 스프링 부트 배치 컴포넌트들을 이용하여 커뮤니티 사이트에 가입한 회원 중 1년이 지나도록 상태 변화가 없는 회원을 휴먼회원으로 전환하는 배치 예제코드를 작성하였습니다.

기술스펙은 아래와 같습니다.

  • Java 8
  • Gradle 6.3
  • Spring Boot 2.3.0 RELEASE
  • IntelliJ IDEA 2020.02
  • H2

의존성 라이브러리들은 Spring Data JPA, H2, Lombok, Spring Batch starter 시리지들로 선택하였습니다.

전체 배치 프로세스

Untitled Diagram (2)

처리 절차

  1. H2 DB에 저장된 데이터 중 1년간 업데이트 되지 않은 사용자를 찾는 로직을 ItemReader로 구현합니다.
  2. 대상 사용자 데이터의 상태 값을 휴먼회원으로 전환하는 프로세스를 ItemProcessor에 구현합니다.
  3. 상태 값이 변한 휴먼회원을 실제로 DB에 저장하는 ItemWriter를 구현합니다.

1. build.gradle 의존성 설정

buildscript {
    ext {
        springBootVersion = '2.3.0.RELEASE'
        gradle_node_version='2.2.4'
    }

    repositories {
        mavenCentral()
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }

    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

subprojects {
    apply plugin: 'java'
    apply plugin: 'eclipse'
    apply plugin: 'org.springframework.boot'
    apply plugin: 'io.spring.dependency-management'

    group = 'com.junyoung'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '1.8'

    repositories {
        mavenCentral()
    }

    configurations {
        compileOnly {
            extendsFrom annotationProcessor
        }
    }

    dependencies {
        compile('org.springframework.boot:spring-boot-starter-batch')
        runtime('com.h2database:h2')
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
        testImplementation('org.springframework.boot:spring-boot-starter-test') {
            exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
        }
        testCompile group: 'junit', name: 'junit', version: '4.12'
        testCompile('org.springframework.batch:spring-batch-test')
    }

    test {
        useJUnitPlatform()
    }
}

1. 도메인 작성

먼저 휴먼회원 배치 처리에 사용될 도메인을 작성합니다. 객체 명은 User이고, 휴먼 여부를 판별하는 UserStatus Enum을 추가하였습니다. ACTIVE는 활성회원, INACTIVE는 휴먼회원입니다.

UserStatus (회원 활성화 상태)

public enum UserStatus {
    ACTIVE, INACTIVE;
}

Grade (회원 등급)

public enum Grade {
    VIP, GOLD, FAMILY;
}

User 도메인 객체

@NoArgsConstructor
@Entity
@Table
public class User implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long idx;

    @Column
    private String name;

    @Column
    private String password;

    @Column
    private String email;

    @Column
    private String principle;

    @Column
    @Enumerated(EnumType.STRING)
    private SocialType socialType;

    @Column
    @Enumerated(EnumType.STRING)
    private UserStatus status;

    @Column
    @Enumerated(EnumType.STRING)
    private Grade grade;

    @Column
    private LocalDateTime createdDate;

    @Column
    private LocalDateTime updatedDate;

    @Builder
    public User(String name, String password, String email, String principle,
                SocialType socialType, UserStatus status, Grade grade, LocalDateTime createdDate,
                LocalDateTime updatedDate) {
        this.name = name;
        this.password = password;
        this.email = email;
        this.principle = principle;
        this.socialType = socialType;
        this.status = status;
        this.grade = grade;
        this.createdDate = createdDate;
        this.updatedDate = updatedDate;
    }

    public User setInactive() {
        status = UserStatus.INACTIVE;
        return this;
    }
}

UserRepository 리포지터리

public interface UserRepository extends JpaRepository<User, Long> {
    List<User> findByUpdatedDateBeforeAndStatusEquals(LocalDateTime localDateTime, UserStatus status);
}

findByUpdatedDateBeforeAndStatusEquals() 메서드는 인자 값으로 LocalDateTime, 즉 현재 기준 날짜 값보다 1년 전의 날짜 값을 받고 두 번째 인자값으로 UserStatus 타입을 받아 쿼리를 실행하는 메서드 입니다.

2. 휴먼회원 배치 Job 설정

먼저, 스프링 부트를 실행하는 Entry 포인트인 BatchApplication 파일에서 아래와 같이 @EnableBatchProcessing을 활성화 시켜야 합니다.

@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

EnableBatchProcessing를 적용해야 배치 작업에 필요한 빈을 미리 등록하여 사용할 수 있습니다.

배치 정보는 아래 @Configuration 어노테이션을 사용하는 설정 클래스에서 빈으로 등록합니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job inactiveUserJob() {
        // (1) JobBuilderFactory 주입
        return jobBuilderFactory.get("inactiveUserJob3")
                // (2) Job의 재실행 방지
                .preventRestart()
                .start(inactiveJobStep(null))
                .build();
    }

}
  1. (1)는 Job 생성을 직관적이고 편리하게 도와주는 빌더인 JobBuilderFactory를 주입하였습니다.
  2. (2)는 preventRestart()는 Job의 재실행을 막습니다.

위에 Job 설정을 완료하였고, 이제 Step을 설정하겠습니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    @Bean
    public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
        log.info("requestDate: {}", requestDate);
        // (1)  StepBuilderFactory 주입
        return stepBuilderFactory.get("inactiveUserStep")
                // (2) chunk 사이즈 입력
                .<User, User>chunk(10)
                // (3) reader, processor, writer를 각각 설정
                .reader(inactiveUserReader())
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }
}
  1. (1)은 StepBuilderFactory의 get("inactiveUserStep")은 inactiveUserStep이라는 이름의 StepBuilder를 생성합니다.
  2. (2)는 제네릭을 사용해 chunk의 입력 타입과 출력 타입을 User 타입으로 설정하였습니다. 인자 값은 10으로 설정했는데 쓰기 시에 청크 단위로 묶어서 writer() 메서드를 실행시킬 단위를 지정하였습니다. 즉, 커밋의 다누이가 10개입니다.
  3. (3)은 reader, processor, writer를 각각 설정하였습니다.

3. Chunk 지향 처리

여기서... Chunk가 무엇일까요?

Chunk는 덩어리라는 뜻으로 Spring Batch에서 각 커밋 사이에 처리되는 row 수를 애기합니다. 만약에 Chunk로 처리하지 않을 경우에 DB에서 데이터가 1000개인 로우를 읽어와서 배치처리를 하는 경우를 생각할 수 있습니다.

배치처리를 하는 중에 1개의 데이터를 저장하는데 문제가 생기면 나머지 999개의 데이터도 rollback 처리를 해야됩니다. 이러한 문제를 방지하기 위해서 Chunk 지향 프로세스 방식으로 스프링 부트에서 배치 실행을 지원하고 있습니다.

즉, Chunk 단위로 트랜잭션을 수행하기 때문에 실패하는 경우 해당 Chunk 만큼 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영이 된다는 것입니다.

Chunk는 스프링 부트 배치를 잘 사용하기 위해서 반드시 알아둬야 하는 개념이라고 생각됩니다.

아래 예시코드는 jojoldu님이 블로그에서 가져온 Chunk 지향 처리를 Java 코드로 표현하는 것입니다.

for (int i = 0; i < totalSize; i += chunkSize) {

    List<Item> items = new ArrayList<> ();

    for (int j = 0; j < chunkSize; j++) {
        Object item = itemReader.read();
        Object processedItem = item.Processor.process(item);
        items.add(processedItem);    
    }
    itemWriter.write(items);
}

Chunk 단위로 reader, process, writer로 처리하기 때문에 만약 chunkSize가 10일 경우에 process나 write에서 예외가 발생한다면 전부 rollback 되고, 그 다음 chunkSize만큼 배치가 처리 됩니다. 참조 블로그

4. ItemReader 구현

인터페이스인 ItemReader(데이터를 읽어오는 역할)를 설정한 부분입니다. 여기서는 ItemReader 인터페이스를 구현한 QueueItemReader 구현체를 리턴하고 있습니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    @StepScope // (1) Step의 주기에 따라 새로운 빈 생성
    public QueueItemReader<User> inactiveUserReader() {
        List<User> oldUsers =
                userRepository.findByUpdatedDateBeforeAndStatusEquals(
                        LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
        return new QueueItemReader<>(oldUsers);
    }
}
  1. 기본 빈 생성은 싱글턴 방식이지만, (1)에서 @StepScope를 사용하면 해당 메서드는 Step 주기에 따라 새로운 빈을 생성합니다. 즉, 각 Step의 실행마다 새로운 빈을 만들기 때문에 지연 생성이 가능합니다. @StepScope는 기본 프록시 모드가 반환되는 클래스 타입을 참조하기 때문에 @StepScope를 사용하면 반드시 구현된 반환 타입을 명시해 반환해야 합니다. 위 예제에서는 QueueItemReader라고 명시했습니다.
  2. findByUpdatedDateBeforeAndStatusEquals() 메서드로 현재 날짜 기준 1년 전의 날짜값과 User의 상태값이 ACTIVE인 User 리스트를 조회하고, QueueItemReader 객체 생성 시 파라미터로 넣어서 Queue에 담도록 하고 있습니다.

QueueItemReader

public class QueueItemReader<T> implements ItemReader<T> {

    private Queue<T> queue;

    public QueueItemReader(List<T> data) {
        this.queue = new LinkedList<>(data);
    }

    @Override
    public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return this.queue.poll();
    }
}

위 코드를 보면 QueItemReader를 사용해 휴먼회원으로 지정될 타깃 데이터를 한번에 불러와 큐에 담아놓습니다.

그리고 read() 메서드를 호출할 때 큐의 poll() 메서드를 사용하여 큐에서 데이터를 하나씩 반환합니다.

5. ItemProcessor 구현

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    ...

    public ItemProcessor<User, User> inactiveUserProcessor() {
        return new ItemProcessor<User, User>() {
            @Override
            public User process(User user) throws Exception {
                return user.setInactive();
            }
        };
    }
}

ItemReader에서 읽은 User를 휴먼 상태로 전환하는 processor 메서드를 추가하는 예입니다.

6. ItemWriter 구현

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    ...

     public ItemWriter<User> inactiveUserWriter() {
        return ((List<? extends User> users) -> userRepository.saveAll(users));
    }
}

휴먼회원 전환 배치 처리 최종 코드

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job inactiveUserJob() {
        return jobBuilderFactory.get("inactiveUserJob3")
                .preventRestart()
                .start(inactiveJobStep(null))
                .build();
    }

    @Bean
    public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
        log.info("requestDate: {}", requestDate);
        return stepBuilderFactory.get("inactiveUserStep")
                .<User, User>chunk(10)
                .reader(inactiveUserReader())
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }

    @Bean
    @StepScope
    public QueueItemReader<User> inactiveUserReader() {
        List<User> oldUsers =
                userRepository.findByUpdatedDateBeforeAndStatusEquals(
                        LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
        return new QueueItemReader<>(oldUsers);
    }

    public ItemProcessor<User, User> inactiveUserProcessor() {
        return new org.springframework.batch.item.ItemProcessor<User, User>() {
            @Override
            public User process(User user) throws Exception {
                return user.setInactive();
            }
        };
    }

    public ItemWriter<User> inactiveUserWriter() {
        return ((List<? extends User> users) -> userRepository.saveAll(users));
    }
}

ItemWriter는 리스트 타입을 앞서 설정한 Chunk 단위로 받습니다. Chunk 단위를 10으로 설정했으므로 users에는 휴먼회원 10개 가 주어지며 saveAll() 메서드를 사용해서 한번에 DB에 저장합니다.

SQL문을 사용하여 테스트 데이터 주입

실제 배치를 실행하기 전에 테스트 할 데이터를 만들기 위해 SQL 쿼리 파일을 생성하여 실행하였습니다.

기본적으로 /resources 하위 경로에 import.sql 파일을 생성하면 스프링 부트(정확히는 하이버네이트)가 실행될 때 자동으로 해당 파일의 쿼리를 실행합니다.

insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1001, 'test@test.com', 'test1', 'test1', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1002, 'test@test.com', 'test2', 'test2', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1003, 'test@test.com', 'test3', 'test3', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1004, 'test@test.com', 'test4', 'test4', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1005, 'test@test.com', 'test5', 'test5', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1006, 'test@test.com', 'test6', 'test6', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1007, 'test@test.com', 'test7', 'test7', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1008, 'test@test.com', 'test8', 'test8', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1009, 'test@test.com', 'test9', 'test9', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1010, 'test@test.com', 'test10', 'test10', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1011, 'test@test.com', 'test11', 'test11', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');

실행 결과

Before

image

After

image

휴먼회원 배치 실행 결과 UPDATE_DATE 컬럼 값이 현재 시점에서 1년전이고 상태값이 ACTIVE인 회원들의 상태 값이 INAVTIVE로 변경되는 것을 확인할 수 있었습니다.

참조: https://jojoldu.tistory.com/331?category=902551, https://github.com/young891221/Spring-Boot-Community-Batch

@JobScope와 @StepScope

@JobScope@StepScope는 스프링의 기본 Scope인 싱글톤 방식과는 대치되는 역할입니다.

Bean의 생성 시점이 스프링 애플리케이션이 실행되는 시점이 아닌 @JobScope, @StepScope가 명시된 메서드가 실행될 때까지 지연시키는 것을 의미합니다. 이러한 행위를 Late Binding이라고도 합니다.

Spring Batch에서 이렇게 Late Binding을 하면서 얻는 이점들은 아래와 같습니다.

  1. JobParameter를 특정 메서드가 실행하는 시점까지 지연시켜 할당시킬 수 있습니다.
    즉, 애플리케이션이 구동되는 시점이 아니라 비즈니스 로직이 구현되는 어디든 JobParameter를 할당함으로 유연한 설계를 가능하게 합니다.
  2. 병렬처리에 안전합니다.
    Step의 구성요소인 ItemReader, ItemProcessor, ItemWriter이 있고, ItemReader에서 데이터를 읽어 오는 메서드를 서로 다른 Step으로 부터 동시에 병렬 실행이 된다면 서로 상태를 간섭받게 될 수 있습니다. 하지만 @StepScope를 적용하면 각각의 Step에서 실행될 때 서로의 상태를 침범하지 않고 처리를 완료할 수 있습니다.

@JobScope는 Step 선언문에서만 사용이 가능하고, @StepScope는 Step을 구성하는 ItemReader, ItemProcessor, ItemWriter에서 사용 가능합니다.

1. JobParameters 사용 시 주의사항

JobParameters는 아래 예제코드처럼 @Value를 통해서 가능합니다. JobPameters는 Step이나 Tasklet, Reader 등 배치 컴포넌트 Bean의 생성 시점에 호출할 수 있습니다. 정확하게 말해서 Scope Bean을 생성할때만 가능합니다.

즉, @StepScope, @JobScope Bean을 생성할 때만 JobParameters가 생성되기 때문에 사용할 수 있습니다.

 @Bean
 @JobScope
 public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
       log.info("requestDate: {}", requestDate);
       return stepBuilderFactory.get("inactiveUserStep")
               .<User, User>chunk(10)
               .reader(inactiveUserReader())
               .processor(inactiveUserProcessor())
               .writer(inactiveUserWriter())
               .build();
 }

스프링 부트 배치 입문을 위한 용어정리

스프링 부트 배치는 대용량 데이터를 처리하는 기술로만 알고 있어서, 이번 기회에 한번 개념만 살펴보았습니다.

스프링 부트 배치를 왜 사용하는지 장점부터 살펴보았습니다.

  • 대용량 데이터 처리에 최적화되어 고성능을 발휘합니다.
  • 효과적인 로깅, 통계 처리, 트랜잭션 관리 등 재사용 가능한 필수 기능을 지원합니다.
  • 수동으로 처리하지 않도록 자동화 되었습니다.
  • 예외 사항과 비정상 동작에 대한 방어 기능이 있습니다.
  • 스프링 부트 배치의 반복되는 작업 프로세스를 이해하면 비즈니스 로직에 집중할 수 있습니다.

일반적으로 스프링 부트 배치의 절차는 읽기 -> 처리 -> 쓰기를 따릅니다.

  1. 읽기(read): 데이터 저장소(일반적으로 데이터 베이스)에서 특정 데이터 레코드를 읽습니다.
  2. 처리(processing): 원하는 방식으로 데이터를 가공/처리합니다.
  3. 쓰기(write): 수정된 데이터를 다시 저장소(데이터베이스)에 저장합니다. 혹은 외부 API를 통해 내보내기도 합니다.

아래 그림은 배치 처리와 관련된 객체의 관계입니다.

Untitled Diagram (1)

Job과 Step은 1:M, Step과 ItemReader, ItemProcessor, ItemWriter는 1:1의 관계를 가집니다. 즉, Job이라는 하나의 큰 일감(Job)에 여러 단계(Step)을 두고, 각 단계를 배치의 기본 흐름대로 구현합니다.

스프링 부트 배치 용어들에 대해서 간단하게 개념정리를 해봤습니다.

1. Job

Job은 배치 처리 과정을 하나의 단위로 만들어 표현한 객체입니다. 전체 배치 처리에 있어 항상 최상단 계층에 있습니다. 스프링 배치에서 Job 객체는 여러 Step 인스턴스를 포함하는 컨테이너 입니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class SimpleJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job simpleJob() {
        // simpleJob이라는 이름을 가진 Job을 생성할 수 있는 JobBuilder 인스턴스 반환
        return jobBuilderFactory.get("simpleJob")
                // simpleJobBuilder 인스턴스 반환
                .start(simpleStep1())
                // simpleJob이라는 이름을 가진 Job 인스턴스 반환 
                .build(); 
    }
}

보통 배치 Job 객체를 만드는 빌더는 여러 개 있습니다. 여러 빌더를 통합 처리하는 공장인 JobBuiderFactory 객체로 원하는 Job을 손쉽게 만들 수 있습니다. JobBuilderFactory의 get() 메서드를 호출하여 JobBuilder를 생성하고 이를 이용합니다.

JobBuilder

JobBuildr의 메서드들을 까보면 모든 반환 타입이 빌더입니다. 아무래도 비즈니스 환경에 따라서 Job 생성 방법이 모두 다르기 때문에 별도의 구체적인 빌더를 구현하고 이를 통해 Job 생성이 이루어지게 하려는 의도가 아닌가 싶습니다.

JobInstance

JobInstance는 배치에서 Job이 실행될 때 하나의 Job 실행 단위입니다. 만약 하루에 한 번씩 배치의 Job이 실행된다면 어제와 오늘 실행한 각각의 Job을 JobInstance라고 부를 수 있습니다.

만약 JobInstance가 Job 실행이 실패하면 JobInstance가 끝난 것이 아닙니다. 이 경우 JobExecution이 실패 정보를 가지고 있고, 성공하면 JobInstance는 끝난 것으로 간주합니다. 그리고 성공한 JobExecution을 가져서 총 두개를 가지게 됩니다. 여기서 JobInstance와 JobExecution은 부모와 자식관계로 생각하면 됩니다.

JobParameters

JobParameters는 Job이 실행될 때 필요한 파라미터들을 Map 타입으로 저장하는 객체입니다.
JobParameters는 JobInstance를 구분하는 기준이 되기도 합니다. 예를 들어서 Job 하나를 생성할 때 시작 시간 등의 정보를 파리미터로 해서 하나의 JobInstance를 생성합니다. 즉 JobInstance와 JobParameters는 1:1 관계입니다. 파라미터 타입으로는 String, Long, Date, Double를 사용할 수 있습니다.

StepExecution

Job에 JobExecution이라는 Job 실행 정보가 있따면 Step에는 StepExecution이라는 Step 실행 정보를 담는 객체가 있습니다. 각각의 Step이 실행될 때마다 StepExecution이 생성됩니다.

JobRepository

JobRepository는 배치 처리 정보를 담고 있는 메커니즘입니다. 어떤 Job이 실행되었으며 몇 번 실행되었고 언제 끝났는지 등 배치 처리에 대한 메타 데이터를 저장합니다.

예를 들어 Job 하나가 실행되면 JobRepository에서는 배치 실행에 관련된 정보를 담고 있는 도메인인 JobExecution을 생성합니다.

JobRepository는 Step의 실행 정보를 담고 있는 StepExecution도 저장소에 저장하며 전체 메타데이터를 저장/관리하는 역할을 수행합니다.

JobLauncher

JobLauncher는 Job, JobParameters와 함께 배치를 실행하는 인터페이스입니다. 인터페이스의 메서드는 run() 메서드 하나입니다.

public interface JobLauncher {
    public JobExecution run(Job job, JobParameters jobParameters) throw ...
}

run() 메서드는 매개변수로 Job과 JobParameters를 받아 JobExecution을 반환합니다. 만약 매개변수가 이전과 동일하면서 이전에 JobExecution이 중단된 적이 있다면 동일한 JobExecution을 반환합니다.

ItemReader

ItemReader는 Step의 대상이 되는 배치 데이터를 읽어오는 인터페이스입니다. FILE, XML, DB 등 여러 타입의 데이터를 읽어올 수 있습니다.

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

ItemReader에서 read() 메서드의 반환 타입을 제네릭으로 구현했기 때문에 직접 타입을 지정할 수 있습니다.

ItemProcessor

ItemProcessor는 ItemReader로 읽어온 배치 데이터를 변환하는 역할을 수행합니다.

ItemProcessor가 존재하는 이유는 비즈니스 로직을 분리하기 위해서입니다. ItemWriter는 정말 저장만 수행하고, ItemProcessor 로직 처리만 수행해 역할을 명확하게 분리하여 유지보수성을 용이하게 합니다. 또 다른 이유는 읽어온 배치 데이터와 쓰여질 데이터 타입이 다를 경우 대응하기 위해서입니다. 명확한 인풋과 아웃풋을 ItemProcessor로 구현해놓는다면 더 직관적인 코드가 됩니다.

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

ItemWriter

ItemWriter는 배치 데이터를 저장합니다. 일반적으로 DB나 파일에 저장합니다.

public interface ItemWriter<T> {
    public write<(List<? extends T> items) throws Exception;
}

ItemWriter와 ItemReader와 비슷한 방식으로 구현하면 됩니다. 제네릭으로 원하는 타입을 받습니다. write() 메서드는 List 자료구조를 사용해 지정한 타입이 리스트를 매개변수로 받습니다. 리스트 데이터 수는 설정한 청크 단위로 불러옵니다. write() 메서드의 반환 값은 따로 없고 매개 변수로 받은 데이터를 저장하는 로직을 구현하면 됩니다.

이제 다음편에는 실제로 어떻게 스프링 부트 배치를 사용하는지 예제코드를 소개하겠습니다.

비동기 프로그래밍

프로그래밍을 하면서 비동기(Async), 동기(Sync), 블로킹(Blocking), 논 블로킹(Non-Blocking) 방식의 프로그래밍이라는 언어를 많이 들어보게 됩니다. 저도 회사에서 비동기 방식으로 코드를 작성한적이 있었는데 아무래도 기능을 빠르게 만들다보니 제대로 공부를 하지 않고 작성한 경우가 대부분이라서 오늘은 비동기, 동기, 블로킹, 논 블로킹이 무엇인지 살펴보고 간단하게 예제 코드를 작성하였습니다.

제가 생각하는 동기, 비동기는 메서드를 제공해주는 객체 입장에서 보는 관점이라고 생각하고, 블로킹, 논 블로킹은 메서드를 호출하는 곳 즉 클라이언트 관점이라고 생각합니다. 물론 여러 블로그 포스팅을 보면 아직까지 정확한 정의가 개발자분들마다 다르다고 생각합니다.

블로킹(Blocking), 논 블로킹(Non-Blocking)

동기,비동기는 메서드를 제공하는 곳의 입장이라면, 블로킹, 논 블로킹은 메서드를 호출하는 곳, 즉 클라이언트에서의 입장입니다.

만약 데이터를 조회하는 메서드를 제공하는 객체를 클라이언트에서 호출한다고 하면 블로킹 방식으로 프로그래밍을 구현하면 데이터 조회 메서드를 호출하는 순간 클라이언트의 코드 흐름에 대한 제어권이 데이터를 조회하는 부분으로 넘어가게 됩니다. 그러면 클라이언트는 데이터 조회 결과를 받기전 까지 아무것도 할 수 없는 블로킹 상태에 빠지게 됩니다.

반대로, 논 블로킹 방식은 데이터 조회 메서드를 호출 후 제어권을 넘기지 않고 다른 작업을 수행할 수 있는 프로그래밍 방식이라고 생각하면 됩니다.

이제 동기, 비동기, 블로킹, 논 블로킹을 좀 더 이해하기 위해 간단한 예제 코드로 설명하겠습니다.

커피 이름으로 커피 가격을 조회하는 Repository 객체를 생성하였습니다. H2 DB를 이용해서 조회를 하려고 했지만 아무래도 가볍게 설명하기 위해서는 Map 객체를 사용하는게 좋다고 판단이 되었습니다.

커피 도메인 클래스를 만들어서 해당 인스턴스를 생성하여 Map 객체에서 관리하도록 하기 위해서 만들었습니다.

public class Coffee {

    private String name;
    private int price;

    @Builder
    public Coffee(String name, int price) {
        this.name = name;
        this.price = price;
    }

    public static Coffee makeCoffee(String name, int price) {
        return Coffee.builder()
                .name(name)
                .price(price)
                .build();
    }

}
@Repository
public class CoffeeRepository {

    private Map<String, Coffee> coffeeMap = new HashMap<>();

    @PostConstruct
    public void init() {
        coffeeMap.put("latte", Coffee.makeCoffee("latte", 3500));
        coffeeMap.put("mocha", Coffee.makeCoffee("mocha", 4000));
        coffeeMap.put("americano", Coffee.makeCoffee("americano", 2000));
    }

    public int getPriceByName(String name) {

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return coffeeMap.get(name).getPrice();
    }
}

위의 코드를 살펴보면, 스프링 부트 환경에서 작성하였습니다. 데이터를 제공해주는 Repository 클래스의 getPriceByName() 메서드를 정의하여 커피의 이름을 파라미터로 받아서 커피의 가격을 리턴해주는 메서드입니다.
단, 해당 메서드는 1초의 지연 시간을 주기로 하였습니다. 클라이언트는 커피의 가격을 조회하기 위해서 최소 1초가 걸릴 것입니다.

이제 이 Repository 객체를 의존하는 서비스 인터페이스를 아래와 같이 정의하였습니다.

public interface CoffeeUseService {
    int getPrice(String name); // 동기
    CompletableFuture<Integer> getPriceAsync(String name); // 비동기
    CompletableFuture<Integer> getDiscountPriceAsync(Integer price); // 비동기
}

getPrice()는 동기 메서드이고, Async가 붙은 나머지 메서드 두개는 비동기 방식의 메서드입니다. 기능을 제공하는 곳에서 동기, 비동기에 대한 개념을 포함하고 있습니다. 블로킹으로 할지, 논 블로킹으로 할지 선택은 기능을 제공하는 클래스에서 결정되는게 아니라, 해당 메서드를 호출 하는곳, 즉 클라이언트에서 선택할 것입니다.

동기(Sync) 방식

CoffeeUseService 인터페이스의 구현체를 작성하였습니다. 첫번째 메서드인 getPrice()는 동기 방식으로 데이터를 제공합니다. 즉, 클라이언트에서 제어권을 받아서 데이터 처리 연산이 완료되어야 반환하는 방식입니다.

@Slf4j
@RequiredArgsConstructor
@Service
public class CoffeeUseServiceImpl implements CoffeeUseService {

    private final CoffeeRepository coffeeRepository;
    Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    @Override
    public int getPrice(String name) {
        log.info("동기 호출 방식으로 가격 조회 시작");

        return coffeeRepository.getPriceByName(name);
    }
}

테스트 코드를 작성하여 검증하였습니다. CoffeeTest라는 테스트 클래스를 생성하고, @SpringBootTest 어노테이션을 사용해서 테스트에 필요한 빈들을 주입하여 사용하도록 했습니다. 참고로 @SpringBootTest 어노테이션을 사용할 경우 스프링 컨테이너에 생성된 모든 Bean들을 가져오기 때문에 간단한 테스트를 할 경우에는 필요한 Bean들만 가져와서 사용하는 것을 권고드립니다. 저는 귀찮아서 @SpringBootTest 어노테이션을 사용했습니다.

@SpringBootTest
public class CoffeeTest {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    CoffeeUseService coffeeUseService;

    @Test
    public void  가격_조회_동기_블로킹_호출_테스트() throws Exception {

        //given
        int expectedPrice = 2000;

        //when
        int resultPrice = coffeeUseService.getPrice("americano");
        logger.info("최종 가격 전달: [{}]", resultPrice);

        //then
        assertThat(resultPrice).isEqualTo(expectedPrice);

     }
}

실행 결과

image

테스트가 성공적으로 수행되었습니다. 1초라는 지연시간이 걸렸습니다. 만약 두 번 수행하면 동기호출이기 때문에 2초가 넘게 걸릴 것입니다.

비동기(Async) 메서드, 논 블로킹 + 블로킹 혼합

이번에는 Async(비동기) 메서드를 구현하겠습니다. 여기서는 CompleteFuture를 사용합니다. 이 객체는 비동기 방식의 메서드를 호출하여 리턴값을 받고자 할때 자주 사용하는 객체라고 생각하면 됩니다. 사실 저도 ComplteFuture 클래스를 최근 들어서 써봤기 때문에 Doc은 아래 블로그 포스팅에 잘나와있습니다.

참조: https://gunju-ko.github.io/java/2018/07/05/Future.html

비동기 메서드에서는 새로운 쓰레드를 생성해서 Repository를 통해서 데이터를 조회합니다. 최종 데이터 연산이 끝나지 않아도 일단 return future를 실행해서 먼저 껍데기만 반환하게 됩니다.

@Override
public CompletableFuture<Integer> getPriceAsync(String name) {

    log.info("비동기 호출 방식으로 가격 조회 시작");

    CompletableFuture<Integer> future = new CompletableFuture<>();

    new Thread(() -> {
        log.info("새로운 쓰레드로 작업 시작");
        Integer price = coffeeRepository.getPriceByName(name);
        future.complete(price);
    }).start();

    return future;
}

getPriceByName() 메서드에서 1초의 지연시간을 임의로 주었지만, 해당 데이터는 무작정 기다리지 않고, 다른 작업을 병행할 수 있습니다. 아래와 같이 테스트 코드를 통해 검증하였습니다.

@Test
public void 가격_조회_비동기_블록킹_호출_테스트() throws Exception {

    //given
    int expectedPrice = 3500;

    //when
    // 비동기 메소드 호출 후 껍데기 반환
    CompletableFuture<Integer> future = coffeeUseService.getPriceAsync("latte");
    logger.info("아직 최종 데이터를 전달 받지는 않았지만, 다른 작업 수행 가능");

    int resultPrice = future.join(); // 블로킹
    logger.info("최종 가격 전달 받음: [{}]", resultPrice);


    //then
    assertThat(resultPrice).isEqualTo(expectedPrice);
}

CompletableFuture로 리턴을 받았지만, 최종 데이터를 조회하기 전까지 다른 작업을 병핼할 수 있습니다. 즉, 제어권을 넘겨주지 않고 다른 작업을 할 수 있습니다.

실행 결과

image

하지만, 최종 데이터를 조회하기 위해서는 CompletableFuture의 join 또는 get 메서드를 사용해야 합니다. 일단 get과 join은 예외처리를 하는 방식이 조금 다릅니다. 이 정도 차이만 있고, join이나 get을 수행하는 시점에서는 데이터를 조회할 때까지 블로킹 됩니다. 데이터가 계산이 안되었다면 될때까지 기다렸다가 결과를 전달받습니다.

결국 동기든 비동기는 결과 값을 받기 위해서는 로직이 다 돌아야되기 때문이죠.

메서드를 제공하는 곳에서는 CompletableFuture를 반환하고, 메서드를 사용하는 곳, 즉 클라이언트에서는 논 블로킹과 블로킹이 혼합되어 있는 상황입니다. 어떻게 보면 완전한 논블로킹 프로그래밍은 아닙니다.

비동기(Async)를 더 깔끔하게 수정하기

getPriceAsync 메서드를 좀 더 깔끔하게 수정해보겠습니다. 참고로, 테스트 코드는 수정이 되지 않습니다. 즉 메서드를 리팩토링한 이후에도 테스트 코드는 수정없이 정상적으로 통과해야 합니다. CompletableFuture에서는 몇개의 유용한 팩토리 메서드를 제공하는데, 그 중에서 supplyAsync와 runAsync 메서드를 살펴보겠습니다. supplyhAsync 메서드는 supplier라는 함수적 인터페이스를 파라미터로 받습니다. 람다에 대해서 알고 있다면 쉽게 이해할 수 있는 내용입니다. 반면에 runAsync 메서드는 Runnable 함수적 인터페이스를 파라미터로 받고 있습니다.

전자인 supplyAsync는 파라미터는 없지만 리턴 값이 존재하고, Runnable은 파라미터, 리턴 모두 없는 함수적 인터페이스입니다. 먼저 supplyAsync 팩토리 메서드를 사용해서 아래와 같이 코드를 수정하였습니다.

@Override
public CompletableFuture<Integer> getPriceAsync(String name) {

    log.info("비동기 호출 방식으로 가격 조회 시작");

    return CompletableFuture.supplyAsync(() -> {
         log.info("Thread CurrentName: [{}]", Thread.currentThread().getName());
        log.info("supplyAsync");
        return coffeeRepository.getPriceByName(name);
    });
}

image

Thread CurrentName: [ForkJoinPool.commonPool-worker-9]

위와 같이 supplyAsync 메서드로 수행하는 로직은 ForkJoinPool의 commonPool을 사용하는 것을 확인할 수 있습니다. 사실, 일반적으로 commonPool을 사용하는 방법은 바람직하지 않습니다. 그래서, 좀 더 수정을 하였습니다. supplyAsync를 실행할 때 Executor를 파라미터로 추가하면, Common Pool에서 동작하지 않고 별도의 쓰레드 풀에서 동작할 것입니다. 함수를 제공하는 코드를 아래와 같이 수정하였습니다.

Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

@Override
public CompletableFuture<Integer> getPriceAsync(String name) {

    log.info("비동기 호출 방식으로 가격 조회 시작");

    return CompletableFuture.supplyAsync(() -> {
        log.info("Thread CurrentName: [{}]", Thread.currentThread().getName());
        log.info("supplyAsync");
        return coffeeRepository.getPriceByName(name);
    }, executor);
}

테스트 코드를 수행하면, commonPool을 사용하지 않고, 별도로 정의한 쓰레드 풀을 사용합니다.

Thread CurrentName: [pool-1-thread-1]

콜백함수를 사용하여 Non-Blocking 프로그래밍 구현 에제를 살펴보겠습니다.

이제, 논 블로킹(Non-Blocking)을 위해서 코드를 더 수정하겠습니다. 비동기 메서드는 수정하지 않고, 클라이언트 코드를 수정해야 합니다.

블로킹, 논 블로킹은 메서드를 사용하는 곳, 즉 클라이언트에서의 입장입니다.

Non-Blocking 구현: thenAccept, thenApply

위 코드는, CompletableFuture의 get, join 메서드를 사용하는데, 해당 메서드를 호출하는 순간 블로킹 현상이 발생합니다. 논 블로킹으로 개선하기 위해서는 콜백 함수를 구현해야 하는데, CompletableFuture는 thenAccept()와 thenApply() 메서드를 제공합니다. thenAccept() 메서드는 CompletableFuture를 반환합니다. 즉 결과를 반환하지 않습니다. 하지만 thenApply() 메서드는 CompletableFuture 즉, 데이터를 포함하는 Future를 반환합니다.

 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

먼저, thenAccept() 메서드를 사용해서 테스트 코드를 작성해보겠습니다. 일단 getPriceAsync()는 CompletableFuture를 반환하는데, 이때 thenAccept() 메서드를 정의하면 콜백함수를 선언할 수 있습니다. CompletableFuture가 complete가 되면, 즉 커피의 가격 조회가 완료되면 thenAccept를 수행하게 될 것 입니다.

@Test
public void 가격_조회_비동기_호출_콜백_테스트() throws Exception {

    //given
    int expectedPrice = 4000;

    //when
    CompletableFuture<Void> future = coffeeUseService
        .getPriceAsync("mocha")
        .thenAccept(p -> {
        logger.info("콜백, 가격은: [{}]", p + "원, 하지만 데이터를 반환하지 않습니다." );
        assertThat(p).isEqualTo(expectedPrice);
                  });
    //then
    logger.info("아직 최종 데이터를 전달 받지는 않았지만, 다른 작업 수행 가능, 논 블로킹");

    assertThat(future.join()).isEqualTo(expectedPrice);
}

위에서 작성했던 테스트 코드처럼 get이나 join 메서드를 사용해서 최종 연산이 된 데이터를 조회할 필요가 없습니다. CompletableFuture 객체에서 알아서 최종 연산이 되면 콜백 함수를 실행해주기 때문입니다. 단, 해당 코드는 테스트 코드이기 때문에 제일 하단에 future.join() 메서드를 실행해서 블로킹 코드를 추가하였습니다. 실제 서비스 코드에서 해당 코드는 필요없지만, 테스트 코드이기 때문에 추가하였는데, 해당 코드가 없다면 thenAccept() 메소드가 수행하기 전에 테스트는 통과해버릴 것입니다. 그 이유는 테스트 코드는 Main 쓰레드에서 동작하게 되고, thenAccept 콜백 메서드가 수행하기도 전에 Main 쓰레드는 종료되기 때문입니다. Non-Blocking 코드이기 때문에 결과가 오는 것을 기다리지 않고 게속 코드가 동작이 되는데, 테스트 코드 특성상 Main 쓰레드가 종료되기 때문에, Main 쓰레드를 종료시키지 않기 위해서 임의로 작성한 코드입니다.

thenAccept는 CompletableFuture를 반환합니다. 즉, 연산된 데이터를 반환하지 않기 때문에 해당 로직이 끝나면 데이터를 조회할 수 없습니다. 만약, 데이터를 반환하기 위해서는 어떻게 구현하면 될까요? 이때는 thenApply 메소드를 사용해야 합니다. 커피의 가격을 조회한 다음에 100원을 추가하고 싶으면 아래와 같이 코드를 작성합니다.

@Test
public void 가격_조회_비동기_호출_콜백_테스트() throws Exception {

    //given
    int expectedPrice = 4000;

    //when
    CompletableFuture<Void> future = coffeeUseService
        .getPriceAsync("mocha")
        .thenApply(p -> {
            logger.info("같은 쓰레드로 동작");
            return p + 100;
        })
        .thenAccept(p -> {
        logger.info("콜백, 가격은: [{}]", p + "원, 하지만 데이터를 반환하지 않습니다." );
        assertThat(p).isEqualTo(expectedPrice);
                  });
    //then
    logger.info("아직 최종 데이터를 전달 받지는 않았지만, 다른 작업 수행 가능, 논 블로킹");

    assertThat(future.join()).isEqualTo(expectedPrice);
}

실행 결과

image

참고로, thenApply와 thenAccept 메서드를 별도의 쓰레드로 동작하고 싶다면, thenApplyAsync와 thenAcceptAsync 메서드를 사용하면 됩니다.

thenCombine 메서드

thenCombine() 메서드는 CompletableFuture를 2개 실행해서 결과를 조합할 때 사용합니다.thenCombine()는 병렬 실행을 해서 조합하는데, 순차적으로 실행하지 않습니다. 커피의 가격을 조회하는 기능은 1초의 지연시간이 있습니다. 만약 순차적으로 조회하면 1+1이 되기 때문에 총 2초가 걸릴 것입니다. 그래서 동시에 두가지 조회를 같이 수행한 다음에 결과를 조합할 것이고, 그러면 2개를 조회하는데 1초가 걸리도록 프로그램을 작성할 것입니다. 이것이 바로 병렬 프로그래밍입니다.

@Test
public void thenCombine_테스트() throws Exception {

    //given
    Integer expectedPrice = 7500;

    //when
    CompletableFuture<Integer> futureA = coffeeUseService.getPriceAsync("latte");
    CompletableFuture<Integer> futureB = coffeeUseService.getPriceAsync("mocha");

    Integer resultPrice = futureA.thenCombine(futureB,Integer::sum).join();

    //then
    assertThat(resultPrice).isEqualTo(expectedPrice);
}

실행 결과

image

커피 이름 중, 라떼와 모카를 조회하는데 총 1초 정도 걸렸습니다. 즉 2초가 걸리지 않았습니다. 두 작업은 별도의 쓰레드 풀에서 동작하고, thenCombine 메서드에 의해서 조합이 됩니다. 여기서 쓰레드 풀의 쓰레드 개수를 1로 주게되면 쓰레드가 한개이기 때문에 병렬로 수행하지 못하고 하나의 쓰레드를 사용합니다. 따라서 1초가 아니고 2초가 걸리게 됩니다.

thenCompose 메소드

thenCompose() 메서드는 바로 위에서 설명한 thenCombine와는 다르게 CompletableFuture를 순차적으로 실행합니다. 가격을 조회하는 기능이 있고, 조회된 가격에서 할인을 하는 기능을 별도로 조회하는 기능을 구현해보겠습니다.
https://github.com/HomoEfficio/dev-tips/blob/master/Java-Spring%20Thread%20Programming%20%EA%B0%84%EB%8B%A8%20%EC%A0%95%EB%A6%AC.md

@Override
public CompletableFuture<Integer> getDiscountPriceAsync(Integer price) {
    return CompletableFuture.supplyAsync(() -> {
            log.info("supplyAsync");
            return (int)(price * 0.9);
    }, threadPoolTaskExecutor);
}
  1. 가격 조회
  2. 조회된 가격에 할인율 적용이라는 기능을 순차적으로 수행해야합니다.
  @Test
public void thenCompose_테스트() throws Exception {

    //given
    Integer expectedPrice = (int)(3500 * 0.9);

    //when
    CompletableFuture<Integer> futureA = coffeeUseService.getPriceAsync("latte");
    Integer resultPrice = futureA.thenCompose(result ->
        coffeeUseService.getDiscountPriceAsync(result)).join();

    //then
    assertThat(resultPrice).isEqualTo(expectedPrice);
}

실행 결과

image

참조: https://brunch.co.kr/@springboot/267

'SpringFramework > JAVA' 카테고리의 다른 글

Java NIO 채널(Channel)  (0) 2021.05.11
프록시 패턴 예제  (0) 2020.02.15
객체지향 프로그래밍  (0) 2020.01.01
멀티 스레드의 개념  (0) 2019.12.30
스트림 메소드 2편  (0) 2019.12.05

+ Recent posts