Spring Boot/Batch

[Batch] ItemWriter 데이터 쓰기

수수한개발자 2022. 8. 8.
728x90

 ItemWriter 소개

ItemWriter는 Spring Batch에서 사용하는 출력 기능입니다.
Spring Batch가 처음 나왔을 때, ItemWriter는 ItemReader와 마찬가지로 item을 하나씩 다루었습니다.
그러나 Spring Batch2와 청크 (Chunk) 기반 처리의 도입으로 인해 ItemWriter에도 큰 변화가 있었습니다.

이 업데이트 이후 부터 ItemWriter는 item 하나를 작성하지 않고 Chunk 단위로 묶인 item List를 다룹니다.
이 때문에 ItemWriter 인터페이스는 ItemReader 인터페이스와 약간 다릅니다.

Reader의 read()는 Item 하나를 반환하는 반면, Writer의 write()는 인자로 Item List를 받습니다.

이를 그림으로 표현하면 아래와 같습니다.

즉, Reader와 Processor를 거쳐 처리된 Item을 Chunk 단위 만큼 쌓은 뒤 이를 Writer에 전달하는 것입니다.

Spring Batch는 다양한 Output 타입을 처리 할 수 있도록 많은 Writer를 제공합니다.
csv 파일과 Database관련된 내용을 다뤄 보겠습니다.

 

CSVItemWriter

@Configuration
@Slf4j
@RequiredArgsConstructor
public class ItemWriterConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job itemWriterJob() throws Exception {
        return this.jobBuilderFactory.get("itemWriterJob")
                .incrementer(new RunIdIncrementer())
                .start(this.csvItemWriterStep())
                .build();
    }

    @Bean
    public Step csvItemWriterStep() throws Exception {
        return stepBuilderFactory.get("csvItemWriterStep")
                .<Person, Person>chunk(10)
                .reader(itemReader())
                .writer(csvFileItemWriter())
                .build();
    }

    private ItemWriter<Person> csvFileItemWriter() throws Exception {
        BeanWrapperFieldExtractor<Person> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"id", "name", "age", "address"});

        DelimitedLineAggregator<Person> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(",");
        lineAggregator.setFieldExtractor(fieldExtractor);

        FlatFileItemWriter<Person> itemWriter = new FlatFileItemWriterBuilder<Person>()
                .name("csvFileItemWriter")
                .encoding("UTF-8")
                .resource(new FileSystemResource("output/test-output.csv"))
                .lineAggregator(lineAggregator)
                .headerCallback(writer -> writer.write("id,이름,나이,거주지"))
                .footerCallback(writer -> writer.write("-----------------\n"))
                .append(true)
                .build();
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }

    private ItemReader<Person> itemReader() {
        return new CustomItemReader<>(getItems());
    }

    private List<Person> getItems() {
        List<Person> items = new ArrayList<>();

        for (int i = 0; i < 100; i++) {
            items.add(new Person(i + 1, "test name" + i, "test age", "test address"));
        }
        return items;
    }
}

이렇게 코드를 작성후 배치를 실행하게 되면

이렇게 루트 프로젝트 위치에 output 폴더 밑에 csv 파일이 생기게 됩니다. 

id,이름,나이,거주지
1,test name0,test age,test address
....
100,test name99,test age,test address
-----------------

처음 한번 실행하게 되면 위와 같이 csv파일이 생기게됩니다. (...은 반복문이기 때문에 생략) 이제 코드를 보겠습니다.

 

FlatFileItemWriter<Person> itemWriter = new FlatFileItemWriterBuilder<Person>()
                .name("csvFileItemWriter")
                .encoding("UTF-8")
                .resource(new FileSystemResource("output/test-output.csv"))
                .lineAggregator(lineAggregator)
                .headerCallback(writer -> writer.write("id,이름,나이,거주지"))
                .footerCallback(writer -> writer.write("-----------------\n"))
                .append(true)
                .build();
        itemWriter.afterPropertiesSet();

여기서 headerCallback을 통해 id,이름,나이,거주기가 들어가고 

footerCollback으로 인해 ---------가 들어가게됩니다.

여기서 \n을 해줘서 줄바꿈을 해줍니다 .

그리고 append를 생략하였을때는 이 잡을 실행할때마다 새로운 csv파일이 생기게 되어서 ( 덮어써진다,) 전에 있던 내용이 사라지게 됩니다. 그래서 append를 하게되면 footer밑에 이어서 써지게 됩니다. 그래서 한번 더 실행하게 되었을 경우 

id,이름,나이,거주지
1,test name0,test age,test address
....
100,test name99,test age,test address
-----------------
1,test name0,test age,test address
...
100,test name99,test age,test address
-----------------

다음과 같이 생기게 됩니다. 

 

JdbcBatchItemWriter

ORM을 사용하지 않는 경우 Writer는 대부분 JdbcBatchItemWriter를 사용합니다.
이 JdbcBatchItemWriter는 아래 그림과 같이 JDBC의 Batch 기능을 사용하여 한번에 Database로 전달하여 Database 내부에서 쿼리들이 실행되도록 합니다.

 

JdbcBatchItemWriter는 jdbc를 사용해 DB에 writer합니다.

JdbcBatchItemWriter는 bulk insert/update/delete 처리를 합니다.

insert into person(name, age, address) values (1,2,3),(4,5,6),(7,8,9);

단건 처리가 아니기 때문에 비교적 높은 성능을 냅니다.

 

이렇게 처리하는 이유는 어플리케이션과 데이터베이스 간에 데이터를 주고 받는 회수를 최소화 하여 성능 향상을 꾀하기 위함입니다.

JdbcTemplate.batchUpdate의 공식 문서 내용을 참고하시면 같은 내용을 알 수 있습니다.

  • 업데이트를 일괄 처리로 그룹화하면 데이터베이스와 어플리케이션간 왕복 횟수가 줄어들어 성능이 향상 됩니다.

실제로 JdbcBatchItemWriter의 write()를 확인해보시면 일괄처리 하는 것을 확인할 수 있습니다.

 

코드를 작성해 보겠습니다.

 

@Configuration
@Slf4j
@RequiredArgsConstructor
public class ItemWriterConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    public Job itemWriterJob() throws Exception {
        return this.jobBuilderFactory.get("itemWriterJob")
                .incrementer(new RunIdIncrementer())
                .start(this.jdbcBatchItemWriterStep())
                .build();
    } 
    @Bean
    public Step jdbcBatchItemWriterStep() {
        return stepBuilderFactory.get("jdbcBatchItemWriterStep")
                .<Person, Person>chunk(10)
                .reader(itemReader())
                .writer(jdbcBatchItemWriter())
                .build();
    }

    private ItemWriter<Person> jdbcBatchItemWriter() {
        JdbcBatchItemWriter<Person> itemWriter = new JdbcBatchItemWriterBuilder<Person>()
                .dataSource(dataSource)
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("insert into person(name, age, address) values(:name, :age, :address)")
                .build();
        itemWriter.afterPropertiesSet();
        return itemWriter;
    }
    private ItemReader<Person> itemReader() {
        return new CustomItemReader<>(getItems());
    }

    private List<Person> getItems() {
        List<Person> items = new ArrayList<>();

        for (int i = 0; i < 100; i++) {
            items.add(new Person(i + 1, "test name" + i, "test age", "test address"));
        }
        return items;
    }
}

reader로 테스트 데이터를 1~100까지 생성후에 읽어봐서 jdbcBatchItemWriter로 인서트 하는 코드입니다.

이런식으로 DB에 잘 저장된 모습입니다.

여기서 afterPropertiesSet은 다음과 같습니다.

afterPropertiesSet()

afterPropertiesSet() 은 InintializingBean 인터페이스의 메소드로 BeanFactory에 의해 모든 property 가 설정되고 난 뒤 실행되는 메소드입니다. 주로 실행시점의 custom 초기화 로직이 필요하거나 주입받은 property 를 확인하는 용도로 사용됩니다.

조금 더 자세한 실행 시점은 BeanFactory의 공식문서 에서 확인할 수 있습니다.

 

JpaItemWriter

두번째로 알아볼 Writer는 ORM을 사용할 수 있는 JpaItemWriter입니다.
Writer에 전달하는 데이터가 Entity 클래스라면 JpaItemWriter를 사용하시면 됩니다.

Entity를 하나씩 EntityManager.persist 또는 EntityManager.merge로 insert 해줍니다.

 

@Configuration
@Slf4j
@RequiredArgsConstructor
public class ItemWriterConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Step jpaItemWriterStep() throws Exception {
        return stepBuilderFactory.get("jpaItemWriterStep")
                .<Person, Person>chunk(10)
                .reader(itemReader())
                .writer(jpaItemWriter())
                .build();
    }
    private ItemWriter<Person> jpaItemWriter() throws Exception {
        JpaItemWriter<Person> itemWriter = new JpaItemWriterBuilder<Person>()
                .entityManagerFactory(entityManagerFactory)
                .build();
        itemWriter.afterPropertiesSet();

        return itemWriter;
    }
    private ItemReader<Person> itemReader() {
            return new CustomItemReader<>(getItems());
    }

    private List<Person> getItems() {
        List<Person> items = new ArrayList<>();

        for (int i = 0; i < 100; i++) {
            items.add(new Person(i + 1, "test name" + i, "test age", "test address"));
        }
        return items;
    }
}

이렇게 실행하게되면 DB에 잘들어 갑니다. 로그를 한번 살펴보겠습니다.

이런식으로 select 한 후에 insert를 하고 있습니다.

JdbcBatchItemWriter는 위에서 성능 이점을 가져가려고 쿼리를 모아서 insert하는데 반면 jpa는 위와같이 하면 성능적으로 떨어지게 됩니다. 왜그런지 알아보니 jpaItemWriter는 기본값으로 jpaEntityManager merge메소드로 실행하게 됩니다.

merge메소드는 저장대상 엔티티를 저장하거나 인서트,업데이트 하는 메소드입니다. 그래서 셀렉트해와서 업데이트 대상인지 인서트대상인지 확인을 하기 위해서 두번의 쿼리가 나가고 있었습니다. 그래서 jpaItemWriter를 수정해줍시다.

 

private ItemWriter<Person> jpaItemWriter() throws Exception {
    JpaItemWriter<Person> itemWriter = new JpaItemWriterBuilder<Person>()
            .entityManagerFactory(entityManagerFactory)
            .usePersist(true)
            .build();
    itemWriter.afterPropertiesSet();

    return itemWriter;
}

위와 같이 userPersist(true)로 설정을 해줍니다. 그러고 실행하면 

org.hibernate.PersistentObjectException: detached entity passed to persist: study.spring.batch.springbatchstudy.part3.Person 익셉션이 터지는데 

private ItemReader<Person> itemReader() {
    return new CustomItemReader<>(getItems());
}

private List<Person> getItems() {
    List<Person> items = new ArrayList<>();

    for (int i = 0; i < 100; i++) {
        items.add(new Person(i + 1, "test name" + i, "test age", "test address"));
    }
    return items;
}

여기 코드에서 계속 Person의 id를 줘서 select를 하는것이였다.

그래서 Person의 id가 없는 생성자를 만들어서 

private ItemReader<Person> itemReader() {
    return new CustomItemReader<>(getItems());
}

private List<Person> getItems() {
    List<Person> items = new ArrayList<>();

    for (int i = 0; i < 100; i++) {
        items.add(new Person("test name" + i, "test age", "test address"));
    }
    return items;
}

 다음과 같이 해주었더니 잘 해결 되었다.

728x90

'Spring Boot > Batch' 카테고리의 다른 글

[Batch] ItemReader (JDBC,JPA)  (0) 2022.08.08
[Batch] Execution 데이터 공유  (0) 2022.07.28
[Batch] 메타데이터, 메타 테이블  (0) 2022.07.28
[Batch] 스프링 배치 시작하기  (0) 2022.07.28

댓글