SpringFramework/Spring Batch

스프링 부트 휴먼회원 전환을 위한 배치 설계

준준영 2021. 5. 11. 16:32

스프링 부트 휴먼회원 배치 설계

위에서 살펴본 스프링 부트 배치 컴포넌트들을 이용하여 커뮤니티 사이트에 가입한 회원 중 1년이 지나도록 상태 변화가 없는 회원을 휴먼회원으로 전환하는 배치 예제코드를 작성하였습니다.

기술스펙은 아래와 같습니다.

  • Java 8
  • Gradle 6.3
  • Spring Boot 2.3.0 RELEASE
  • IntelliJ IDEA 2020.02
  • H2

의존성 라이브러리들은 Spring Data JPA, H2, Lombok, Spring Batch starter 시리지들로 선택하였습니다.

전체 배치 프로세스

Untitled Diagram (2)

처리 절차

  1. H2 DB에 저장된 데이터 중 1년간 업데이트 되지 않은 사용자를 찾는 로직을 ItemReader로 구현합니다.
  2. 대상 사용자 데이터의 상태 값을 휴먼회원으로 전환하는 프로세스를 ItemProcessor에 구현합니다.
  3. 상태 값이 변한 휴먼회원을 실제로 DB에 저장하는 ItemWriter를 구현합니다.

1. build.gradle 의존성 설정

buildscript {
    ext {
        springBootVersion = '2.3.0.RELEASE'
        gradle_node_version='2.2.4'
    }

    repositories {
        mavenCentral()
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }

    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

subprojects {
    apply plugin: 'java'
    apply plugin: 'eclipse'
    apply plugin: 'org.springframework.boot'
    apply plugin: 'io.spring.dependency-management'

    group = 'com.junyoung'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '1.8'

    repositories {
        mavenCentral()
    }

    configurations {
        compileOnly {
            extendsFrom annotationProcessor
        }
    }

    dependencies {
        compile('org.springframework.boot:spring-boot-starter-batch')
        runtime('com.h2database:h2')
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
        testImplementation('org.springframework.boot:spring-boot-starter-test') {
            exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
        }
        testCompile group: 'junit', name: 'junit', version: '4.12'
        testCompile('org.springframework.batch:spring-batch-test')
    }

    test {
        useJUnitPlatform()
    }
}

1. 도메인 작성

먼저 휴먼회원 배치 처리에 사용될 도메인을 작성합니다. 객체 명은 User이고, 휴먼 여부를 판별하는 UserStatus Enum을 추가하였습니다. ACTIVE는 활성회원, INACTIVE는 휴먼회원입니다.

UserStatus (회원 활성화 상태)

public enum UserStatus {
    ACTIVE, INACTIVE;
}

Grade (회원 등급)

public enum Grade {
    VIP, GOLD, FAMILY;
}

User 도메인 객체

@NoArgsConstructor
@Entity
@Table
public class User implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long idx;

    @Column
    private String name;

    @Column
    private String password;

    @Column
    private String email;

    @Column
    private String principle;

    @Column
    @Enumerated(EnumType.STRING)
    private SocialType socialType;

    @Column
    @Enumerated(EnumType.STRING)
    private UserStatus status;

    @Column
    @Enumerated(EnumType.STRING)
    private Grade grade;

    @Column
    private LocalDateTime createdDate;

    @Column
    private LocalDateTime updatedDate;

    @Builder
    public User(String name, String password, String email, String principle,
                SocialType socialType, UserStatus status, Grade grade, LocalDateTime createdDate,
                LocalDateTime updatedDate) {
        this.name = name;
        this.password = password;
        this.email = email;
        this.principle = principle;
        this.socialType = socialType;
        this.status = status;
        this.grade = grade;
        this.createdDate = createdDate;
        this.updatedDate = updatedDate;
    }

    public User setInactive() {
        status = UserStatus.INACTIVE;
        return this;
    }
}

UserRepository 리포지터리

public interface UserRepository extends JpaRepository<User, Long> {
    List<User> findByUpdatedDateBeforeAndStatusEquals(LocalDateTime localDateTime, UserStatus status);
}

findByUpdatedDateBeforeAndStatusEquals() 메서드는 인자 값으로 LocalDateTime, 즉 현재 기준 날짜 값보다 1년 전의 날짜 값을 받고 두 번째 인자값으로 UserStatus 타입을 받아 쿼리를 실행하는 메서드 입니다.

2. 휴먼회원 배치 Job 설정

먼저, 스프링 부트를 실행하는 Entry 포인트인 BatchApplication 파일에서 아래와 같이 @EnableBatchProcessing을 활성화 시켜야 합니다.

@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

EnableBatchProcessing를 적용해야 배치 작업에 필요한 빈을 미리 등록하여 사용할 수 있습니다.

배치 정보는 아래 @Configuration 어노테이션을 사용하는 설정 클래스에서 빈으로 등록합니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job inactiveUserJob() {
        // (1) JobBuilderFactory 주입
        return jobBuilderFactory.get("inactiveUserJob3")
                // (2) Job의 재실행 방지
                .preventRestart()
                .start(inactiveJobStep(null))
                .build();
    }

}
  1. (1)는 Job 생성을 직관적이고 편리하게 도와주는 빌더인 JobBuilderFactory를 주입하였습니다.
  2. (2)는 preventRestart()는 Job의 재실행을 막습니다.

위에 Job 설정을 완료하였고, 이제 Step을 설정하겠습니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    @Bean
    public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
        log.info("requestDate: {}", requestDate);
        // (1)  StepBuilderFactory 주입
        return stepBuilderFactory.get("inactiveUserStep")
                // (2) chunk 사이즈 입력
                .<User, User>chunk(10)
                // (3) reader, processor, writer를 각각 설정
                .reader(inactiveUserReader())
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }
}
  1. (1)은 StepBuilderFactory의 get("inactiveUserStep")은 inactiveUserStep이라는 이름의 StepBuilder를 생성합니다.
  2. (2)는 제네릭을 사용해 chunk의 입력 타입과 출력 타입을 User 타입으로 설정하였습니다. 인자 값은 10으로 설정했는데 쓰기 시에 청크 단위로 묶어서 writer() 메서드를 실행시킬 단위를 지정하였습니다. 즉, 커밋의 다누이가 10개입니다.
  3. (3)은 reader, processor, writer를 각각 설정하였습니다.

3. Chunk 지향 처리

여기서... Chunk가 무엇일까요?

Chunk는 덩어리라는 뜻으로 Spring Batch에서 각 커밋 사이에 처리되는 row 수를 애기합니다. 만약에 Chunk로 처리하지 않을 경우에 DB에서 데이터가 1000개인 로우를 읽어와서 배치처리를 하는 경우를 생각할 수 있습니다.

배치처리를 하는 중에 1개의 데이터를 저장하는데 문제가 생기면 나머지 999개의 데이터도 rollback 처리를 해야됩니다. 이러한 문제를 방지하기 위해서 Chunk 지향 프로세스 방식으로 스프링 부트에서 배치 실행을 지원하고 있습니다.

즉, Chunk 단위로 트랜잭션을 수행하기 때문에 실패하는 경우 해당 Chunk 만큼 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영이 된다는 것입니다.

Chunk는 스프링 부트 배치를 잘 사용하기 위해서 반드시 알아둬야 하는 개념이라고 생각됩니다.

아래 예시코드는 jojoldu님이 블로그에서 가져온 Chunk 지향 처리를 Java 코드로 표현하는 것입니다.

for (int i = 0; i < totalSize; i += chunkSize) {

    List<Item> items = new ArrayList<> ();

    for (int j = 0; j < chunkSize; j++) {
        Object item = itemReader.read();
        Object processedItem = item.Processor.process(item);
        items.add(processedItem);    
    }
    itemWriter.write(items);
}

Chunk 단위로 reader, process, writer로 처리하기 때문에 만약 chunkSize가 10일 경우에 process나 write에서 예외가 발생한다면 전부 rollback 되고, 그 다음 chunkSize만큼 배치가 처리 됩니다. 참조 블로그

4. ItemReader 구현

인터페이스인 ItemReader(데이터를 읽어오는 역할)를 설정한 부분입니다. 여기서는 ItemReader 인터페이스를 구현한 QueueItemReader 구현체를 리턴하고 있습니다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    @StepScope // (1) Step의 주기에 따라 새로운 빈 생성
    public QueueItemReader<User> inactiveUserReader() {
        List<User> oldUsers =
                userRepository.findByUpdatedDateBeforeAndStatusEquals(
                        LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
        return new QueueItemReader<>(oldUsers);
    }
}
  1. 기본 빈 생성은 싱글턴 방식이지만, (1)에서 @StepScope를 사용하면 해당 메서드는 Step 주기에 따라 새로운 빈을 생성합니다. 즉, 각 Step의 실행마다 새로운 빈을 만들기 때문에 지연 생성이 가능합니다. @StepScope는 기본 프록시 모드가 반환되는 클래스 타입을 참조하기 때문에 @StepScope를 사용하면 반드시 구현된 반환 타입을 명시해 반환해야 합니다. 위 예제에서는 QueueItemReader라고 명시했습니다.
  2. findByUpdatedDateBeforeAndStatusEquals() 메서드로 현재 날짜 기준 1년 전의 날짜값과 User의 상태값이 ACTIVE인 User 리스트를 조회하고, QueueItemReader 객체 생성 시 파라미터로 넣어서 Queue에 담도록 하고 있습니다.

QueueItemReader

public class QueueItemReader<T> implements ItemReader<T> {

    private Queue<T> queue;

    public QueueItemReader(List<T> data) {
        this.queue = new LinkedList<>(data);
    }

    @Override
    public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return this.queue.poll();
    }
}

위 코드를 보면 QueItemReader를 사용해 휴먼회원으로 지정될 타깃 데이터를 한번에 불러와 큐에 담아놓습니다.

그리고 read() 메서드를 호출할 때 큐의 poll() 메서드를 사용하여 큐에서 데이터를 하나씩 반환합니다.

5. ItemProcessor 구현

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    ...

    public ItemProcessor<User, User> inactiveUserProcessor() {
        return new ItemProcessor<User, User>() {
            @Override
            public User process(User user) throws Exception {
                return user.setInactive();
            }
        };
    }
}

ItemReader에서 읽은 User를 휴먼 상태로 전환하는 processor 메서드를 추가하는 예입니다.

6. ItemWriter 구현

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    ...

     public ItemWriter<User> inactiveUserWriter() {
        return ((List<? extends User> users) -> userRepository.saveAll(users));
    }
}

휴먼회원 전환 배치 처리 최종 코드

@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {

    private final UserRepository userRepository;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job inactiveUserJob() {
        return jobBuilderFactory.get("inactiveUserJob3")
                .preventRestart()
                .start(inactiveJobStep(null))
                .build();
    }

    @Bean
    public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
        log.info("requestDate: {}", requestDate);
        return stepBuilderFactory.get("inactiveUserStep")
                .<User, User>chunk(10)
                .reader(inactiveUserReader())
                .processor(inactiveUserProcessor())
                .writer(inactiveUserWriter())
                .build();
    }

    @Bean
    @StepScope
    public QueueItemReader<User> inactiveUserReader() {
        List<User> oldUsers =
                userRepository.findByUpdatedDateBeforeAndStatusEquals(
                        LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
        return new QueueItemReader<>(oldUsers);
    }

    public ItemProcessor<User, User> inactiveUserProcessor() {
        return new org.springframework.batch.item.ItemProcessor<User, User>() {
            @Override
            public User process(User user) throws Exception {
                return user.setInactive();
            }
        };
    }

    public ItemWriter<User> inactiveUserWriter() {
        return ((List<? extends User> users) -> userRepository.saveAll(users));
    }
}

ItemWriter는 리스트 타입을 앞서 설정한 Chunk 단위로 받습니다. Chunk 단위를 10으로 설정했으므로 users에는 휴먼회원 10개 가 주어지며 saveAll() 메서드를 사용해서 한번에 DB에 저장합니다.

SQL문을 사용하여 테스트 데이터 주입

실제 배치를 실행하기 전에 테스트 할 데이터를 만들기 위해 SQL 쿼리 파일을 생성하여 실행하였습니다.

기본적으로 /resources 하위 경로에 import.sql 파일을 생성하면 스프링 부트(정확히는 하이버네이트)가 실행될 때 자동으로 해당 파일의 쿼리를 실행합니다.

insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1001, 'test@test.com', 'test1', 'test1', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1002, 'test@test.com', 'test2', 'test2', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1003, 'test@test.com', 'test3', 'test3', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1004, 'test@test.com', 'test4', 'test4', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1005, 'test@test.com', 'test5', 'test5', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1006, 'test@test.com', 'test6', 'test6', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1007, 'test@test.com', 'test7', 'test7', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1008, 'test@test.com', 'test8', 'test8', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1009, 'test@test.com', 'test9', 'test9', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1010, 'test@test.com', 'test10', 'test10', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1011, 'test@test.com', 'test11', 'test11', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');

실행 결과

Before

image

After

image

휴먼회원 배치 실행 결과 UPDATE_DATE 컬럼 값이 현재 시점에서 1년전이고 상태값이 ACTIVE인 회원들의 상태 값이 INAVTIVE로 변경되는 것을 확인할 수 있었습니다.

참조: https://jojoldu.tistory.com/331?category=902551, https://github.com/young891221/Spring-Boot-Community-Batch