[Spring Batch] Implement of Aggregated Item reader
들어가기에 앞서
SQL 을 이용하다보면 GROUP BY 절을 자주 사용하게됩니다.
Batch 의 Item reader 에서도 GROUP BY 절을 사용하여 grouped item 을 가져오면 좋겠으나
집계되어 반환되는 record 의 수가 1이기 때문에 item reader 에 사용하기에는 애로사항이 있었습니다.
이를 Custom row mapper 와 Custom result set extractor 구현으로 해결했던 사례를 공유하고자 합니다.
Aggregated querying (customize)
아래 테이블이 집약을 하고 싶은 대상 테이블이라고 가정하겠습니다.
id | user_id | some_data | some_date |
1 | A | a1 | 1970-01-01 |
2 | A | a2 | 1970-01-02 |
3 | B | b1 | 1970-01-03 |
4 | B | b2 | 1970-01-04 |
5 | B | b3 | 1970-01-05 |
6 | C | c1 | 1970-01-06 |
7 | D | d1 | 1970-01-07 |
8 | D | d2 | 1970-01-08 |
9 | D | d3 | 1970-01-09 |
10 | D | d4 | 1970-01-10 |
public class SomeEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @Column private String userId; @Column private String someData; @Column private Date someDate; }
그리고 매핑되는 Entity 클래스 입니다.
위 데이터를 아래같은 형식으로 만들고 싶지만 Database 관점에서는 적절하지 못합니다.
{ A : [a1, a2], B : [b1, b2, b3], C : [c1], D : [d1, d2, d3, d4] }
이 때, 객체 지향 관점에서 aggregated querying 을 적용하면 됩니다.
이제부터 아래 2개의 항목을 통해 설명하겠습니다.
- Aggregated Jdbc Cursor Item Reader in Spring Batch
- Aggregated querying through native jdbcTemplate with Result Set Extractor
공통영역
AggregatedRowMapper <– AbstractAggregatedRowMapper
각 row mapper 상속구현
<– AggregatedEntityRowMapper
Batch job configuration 에서 AggregateJdbcCursorItemReader 클래스에 AggregatedEntityRowMapper 를 넣음
JdbcTemplate 사용시 RowMapper 만 넣는 대신 ResultSetExtractor(RowMapper) 를 넣는 이유
rs.next() 최초 호출이 필요한데 jdbcTemplate 에서는 그게 없음 그래서 ResultSetExtractor 를 구현해서 hasNext(rs.next() 호출) 를 호출하는 것.
AggregatedRowMapperResultSetExtractor
rowMapper.hasNext 에서 rs.next() 를 호출
rowMapper.mapRow 에서 rs.next() 를 호출
while (rowMapper.hasNext(rs)) { results.add(rowMapper.mapRow(rs, rowNumber++)); }
Jdbc Cursor Item Reader
item reader 는 batch 개발에서 사용되는 용어입니다. 원천 data source 로 부터 어떤 데이터(무엇)를 어떻게 가져오겠다(어떻게)를 정의할 수 있습니다.
대부분의 batch 개발시 이미 구현돼있는 row mapper(어떻게) 를 이용해 (BeanPropertyRowMapper) “어떤 데이터”(무엇을)를 가져오겠다고 정의합니다.
하지만 Entity 를 aggregating 하기 위해서는 이미 구현돼있는 row mapper 를 대신하여, “어떻게” 데이터를 가져올지 정의해주어야 합니다.
Item reader 를 정의하기 위한 단계는 다음과 같습니다.
- RowMapper<T> 를 구현한 custom RowMapper
- JdbcCursorItemReader<T> 를 상속하여 cursor 를 제어할 수 있는 custom ItemReader
가 필요합니다.
custom RowMapper
저는 이 Row Mapper 의 활용도가 많아보여 RowMapper 인터페이스를 확장하였고, 확장한 인터페이스를 다시 추상클래스로 구현하였습니다.
가장 간소화된 버전은 모든 단계를 생략하고 3번에서 바로 RowMapper 를 구현하시면 됩니다.
1. AggregatedRowMapper.java –> RowMapper 를 확장. hasNext 메서드 추가
public interface AggregatedRowMapper<T> extends RowMapper<T> { boolean hasNext(ResultSet rs) throws SQLException; }
2. AbstractAggregatedRowMapper.java –> AggregatedRowMapper 를 구현한 추상클래스
public abstract class AbstractAggregateRowMapper<T> implements AggregateRowMapper<T> { private boolean isFirst = true; private boolean isNextResultLast; /** * Why RowMapper has {@link #hasNext} method? * > Because JdbcCursorItemReader can not recognize each record what have relation * > So mapRow will run until not found relation ( It mean {@link #isRelated} were return false ) * > And it is why {@link #forward} is exist also. * @param rs * @return * @throws SQLException */ @Override public boolean hasNext(ResultSet rs) throws SQLException { if (isFirst) { isNextResultLast = rs.next(); isFirst = false; } return isNextResultLast; } @Override public T mapRow(ResultSet rs, int rowNum) throws SQLException { T result = mapRow(rs, null, rowNum); while (forward(rs) && isRelated(rs, result)) { result = mapRow(rs, result, ++rowNum); } return result; } protected boolean forward(ResultSet rs) throws SQLException { isNextResultLast = rs.next(); return isNextResultLast; } protected abstract T mapRow(ResultSet rs, T partialResult, int rowNum) throws SQLException; protected abstract boolean isRelated(ResultSet rs, T partialResult) throws SQLException; }
mapRow method 에서는 상속한 클래스에서 일단 구현한 mapRow 를 호출합니다. (fetch 및 VO 에 mapping)
– 그리고 isRelated() 를 통해 연관관계가 깨질 때까지 하나의 entity 덩어리로 판단하여 계속해서 mapRow() 를 호출합니다.
이는 3번의 isRelated() method 의 구현된 모습을 보면 이해가 빠릅니다.
- hasNext() method 에서 isFirst 를 체크하는 이유는 아래 jdbcTemplate 파트를 설명할 때 참조하시면 됩니다.
3. AggregatedEntityRowMapper –> AbstractAggregatedRowMapper 를 상속한 클래스. SomeEntity 를 집약하여 AggregatedEntity 덩어리로 만들어 준다.
public class AggregatedEntityRowMapper extends AbstractAggregatedRowMapper<AggregatedEntity> { @Override protected AggregatedEntity mapRow(ResultSet rs, AggregatedEntity partialResult, int rowNum) throws SQLException { if (partialResult == null) { partialResult = AggregatedEntity.of(rs.getString("user_id")); } partialResult.add(rs.getString("some_data")); return partialResult; } @Override protected boolean isRelated(ResultSet rs, AggregatedEntity partialResult) throws SQLException { return partialResult.getUserId().equals(rs.getString("user_id")); } }
이 row mapper 에서 핵심 method 는 isRelated() 입니다.
현재 cursor 의 record(row) 와 이전에 fetch 한 record 가 연관이 있는지, 연관이 있다면 aggregation 할 수 있도록 판단해주는 method 입니다.
4. AggregatedJdbcCursorItemReader.java –> 조금 전에 구현한 rowMapper 를 멤버로 가지고 있고, rowMapper 결과에 따라서 cursor 를 제어하는 역할
public class AggregatedJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> { private AggregatedRowMapper<T> rowMapper; @Override public void setRowMapper(RowMapper<T> rowMapper) { this.rowMapper = (AggregatedRowMapper<T>) rowMapper; super.setRowMapper(rowMapper); } @Override protected T readCursor(ResultSet rs, int currentRow) throws SQLException { T result = super.readCursor(rs, currentRow); setCurrentItemCount(rs.getRow()); return result; } @Override protected T doRead() throws Exception { if (rs == null) { throw new ReaderNotOpenException("Reader must be open before it can be read."); } try { return rowMapper.hasNext(rs) ? readCursor(rs, getCurrentItemCount()) : null; } catch (SQLException e) { throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), e); } } }
cursor item reader 는 Step executor 에 의해 doRead 를 호출 -> 다시 readCursor 를 호출합니다.
(우리가 필요한 것은 record 간에 연관관계를 만들어 집약하는것입니다. database 를 제어하는 것은 JdbcCursorItemReader 가 해주기 때문에 별도로 구현하는 것이 아니라, super 클래스(JdbcCursorItemReader)의 readCursor를 호출하게됩니다.)
잠깐 사족을 달자면 아래는 JdbcCursorItemReader 의 readCursor (위에서 말한 super 클래스의 readCursor) 입니다.
@Nullable @Override protected T readCursor(ResultSet rs, int currentRow) throws SQLException { return rowMapper.mapRow(rs, currentRow); }
AggregatedJdbcCursorItemReader 생성자에서 호출한 super.setRowMapper(rowMapper); 에 의해
이전에 만든 custom row mapper 를 JdbcCursorItemReader 의 row mapper 로 지정해주었고,
readCursor 에서는 해당 rowMapper 를 통해 mapRow 하도록 트리거합니다.
그래서 doRead() -> readCursor() -> super.readCursor() -> customRowMapper.mapRow() 가 되는 것이고,
mapRow 를 통해 나온 aggregated entity 가 doRead() 에서 return 되는 것입니다.
Batch Job, Step 구현
@Slf4j @Configuration public class SomeJobConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final PlatformTransactionManager platformTransactionManager; private final DataSource dataSource; public SomeJobConfiguration( JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, PlatformTransactionManager platformTransactionManager, DataSource dataSource) { this.jobBuilderFactory = jobBuilderFactory; this.stepBuilderFactory = stepBuilderFactory; this.platformTransactionManager = platformTransactionManager; this.dataSource = dataSource; } @Bean("someJob") public Job someJob( Step someStep) throws Exception { return jobBuilderFactory.get("someJob") .start(someStep) .build(); } @Bean("someStep") public Step someStep( AggregatedJdbcCursorItemReader<AggregatedEntity> itemReader, ItemWriter<AggregatedEntity> itemWriter ) { return stepBuilderFactory.get("someStep") .<AggregatedEntity, AggregatedEntity>chunk(2) .reader(itemReader) .writer(itemWriter) .transactionManager(platformTransactionManager) .build(); } @Bean("itemReader") public AggregatedJdbcCursorItemReader<AggregatedEntity> itemReader() { AggregatedJdbcCursorItemReader<AggregatedEntity> itemReader = new AggregatedJdbcCursorItemReader<>(); itemReader.setRowMapper(new AggregatedEntityRowMapper()); itemReader.setDataSource(dataSource); itemReader.setSql(" select * from some_entity order by user_id asc "); return itemReader; } @Bean("itemWriter") public ItemWriter<AggregatedEntity> itemWriter() { return items -> { log.info("[AGGREGATE] result : {}", items); }; } }
someJob 을 적절히 구현하였습니다.
- someJob 은 someStep 을 포함하여 수행합니다.
- someStep 은 chunk 단위로 reader -> writer 를 수행합니다.
- reader 는 우리가 구현한 AggregateJdbcCursorItemReader 로 지정해줍니다.
- item reader 의 row mapper 에는 우리가 구현한 AggregatedEntityRowMapper 객체를 넣어줍니다.
- writer 는 읽어들인 chunk 만큼을 (items) stdout 으로 출력만 합니다.
주의
여기서 chunk 는 cursor 에서 읽은 record 의 단위가 아니라, aggregate 된 record 의 단위입니다.
읽어들인 record 의 수가 몇개인지는 중요하지 않습니다. 실제 집약된 객체의 수가 chunk 의 기준입니다.
chunk 가 2 이기 때문에 집약된 2개의 객체가 나올때까지가 하나의 chunk 단위가 됩니다.
가장 처음에 제시한 테이블을 예로 들면 A,B 가 하나의 chunk, C,D 가 하나의 chunk 가 됩니다.
정리
첫 번째 chunk
{ A : [a1, a2], B : [b1, b2, b3] }
두 번째 chunk
{ C : [c1], D : [d1, d2, d3, d4] }
someStep 의 item reader 에서는 위와 같이 chunk 데이터를 넘겨주게 됩니다.
JdbcTemplate Result Set Extractor
Spring batch 에서는 item reader 가 있지만, 쌩짜 native jdbc template 을 사용할때는 chunk 라는 개념이 없습니다.
따라서 조금 더 제어포인트가 생깁니다.
Jdbc Cursor Item Reader 문단을 읽으셨다면 기억하실겁니다. 바로 hasNext() 에서 isFirst 를 체크하는 것이 그 중 하나입니다.
JdbcTemplate 에서는 각 집약된 객체의 처음을 판별해야합니다.
마치며
처음에 RowMapper 에서 cursor 를 제어하는게 맞을까? 라고 생각이 들었습니다.
클래스명을 보면 Row mapping 을 해야지, cursor 를 직접 제어하는 역할과 책임이 있는것이 아니었기 때문입니다.
그러나, Datasource 를 사용하는 대부분의 row mapper 구현에서 resultset 의 next() 를 이용해 cursor 를 제어하고 있었습니다.
line by line 으로 디버깅을 해보니 구조적으로 이렇게 될 수 밖에 없다는 것을 알았습니다.
덕분에 batch 구조에 대해 조금 더 이해할 수 있게되었네요.
0개의 댓글