Cursor와 Paging 기반 ItemReader 구현체

1. Cursor 기반의 ItemReader 구현체

Cursor 방식은 DB와 커넥션을 맺은 후, Cursot를 한칸씩 옮기면서 지속적으로 데이터를 가져옵니다. DB와 어플리케이션 사이 통로를 하나 연결해서 하나씩 데이터를 가져온다고 생각하면 됩니다.

Cursor 기반의 ItemReader의 대표적인 구현체는 아래와 같습니다.

  • JdbcCursorItemReader
  • HibernateCursorItemReader
  • StoredProcedureItemReader

JdbcCursorItemReader 사용 예제코드

아래 예제코드는 jojoldu님의 블로그의 스프링 배치 글을 보고 따라서 작성해본 예제코드 입니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcCursorItemReaderJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    private static final int chunkSize = 10;

    @Bean
    public Job jobCursorItemReaderJob() {
        return jobBuilderFactory.get("jdbcCursorItemReaderJob")
                .start(jdbcCursorItemReaderStep())
                .build();
    }

    // ItemReader에서 반환하는 타입은 Pay, Writer에서 저장하는 타입도 Pay입니다.
    @Bean
    public Step jdbcCursorItemReaderStep() {
        return stepBuilderFactory.get("jdbcCursorItemReaderStep")
                .<Pay, Pay> chunk(chunkSize)
                .reader(jdbcCursorItemReader())
                .writer(jdbcCursorItemWriter())
                .build();
    }

    @Bean
    public JdbcCursorItemReader<Pay> jdbcCursorItemReader() {
        return new JdbcCursorItemReaderBuilder<Pay>()
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
                .name("jdbcCursorItemReader")
                .build();
    }

    @Bean
    public ItemWriter<Pay> jdbcCursorItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay= {}", pay);
            }
        };
    }
}

Pay.class

@ToString
@Setter
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Entity
public class Pay {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long amount;
    private String txName;
    private LocalDateTime txDateTime;

    private Pay(Long amount, String txName, LocalDateTime txDateTime) {
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = txDateTime;
    }

    private Pay(Long id, Long amount, String txName, LocalDateTime txDateTime) {
        this.id = id;
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = txDateTime;
    }

    public static Pay of(Long amount, String txName, LocalDateTime txDateTime) {
        return new Pay(amount, txName, txDateTime);
    }
}

실제로 AbstractItemCountingItemStreamItemReader.read() 메서드를 호출하면 하위 추상 클래스인 AbstractCursorItemReader.doRead() 메서드가 호출 됩니다.

@Nullable
@Override
protected T doRead() throws Exception {
    if (rs == null) {
        throw new ReaderNotOpenException("Reader must be open before it can be read.");
    }

    try {
        if (!rs.next()) {
            return null;
        }
        int currentRow = getCurrentItemCount();
        T item = readCursor(rs, currentRow);
        verifyCursorPosition(currentRow);
        return item;
    }
    catch (SQLException se) {
        throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
    }
}

템플릿 메서드 패턴을 사용하기 때문에 readCursor() 메서드는 실제로 비어있고, 하위 클래스인 JdbcCursorItemReader에서 구현하고 있습니다. 하나씩 DB에서 ResultSet을 open해서 읽어오고 있습니다.

JdbcCursorItemReader.class

@Nullable
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
    return rowMapper.mapRow(rs, currentRow);
}

JdbcCursorItemReaderBuilder를 통해서 JdbcCursorItemReader 구현체를 생성하기 위해 다양한 메서드를 호출하고 있는데 하나씩 살펴보겠습니다. 각 역할들을 살펴보겠습니다.

  • fetchSize: DB에서 한번에 가져올 데이터의 양을 나타냅니다. 분할처리 없이 내부적으로 가져오는 데이터는 FetchSize 만큼 가져와 read()를 통해 하나씩 가져옵니다.

ResultSet의 동작 과정을 알아야하는데 최초로 ResultSet.next() 메서드를 호출 시 한 꺼번에 fetchSize 만큼 DB에서 가져와 메모리에 저장합니다. 그 다음 read() 메서드로 메모리에서 하나씩 읽어서 처리합니다.

  • dataSource: Database에 접근하기 위해 사용할 Datasource 객체를 할당합니다.

  • rowMapper: 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper 입니다. 커스텀하기 보다는 매번 Mapper 클래스를 생성해야 하기 때문에 보편적으로 Spring에서 공식적으로 지원하는 BeanPropertyRowMapper를 많이 사용합니다.

  • name: ItemReader의 이름을 지정합니다. Bean의 이름이 아니고 Spring Batch의 ExecutionContext에 저장되어질 이름입니다.

ItemWriter는 Chunk 크기의 List 객체를 받아서 간단히 로그만 찍도록 처리하였습니다.

CursorItemReader 사용 시 주의사항

CursorItemReader를 사용할 때는 DB와 SocketTimeout을 충분히 큰 값으로 설정해야 합니다. 기본적으로 TCP 통신은 Socket으로 하기 때문에 타임아웃을 설정해줘야 합니다. Cursor는 하나의 Connection으로 Batch가 끝날때까지 사용되기 때문에 작업이 다 끝나기전에 Connection이 먼저 끊어질 수 있습니다.

Batch 수행 시간이 오래 걸리는 경우에 권장하는 방법은 PagingItemReader를 사용하는 것입니다. Paging의 경우 한 페이지를 읽을 때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있습니다.

2. Paging 기반의 ItemReader 구현체

Paging 방식은 한번에 10개 혹은 개발자가 지정한 pageSize만큼 데이터를 가져옵니다.

Paging 기반의 ItemReader 구현체는 아래와 같습니다.

  • JdbcPagingItemReader
  • HibernatePagingItemReader
  • JpaPagingItemReader

JdbcPagingItemReader는 JdbcCursorItemReader와 같은 JdbcTemplate 인터페이스를 이용한 PagingItemReader 입니다.

JdbcPagingItemReader 사용 예제코드

JdbcPagingItemReader.class

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcPagingItemReaderConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource; // DataSource DI

    private static final int chunkSize = 10;

    @Bean
    public Job jdbcPagingItemReaderJob() throws Exception {
        return jobBuilderFactory.get("jdbcPagingItemReaderJob")
                .start(jdbcPagingItemReaderStep())
                .build();
    }

    @Bean
    public Step jdbcPagingItemReaderStep() throws Exception {
        return stepBuilderFactory.get("jdbcPagingItemReaderStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(jdbcPagingItemReader())
                .writer(jdbcPagingItemWriter())
                .build();
    }

    @Bean
    public JdbcPagingItemReader<Pay> jdbcPagingItemReader() throws Exception {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("amount", 2000);

        return new JdbcPagingItemReaderBuilder<Pay>()
                .pageSize(chunkSize)
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .queryProvider(createQueryProvider())
                .parameterValues(parameterValues)
                .name("jdbcPagingItemReader")
                .build();
    }

    private ItemWriter<Pay> jdbcPagingItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay={}", pay);
            }
        };
    }

    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해
        queryProvider.setSelectClause("id, amount, tx_name, tx_date_time");
        queryProvider.setFromClause("from pay");
        queryProvider.setWhereClause("where amount >= :amount");

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        return queryProvider.getObject();
    }
}

JdbcCursorItemReader 클래스와 크게 다른 점은 쿼리를 생성하는 부분입니다. JdbcPagingItemReader 클래스는 PagingQueryProvider를 통해 쿼리를 생성합니다.

이렇게까지 하는 이유는 각 DB에 Paging을 지원하는 자체적인 전략 때문입니다.

SqlPagingQueryProviderFactoryBean.class

{
    providers.put(DB2, new Db2PagingQueryProvider());
    providers.put(DB2VSE, new Db2PagingQueryProvider());
    providers.put(DB2ZOS, new Db2PagingQueryProvider());
    providers.put(DB2AS400, new Db2PagingQueryProvider());
    providers.put(DERBY,new DerbyPagingQueryProvider());
    providers.put(HSQL,new HsqlPagingQueryProvider());
    providers.put(H2,new H2PagingQueryProvider());
    providers.put(MYSQL,new MySqlPagingQueryProvider());
    providers.put(ORACLE,new OraclePagingQueryProvider());
    providers.put(POSTGRES,new PostgresPagingQueryProvider());
    providers.put(SQLITE, new SqlitePagingQueryProvider());
    providers.put(SQLSERVER,new SqlServerPagingQueryProvider());
    providers.put(SYBASE,new SybasePagingQueryProvider());
}

Spring Batch는 SqlPagingQueryProviderFactoryBean을 통해 DataSource 설정 값을 보고 위 코드에서 작성된 Provider 중 하나를 자동으로 선택하도록 합니다. 이렇게 하면 코드 변경 사항이 적어서 Spring Batch에서 공식 지원하는 방법입니다.

그럼 다시 위의 코드들이 무엇을 의미하는지 살펴보겠습니다.

  • parameterValues: 쿼리에 대한 매개 변수 값의 Map 타입을 지정합니다.
    아래 queryProvider.setWhereClause을 보면 어떻게 변수를 사용하는지 알 수 있습니다.
 queryProvider.setWhereClause("where amount >= :amount");

where 절에서 선언된 파라미터 변수 명과 parameterValues.put("amount", 2000) 코드에서 key에 해당하는 변수명이 일치해야 합니다.

JdbcPagingItemReader는 추상클래스인 AbstractPagingItemReader를 상속받고 있습니다. Paging 쿼리로 limit, offset 만큼 조회를 하면 AbstractPagingItemReader가 내부적으로 가지고 있는 List<T> results 변수에서 페이지 크기만큼의 row들을 가지고 있다가 read() 메서드 호출 시에 List 내부에서 하나씩 Item을 리턴합니다.

PagingItemReader 사용 시 주의사항

정렬(Order)가 무조건 포함되어 있어야 합니다.

참조 사이트: https://n1tjrgns.tistory.com/159

스프링 부트 휴먼회원 배치 설계

위에서 살펴본 스프링 부트 배치 컴포넌트들을 이용하여 커뮤니티 사이트에 가입한 회원 중 1년이 지나도록 상태 변화가 없는 회원을 휴먼회원으로 전환하는 배치 예제코드를 작성하였습니다.

기술스펙은 아래와 같습니다.

  • Java 8
  • Gradle 6.3
  • Spring Boot 2.3.0 RELEASE
  • IntelliJ IDEA 2020.02
  • H2

의존성 라이브러리들은 Spring Data JPA, H2, Lombok, Spring Batch starter 시리지들로 선택하였습니다.

전체 배치 프로세스

Untitled Diagram (2)

처리 절차

  1. H2 DB에 저장된 데이터 중 1년간 업데이트 되지 않은 사용자를 찾는 로직을 ItemReader로 구현합니다.
  2. 대상 사용자 데이터의 상태 값을 휴먼회원으로 전환하는 프로세스를 ItemProcessor에 구현합니다.
  3. 상태 값이 변한 휴먼회원을 실제로 DB에 저장하는 ItemWriter를 구현합니다.

1. build.gradle 의존성 설정

buildscript {
    ext {
        springBootVersion = '2.3.0.RELEASE'
        gradle_node_version='2.2.4'
    }

    repositories {
        mavenCentral()
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }

    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

subprojects {
    apply plugin: 'java'
    apply plugin: 'eclipse'
    apply plugin: 'org.springframework.boot'
    apply plugin: 'io.spring.dependency-management'

    group = 'com.junyoung'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '1.8'

    repositories {
        mavenCentral()
    }

    configurations {
        compileOnly {
            extendsFrom annotationProcessor
        }
    }

    dependencies {
        compile('org.springframework.boot:spring-boot-starter-batch')
        runtime('com.h2database:h2')
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
        testImplementation('org.springframework.boot:spring-boot-starter-test') {
            exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
        }
        testCompile group: 'junit', name: 'junit', version: '4.12'
        testCompile('org.springframework.batch:spring-batch-test')
    }

    test {
        useJUnitPlatform()
    }
}

1. 도메인 작성

먼저 휴먼회원 배치 처리에 사용될 도메인을 작성합니다. 객체 명은 User이고, 휴먼 여부를 판별하는 UserStatus Enum을 추가하였습니다. ACTIVE는 활성회원, INACTIVE는 휴먼회원입니다.

UserStatus (회원 활성화 상태)

public enum UserStatus {
    ACTIVE, INACTIVE;
}

Grade (회원 등급)

public enum Grade {
    VIP, GOLD, FAMILY;
}

User 도메인 객체

@NoArgsConstructor
@Entity
@Table
public class User implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long idx;

    @Column
    private String name;

    @Column
    private String password;

    @Column
    private String email;

    @Column
    private String principle;

    @Column
    @Enumerated(EnumType.STRING)
    private SocialType socialType;

    @Column
    @Enumerated(EnumType.STRING)
    private UserStatus status;

    @Column
    @Enumerated(EnumType.STRING)
    private Grade grade;

    @Column
    private LocalDateTime createdDate;

    @Column
    private LocalDateTime updatedDate;

    @Builder
    public User(String name, String password, String email, String principle,
                SocialType socialType, UserStatus status, Grade grade, LocalDateTime createdDate,
                LocalDateTime updatedDate) {
        this.name = name;
        this.password = password;
        this.email = email;
        this.principle = principle;
        this.socialType = socialType;
        this.status = status;
        this.grade = grade;
        this.createdDate = createdDate;
        this.updatedDate = updatedDate;
    }

    public User setInactive() {
        status = UserStatus.INACTIVE;
        return this;
    }
}

UserRepository 리포지터리

public interface UserRepository extends JpaRepository<User, Long> {
    List<User> findByUpdatedDateBeforeAndStatusEquals(LocalDateTime localDateTime, UserStatus status);
}

findByUpdatedDateBeforeAndStatusEquals() 메서드는 인자 값으로 LocalDateTime, 즉 현재 기준 날짜 값보다 1년 전의 날짜 값을 받고 두 번째 인자값으로 UserStatus 타입을 받아 쿼리를 실행하는 메서드 입니다.

2. 휴먼회원 배치 Job 설정

먼저, 스프링 부트를 실행하는 Entry 포인트인 BatchApplication 파일에서 아래와 같이 @EnableBatchProcessing을 활성화 시켜야 합니다.

@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

EnableBatchProcessing를 적용해야 배치 작업에 필요한 빈을 미리 등록하여 사용할 수 있습니다.

배치 정보는 아래 @Configuration 어노테이션을 사용하는 설정 클래스에서 빈으로 등록합니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job inactiveUserJob() {
        // (1) JobBuilderFactory 주입
        return jobBuilderFactory.get("inactiveUserJob3")
                // (2) Job의 재실행 방지
                .preventRestart()
                .start(inactiveJobStep(null))
                .build();
    }

}
  1. (1)는 Job 생성을 직관적이고 편리하게 도와주는 빌더인 JobBuilderFactory를 주입하였습니다.
  2. (2)는 preventRestart()는 Job의 재실행을 막습니다.

위에 Job 설정을 완료하였고, 이제 Step을 설정하겠습니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    @Bean
    public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
        log.info("requestDate: {}", requestDate);
        // (1)  StepBuilderFactory 주입
        return stepBuilderFactory.get("inactiveUserStep")
                // (2) chunk 사이즈 입력
                .<User, User>chunk(10)
                // (3) reader, processor, writer를 각각 설정
                .reader(inactiveUserReader())
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }
}
  1. (1)은 StepBuilderFactory의 get("inactiveUserStep")은 inactiveUserStep이라는 이름의 StepBuilder를 생성합니다.
  2. (2)는 제네릭을 사용해 chunk의 입력 타입과 출력 타입을 User 타입으로 설정하였습니다. 인자 값은 10으로 설정했는데 쓰기 시에 청크 단위로 묶어서 writer() 메서드를 실행시킬 단위를 지정하였습니다. 즉, 커밋의 다누이가 10개입니다.
  3. (3)은 reader, processor, writer를 각각 설정하였습니다.

3. Chunk 지향 처리

여기서... Chunk가 무엇일까요?

Chunk는 덩어리라는 뜻으로 Spring Batch에서 각 커밋 사이에 처리되는 row 수를 애기합니다. 만약에 Chunk로 처리하지 않을 경우에 DB에서 데이터가 1000개인 로우를 읽어와서 배치처리를 하는 경우를 생각할 수 있습니다.

배치처리를 하는 중에 1개의 데이터를 저장하는데 문제가 생기면 나머지 999개의 데이터도 rollback 처리를 해야됩니다. 이러한 문제를 방지하기 위해서 Chunk 지향 프로세스 방식으로 스프링 부트에서 배치 실행을 지원하고 있습니다.

즉, Chunk 단위로 트랜잭션을 수행하기 때문에 실패하는 경우 해당 Chunk 만큼 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영이 된다는 것입니다.

Chunk는 스프링 부트 배치를 잘 사용하기 위해서 반드시 알아둬야 하는 개념이라고 생각됩니다.

아래 예시코드는 jojoldu님이 블로그에서 가져온 Chunk 지향 처리를 Java 코드로 표현하는 것입니다.

for (int i = 0; i < totalSize; i += chunkSize) {

    List<Item> items = new ArrayList<> ();

    for (int j = 0; j < chunkSize; j++) {
        Object item = itemReader.read();
        Object processedItem = item.Processor.process(item);
        items.add(processedItem);    
    }
    itemWriter.write(items);
}

Chunk 단위로 reader, process, writer로 처리하기 때문에 만약 chunkSize가 10일 경우에 process나 write에서 예외가 발생한다면 전부 rollback 되고, 그 다음 chunkSize만큼 배치가 처리 됩니다. 참조 블로그

4. ItemReader 구현

인터페이스인 ItemReader(데이터를 읽어오는 역할)를 설정한 부분입니다. 여기서는 ItemReader 인터페이스를 구현한 QueueItemReader 구현체를 리턴하고 있습니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    @StepScope // (1) Step의 주기에 따라 새로운 빈 생성
    public QueueItemReader<User> inactiveUserReader() {
        List<User> oldUsers =
                userRepository.findByUpdatedDateBeforeAndStatusEquals(
                        LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
        return new QueueItemReader<>(oldUsers);
    }
}
  1. 기본 빈 생성은 싱글턴 방식이지만, (1)에서 @StepScope를 사용하면 해당 메서드는 Step 주기에 따라 새로운 빈을 생성합니다. 즉, 각 Step의 실행마다 새로운 빈을 만들기 때문에 지연 생성이 가능합니다. @StepScope는 기본 프록시 모드가 반환되는 클래스 타입을 참조하기 때문에 @StepScope를 사용하면 반드시 구현된 반환 타입을 명시해 반환해야 합니다. 위 예제에서는 QueueItemReader라고 명시했습니다.
  2. findByUpdatedDateBeforeAndStatusEquals() 메서드로 현재 날짜 기준 1년 전의 날짜값과 User의 상태값이 ACTIVE인 User 리스트를 조회하고, QueueItemReader 객체 생성 시 파라미터로 넣어서 Queue에 담도록 하고 있습니다.

QueueItemReader

public class QueueItemReader<T> implements ItemReader<T> {

    private Queue<T> queue;

    public QueueItemReader(List<T> data) {
        this.queue = new LinkedList<>(data);
    }

    @Override
    public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return this.queue.poll();
    }
}

위 코드를 보면 QueItemReader를 사용해 휴먼회원으로 지정될 타깃 데이터를 한번에 불러와 큐에 담아놓습니다.

그리고 read() 메서드를 호출할 때 큐의 poll() 메서드를 사용하여 큐에서 데이터를 하나씩 반환합니다.

5. ItemProcessor 구현

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    ...

    public ItemProcessor<User, User> inactiveUserProcessor() {
        return new ItemProcessor<User, User>() {
            @Override
            public User process(User user) throws Exception {
                return user.setInactive();
            }
        };
    }
}

ItemReader에서 읽은 User를 휴먼 상태로 전환하는 processor 메서드를 추가하는 예입니다.

6. ItemWriter 구현

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    ...

     public ItemWriter<User> inactiveUserWriter() {
        return ((List<? extends User> users) -> userRepository.saveAll(users));
    }
}

휴먼회원 전환 배치 처리 최종 코드

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job inactiveUserJob() {
        return jobBuilderFactory.get("inactiveUserJob3")
                .preventRestart()
                .start(inactiveJobStep(null))
                .build();
    }

    @Bean
    public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
        log.info("requestDate: {}", requestDate);
        return stepBuilderFactory.get("inactiveUserStep")
                .<User, User>chunk(10)
                .reader(inactiveUserReader())
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }

    @Bean
    @StepScope
    public QueueItemReader<User> inactiveUserReader() {
        List<User> oldUsers =
                userRepository.findByUpdatedDateBeforeAndStatusEquals(
                        LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
        return new QueueItemReader<>(oldUsers);
    }

    public ItemProcessor<User, User> inactiveUserProcessor() {
        return new org.springframework.batch.item.ItemProcessor<User, User>() {
            @Override
            public User process(User user) throws Exception {
                return user.setInactive();
            }
        };
    }

    public ItemWriter<User> inactiveUserWriter() {
        return ((List<? extends User> users) -> userRepository.saveAll(users));
    }
}

ItemWriter는 리스트 타입을 앞서 설정한 Chunk 단위로 받습니다. Chunk 단위를 10으로 설정했으므로 users에는 휴먼회원 10개 가 주어지며 saveAll() 메서드를 사용해서 한번에 DB에 저장합니다.

SQL문을 사용하여 테스트 데이터 주입

실제 배치를 실행하기 전에 테스트 할 데이터를 만들기 위해 SQL 쿼리 파일을 생성하여 실행하였습니다.

기본적으로 /resources 하위 경로에 import.sql 파일을 생성하면 스프링 부트(정확히는 하이버네이트)가 실행될 때 자동으로 해당 파일의 쿼리를 실행합니다.

insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1001, 'test@test.com', 'test1', 'test1', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1002, 'test@test.com', 'test2', 'test2', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1003, 'test@test.com', 'test3', 'test3', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1004, 'test@test.com', 'test4', 'test4', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1005, 'test@test.com', 'test5', 'test5', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1006, 'test@test.com', 'test6', 'test6', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1007, 'test@test.com', 'test7', 'test7', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1008, 'test@test.com', 'test8', 'test8', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1009, 'test@test.com', 'test9', 'test9', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1010, 'test@test.com', 'test10', 'test10', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1011, 'test@test.com', 'test11', 'test11', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');

실행 결과

Before

image

After

image

휴먼회원 배치 실행 결과 UPDATE_DATE 컬럼 값이 현재 시점에서 1년전이고 상태값이 ACTIVE인 회원들의 상태 값이 INAVTIVE로 변경되는 것을 확인할 수 있었습니다.

참조: https://jojoldu.tistory.com/331?category=902551, https://github.com/young891221/Spring-Boot-Community-Batch

@JobScope와 @StepScope

@JobScope@StepScope는 스프링의 기본 Scope인 싱글톤 방식과는 대치되는 역할입니다.

Bean의 생성 시점이 스프링 애플리케이션이 실행되는 시점이 아닌 @JobScope, @StepScope가 명시된 메서드가 실행될 때까지 지연시키는 것을 의미합니다. 이러한 행위를 Late Binding이라고도 합니다.

Spring Batch에서 이렇게 Late Binding을 하면서 얻는 이점들은 아래와 같습니다.

  1. JobParameter를 특정 메서드가 실행하는 시점까지 지연시켜 할당시킬 수 있습니다.
    즉, 애플리케이션이 구동되는 시점이 아니라 비즈니스 로직이 구현되는 어디든 JobParameter를 할당함으로 유연한 설계를 가능하게 합니다.
  2. 병렬처리에 안전합니다.
    Step의 구성요소인 ItemReader, ItemProcessor, ItemWriter이 있고, ItemReader에서 데이터를 읽어 오는 메서드를 서로 다른 Step으로 부터 동시에 병렬 실행이 된다면 서로 상태를 간섭받게 될 수 있습니다. 하지만 @StepScope를 적용하면 각각의 Step에서 실행될 때 서로의 상태를 침범하지 않고 처리를 완료할 수 있습니다.

@JobScope는 Step 선언문에서만 사용이 가능하고, @StepScope는 Step을 구성하는 ItemReader, ItemProcessor, ItemWriter에서 사용 가능합니다.

1. JobParameters 사용 시 주의사항

JobParameters는 아래 예제코드처럼 @Value를 통해서 가능합니다. JobPameters는 Step이나 Tasklet, Reader 등 배치 컴포넌트 Bean의 생성 시점에 호출할 수 있습니다. 정확하게 말해서 Scope Bean을 생성할때만 가능합니다.

즉, @StepScope, @JobScope Bean을 생성할 때만 JobParameters가 생성되기 때문에 사용할 수 있습니다.

 @Bean
 @JobScope
 public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
       log.info("requestDate: {}", requestDate);
       return stepBuilderFactory.get("inactiveUserStep")
               .<User, User>chunk(10)
               .reader(inactiveUserReader())
               .processor(inactiveUserProcessor())
               .writer(inactiveUserWriter())
               .build();
 }

스프링 부트 배치 입문을 위한 용어정리

스프링 부트 배치는 대용량 데이터를 처리하는 기술로만 알고 있어서, 이번 기회에 한번 개념만 살펴보았습니다.

스프링 부트 배치를 왜 사용하는지 장점부터 살펴보았습니다.

  • 대용량 데이터 처리에 최적화되어 고성능을 발휘합니다.
  • 효과적인 로깅, 통계 처리, 트랜잭션 관리 등 재사용 가능한 필수 기능을 지원합니다.
  • 수동으로 처리하지 않도록 자동화 되었습니다.
  • 예외 사항과 비정상 동작에 대한 방어 기능이 있습니다.
  • 스프링 부트 배치의 반복되는 작업 프로세스를 이해하면 비즈니스 로직에 집중할 수 있습니다.

일반적으로 스프링 부트 배치의 절차는 읽기 -> 처리 -> 쓰기를 따릅니다.

  1. 읽기(read): 데이터 저장소(일반적으로 데이터 베이스)에서 특정 데이터 레코드를 읽습니다.
  2. 처리(processing): 원하는 방식으로 데이터를 가공/처리합니다.
  3. 쓰기(write): 수정된 데이터를 다시 저장소(데이터베이스)에 저장합니다. 혹은 외부 API를 통해 내보내기도 합니다.

아래 그림은 배치 처리와 관련된 객체의 관계입니다.

Untitled Diagram (1)

Job과 Step은 1:M, Step과 ItemReader, ItemProcessor, ItemWriter는 1:1의 관계를 가집니다. 즉, Job이라는 하나의 큰 일감(Job)에 여러 단계(Step)을 두고, 각 단계를 배치의 기본 흐름대로 구현합니다.

스프링 부트 배치 용어들에 대해서 간단하게 개념정리를 해봤습니다.

1. Job

Job은 배치 처리 과정을 하나의 단위로 만들어 표현한 객체입니다. 전체 배치 처리에 있어 항상 최상단 계층에 있습니다. 스프링 배치에서 Job 객체는 여러 Step 인스턴스를 포함하는 컨테이너 입니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class SimpleJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job simpleJob() {
        // simpleJob이라는 이름을 가진 Job을 생성할 수 있는 JobBuilder 인스턴스 반환
        return jobBuilderFactory.get("simpleJob")
                // simpleJobBuilder 인스턴스 반환
                .start(simpleStep1())
                // simpleJob이라는 이름을 가진 Job 인스턴스 반환 
                .build(); 
    }
}

보통 배치 Job 객체를 만드는 빌더는 여러 개 있습니다. 여러 빌더를 통합 처리하는 공장인 JobBuiderFactory 객체로 원하는 Job을 손쉽게 만들 수 있습니다. JobBuilderFactory의 get() 메서드를 호출하여 JobBuilder를 생성하고 이를 이용합니다.

JobBuilder

JobBuildr의 메서드들을 까보면 모든 반환 타입이 빌더입니다. 아무래도 비즈니스 환경에 따라서 Job 생성 방법이 모두 다르기 때문에 별도의 구체적인 빌더를 구현하고 이를 통해 Job 생성이 이루어지게 하려는 의도가 아닌가 싶습니다.

JobInstance

JobInstance는 배치에서 Job이 실행될 때 하나의 Job 실행 단위입니다. 만약 하루에 한 번씩 배치의 Job이 실행된다면 어제와 오늘 실행한 각각의 Job을 JobInstance라고 부를 수 있습니다.

만약 JobInstance가 Job 실행이 실패하면 JobInstance가 끝난 것이 아닙니다. 이 경우 JobExecution이 실패 정보를 가지고 있고, 성공하면 JobInstance는 끝난 것으로 간주합니다. 그리고 성공한 JobExecution을 가져서 총 두개를 가지게 됩니다. 여기서 JobInstance와 JobExecution은 부모와 자식관계로 생각하면 됩니다.

JobParameters

JobParameters는 Job이 실행될 때 필요한 파라미터들을 Map 타입으로 저장하는 객체입니다.
JobParameters는 JobInstance를 구분하는 기준이 되기도 합니다. 예를 들어서 Job 하나를 생성할 때 시작 시간 등의 정보를 파리미터로 해서 하나의 JobInstance를 생성합니다. 즉 JobInstance와 JobParameters는 1:1 관계입니다. 파라미터 타입으로는 String, Long, Date, Double를 사용할 수 있습니다.

StepExecution

Job에 JobExecution이라는 Job 실행 정보가 있따면 Step에는 StepExecution이라는 Step 실행 정보를 담는 객체가 있습니다. 각각의 Step이 실행될 때마다 StepExecution이 생성됩니다.

JobRepository

JobRepository는 배치 처리 정보를 담고 있는 메커니즘입니다. 어떤 Job이 실행되었으며 몇 번 실행되었고 언제 끝났는지 등 배치 처리에 대한 메타 데이터를 저장합니다.

예를 들어 Job 하나가 실행되면 JobRepository에서는 배치 실행에 관련된 정보를 담고 있는 도메인인 JobExecution을 생성합니다.

JobRepository는 Step의 실행 정보를 담고 있는 StepExecution도 저장소에 저장하며 전체 메타데이터를 저장/관리하는 역할을 수행합니다.

JobLauncher

JobLauncher는 Job, JobParameters와 함께 배치를 실행하는 인터페이스입니다. 인터페이스의 메서드는 run() 메서드 하나입니다.

public interface JobLauncher {
    public JobExecution run(Job job, JobParameters jobParameters) throw ...
}

run() 메서드는 매개변수로 Job과 JobParameters를 받아 JobExecution을 반환합니다. 만약 매개변수가 이전과 동일하면서 이전에 JobExecution이 중단된 적이 있다면 동일한 JobExecution을 반환합니다.

ItemReader

ItemReader는 Step의 대상이 되는 배치 데이터를 읽어오는 인터페이스입니다. FILE, XML, DB 등 여러 타입의 데이터를 읽어올 수 있습니다.

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

ItemReader에서 read() 메서드의 반환 타입을 제네릭으로 구현했기 때문에 직접 타입을 지정할 수 있습니다.

ItemProcessor

ItemProcessor는 ItemReader로 읽어온 배치 데이터를 변환하는 역할을 수행합니다.

ItemProcessor가 존재하는 이유는 비즈니스 로직을 분리하기 위해서입니다. ItemWriter는 정말 저장만 수행하고, ItemProcessor 로직 처리만 수행해 역할을 명확하게 분리하여 유지보수성을 용이하게 합니다. 또 다른 이유는 읽어온 배치 데이터와 쓰여질 데이터 타입이 다를 경우 대응하기 위해서입니다. 명확한 인풋과 아웃풋을 ItemProcessor로 구현해놓는다면 더 직관적인 코드가 됩니다.

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

ItemWriter

ItemWriter는 배치 데이터를 저장합니다. 일반적으로 DB나 파일에 저장합니다.

public interface ItemWriter<T> {
    public write<(List<? extends T> items) throws Exception;
}

ItemWriter와 ItemReader와 비슷한 방식으로 구현하면 됩니다. 제네릭으로 원하는 타입을 받습니다. write() 메서드는 List 자료구조를 사용해 지정한 타입이 리스트를 매개변수로 받습니다. 리스트 데이터 수는 설정한 청크 단위로 불러옵니다. write() 메서드의 반환 값은 따로 없고 매개 변수로 받은 데이터를 저장하는 로직을 구현하면 됩니다.

이제 다음편에는 실제로 어떻게 스프링 부트 배치를 사용하는지 예제코드를 소개하겠습니다.

+ Recent posts