Java NIO 패키지

이전에 올렸던 java.io 패키지에 대해서 공부하고 포스팅했지만, Java 4부터 등장한 java.nio에 대해서도 궁금하여 포스팅하였습니다.

1. IO와 NIO의 차이

NIO는 의미만 봤을 때 Non-blocking IO의 줄임말이라고 생각했지만, 사실 New IO의 줄임말이였습니다.

java.io 패키지랑 무슨 차이가 있는지 javadoc에서 찾아보니 파일에 데이터를 읽고 쓰는 통로 역할을 하는 채널은 버퍼라는 곳에 항상 데이터를 read하거나 write 하도록 되어있다고 나와있습니다.

IO 같은 경우에는 Stream을 통해서 파일로부터 데이터를 읽거나 쓰도록 되어 있고, NIO는 Channel을 통해서 무조건 버퍼에 데이터를 읽거나 씁니다.

Stream

  • 파일을 읽기 위한 InputStream, 파일을 쓰기 위한 OutputStream 객체가 별도로 존재하고, 단방향으로만 데이터가 흐릅니다.

Channel

  • 양방향으로 데이터가 흐를 수 있고, ByteChannel, FileChannel을 만들어서 읽고 쓰는게 가능합니다.

  • io와 다르게 Non-Blocking 방식도 가능합니다. 하지만 언제나 Non-blocking 방식으로 동작하는 것이 아니라는것을 명심해야 합니다.

2. NIO Channel

Channel을 살펴보기전에 기본적으로 Buffer에 대한 개념을 알아야 합니다. 채널을 통한 파일 입출력은 무조건 버퍼를 사용해야 합니다. 기본적으로 nio 패키지에서 정적(static) 메서드를 이용하여 생성할 수 있습니다.

Channel을 통한 파일 읽기 예제

public class ChannelReadExam {

    public static void main(String[] args) {

        Path path = Paths.get("/Users/limjun-young/workspace/privacy/dev/test/video/video/temp.txt");
        // 채널 객체를 파일 읽기 모드로 생성합니다.
        try (FileChannel ch = FileChannel.open(path, StandardOpenOption.READ)) {
            // 1024 바이트 크기를 가진 Buffer 객체 생성
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            ch.read(buffer);

            buffer.flip();
            Charset charset = Charset.defaultCharset();
            String inputData = charset.decode(buffer).toString();
            System.out.println("inputData: " + inputData);

            buffer.clear();

        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("파일 작업 실패");
        }
    }
}

Channel을 통한 파일 wirte 예제 코드

public class ChannelWriteExam {

   public static void main(String[] args) {

        Path path = Paths.get("/Users/limjun-young/workspace/privacy/dev/test/video/video/output.txt");

        try (FileChannel ch = FileChannel.open(path, 
        StandardOpenOption.WRITE, 
        StandardOpenOption.CREATE)) {

            String data = "NIO Channel을 이용해서 파일에 데이터를 써보겠습니다.";
            Charset charset = Charset.defaultCharset();
            ByteBuffer buffer = charset.encode(data);
            ch.write(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

실행 결과

image

Channel 객체 생성

  • 채널(Channel) 생성 옵션을 가진 기본 라이브러리 Enum 클래스
  • open() 메서드를 이용한 채널 인스턴스 생성 시 옵션은 여러 개 중복으로 넣어줄 수 있습니다.
옵션 설명
READ 읽기용으로 파일을 엽니다.
WRITE 쓰기용으로 파일을 엽니다.
CREATE 파일이 없으면 새 파일을 생성합니다.
CREATE_NEW 새 파일 생성합니다. (기존에 존재하면 예외 발생)
APPEND 추가 모드로 파일을 엽니다.(EOF 위치부터 시작, WRITE / CREATE와 같이 사용)
DELETE_ON_CLOSE 채널이 닫힐 때 파일을 삭제합니다.
TRUNCATE_EXISTING 파일을 열 때 파일 내용을 모두 삭제 합니다.(0 바이트로 만들고, WRITE와 같이 사용합니다.)

java.nio.Path / Files 클래스를 이용해 미리 파일 상태를 확인해서 Path 객체를 생성한 뒤 적절한 옵션을 사용하면 됩니다.

ByteBuffer 객체 생성

파일 I/O를 자주하면 allocate()를 크게 하나 만들어두고 계속 사용합니다.

ByteBuffer buffer = ByteBuffer.allocate(10);

기본적으로 아래와 같이 메모리에 버퍼가 생성되고, 파일의 데이터를 가르키는 파일 포인터처럼 버퍼도 버퍼 포인터가 존재합니다.

  • Capacity: 버퍼의 전체 크기
  • Position: 현재 버퍼를 쓰거나 읽을 위치, 파일 포인터의 개념과 같은 버퍼 포인터라고 보면 됩니다.
  • Limit: 전체 크기 중에 실제 읽고 쓸 수 있는 위치를 따로 지정한 것으로 기존에 Capacity와 동일하게 생성됩니다.

image

Charset 객체 생성

위의 과정을 통해 파일과 채널을 생성하고 읽고 쓸 수 있는 버퍼 생성까지 완료했습니다. 이제 파일을 I/O 준비가 되었습니다.

외부의 문자 데이터를 주고 받을 때는 서로 같은 인코딩 타입을 사용하지 않을 수 있습니다. Window 환경에서는 메모장은 ANSI 코드를 사용하고, Java는 charset으로 유니코드를 사용합니다. 따라서 한글처럼 2byte 이상으로 이루어진 문자를 유니코드를 출력해도 메모장에서는 해당 문자가 깨지게 됩니다.

이러한 문제를 해결하기 위해 인코딩 타입 간 변환을 위해 일단 Charset 클래스의 인스턴스를 하나 생성해야 합니다.

아래와 같이 2가지 방법으로 Charset 인스턴스 생성이 가능합니다.

Charset charset = Charset.defaultCharset();
Charset charset = Charset.forName("UTF-8");
  • defaultCharset(): OS의 인코딩 타입 간 변환을 해주는 객체 생성합니다.
  • forName("타입"): 직접 입력한 타입 간 변환을 해주는 객체를 생성합니다.

위 예제 코드에서는 파일 읽기 용으로 Channel을 생성하였기 때문에, Buffer에 파일 데이터를 읽어와서 Charset을 통해서 해당 인코딩 타입으로 다시 디코드하여 문자열을 출력하고 있습니다.

Charset 인코딩

encode() 데이터를 UTF-8로 인코딩 후 버퍼에 저장하고 있습니다.

String data = "NIO Channel을 이용해서 파일에 데이터를 써보겠습니다.";
Charset charset = Charset.defaultCharset();
ByteBuffer buffer = charset.encode(data);

Charset 디코딩

decode() 메서드는 버퍼에 저장된 바이너리 값을 UTF-8로 디코딩 후 문자열로 리턴하고 있습니다.

Charset charset = Charset.defaultCharset();
String inputData = charset.decode(buffer).toString();
System.out.println("inputData: " + inputData);

3. NIO에 대한 오해

NIO가 의미만 봤을 때 Non-Blocking 방식으로 동작할 것 같지만 생각만큼 Non-Blocking 하지 않다고 합니다.

예를 들어서 아래 java.nio.Files는 NIO 중에서 File I/O를 담당합니다. 파일을 읽는데 사용되는 Files.newBufferedReader(), Files.newInputStream()등은 모두 blocking 입니다. 마찬가지로 Files.newBufferedWriter(), Files.newOutputStream 등도 모두 blocking 입니다.

그렇다면 왜 사용할까 찾아보니 성능적인면에서 FILE I/O에 사용되는 Channel이 blocking 모드로 동작하지만 데이터를 Buffer를 통해 이동시키므로써 기존의 java.io 패키지에서 사용하는 Stream I/O에서 병목을 유발하는 몇가지 레이어를 건너뛸 수 있어서 성능상의 이점을 누릴 수 있다고 합니다.

4. Non-Blocking 방식으로 I/O 처리

Java 7부터 도입되어 NIO2라고 불리는 NIO에는 AsynchronousFileChannel이 Non-Blocking 모드로 동작합니다.

AsynchronousFileChannel 예제 코드

아래는 AsynchronousFileChannel 객체를 이용하여 Non-Blocking 처리하는 예제 코드를 작성해봤습니다. 여기서 try-with-resources를 사용할 경우 파일을 버퍼에 저장 후 CompletionHandler의 콜백 함수가 실행이 되는데 try-with-resources에서 자동으로 닫히기 때문에 비동기 방식으로 파일을 읽으려고 하는 순간 예외가 발생합니다. 그래서 try-with-resources 구문을 사용하지 않고 콜백 함수에서 close 하도록 처리하였습니다.

public class AsynchronousFileChannelExam {

    public static void main(String[] args) {

        Path path = Paths.get("/Users/limjun-young/workspace/privacy/dev/test/video/video/output.txt");

        try {

            AsynchronousFileChannel ch = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            long position = 0;

            ch.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    // 읽은 바이트 수를 리턴합니다.
                    System.out.println("result = " + result);

                    attachment.flip();
                    byte[] data = new byte[attachment.limit()];
                    attachment.get(data);
                    System.out.println(new String(data));
                    attachment.clear();

                    // AsynchronousFileChannel close 처리
                    if (ch != null || ch.isOpen()) {
                        try {
                            ch.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                            throw new RuntimeException(e);
                        }
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    System.out.println("파일 읽기 실패");
                    exc.printStackTrace();
                }
            });
            System.out.println("Non-Blocking 중이니?");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

참조 사이트: https://codevang.tistory.com/160

'SpringFramework > JAVA' 카테고리의 다른 글

비동기 프로그래밍  (0) 2020.04.11
프록시 패턴 예제  (0) 2020.02.15
객체지향 프로그래밍  (0) 2020.01.01
멀티 스레드의 개념  (0) 2019.12.30
스트림 메소드 2편  (0) 2019.12.05

Cursor와 Paging 기반 ItemReader 구현체

1. Cursor 기반의 ItemReader 구현체

Cursor 방식은 DB와 커넥션을 맺은 후, Cursot를 한칸씩 옮기면서 지속적으로 데이터를 가져옵니다. DB와 어플리케이션 사이 통로를 하나 연결해서 하나씩 데이터를 가져온다고 생각하면 됩니다.

Cursor 기반의 ItemReader의 대표적인 구현체는 아래와 같습니다.

  • JdbcCursorItemReader
  • HibernateCursorItemReader
  • StoredProcedureItemReader

JdbcCursorItemReader 사용 예제코드

아래 예제코드는 jojoldu님의 블로그의 스프링 배치 글을 보고 따라서 작성해본 예제코드 입니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcCursorItemReaderJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    private static final int chunkSize = 10;

    @Bean
    public Job jobCursorItemReaderJob() {
        return jobBuilderFactory.get("jdbcCursorItemReaderJob")
                .start(jdbcCursorItemReaderStep())
                .build();
    }

    // ItemReader에서 반환하는 타입은 Pay, Writer에서 저장하는 타입도 Pay입니다.
    @Bean
    public Step jdbcCursorItemReaderStep() {
        return stepBuilderFactory.get("jdbcCursorItemReaderStep")
                .<Pay, Pay> chunk(chunkSize)
                .reader(jdbcCursorItemReader())
                .writer(jdbcCursorItemWriter())
                .build();
    }

    @Bean
    public JdbcCursorItemReader<Pay> jdbcCursorItemReader() {
        return new JdbcCursorItemReaderBuilder<Pay>()
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
                .name("jdbcCursorItemReader")
                .build();
    }

    @Bean
    public ItemWriter<Pay> jdbcCursorItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay= {}", pay);
            }
        };
    }
}

Pay.class

@ToString
@Setter
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Entity
public class Pay {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long amount;
    private String txName;
    private LocalDateTime txDateTime;

    private Pay(Long amount, String txName, LocalDateTime txDateTime) {
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = txDateTime;
    }

    private Pay(Long id, Long amount, String txName, LocalDateTime txDateTime) {
        this.id = id;
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = txDateTime;
    }

    public static Pay of(Long amount, String txName, LocalDateTime txDateTime) {
        return new Pay(amount, txName, txDateTime);
    }
}

실제로 AbstractItemCountingItemStreamItemReader.read() 메서드를 호출하면 하위 추상 클래스인 AbstractCursorItemReader.doRead() 메서드가 호출 됩니다.

@Nullable
@Override
protected T doRead() throws Exception {
    if (rs == null) {
        throw new ReaderNotOpenException("Reader must be open before it can be read.");
    }

    try {
        if (!rs.next()) {
            return null;
        }
        int currentRow = getCurrentItemCount();
        T item = readCursor(rs, currentRow);
        verifyCursorPosition(currentRow);
        return item;
    }
    catch (SQLException se) {
        throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
    }
}

템플릿 메서드 패턴을 사용하기 때문에 readCursor() 메서드는 실제로 비어있고, 하위 클래스인 JdbcCursorItemReader에서 구현하고 있습니다. 하나씩 DB에서 ResultSet을 open해서 읽어오고 있습니다.

JdbcCursorItemReader.class

@Nullable
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
    return rowMapper.mapRow(rs, currentRow);
}

JdbcCursorItemReaderBuilder를 통해서 JdbcCursorItemReader 구현체를 생성하기 위해 다양한 메서드를 호출하고 있는데 하나씩 살펴보겠습니다. 각 역할들을 살펴보겠습니다.

  • fetchSize: DB에서 한번에 가져올 데이터의 양을 나타냅니다. 분할처리 없이 내부적으로 가져오는 데이터는 FetchSize 만큼 가져와 read()를 통해 하나씩 가져옵니다.

ResultSet의 동작 과정을 알아야하는데 최초로 ResultSet.next() 메서드를 호출 시 한 꺼번에 fetchSize 만큼 DB에서 가져와 메모리에 저장합니다. 그 다음 read() 메서드로 메모리에서 하나씩 읽어서 처리합니다.

  • dataSource: Database에 접근하기 위해 사용할 Datasource 객체를 할당합니다.

  • rowMapper: 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper 입니다. 커스텀하기 보다는 매번 Mapper 클래스를 생성해야 하기 때문에 보편적으로 Spring에서 공식적으로 지원하는 BeanPropertyRowMapper를 많이 사용합니다.

  • name: ItemReader의 이름을 지정합니다. Bean의 이름이 아니고 Spring Batch의 ExecutionContext에 저장되어질 이름입니다.

ItemWriter는 Chunk 크기의 List 객체를 받아서 간단히 로그만 찍도록 처리하였습니다.

CursorItemReader 사용 시 주의사항

CursorItemReader를 사용할 때는 DB와 SocketTimeout을 충분히 큰 값으로 설정해야 합니다. 기본적으로 TCP 통신은 Socket으로 하기 때문에 타임아웃을 설정해줘야 합니다. Cursor는 하나의 Connection으로 Batch가 끝날때까지 사용되기 때문에 작업이 다 끝나기전에 Connection이 먼저 끊어질 수 있습니다.

Batch 수행 시간이 오래 걸리는 경우에 권장하는 방법은 PagingItemReader를 사용하는 것입니다. Paging의 경우 한 페이지를 읽을 때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있습니다.

2. Paging 기반의 ItemReader 구현체

Paging 방식은 한번에 10개 혹은 개발자가 지정한 pageSize만큼 데이터를 가져옵니다.

Paging 기반의 ItemReader 구현체는 아래와 같습니다.

  • JdbcPagingItemReader
  • HibernatePagingItemReader
  • JpaPagingItemReader

JdbcPagingItemReader는 JdbcCursorItemReader와 같은 JdbcTemplate 인터페이스를 이용한 PagingItemReader 입니다.

JdbcPagingItemReader 사용 예제코드

JdbcPagingItemReader.class

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcPagingItemReaderConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource; // DataSource DI

    private static final int chunkSize = 10;

    @Bean
    public Job jdbcPagingItemReaderJob() throws Exception {
        return jobBuilderFactory.get("jdbcPagingItemReaderJob")
                .start(jdbcPagingItemReaderStep())
                .build();
    }

    @Bean
    public Step jdbcPagingItemReaderStep() throws Exception {
        return stepBuilderFactory.get("jdbcPagingItemReaderStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(jdbcPagingItemReader())
                .writer(jdbcPagingItemWriter())
                .build();
    }

    @Bean
    public JdbcPagingItemReader<Pay> jdbcPagingItemReader() throws Exception {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("amount", 2000);

        return new JdbcPagingItemReaderBuilder<Pay>()
                .pageSize(chunkSize)
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .queryProvider(createQueryProvider())
                .parameterValues(parameterValues)
                .name("jdbcPagingItemReader")
                .build();
    }

    private ItemWriter<Pay> jdbcPagingItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay={}", pay);
            }
        };
    }

    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해
        queryProvider.setSelectClause("id, amount, tx_name, tx_date_time");
        queryProvider.setFromClause("from pay");
        queryProvider.setWhereClause("where amount >= :amount");

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        return queryProvider.getObject();
    }
}

JdbcCursorItemReader 클래스와 크게 다른 점은 쿼리를 생성하는 부분입니다. JdbcPagingItemReader 클래스는 PagingQueryProvider를 통해 쿼리를 생성합니다.

이렇게까지 하는 이유는 각 DB에 Paging을 지원하는 자체적인 전략 때문입니다.

SqlPagingQueryProviderFactoryBean.class

{
    providers.put(DB2, new Db2PagingQueryProvider());
    providers.put(DB2VSE, new Db2PagingQueryProvider());
    providers.put(DB2ZOS, new Db2PagingQueryProvider());
    providers.put(DB2AS400, new Db2PagingQueryProvider());
    providers.put(DERBY,new DerbyPagingQueryProvider());
    providers.put(HSQL,new HsqlPagingQueryProvider());
    providers.put(H2,new H2PagingQueryProvider());
    providers.put(MYSQL,new MySqlPagingQueryProvider());
    providers.put(ORACLE,new OraclePagingQueryProvider());
    providers.put(POSTGRES,new PostgresPagingQueryProvider());
    providers.put(SQLITE, new SqlitePagingQueryProvider());
    providers.put(SQLSERVER,new SqlServerPagingQueryProvider());
    providers.put(SYBASE,new SybasePagingQueryProvider());
}

Spring Batch는 SqlPagingQueryProviderFactoryBean을 통해 DataSource 설정 값을 보고 위 코드에서 작성된 Provider 중 하나를 자동으로 선택하도록 합니다. 이렇게 하면 코드 변경 사항이 적어서 Spring Batch에서 공식 지원하는 방법입니다.

그럼 다시 위의 코드들이 무엇을 의미하는지 살펴보겠습니다.

  • parameterValues: 쿼리에 대한 매개 변수 값의 Map 타입을 지정합니다.
    아래 queryProvider.setWhereClause을 보면 어떻게 변수를 사용하는지 알 수 있습니다.
 queryProvider.setWhereClause("where amount >= :amount");

where 절에서 선언된 파라미터 변수 명과 parameterValues.put("amount", 2000) 코드에서 key에 해당하는 변수명이 일치해야 합니다.

JdbcPagingItemReader는 추상클래스인 AbstractPagingItemReader를 상속받고 있습니다. Paging 쿼리로 limit, offset 만큼 조회를 하면 AbstractPagingItemReader가 내부적으로 가지고 있는 List<T> results 변수에서 페이지 크기만큼의 row들을 가지고 있다가 read() 메서드 호출 시에 List 내부에서 하나씩 Item을 리턴합니다.

PagingItemReader 사용 시 주의사항

정렬(Order)가 무조건 포함되어 있어야 합니다.

참조 사이트: https://n1tjrgns.tistory.com/159

스프링 부트 휴먼회원 배치 설계

위에서 살펴본 스프링 부트 배치 컴포넌트들을 이용하여 커뮤니티 사이트에 가입한 회원 중 1년이 지나도록 상태 변화가 없는 회원을 휴먼회원으로 전환하는 배치 예제코드를 작성하였습니다.

기술스펙은 아래와 같습니다.

  • Java 8
  • Gradle 6.3
  • Spring Boot 2.3.0 RELEASE
  • IntelliJ IDEA 2020.02
  • H2

의존성 라이브러리들은 Spring Data JPA, H2, Lombok, Spring Batch starter 시리지들로 선택하였습니다.

전체 배치 프로세스

Untitled Diagram (2)

처리 절차

  1. H2 DB에 저장된 데이터 중 1년간 업데이트 되지 않은 사용자를 찾는 로직을 ItemReader로 구현합니다.
  2. 대상 사용자 데이터의 상태 값을 휴먼회원으로 전환하는 프로세스를 ItemProcessor에 구현합니다.
  3. 상태 값이 변한 휴먼회원을 실제로 DB에 저장하는 ItemWriter를 구현합니다.

1. build.gradle 의존성 설정

buildscript {
    ext {
        springBootVersion = '2.3.0.RELEASE'
        gradle_node_version='2.2.4'
    }

    repositories {
        mavenCentral()
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }

    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

subprojects {
    apply plugin: 'java'
    apply plugin: 'eclipse'
    apply plugin: 'org.springframework.boot'
    apply plugin: 'io.spring.dependency-management'

    group = 'com.junyoung'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '1.8'

    repositories {
        mavenCentral()
    }

    configurations {
        compileOnly {
            extendsFrom annotationProcessor
        }
    }

    dependencies {
        compile('org.springframework.boot:spring-boot-starter-batch')
        runtime('com.h2database:h2')
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
        testImplementation('org.springframework.boot:spring-boot-starter-test') {
            exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
        }
        testCompile group: 'junit', name: 'junit', version: '4.12'
        testCompile('org.springframework.batch:spring-batch-test')
    }

    test {
        useJUnitPlatform()
    }
}

1. 도메인 작성

먼저 휴먼회원 배치 처리에 사용될 도메인을 작성합니다. 객체 명은 User이고, 휴먼 여부를 판별하는 UserStatus Enum을 추가하였습니다. ACTIVE는 활성회원, INACTIVE는 휴먼회원입니다.

UserStatus (회원 활성화 상태)

public enum UserStatus {
    ACTIVE, INACTIVE;
}

Grade (회원 등급)

public enum Grade {
    VIP, GOLD, FAMILY;
}

User 도메인 객체

@NoArgsConstructor
@Entity
@Table
public class User implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long idx;

    @Column
    private String name;

    @Column
    private String password;

    @Column
    private String email;

    @Column
    private String principle;

    @Column
    @Enumerated(EnumType.STRING)
    private SocialType socialType;

    @Column
    @Enumerated(EnumType.STRING)
    private UserStatus status;

    @Column
    @Enumerated(EnumType.STRING)
    private Grade grade;

    @Column
    private LocalDateTime createdDate;

    @Column
    private LocalDateTime updatedDate;

    @Builder
    public User(String name, String password, String email, String principle,
                SocialType socialType, UserStatus status, Grade grade, LocalDateTime createdDate,
                LocalDateTime updatedDate) {
        this.name = name;
        this.password = password;
        this.email = email;
        this.principle = principle;
        this.socialType = socialType;
        this.status = status;
        this.grade = grade;
        this.createdDate = createdDate;
        this.updatedDate = updatedDate;
    }

    public User setInactive() {
        status = UserStatus.INACTIVE;
        return this;
    }
}

UserRepository 리포지터리

public interface UserRepository extends JpaRepository<User, Long> {
    List<User> findByUpdatedDateBeforeAndStatusEquals(LocalDateTime localDateTime, UserStatus status);
}

findByUpdatedDateBeforeAndStatusEquals() 메서드는 인자 값으로 LocalDateTime, 즉 현재 기준 날짜 값보다 1년 전의 날짜 값을 받고 두 번째 인자값으로 UserStatus 타입을 받아 쿼리를 실행하는 메서드 입니다.

2. 휴먼회원 배치 Job 설정

먼저, 스프링 부트를 실행하는 Entry 포인트인 BatchApplication 파일에서 아래와 같이 @EnableBatchProcessing을 활성화 시켜야 합니다.

@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

EnableBatchProcessing를 적용해야 배치 작업에 필요한 빈을 미리 등록하여 사용할 수 있습니다.

배치 정보는 아래 @Configuration 어노테이션을 사용하는 설정 클래스에서 빈으로 등록합니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job inactiveUserJob() {
        // (1) JobBuilderFactory 주입
        return jobBuilderFactory.get("inactiveUserJob3")
                // (2) Job의 재실행 방지
                .preventRestart()
                .start(inactiveJobStep(null))
                .build();
    }

}
  1. (1)는 Job 생성을 직관적이고 편리하게 도와주는 빌더인 JobBuilderFactory를 주입하였습니다.
  2. (2)는 preventRestart()는 Job의 재실행을 막습니다.

위에 Job 설정을 완료하였고, 이제 Step을 설정하겠습니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    @Bean
    public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
        log.info("requestDate: {}", requestDate);
        // (1)  StepBuilderFactory 주입
        return stepBuilderFactory.get("inactiveUserStep")
                // (2) chunk 사이즈 입력
                .<User, User>chunk(10)
                // (3) reader, processor, writer를 각각 설정
                .reader(inactiveUserReader())
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }
}
  1. (1)은 StepBuilderFactory의 get("inactiveUserStep")은 inactiveUserStep이라는 이름의 StepBuilder를 생성합니다.
  2. (2)는 제네릭을 사용해 chunk의 입력 타입과 출력 타입을 User 타입으로 설정하였습니다. 인자 값은 10으로 설정했는데 쓰기 시에 청크 단위로 묶어서 writer() 메서드를 실행시킬 단위를 지정하였습니다. 즉, 커밋의 다누이가 10개입니다.
  3. (3)은 reader, processor, writer를 각각 설정하였습니다.

3. Chunk 지향 처리

여기서... Chunk가 무엇일까요?

Chunk는 덩어리라는 뜻으로 Spring Batch에서 각 커밋 사이에 처리되는 row 수를 애기합니다. 만약에 Chunk로 처리하지 않을 경우에 DB에서 데이터가 1000개인 로우를 읽어와서 배치처리를 하는 경우를 생각할 수 있습니다.

배치처리를 하는 중에 1개의 데이터를 저장하는데 문제가 생기면 나머지 999개의 데이터도 rollback 처리를 해야됩니다. 이러한 문제를 방지하기 위해서 Chunk 지향 프로세스 방식으로 스프링 부트에서 배치 실행을 지원하고 있습니다.

즉, Chunk 단위로 트랜잭션을 수행하기 때문에 실패하는 경우 해당 Chunk 만큼 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영이 된다는 것입니다.

Chunk는 스프링 부트 배치를 잘 사용하기 위해서 반드시 알아둬야 하는 개념이라고 생각됩니다.

아래 예시코드는 jojoldu님이 블로그에서 가져온 Chunk 지향 처리를 Java 코드로 표현하는 것입니다.

for (int i = 0; i < totalSize; i += chunkSize) {

    List<Item> items = new ArrayList<> ();

    for (int j = 0; j < chunkSize; j++) {
        Object item = itemReader.read();
        Object processedItem = item.Processor.process(item);
        items.add(processedItem);    
    }
    itemWriter.write(items);
}

Chunk 단위로 reader, process, writer로 처리하기 때문에 만약 chunkSize가 10일 경우에 process나 write에서 예외가 발생한다면 전부 rollback 되고, 그 다음 chunkSize만큼 배치가 처리 됩니다. 참조 블로그

4. ItemReader 구현

인터페이스인 ItemReader(데이터를 읽어오는 역할)를 설정한 부분입니다. 여기서는 ItemReader 인터페이스를 구현한 QueueItemReader 구현체를 리턴하고 있습니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    @StepScope // (1) Step의 주기에 따라 새로운 빈 생성
    public QueueItemReader<User> inactiveUserReader() {
        List<User> oldUsers =
                userRepository.findByUpdatedDateBeforeAndStatusEquals(
                        LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
        return new QueueItemReader<>(oldUsers);
    }
}
  1. 기본 빈 생성은 싱글턴 방식이지만, (1)에서 @StepScope를 사용하면 해당 메서드는 Step 주기에 따라 새로운 빈을 생성합니다. 즉, 각 Step의 실행마다 새로운 빈을 만들기 때문에 지연 생성이 가능합니다. @StepScope는 기본 프록시 모드가 반환되는 클래스 타입을 참조하기 때문에 @StepScope를 사용하면 반드시 구현된 반환 타입을 명시해 반환해야 합니다. 위 예제에서는 QueueItemReader라고 명시했습니다.
  2. findByUpdatedDateBeforeAndStatusEquals() 메서드로 현재 날짜 기준 1년 전의 날짜값과 User의 상태값이 ACTIVE인 User 리스트를 조회하고, QueueItemReader 객체 생성 시 파라미터로 넣어서 Queue에 담도록 하고 있습니다.

QueueItemReader

public class QueueItemReader<T> implements ItemReader<T> {

    private Queue<T> queue;

    public QueueItemReader(List<T> data) {
        this.queue = new LinkedList<>(data);
    }

    @Override
    public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return this.queue.poll();
    }
}

위 코드를 보면 QueItemReader를 사용해 휴먼회원으로 지정될 타깃 데이터를 한번에 불러와 큐에 담아놓습니다.

그리고 read() 메서드를 호출할 때 큐의 poll() 메서드를 사용하여 큐에서 데이터를 하나씩 반환합니다.

5. ItemProcessor 구현

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    ...

    public ItemProcessor<User, User> inactiveUserProcessor() {
        return new ItemProcessor<User, User>() {
            @Override
            public User process(User user) throws Exception {
                return user.setInactive();
            }
        };
    }
}

ItemReader에서 읽은 User를 휴먼 상태로 전환하는 processor 메서드를 추가하는 예입니다.

6. ItemWriter 구현

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    ...

     public ItemWriter<User> inactiveUserWriter() {
        return ((List<? extends User> users) -> userRepository.saveAll(users));
    }
}

휴먼회원 전환 배치 처리 최종 코드

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job inactiveUserJob() {
        return jobBuilderFactory.get("inactiveUserJob3")
                .preventRestart()
                .start(inactiveJobStep(null))
                .build();
    }

    @Bean
    public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
        log.info("requestDate: {}", requestDate);
        return stepBuilderFactory.get("inactiveUserStep")
                .<User, User>chunk(10)
                .reader(inactiveUserReader())
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }

    @Bean
    @StepScope
    public QueueItemReader<User> inactiveUserReader() {
        List<User> oldUsers =
                userRepository.findByUpdatedDateBeforeAndStatusEquals(
                        LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
        return new QueueItemReader<>(oldUsers);
    }

    public ItemProcessor<User, User> inactiveUserProcessor() {
        return new org.springframework.batch.item.ItemProcessor<User, User>() {
            @Override
            public User process(User user) throws Exception {
                return user.setInactive();
            }
        };
    }

    public ItemWriter<User> inactiveUserWriter() {
        return ((List<? extends User> users) -> userRepository.saveAll(users));
    }
}

ItemWriter는 리스트 타입을 앞서 설정한 Chunk 단위로 받습니다. Chunk 단위를 10으로 설정했으므로 users에는 휴먼회원 10개 가 주어지며 saveAll() 메서드를 사용해서 한번에 DB에 저장합니다.

SQL문을 사용하여 테스트 데이터 주입

실제 배치를 실행하기 전에 테스트 할 데이터를 만들기 위해 SQL 쿼리 파일을 생성하여 실행하였습니다.

기본적으로 /resources 하위 경로에 import.sql 파일을 생성하면 스프링 부트(정확히는 하이버네이트)가 실행될 때 자동으로 해당 파일의 쿼리를 실행합니다.

insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1001, 'test@test.com', 'test1', 'test1', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1002, 'test@test.com', 'test2', 'test2', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1003, 'test@test.com', 'test3', 'test3', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1004, 'test@test.com', 'test4', 'test4', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1005, 'test@test.com', 'test5', 'test5', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1006, 'test@test.com', 'test6', 'test6', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1007, 'test@test.com', 'test7', 'test7', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1008, 'test@test.com', 'test8', 'test8', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1009, 'test@test.com', 'test9', 'test9', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1010, 'test@test.com', 'test10', 'test10', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1011, 'test@test.com', 'test11', 'test11', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');

실행 결과

Before

image

After

image

휴먼회원 배치 실행 결과 UPDATE_DATE 컬럼 값이 현재 시점에서 1년전이고 상태값이 ACTIVE인 회원들의 상태 값이 INAVTIVE로 변경되는 것을 확인할 수 있었습니다.

참조: https://jojoldu.tistory.com/331?category=902551, https://github.com/young891221/Spring-Boot-Community-Batch

@JobScope와 @StepScope

@JobScope@StepScope는 스프링의 기본 Scope인 싱글톤 방식과는 대치되는 역할입니다.

Bean의 생성 시점이 스프링 애플리케이션이 실행되는 시점이 아닌 @JobScope, @StepScope가 명시된 메서드가 실행될 때까지 지연시키는 것을 의미합니다. 이러한 행위를 Late Binding이라고도 합니다.

Spring Batch에서 이렇게 Late Binding을 하면서 얻는 이점들은 아래와 같습니다.

  1. JobParameter를 특정 메서드가 실행하는 시점까지 지연시켜 할당시킬 수 있습니다.
    즉, 애플리케이션이 구동되는 시점이 아니라 비즈니스 로직이 구현되는 어디든 JobParameter를 할당함으로 유연한 설계를 가능하게 합니다.
  2. 병렬처리에 안전합니다.
    Step의 구성요소인 ItemReader, ItemProcessor, ItemWriter이 있고, ItemReader에서 데이터를 읽어 오는 메서드를 서로 다른 Step으로 부터 동시에 병렬 실행이 된다면 서로 상태를 간섭받게 될 수 있습니다. 하지만 @StepScope를 적용하면 각각의 Step에서 실행될 때 서로의 상태를 침범하지 않고 처리를 완료할 수 있습니다.

@JobScope는 Step 선언문에서만 사용이 가능하고, @StepScope는 Step을 구성하는 ItemReader, ItemProcessor, ItemWriter에서 사용 가능합니다.

1. JobParameters 사용 시 주의사항

JobParameters는 아래 예제코드처럼 @Value를 통해서 가능합니다. JobPameters는 Step이나 Tasklet, Reader 등 배치 컴포넌트 Bean의 생성 시점에 호출할 수 있습니다. 정확하게 말해서 Scope Bean을 생성할때만 가능합니다.

즉, @StepScope, @JobScope Bean을 생성할 때만 JobParameters가 생성되기 때문에 사용할 수 있습니다.

 @Bean
 @JobScope
 public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
       log.info("requestDate: {}", requestDate);
       return stepBuilderFactory.get("inactiveUserStep")
               .<User, User>chunk(10)
               .reader(inactiveUserReader())
               .processor(inactiveUserProcessor())
               .writer(inactiveUserWriter())
               .build();
 }

스프링 부트 배치 입문을 위한 용어정리

스프링 부트 배치는 대용량 데이터를 처리하는 기술로만 알고 있어서, 이번 기회에 한번 개념만 살펴보았습니다.

스프링 부트 배치를 왜 사용하는지 장점부터 살펴보았습니다.

  • 대용량 데이터 처리에 최적화되어 고성능을 발휘합니다.
  • 효과적인 로깅, 통계 처리, 트랜잭션 관리 등 재사용 가능한 필수 기능을 지원합니다.
  • 수동으로 처리하지 않도록 자동화 되었습니다.
  • 예외 사항과 비정상 동작에 대한 방어 기능이 있습니다.
  • 스프링 부트 배치의 반복되는 작업 프로세스를 이해하면 비즈니스 로직에 집중할 수 있습니다.

일반적으로 스프링 부트 배치의 절차는 읽기 -> 처리 -> 쓰기를 따릅니다.

  1. 읽기(read): 데이터 저장소(일반적으로 데이터 베이스)에서 특정 데이터 레코드를 읽습니다.
  2. 처리(processing): 원하는 방식으로 데이터를 가공/처리합니다.
  3. 쓰기(write): 수정된 데이터를 다시 저장소(데이터베이스)에 저장합니다. 혹은 외부 API를 통해 내보내기도 합니다.

아래 그림은 배치 처리와 관련된 객체의 관계입니다.

Untitled Diagram (1)

Job과 Step은 1:M, Step과 ItemReader, ItemProcessor, ItemWriter는 1:1의 관계를 가집니다. 즉, Job이라는 하나의 큰 일감(Job)에 여러 단계(Step)을 두고, 각 단계를 배치의 기본 흐름대로 구현합니다.

스프링 부트 배치 용어들에 대해서 간단하게 개념정리를 해봤습니다.

1. Job

Job은 배치 처리 과정을 하나의 단위로 만들어 표현한 객체입니다. 전체 배치 처리에 있어 항상 최상단 계층에 있습니다. 스프링 배치에서 Job 객체는 여러 Step 인스턴스를 포함하는 컨테이너 입니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class SimpleJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job simpleJob() {
        // simpleJob이라는 이름을 가진 Job을 생성할 수 있는 JobBuilder 인스턴스 반환
        return jobBuilderFactory.get("simpleJob")
                // simpleJobBuilder 인스턴스 반환
                .start(simpleStep1())
                // simpleJob이라는 이름을 가진 Job 인스턴스 반환 
                .build(); 
    }
}

보통 배치 Job 객체를 만드는 빌더는 여러 개 있습니다. 여러 빌더를 통합 처리하는 공장인 JobBuiderFactory 객체로 원하는 Job을 손쉽게 만들 수 있습니다. JobBuilderFactory의 get() 메서드를 호출하여 JobBuilder를 생성하고 이를 이용합니다.

JobBuilder

JobBuildr의 메서드들을 까보면 모든 반환 타입이 빌더입니다. 아무래도 비즈니스 환경에 따라서 Job 생성 방법이 모두 다르기 때문에 별도의 구체적인 빌더를 구현하고 이를 통해 Job 생성이 이루어지게 하려는 의도가 아닌가 싶습니다.

JobInstance

JobInstance는 배치에서 Job이 실행될 때 하나의 Job 실행 단위입니다. 만약 하루에 한 번씩 배치의 Job이 실행된다면 어제와 오늘 실행한 각각의 Job을 JobInstance라고 부를 수 있습니다.

만약 JobInstance가 Job 실행이 실패하면 JobInstance가 끝난 것이 아닙니다. 이 경우 JobExecution이 실패 정보를 가지고 있고, 성공하면 JobInstance는 끝난 것으로 간주합니다. 그리고 성공한 JobExecution을 가져서 총 두개를 가지게 됩니다. 여기서 JobInstance와 JobExecution은 부모와 자식관계로 생각하면 됩니다.

JobParameters

JobParameters는 Job이 실행될 때 필요한 파라미터들을 Map 타입으로 저장하는 객체입니다.
JobParameters는 JobInstance를 구분하는 기준이 되기도 합니다. 예를 들어서 Job 하나를 생성할 때 시작 시간 등의 정보를 파리미터로 해서 하나의 JobInstance를 생성합니다. 즉 JobInstance와 JobParameters는 1:1 관계입니다. 파라미터 타입으로는 String, Long, Date, Double를 사용할 수 있습니다.

StepExecution

Job에 JobExecution이라는 Job 실행 정보가 있따면 Step에는 StepExecution이라는 Step 실행 정보를 담는 객체가 있습니다. 각각의 Step이 실행될 때마다 StepExecution이 생성됩니다.

JobRepository

JobRepository는 배치 처리 정보를 담고 있는 메커니즘입니다. 어떤 Job이 실행되었으며 몇 번 실행되었고 언제 끝났는지 등 배치 처리에 대한 메타 데이터를 저장합니다.

예를 들어 Job 하나가 실행되면 JobRepository에서는 배치 실행에 관련된 정보를 담고 있는 도메인인 JobExecution을 생성합니다.

JobRepository는 Step의 실행 정보를 담고 있는 StepExecution도 저장소에 저장하며 전체 메타데이터를 저장/관리하는 역할을 수행합니다.

JobLauncher

JobLauncher는 Job, JobParameters와 함께 배치를 실행하는 인터페이스입니다. 인터페이스의 메서드는 run() 메서드 하나입니다.

public interface JobLauncher {
    public JobExecution run(Job job, JobParameters jobParameters) throw ...
}

run() 메서드는 매개변수로 Job과 JobParameters를 받아 JobExecution을 반환합니다. 만약 매개변수가 이전과 동일하면서 이전에 JobExecution이 중단된 적이 있다면 동일한 JobExecution을 반환합니다.

ItemReader

ItemReader는 Step의 대상이 되는 배치 데이터를 읽어오는 인터페이스입니다. FILE, XML, DB 등 여러 타입의 데이터를 읽어올 수 있습니다.

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

ItemReader에서 read() 메서드의 반환 타입을 제네릭으로 구현했기 때문에 직접 타입을 지정할 수 있습니다.

ItemProcessor

ItemProcessor는 ItemReader로 읽어온 배치 데이터를 변환하는 역할을 수행합니다.

ItemProcessor가 존재하는 이유는 비즈니스 로직을 분리하기 위해서입니다. ItemWriter는 정말 저장만 수행하고, ItemProcessor 로직 처리만 수행해 역할을 명확하게 분리하여 유지보수성을 용이하게 합니다. 또 다른 이유는 읽어온 배치 데이터와 쓰여질 데이터 타입이 다를 경우 대응하기 위해서입니다. 명확한 인풋과 아웃풋을 ItemProcessor로 구현해놓는다면 더 직관적인 코드가 됩니다.

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

ItemWriter

ItemWriter는 배치 데이터를 저장합니다. 일반적으로 DB나 파일에 저장합니다.

public interface ItemWriter<T> {
    public write<(List<? extends T> items) throws Exception;
}

ItemWriter와 ItemReader와 비슷한 방식으로 구현하면 됩니다. 제네릭으로 원하는 타입을 받습니다. write() 메서드는 List 자료구조를 사용해 지정한 타입이 리스트를 매개변수로 받습니다. 리스트 데이터 수는 설정한 청크 단위로 불러옵니다. write() 메서드의 반환 값은 따로 없고 매개 변수로 받은 데이터를 저장하는 로직을 구현하면 됩니다.

이제 다음편에는 실제로 어떻게 스프링 부트 배치를 사용하는지 예제코드를 소개하겠습니다.

Kotlin 정보은닉 캡슐화

각 클래스나 메서드, 프로퍼티의 접근 범위를 가시성(Visibility)이라고 합니다. 클래스에서 민감하거나 불필요한 부분은 숨기고 사용하기 위해 필요한 부분만 공개하듯이 각 클래스나 메서드, 프로퍼티에 가시성 지시자에 의해 공개할 부분과 숨길 부분을 정할 수 있습니다.

가시성 지시자

  • private: 이 요소는 외부에서 접근할 수 없습니다.
  • public: 이요소는 어디서든 접근이 가능합니다.(기본값)
  • protected: 외부에서 접근할 수 없으나 하위 상속 요소에서는 가능합니다.
  • internal: 같은 정의의 모듈 내부에서는 접근이 가능합니다.

1. private 지시자

private은 접근 범위가 선언된 요소에 한정하는 가시성 지시자입니다. 만약 클래스를 private와 함께 선언하면 그 클래스 안의 맴버만 접근할 수 있습니다.

private class PrivateClass {
    private var i = 1
    private fun privateFunc() {
        i += 1
    }

    fun access() {
        privateFunc()
    }
}

class OtherClass {
    private val opc = PrivateClass() // private를 생략하면 접근 불가합니다. 
    fun test() {
       val pc = PrivateClass()
    }
}

fun main() {
    val pc = PrivateClass() // 생성 가능
    pc.i // 접근 불가
    pc.privateFunc() // 접근 불가
}

fun TopFunction() {
    val tpc = PrivateClass() // 객체 생성 가능
}

위 예제에서는 PrivateClass 클래스는 private으로 선언되어 있으므로 다른 파일에서 접근할 수 없습니다. 만일 다른 클래스에서 프로퍼티로서 PrivateClass에 접근하려하면 똑같이 private으로 선언해야 합니다.

2. protected 지시자

protected 지시자는 최상위에 선언된 요소에는 지정할 수 없고 클래스나 인터페이스와 같은 요소의 멤버에만 지정할 수 있습니다. 맴버가 클래스인 경우에는 protected로 선언할 수 있습니다.

open class Base {
    protected var i = 1
    protected fun protectedFunc() {
        i += 1
    }

    fun access() {
        protectedFunc()
    }
    protected class Nested // 내부 클래스에는 지시자 허용
}

class Derived: Base() {
    fun test(base: Base): Int {
        protectedFunc()
        return i
    }
}

fun main() {
    val base = Base()
    // base.i // 접근 불가
    // base.protectedFunc() // 접근 불가
    base.access()
}

3. internal

코틀린의 internal은 자바와 다르게 새롭게 정의된 이름입니다. internal 키워드를 사용합니다. 이 지시자는 모듈 내부에서만 접근이 가능하고, 모듈이 달라지면 접근이 불가능합니다.

Java의 package로 지정된 경우 접근 요소가 패키지 내부에 있다면 접근할 수 있습니다. 하지만 프로젝트 단위 묶음의 .jar 파일이 달라져도 패키지 이름이 동일하면 다른 .jar에서도 접근할 수 있었기 때문에 보안 문제가 발생 할 수 있습니다. 코틀린에서는 이것을 막고자 package를 버리고 internal로 프로젝트의 같은 모듈이 아니면 외부에서 접근할 수 없게 했습니다.

internal class InternalClass {
    internal var i = 1

    internal fun icFunc() {
        i += 1
    }

    fun access() {
        icFunc()
    }
}

class Other {
    internal val ic = InternalClass()
    fun test() {
        ic.i // 접근허용
        ic.icFunc() // 접근허용
    }
}

fun main() {
    val mic = InternalClass() // 생성 가능
    mic.i // 접근 허용
    mic.icFunc() // 접근 허용
}

이제 같은 프로젝트 모듈에만 있으면 어디서든 접근이 가능합니다.

최종적으로 자동차와 도둑의 예제를 통해서 가시성에 대해서 더 확실하게 이해할 수 있었습니다.

4. 예제 코드

open class Car protected constructor(_year: Int, model: String, _power: String, _wheel: String) {
    private var year: Int = _year
    public var model: String = model
    protected open var power: String = _power
    internal var wheel: String = _wheel

    protected fun start(key: Boolean) {
        if (key) println("Start the Engine!")
    }

    class Driver(_name: String, _license: String) {
        private var name: String = _name
        var license: String = _license
        internal fun driving() = println("[Driver] Driving() - $name")
    }
}

class Tico(_year: Int, model: String, _power: String, _wheel: String, var name: String, private var key: Boolean): Car(_year, model, _power, _wheel) {
    override var power: String = "50hp"
    val driver = Driver(name, "first Class")

    constructor(_name: String, _key: Boolean): this(2014, "basic", "100hp", "normal", _name, _key) {
        name = _name
        key = _key
    }

    fun access(password: String) {
        if (password == "gotico") {
            println("------[Tico] access()--------")
            println("super.model = ${super.model}")
            println("super.power = ${super.power}")
            println("super.wheel = ${super.wheel}")
            super.start(key)

            println("Driver().license = ${driver.license}")
            driver.driving()
        } else{
            println("You're a burglar")
        }
    }
}


class Burglar() {

    fun steal(anycar: Any) {
        if (anycar is Tico) {
            println("--------[Burglar] steal()----------")
            println("anycar.name = ${anycar.name}")
            println("anycar.wheel = ${anycar.wheel}")
            println("anycar.model = ${anycar.model}")

            println(anycar.driver.license)
            anycar.driver.driving()
            anycar.access("dontknow")
        } else {
            println("Nothing to steal")
        }
    }
}

fun main() {
    // val car = Car()
    val tico = Tico("kildong", true)
    tico.access("gotico")

    val burglar = Burglar()
    burglar.steal(tico)
}

Car 클래스는 상속이 가능하고, 이 클래스를 상속하는 Tico 클래스를 정의하였습니다. 그리고 도둑을 의미하는 Burglar로 정의하였습니다.

여기서 주의깊게 볼점은 아래와 같습니다.

  1. Car 클래스의 주 생성자는 protected 지시자가 있기 때문에 constructor 키워드를 생략할 수 없으며 Car 클래스를 상속한 클래스만이 Car 클래스의 객체를 생성할 수 있습니다.

  2. Driver 클래스는 Car 클래스 안에 있고, Car 클래스를 상속받는 Tico 클래스에서는 access() 메서드에서 super 키워드를 사용해 상위 클래스에 접근을 시도합니다. 이때 private 지시자가 적용된 year에는 접근이 불가합니다.

  1. Burglar 클래스를 살펴보면 steal() 메서드 하나만 정의하고 있습니다. Any 자료형의 매개변수인 anycar를 받아서 검사하고 있습니다. 이때 자료형 검사 키워드인 is를 사용해 Tico 의 객체인 경우 이 Tico 객체인 anycar를 통해 접근을 시도합니다. 특히 internal의 경우 파일이 달라져도 같은 모듈에 있으면 접근이 가능합니다.

참조 문헌: Do it 코틀린 프로그래밍

'Kotlin' 카테고리의 다른 글

코틀린 기본문법  (0) 2021.05.11

+ Recent posts