Spring Boot/Batch

[Batch] ItemReader (JDBC,JPA)

수수한개발자 2022. 8. 8.
728x90

Spring 프레임워크의 강점 중 하나는 개발자가 비즈니스 로직에만 집중할 수 있도록 JDBC와 같은 문제점을 추상화한 것입니다.

이를 보고 서비스 추상화라고 합니다.

그래서 Spring Batch 개발자들은 Spring 프레임워크의 JDBC 기능을 확장했습니다.

일반적으로 배치 작업은 많은 양의 데이터를 처리해야 합니다.

보통 실시간 처리가 어려운 대용량 데이터나 대규모 데이터일 경우에 배치 애플리케이션을 작업합니다.

수백만 개의 데이터를 조회하는 쿼리가 있는 경우에 해당 데이터를 모두 한 번에 메모리에 불러오길 원하는 개발자는 없을 것입니다.
그러나 Spring의 JdbcTemplate은 분할 처리를 지원하지 않기 때문에 (쿼리 결과를 그대로 반환하니) 개발자가 직접 limit, offset을 사용하는 등의 작업이 필요합니다.
Spring Batch는 이런 문제점을 해결하기 위해 2개의 Reader 타입을 지원합니다.
Cursor는 실제로 JDBC ResultSet의 기본 기능입니다.
ResultSet이 open 될 때마다 next() 메소드가 호출되어 Database의 데이터가 반환됩니다.
이를 통해 필요에 따라 Database에서 데이터를 Streaming 할 수 있습니다.

반면 Paging은 좀 더 많은 작업을 필요로 합니다.
Paging 개념은 페이지라는 Chunk로 Database에서 데이터를 검색한다는 것입니다.
즉, 페이지 단위로 한 번에 데이터를 조회해오는 방식입니다.

Cursor와 Paging을 그림으로 비교하면 다음과 같습니다.

[Batch] ItemReader (JDBC,JPA)

Paging에서 10 Row는 PageSize를 얘기합니다.
10 외에 다른 값도 가능하며 여기선 예시로 10개로 두었습니다.

Cursor 방식은 Database와 커넥션을 맺은 후, Cursor를 한 칸씩 옮기면서 지속적으로 데이터를 빨아옵니다.
반면 Paging 방식에서는 한 번에 10개 (혹은 개발자가 지정한 PageSize)만큼 데이터를 가져옵니다.

2개 방식의 구현체는 다음과 같습니다.

  • Cursor 기반 ItemReader 구현체
    • JdbcCursorItemReader
    • HibernateCursorItemReader
    • StoredProcedureItemReader
  • Paging 기반 ItemReader 구현체
    • JdbcPagingItemReader
    • HibernatePagingItemReader
    • JpaPagingItemReader

IbatisReader는 Spring Batch 공식 지원에서 삭제되었습니다.
현재 MyBatis 프로젝트에서 MyBatisReader를 만들어서 진행하고 있으니 참고해보세요.

모든 ItemReader의 예제를 다루기에는 양이 많으니 여기서는 각 Reader의 대표 격인 JdbcCursorItemReader와 JdbcPagingItemReader, JpaPagingItemReader를 예제와 함께 소개드리겠습니다.

여기서 다루지 않은 예제는 공식 문서에서 아주 상세하게 예제 코드가 나와있으니 참고해보세요.

yml 설정

spring:
  batch:
    job:
      names: ${job.name:NONE}
    jdbc:
      initialize-schema:

  datasource:
    driver-class-name: org.h2.Driver
  sql:
    init:
      mode: always
      data-locations: classpath:person.sql

logging:
  level:
    org.springframework.batch: debug

person.sql

 create table person (
     id bigint primary key auto_increment,
     name varchar(255),
     age varchar(255),
     address varchar(255)
 );

insert into person(name, age, address) VALUES('김지수1','10','안산');
insert into person(name, age, address) VALUES('홍길동1','20','서울');
insert into person(name, age, address) VALUES('강감찬1','30','인천');
insert into person(name, age, address) VALUES('김지수2','40','안산');
insert into person(name, age, address) VALUES('홍길동2','50','서울');
insert into person(name, age, address) VALUES('강감찬2','60','인천');
insert into person(name, age, address) VALUES('김지수3','70','안산');
insert into person(name, age, address) VALUES('홍길동3','80','서울');
insert into person(name, age, address) VALUES('강감찬3','90','인천');
insert into person(name, age, address) VALUES('김지수4','100','안산');
insert into person(name, age, address) VALUES('홍길동4','110','서울');
insert into person(name, age, address) VALUES('강감찬4','120','인천');
insert into person(name, age, address) VALUES('김지수5','130','안산');
insert into person(name, age, address) VALUES('홍길동5','140','서울');
insert into person(name, age, address) VALUES('강감찬5','150','인천');

CursorItemReader

위에서 언급한 대로 CursorItemReader는 Paging과 다르게 Streaming으로 데이터를 처리합니다.
쉽게 생각하시면 Database와 애플리케이션 사이에 통로를 하나 연결하고 하나씩 빨아들인다고 생각하시면 됩니다.
JSP나 Servlet으로 게시판을 작성해보신 분들은 ResultSet을 사용해서 next()로 하나씩 데이터를 가져왔던 것을 기억하시면 됩니다.

이런 Cursor 방식의 대표 격인 JdbcCursorItemReader를 소개합니다.

JdbcCursorItemReader

JdbcCursorItemReader는 Cursor 기반의 JDBC Reader 구현체입니다.
아래 샘플 코드를 바로 보겠습니다.

@Getter
@Entity
@NoArgsConstructor
public class Person {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private int id;
    private String name;
    private String age;
    private String address;

    public Person(String name, String age, String address) {
        this.name = name;
        this.age = age;
        this.address = address;
    }

    public Person(int id, String name, String age, String address) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.address = address;
    }
}

@Configuration
@Slf4j
@RequiredArgsConstructor
public class ItemReaderConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final EntityManagerFactory entityManagerFactory;
    private static final int chunkSize = 10;

    @Bean
    public Job itemReaderJob() throws Exception {
        return this.jobBuilderFactory.get("itemReaderJob")
                .incrementer(new RunIdIncrementer())
                .start(this.customItemReaderStep())
                .next(this.jdbcCursorItemReaderStep())
                .build();
    }

@Bean
    public Step jdbcCursorItemReaderStep() throws Exception {
        return stepBuilderFactory.get("jdbcCursorStep")
                .<Person, Person>chunk(10)
                .reader(jdbcCursorItemReader())
                .writer(itemWriter())
                .build();
    }

private JdbcCursorItemReader<Person> jdbcCursorItemReader() throws Exception {
        JdbcCursorItemReader<Person> itemReader = new JdbcCursorItemReaderBuilder<Person>()
                .name("jdbcCursorItemReader")
                .dataSource(dataSource)
                .sql("select id, name, age, address from person")
                .rowMapper((rs, rowNum) -> new Person(
                        rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(4)))
                .build();
        itemReader.afterPropertiesSet();
        return itemReader;
    }
private ItemWriter<? super Person> itemWriter() {
        return items -> log.info(items.stream().map(Person::getName).collect(Collectors.joining(", ")));
    }


    public List<Person> getItems() {
        List<Person> items = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            items.add(new Person(i + 1, "test name" + i, "test age", "test address"));
        }

        return items;
    }
}

JdbcCursorItemReader의 설정값들은 다음과 같은 역할을 합니다

  • chunk
    • <Pay, Pay>에서첫 번째 Pay는 Reader에서 반환할 타입이며, 두 번째 Pay는 Writer에 파라미터로 넘어올 타입을 얘기합니다.
    • chunkSize로 인자 값을 넣은 경우는 Reader & Writer가 묶일 Chunk 트랜잭션 범위입니다.
  • fetchSize
    • Database에서 한 번에 가져올 데이터 양을 나타냅니다.
    • Paging과는 다른 것이, Paging은 실제 쿼리를 limit, offset을 이용해서 분할 처리하는 반면, Cursor는 쿼리는 분할 처리 없이 실행되나 내부적으로 가져오는 데이터는 FetchSize만큼 가져와 read()를 통해서 하나씩 가져옵니다.
  • dataSource
    • Database에 접근하기 위해 사용할 Datasource 객체를 할당합니다
  • rowMapper
    • 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper입니다.
    • 커스텀하게 생성해서 사용할 수 도 있지만, 이렇게 될 경우 매번 Mapper 클래스를 생성해야 돼서 보편적으로는 Spring에서 공식적으로 지원하는 BeanPropertyRowMapper.class를 많이 사용합니다
  • sql
    • Reader로 사용할 쿼리문을 사용하시면 됩니다.
  • name
    • reader의 이름을 지정합니다.
    • Bean의 이름이 아니며 Spring Batch의 ExecutionContext에서 저장될 이름입니다.

 

PagingItemReader

Database Cursor를 사용하는 대신 여러 쿼리를 실행하여 각 쿼리가 결과의 일부를 가져오는 방법도 있습니다.
이런 처리 방법을 Paging이라고 합니다.
게시판의 페이징을 구현해보신 분들은 아시겠지만 페이징을 한다는 것은 각 쿼리에 시작 행 번호 (offset)와 페이지에서 반환할 행 수 (limit)를 지정해야 함을 의미합니다.
Spring Batch에서는 offset과 limit을 PageSize에 맞게 자동으로 생성해 줍니다.
다만 각 쿼리는 개별적으로 실행한다는 점을 유의해야 합니다.
각 페이지마다 새로운 쿼리를 실행하므로 페이징 시 결과를 정렬하는 것이 중요합니다.
데이터 결과의 순서가 보장될 수 있도록 order by가 권장됩니다.
(이건 아래에서 자세하게 소개드리겠습니다)

가장 먼저 JdbcPagingItemReader를 알아보겠습니다.

 

JdbcPagingItemReader

@Configuration
@Slf4j
@RequiredArgsConstructor
public class ItemReaderConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final EntityManagerFactory entityManagerFactory;
    private static final int chunkSize = 10;

  @Bean
    public Job itemReaderJob() throws Exception {
        return this.jobBuilderFactory.get("itemReaderJob")
                .incrementer(new RunIdIncrementer())
                .start(this.customItemReaderStep())
                .next(this.jdbcPagingItemReaderStep())
                .build();
    }
@Bean
    public Step jdbcPagingItemReaderStep() throws Exception {
        return stepBuilderFactory.get("jdbcPagingStep")
                .<Person, Person>chunk(10)
                .reader(jdbcPagingItemReader())
                .writer(itemWriter())
                .build();
    }

@Bean
    public JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("age", 20);

        return new JdbcPagingItemReaderBuilder<Person>()
                .pageSize(chunkSize)
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper((rs, rowNum) -> new Person(
                        rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(4)))
                .queryProvider(createQueryProvider())
                .parameterValues(parameterValues)
                .name("jdbcPagingItemReader")
                .build();
    }

@Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해
        queryProvider.setSelectClause("id, name, age, address");
        queryProvider.setFromClause("from person");
        queryProvider.setWhereClause("where age >= :age");

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        return queryProvider.getObject();
    }
}

코드를 보시면 JdbcCursorItemReader와 설정이 크게 다른 것이 하나 있는데요.
바로 쿼리 (createQueryProvider())입니다.
JdbcCursorItemReader를 사용할 때는 단순히 String 타입으로 쿼리를 생성했지만, PagingItemReader에서는 PagingQueryProvider를 통해 쿼리를 생성합니다.
이렇게 하는 데는 큰 이유가 있습니다.

각 Database에는 Paging을 지원하는 자체적인 전략들이 있습니다.
때문에 Spring Batch에는 각 Database의 Paging 전략에 맞춰 구현되어야만 합니다.
그래서 아래와 같이 각 Database에 맞는 Provider들이 존재하는데요.

 

[Batch] ItemReader (JDBC,JPA) - PagingItemReader - JdbcPagingItemReader

(각 Database의 Paging 전략에 맞춘 Provider)

하지만 이렇게 되면 Database마다 Provider 코드를 바꿔야 하니 불편함이 많습니다.
(로컬은 H2로 사용하면서 개발/운영은 MySQL을 사용하면 Provider를 하나로 고정시킬 수가 없겠죠?)

그래서 Spring Batch에서는 SqlPagingQueryProviderFactoryBean을 통해 Datasource 설정값을 보고 위 이미지에서 작성된 Provider 중 하나를 자동으로 선택하도록 합니다.

이렇게 하면 코드 변경 사항이 적어서 Spring Batch에서 공식 지원하는 방법입니다.

이외 다른 설정들의 값은 JdbcCursorItemReader와 크게 다르지 않습니다.

  • parameterValues
    • 쿼리에 대한 매개 변수 값의 Map을 지정합니다.
    • queryProvider.setWhereClause을 보시면 어떻게 변수를 사용하는지 자세히 알 수 있습니다.
    • where 절에서 선언된 파라미터 변수명과 parameterValues에서 선언된 파라미터 변수명이 일치해야만 합니다.

예전이었다면?로 파라미터 위치를 지정하고 1부터 시작하여 각 파라미터 값을 할당시키는 방식으로 진행했는데, 그에 비해서 굉장히 명시적이고 실수할 여지가 줄어들었습니다.

자 이렇게 설정 후 Batch를 한번 실행해보겠습니다..

[Batch] ItemReader (JDBC,JPA) - PagingItemReader - JdbcPagingItemReader

쿼리 로그를 보시면 LIMIT 10이 들어간 것을 알 수 있습니다.
작성한 코드에서 Limit 선언은 없는데, 사용된 쿼리에선 추가되었습니다.
이는 위에서 언급했듯이 JdbcPagingItemReader에서 선언된 pageSize (Cursor에서는 fetchSize)에 맞게 자동으로 쿼리에 추가해줬기 때문입니다.
만약 조회할 데이터가 10개 이상이었다면offset으로 적절하게 다음 fetchSize만큼을 가져올 수 있습니다.

 

JpaCursorItemReader

JpaCursorItemReader는 원래 지원이 안되다가 스프링 배치 4.3이 릴리즈 되면서부터 지원이 되기 시작했습니다. 자세한 내용은 여기서 확인해주세요

JPA 2.2 스펙 도입이 예전에 되었지만, 스프링 배치에서는 최근에서야 이 부분을 적용하게 되어 드디어 스프링 배치 4.3부터 Jpa에도 CursorItemReader가 도입되게 되었습니다.

기본적인 작동원리는 기존의 다른 CursorItemReader (Jdbc/Hibernate)와 다르지 않습니다.

 

JpaCursorItemReader
@Configuration
@Slf4j
@RequiredArgsConstructor
public class ItemReaderConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final EntityManagerFactory entityManagerFactory;
    private static final int chunkSize = 10;

    @Bean
    public Job itemReaderJob() throws Exception {
        return this.jobBuilderFactory.get("itemReaderJob")
                .incrementer(new RunIdIncrementer())
                .start(this.jpaCursorItemReaderStep())
                .build();
    }

@Bean
    public Step jpaStep() throws Exception {
        return stepBuilderFactory.get("jpaStep")
                .<Person, Person>chunk(10)
                .reader(this.jpaCursorItemReader())
                .writer(itemWriter())
                .build();
    }
    private JpaCursorItemReader<Person> jpaCursorItemReader() throws Exception {
        JpaCursorItemReader<Person> itemReader = new JpaCursorItemReaderBuilder<Person>()
                .name("jpaCursorItemReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("select p from Person p")
                .build();
        itemReader.afterPropertiesSet();
        return itemReader;
    }
 private ItemWriter<? super Person> itemWriter() {
        return items -> log.info(items.stream().map(Person::getName).collect(Collectors.joining(", ")));
    }


    public List<Person> getItems() {
        List<Person> items = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            items.add(new Person(i + 1, "test name" + i, "test age", "test address"));
        }

        return items;
    }
}

각 설정들이 하는 역할은 다음과 같습니다.

 

속성                                                            소개 기본값
name 실행 컨텍스트 (ExecutionContext) 내에서 구분하기 위한 Key.
saveState  true 로 설정된 경우 필수
 
entityManagerFactory JPA를 사용하기 위한 EntityManagerFactory  
queryString 사용할 JPQL 쿼리문  
maxItemCount 조회할 최대 item 수 Integer.MAX_VALUE
currentItemCount 조회 Item의 시작지점 0
saveState 동일 Job 재실행시 실행 컨텍스트 내에서 ItemStream Support의 상태를 유지할지 여부 true

JpaPagingItemReader와 달리 JpaCursorItemReader에는 pageSize 설정이 없고, maxItemCount, currentItemCount 이 추가되었습니다.
Cursor 방식이 스트리밍이기 때문에 한 번에 몇 개의 데이터를 읽어올지를 결정하는 pageSize 는 Cursor에서는 필요가 없습니다.
그리고 maxItemCount, currentItemCount 의 경우에는 다음과 같은 역할을 하는데요.

maxItemCount(5)를 추가해서 수행하게 되면 다음과 같이 5개만 최대 조회됩니다.
즉,. maxItemCount 이란 최대로 조회할 데이터 개수를 설정하는 것입니다.

 

이 외에. currentItemCount(2)를 추가하게 되면 다음과 같이. currentItemCount지정 값 다음부터 데이터를 조회하게 됩니다.

  • . maxItemCount(5)
  • . currentItemCount(2)
  • . maxItemCount(5)를 통해 최대 5개를 조회하도록 제한 뒤,
  • . currentItemCount(2)를 통해 총 읽어야 할 데이터 중 시작 지점을 어디로 할지

 

JpaPagingItemReader

 

@Configuration
@Slf4j
@RequiredArgsConstructor
public class ItemReaderConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final EntityManagerFactory entityManagerFactory;
    private static final int chunkSize = 10;

    @Bean
    public Job itemReaderJob() throws Exception {
        return this.jobBuilderFactory.get("itemReaderJob")
                .incrementer(new RunIdIncrementer())
                .start(jpaPagingItemReaderStep())
                .build();
    }
 @Bean
    public Step jpaPagingItemReaderStep() {
        return stepBuilderFactory.get("jpaPagingItemReaderStep")
                .<Person, Person>chunk(chunkSize)
                .reader(jpaPagingItemReader())
                .writer(itemWriter())
                .build();
    }

@Bean
    public JpaPagingItemReader<Person> jpaPagingItemReader() {
        return new JpaPagingItemReaderBuilder<Person>()
                .name("jpaPagingItemReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("select p from Person p WHERE age >= 20")
                .build();
    }
}

EntityManagerFactory를 지정하는 것 외에 JdbcPagingItemReader와 크게 다른 점은 없습니다.
자 그럼 이 코드를 한번 실행해보시면!

[Batch] ItemReader (JDBC,JPA) - JpaPagingItemReader

정상적으로 배치가 수행된 것을 확인할 수 있습니다.

 

마무리

ItemReader는 Spring Batch를 구현하는 데 있어 정말 중요한 구현체입니다.
어디서 데이터를 읽어오고, 어떤 방식으로 읽느냐에 따라 Batch의 성능을 크게 좌지우지합니다.
감사합니다.

 

reference

728x90

'Spring Boot > Batch' 카테고리의 다른 글

[Batch] ItemWriter 데이터 쓰기  (0) 2022.08.08
[Batch] Execution 데이터 공유  (0) 2022.07.28
[Batch] 메타데이터, 메타 테이블  (0) 2022.07.28
[Batch] 스프링 배치 시작하기  (0) 2022.07.28

댓글