[Spring Batch] Implement of Aggregated Item reader

글쓴이 Engineer Myoa 날짜

들어가기에 앞서

SQL 을 이용하다보면 GROUP BY 절을 자주 사용하게됩니다.

Batch 의 Item reader 에서도 GROUP BY 절을 사용하여 grouped item 을 가져오면 좋겠으나

집계되어 반환되는 record 의 수가 1이기 때문에 item reader 에 사용하기에는 애로사항이 있었습니다.

이를 Custom row mapper 와 Custom result set extractor 구현으로 해결했던 사례를 공유하고자 합니다.

Aggregated querying (customize)

아래 테이블이 집약을 하고 싶은 대상 테이블이라고 가정하겠습니다.

id user_id some_data some_date
1 A a1 1970-01-01
2 A a2 1970-01-02
3 B b1 1970-01-03
4 B b2 1970-01-04
5 B b3 1970-01-05
6 C c1 1970-01-06
7 D d1 1970-01-07
8 D d2 1970-01-08
9 D d3 1970-01-09
10 D d4 1970-01-10
public class SomeEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column
    private String userId;

    @Column
    private String someData;

    @Column
    private Date someDate;
}

그리고 매핑되는 Entity 클래스 입니다.

위 데이터를 아래같은 형식으로 만들고 싶지만 Database 관점에서는 적절하지 못합니다.

{
	A : [a1, a2],
	B : [b1, b2, b3],
	C : [c1],
	D : [d1, d2, d3, d4]
}

이 때, 객체 지향 관점에서 aggregated querying 을 적용하면 됩니다.

이제부터 아래 2개의 항목을 통해 설명하겠습니다.

  • Aggregated Jdbc Cursor Item Reader in Spring Batch
  • Aggregated querying through native jdbcTemplate with Result Set Extractor

공통영역

AggregatedRowMapper <– AbstractAggregatedRowMapper

각 row mapper 상속구현

<– AggregatedEntityRowMapper

Batch job configuration 에서 AggregateJdbcCursorItemReader 클래스에 AggregatedEntityRowMapper 를 넣음

JdbcTemplate 사용시 RowMapper 만 넣는 대신 ResultSetExtractor(RowMapper) 를 넣는 이유

rs.next() 최초 호출이 필요한데 jdbcTemplate 에서는 그게 없음 그래서 ResultSetExtractor 를 구현해서 hasNext(rs.next() 호출) 를 호출하는 것.

AggregatedRowMapperResultSetExtractor

rowMapper.hasNext 에서 rs.next() 를 호출

rowMapper.mapRow 에서 rs.next() 를 호출

while (rowMapper.hasNext(rs)) {
    results.add(rowMapper.mapRow(rs, rowNumber++));
}

Jdbc Cursor Item Reader

item reader 는 batch 개발에서 사용되는 용어입니다. 원천 data source 로 부터 어떤 데이터(무엇)를 어떻게 가져오겠다(어떻게)를 정의할 수 있습니다.

대부분의 batch 개발시 이미 구현돼있는 row mapper(어떻게) 를 이용해 (BeanPropertyRowMapper)  “어떤 데이터”(무엇을)를 가져오겠다고 정의합니다.

하지만 Entity 를 aggregating 하기 위해서는 이미 구현돼있는 row mapper 를 대신하여, “어떻게” 데이터를 가져올지 정의해주어야 합니다.

Item reader 를 정의하기 위한 단계는 다음과 같습니다.

  • RowMapper<T> 를 구현한 custom RowMapper
  • JdbcCursorItemReader<T> 를 상속하여 cursor 를 제어할 수 있는 custom ItemReader

가 필요합니다.

custom RowMapper

저는 이 Row Mapper 의 활용도가 많아보여 RowMapper 인터페이스를 확장하였고, 확장한 인터페이스를 다시 추상클래스로 구현하였습니다.

가장 간소화된 버전은 모든 단계를 생략하고 3번에서 바로 RowMapper 를 구현하시면 됩니다.

1. AggregatedRowMapper.java –> RowMapper 를 확장. hasNext 메서드 추가
public interface AggregatedRowMapper<T> extends RowMapper<T> {
    boolean hasNext(ResultSet rs) throws SQLException;
}
2. AbstractAggregatedRowMapper.java –> AggregatedRowMapper 를 구현한 추상클래스
public abstract class AbstractAggregateRowMapper<T> implements AggregateRowMapper<T> {

    private boolean isFirst = true;
    private boolean isNextResultLast;

    /**
     * Why RowMapper has {@link #hasNext} method?
     * > Because JdbcCursorItemReader can not recognize each record what have relation
     * > So mapRow will run until not found relation ( It mean {@link #isRelated} were return false )
     * > And it is why {@link #forward} is exist also.
     * @param rs
     * @return
     * @throws SQLException
     */
    @Override
    public boolean hasNext(ResultSet rs) throws SQLException {
        if (isFirst) {
            isNextResultLast = rs.next();
            isFirst = false;
        }

        return isNextResultLast;
    }

    @Override
    public T mapRow(ResultSet rs, int rowNum) throws SQLException {
        T result = mapRow(rs, null, rowNum);
        while (forward(rs) && isRelated(rs, result)) {
            result = mapRow(rs, result, ++rowNum);
        }

        return result;
    }

    protected boolean forward(ResultSet rs) throws SQLException {
        isNextResultLast = rs.next();
        return isNextResultLast;
    }

    protected abstract T mapRow(ResultSet rs, T partialResult, int rowNum) throws SQLException;

    protected abstract boolean isRelated(ResultSet rs, T partialResult) throws SQLException;
}

mapRow method 에서는 상속한 클래스에서 일단 구현한 mapRow 를 호출합니다. (fetch 및 VO 에 mapping)

– 그리고 isRelated() 를 통해 연관관계가 깨질 때까지 하나의 entity 덩어리로 판단하여 계속해서 mapRow() 를 호출합니다.

이는 3번의 isRelated() method 의 구현된 모습을 보면 이해가 빠릅니다.

  • hasNext() method 에서 isFirst 를 체크하는 이유는 아래 jdbcTemplate 파트를 설명할 때 참조하시면 됩니다.
3. AggregatedEntityRowMapper –> AbstractAggregatedRowMapper 를 상속한 클래스. SomeEntity 를 집약하여 AggregatedEntity 덩어리로 만들어 준다.
public class AggregatedEntityRowMapper extends AbstractAggregatedRowMapper<AggregatedEntity> {

    @Override
    protected AggregatedEntity mapRow(ResultSet rs, AggregatedEntity partialResult, int rowNum)
            throws SQLException {

        if (partialResult == null) {
            partialResult = AggregatedEntity.of(rs.getString("user_id"));

        }
            partialResult.add(rs.getString("some_data"));

        return partialResult;
    }

    @Override
    protected boolean isRelated(ResultSet rs, AggregatedEntity partialResult) throws SQLException {
        return partialResult.getUserId().equals(rs.getString("user_id"));
    }
}

이 row mapper 에서 핵심 method 는 isRelated() 입니다.

현재 cursor 의 record(row) 와 이전에 fetch 한 record 가 연관이 있는지, 연관이 있다면 aggregation 할 수 있도록 판단해주는 method 입니다.

4. AggregatedJdbcCursorItemReader.java –> 조금 전에 구현한 rowMapper 를 멤버로 가지고 있고, rowMapper 결과에 따라서 cursor 를 제어하는 역할
public class AggregatedJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {

    private AggregatedRowMapper<T> rowMapper;

    @Override
    public void setRowMapper(RowMapper<T> rowMapper) {
        this.rowMapper = (AggregatedRowMapper<T>) rowMapper;
        super.setRowMapper(rowMapper);
    }

    @Override
    protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
        T result = super.readCursor(rs, currentRow);
        setCurrentItemCount(rs.getRow());
        return result;
    }

    @Override
    protected T doRead() throws Exception {
        if (rs == null) {
            throw new ReaderNotOpenException("Reader must be open before it can be read.");
        }

        try {
            return rowMapper.hasNext(rs) ? readCursor(rs, getCurrentItemCount()) : null;
        } catch (SQLException e) {
            throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), e);
        }
    }
}

cursor item reader 는 Step executor 에 의해 doRead 를 호출 -> 다시 readCursor 를 호출합니다.

(우리가 필요한 것은 record 간에 연관관계를 만들어 집약하는것입니다. database 를 제어하는 것은 JdbcCursorItemReader 가 해주기 때문에 별도로 구현하는 것이 아니라, super 클래스(JdbcCursorItemReader)의 readCursor를 호출하게됩니다.)

잠깐 사족을 달자면 아래는 JdbcCursorItemReader 의 readCursor (위에서 말한 super 클래스의 readCursor) 입니다.

@Nullable
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
   return rowMapper.mapRow(rs, currentRow);
}

AggregatedJdbcCursorItemReader 생성자에서 호출한 super.setRowMapper(rowMapper); 에 의해
이전에 만든 custom row mapper 를 JdbcCursorItemReader 의 row mapper 로 지정해주었고,
readCursor 에서는 해당 rowMapper 를 통해 mapRow 하도록 트리거합니다.

그래서 doRead() -> readCursor() -> super.readCursor() -> customRowMapper.mapRow() 가 되는 것이고,

mapRow 를 통해 나온 aggregated entity 가 doRead() 에서 return 되는 것입니다.

Batch Job, Step 구현

@Slf4j
@Configuration
public class SomeJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final PlatformTransactionManager platformTransactionManager;
    private final DataSource dataSource;

    public SomeJobConfiguration(
            JobBuilderFactory jobBuilderFactory,
            StepBuilderFactory stepBuilderFactory,
            PlatformTransactionManager platformTransactionManager,
            DataSource dataSource) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.platformTransactionManager = platformTransactionManager;
        this.dataSource = dataSource;
    }

    @Bean("someJob")
    public Job someJob(
            Step someStep) throws Exception {
        return jobBuilderFactory.get("someJob")
                                .start(someStep)
                                .build();
    }

    @Bean("someStep")
    public Step someStep(
            AggregatedJdbcCursorItemReader<AggregatedEntity> itemReader,
            ItemWriter<AggregatedEntity> itemWriter
    ) {
        return stepBuilderFactory.get("someStep")
                .<AggregatedEntity, AggregatedEntity>chunk(2)
                .reader(itemReader)
                .writer(itemWriter)
                .transactionManager(platformTransactionManager)
                .build();
    }

    @Bean("itemReader")
    public AggregatedJdbcCursorItemReader<AggregatedEntity> itemReader() {
        AggregatedJdbcCursorItemReader<AggregatedEntity> itemReader = new AggregatedJdbcCursorItemReader<>();
        itemReader.setRowMapper(new AggregatedEntityRowMapper());
        itemReader.setDataSource(dataSource);
        itemReader.setSql(" select * from some_entity order by user_id asc ");

        return itemReader;
    }

    @Bean("itemWriter")
    public ItemWriter<AggregatedEntity> itemWriter() {
        return items -> {
            log.info("[AGGREGATE] result : {}", items);
        };
    }

}

someJob 을 적절히 구현하였습니다.

  • someJob 은 someStep 을 포함하여 수행합니다.
  • someStep 은 chunk 단위로 reader -> writer 를 수행합니다.
  • reader 는 우리가 구현한 AggregateJdbcCursorItemReader 로 지정해줍니다.
    • item reader 의 row mapper 에는 우리가 구현한 AggregatedEntityRowMapper 객체를 넣어줍니다.
  • writer 는 읽어들인 chunk 만큼을 (items) stdout 으로 출력만 합니다.

주의

여기서 chunk 는 cursor 에서 읽은 record 의 단위가 아니라, aggregate 된 record 의 단위입니다.

읽어들인 record 의 수가 몇개인지는 중요하지 않습니다. 실제 집약된 객체의 수가 chunk 의 기준입니다.

chunk 가 2 이기 때문에 집약된 2개의 객체가 나올때까지가 하나의 chunk 단위가 됩니다.

가장 처음에 제시한 테이블을 예로 들면 A,B 가 하나의 chunk, C,D 가 하나의 chunk 가 됩니다.

정리

첫 번째 chunk

{
	A : [a1, a2],
	B : [b1, b2, b3]
}

두 번째 chunk

{
	C : [c1],
        D : [d1, d2, d3, d4]
}

someStep 의 item reader 에서는 위와 같이 chunk 데이터를 넘겨주게 됩니다.

JdbcTemplate Result Set Extractor

Spring batch 에서는 item reader 가 있지만, 쌩짜 native jdbc template 을 사용할때는 chunk 라는 개념이 없습니다.

따라서 조금 더 제어포인트가 생깁니다.

Jdbc Cursor Item Reader 문단을 읽으셨다면 기억하실겁니다. 바로 hasNext() 에서 isFirst 를 체크하는 것이 그 중 하나입니다.

JdbcTemplate 에서는 각 집약된 객체의 처음을 판별해야합니다.

마치며

처음에 RowMapper 에서 cursor 를 제어하는게 맞을까? 라고 생각이 들었습니다.

클래스명을 보면 Row mapping 을 해야지, cursor 를 직접 제어하는 역할과 책임이 있는것이 아니었기 때문입니다.

그러나, Datasource 를 사용하는 대부분의 row mapper 구현에서 resultset 의 next() 를 이용해 cursor 를 제어하고 있었습니다.

line by line 으로 디버깅을 해보니 구조적으로 이렇게 될 수 밖에 없다는 것을 알았습니다.

덕분에 batch 구조에 대해 조금 더 이해할 수 있게되었네요.

카테고리: JAVA개발노트

0개의 댓글

답글 남기기

Avatar placeholder

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다