[Spring Batch] Implement of Aggregated Item reader
들어가기에 앞서
SQL 을 이용하다보면 GROUP BY 절을 자주 사용하게됩니다.
Batch 의 Item reader 에서도 GROUP BY 절을 사용하여 grouped item 을 가져오면 좋겠으나
집계되어 반환되는 record 의 수가 1이기 때문에 item reader 에 사용하기에는 애로사항이 있었습니다.
이를 Custom row mapper 와 Custom result set extractor 구현으로 해결했던 사례를 공유하고자 합니다.
Aggregated querying (customize)
아래 테이블이 집약을 하고 싶은 대상 테이블이라고 가정하겠습니다.
| id | user_id | some_data | some_date |
| 1 | A | a1 | 1970-01-01 |
| 2 | A | a2 | 1970-01-02 |
| 3 | B | b1 | 1970-01-03 |
| 4 | B | b2 | 1970-01-04 |
| 5 | B | b3 | 1970-01-05 |
| 6 | C | c1 | 1970-01-06 |
| 7 | D | d1 | 1970-01-07 |
| 8 | D | d2 | 1970-01-08 |
| 9 | D | d3 | 1970-01-09 |
| 10 | D | d4 | 1970-01-10 |
public class SomeEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column
private String userId;
@Column
private String someData;
@Column
private Date someDate;
}
그리고 매핑되는 Entity 클래스 입니다.
위 데이터를 아래같은 형식으로 만들고 싶지만 Database 관점에서는 적절하지 못합니다.
{
A : [a1, a2],
B : [b1, b2, b3],
C : [c1],
D : [d1, d2, d3, d4]
}
이 때, 객체 지향 관점에서 aggregated querying 을 적용하면 됩니다.
이제부터 아래 2개의 항목을 통해 설명하겠습니다.
- Aggregated Jdbc Cursor Item Reader in Spring Batch
- Aggregated querying through native jdbcTemplate with Result Set Extractor
공통영역
AggregatedRowMapper <– AbstractAggregatedRowMapper
각 row mapper 상속구현
<– AggregatedEntityRowMapper
Batch job configuration 에서 AggregateJdbcCursorItemReader 클래스에 AggregatedEntityRowMapper 를 넣음
JdbcTemplate 사용시 RowMapper 만 넣는 대신 ResultSetExtractor(RowMapper) 를 넣는 이유
rs.next() 최초 호출이 필요한데 jdbcTemplate 에서는 그게 없음 그래서 ResultSetExtractor 를 구현해서 hasNext(rs.next() 호출) 를 호출하는 것.
AggregatedRowMapperResultSetExtractor
rowMapper.hasNext 에서 rs.next() 를 호출
rowMapper.mapRow 에서 rs.next() 를 호출
while (rowMapper.hasNext(rs)) { results.add(rowMapper.mapRow(rs, rowNumber++)); }
Jdbc Cursor Item Reader
item reader 는 batch 개발에서 사용되는 용어입니다. 원천 data source 로 부터 어떤 데이터(무엇)를 어떻게 가져오겠다(어떻게)를 정의할 수 있습니다.
대부분의 batch 개발시 이미 구현돼있는 row mapper(어떻게) 를 이용해 (BeanPropertyRowMapper) “어떤 데이터”(무엇을)를 가져오겠다고 정의합니다.
하지만 Entity 를 aggregating 하기 위해서는 이미 구현돼있는 row mapper 를 대신하여, “어떻게” 데이터를 가져올지 정의해주어야 합니다.
Item reader 를 정의하기 위한 단계는 다음과 같습니다.
- RowMapper<T> 를 구현한 custom RowMapper
- JdbcCursorItemReader<T> 를 상속하여 cursor 를 제어할 수 있는 custom ItemReader
가 필요합니다.
custom RowMapper
저는 이 Row Mapper 의 활용도가 많아보여 RowMapper 인터페이스를 확장하였고, 확장한 인터페이스를 다시 추상클래스로 구현하였습니다.
가장 간소화된 버전은 모든 단계를 생략하고 3번에서 바로 RowMapper 를 구현하시면 됩니다.
1. AggregatedRowMapper.java –> RowMapper 를 확장. hasNext 메서드 추가
public interface AggregatedRowMapper<T> extends RowMapper<T> {
boolean hasNext(ResultSet rs) throws SQLException;
}
2. AbstractAggregatedRowMapper.java –> AggregatedRowMapper 를 구현한 추상클래스
public abstract class AbstractAggregateRowMapper<T> implements AggregateRowMapper<T> {
private boolean isFirst = true;
private boolean isNextResultLast;
/**
* Why RowMapper has {@link #hasNext} method?
* > Because JdbcCursorItemReader can not recognize each record what have relation
* > So mapRow will run until not found relation ( It mean {@link #isRelated} were return false )
* > And it is why {@link #forward} is exist also.
* @param rs
* @return
* @throws SQLException
*/
@Override
public boolean hasNext(ResultSet rs) throws SQLException {
if (isFirst) {
isNextResultLast = rs.next();
isFirst = false;
}
return isNextResultLast;
}
@Override
public T mapRow(ResultSet rs, int rowNum) throws SQLException {
T result = mapRow(rs, null, rowNum);
while (forward(rs) && isRelated(rs, result)) {
result = mapRow(rs, result, ++rowNum);
}
return result;
}
protected boolean forward(ResultSet rs) throws SQLException {
isNextResultLast = rs.next();
return isNextResultLast;
}
protected abstract T mapRow(ResultSet rs, T partialResult, int rowNum) throws SQLException;
protected abstract boolean isRelated(ResultSet rs, T partialResult) throws SQLException;
}
mapRow method 에서는 상속한 클래스에서 일단 구현한 mapRow 를 호출합니다. (fetch 및 VO 에 mapping)
– 그리고 isRelated() 를 통해 연관관계가 깨질 때까지 하나의 entity 덩어리로 판단하여 계속해서 mapRow() 를 호출합니다.
이는 3번의 isRelated() method 의 구현된 모습을 보면 이해가 빠릅니다.
- hasNext() method 에서 isFirst 를 체크하는 이유는 아래 jdbcTemplate 파트를 설명할 때 참조하시면 됩니다.
3. AggregatedEntityRowMapper –> AbstractAggregatedRowMapper 를 상속한 클래스. SomeEntity 를 집약하여 AggregatedEntity 덩어리로 만들어 준다.
public class AggregatedEntityRowMapper extends AbstractAggregatedRowMapper<AggregatedEntity> {
@Override
protected AggregatedEntity mapRow(ResultSet rs, AggregatedEntity partialResult, int rowNum)
throws SQLException {
if (partialResult == null) {
partialResult = AggregatedEntity.of(rs.getString("user_id"));
}
partialResult.add(rs.getString("some_data"));
return partialResult;
}
@Override
protected boolean isRelated(ResultSet rs, AggregatedEntity partialResult) throws SQLException {
return partialResult.getUserId().equals(rs.getString("user_id"));
}
}
이 row mapper 에서 핵심 method 는 isRelated() 입니다.
현재 cursor 의 record(row) 와 이전에 fetch 한 record 가 연관이 있는지, 연관이 있다면 aggregation 할 수 있도록 판단해주는 method 입니다.
4. AggregatedJdbcCursorItemReader.java –> 조금 전에 구현한 rowMapper 를 멤버로 가지고 있고, rowMapper 결과에 따라서 cursor 를 제어하는 역할
public class AggregatedJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {
private AggregatedRowMapper<T> rowMapper;
@Override
public void setRowMapper(RowMapper<T> rowMapper) {
this.rowMapper = (AggregatedRowMapper<T>) rowMapper;
super.setRowMapper(rowMapper);
}
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
T result = super.readCursor(rs, currentRow);
setCurrentItemCount(rs.getRow());
return result;
}
@Override
protected T doRead() throws Exception {
if (rs == null) {
throw new ReaderNotOpenException("Reader must be open before it can be read.");
}
try {
return rowMapper.hasNext(rs) ? readCursor(rs, getCurrentItemCount()) : null;
} catch (SQLException e) {
throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), e);
}
}
}
cursor item reader 는 Step executor 에 의해 doRead 를 호출 -> 다시 readCursor 를 호출합니다.
(우리가 필요한 것은 record 간에 연관관계를 만들어 집약하는것입니다. database 를 제어하는 것은 JdbcCursorItemReader 가 해주기 때문에 별도로 구현하는 것이 아니라, super 클래스(JdbcCursorItemReader)의 readCursor를 호출하게됩니다.)
잠깐 사족을 달자면 아래는 JdbcCursorItemReader 의 readCursor (위에서 말한 super 클래스의 readCursor) 입니다.
@Nullable @Override protected T readCursor(ResultSet rs, int currentRow) throws SQLException { return rowMapper.mapRow(rs, currentRow); }
AggregatedJdbcCursorItemReader 생성자에서 호출한 super.setRowMapper(rowMapper); 에 의해
이전에 만든 custom row mapper 를 JdbcCursorItemReader 의 row mapper 로 지정해주었고,
readCursor 에서는 해당 rowMapper 를 통해 mapRow 하도록 트리거합니다.
그래서 doRead() -> readCursor() -> super.readCursor() -> customRowMapper.mapRow() 가 되는 것이고,
mapRow 를 통해 나온 aggregated entity 가 doRead() 에서 return 되는 것입니다.
Batch Job, Step 구현
@Slf4j
@Configuration
public class SomeJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final PlatformTransactionManager platformTransactionManager;
private final DataSource dataSource;
public SomeJobConfiguration(
JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
PlatformTransactionManager platformTransactionManager,
DataSource dataSource) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.platformTransactionManager = platformTransactionManager;
this.dataSource = dataSource;
}
@Bean("someJob")
public Job someJob(
Step someStep) throws Exception {
return jobBuilderFactory.get("someJob")
.start(someStep)
.build();
}
@Bean("someStep")
public Step someStep(
AggregatedJdbcCursorItemReader<AggregatedEntity> itemReader,
ItemWriter<AggregatedEntity> itemWriter
) {
return stepBuilderFactory.get("someStep")
.<AggregatedEntity, AggregatedEntity>chunk(2)
.reader(itemReader)
.writer(itemWriter)
.transactionManager(platformTransactionManager)
.build();
}
@Bean("itemReader")
public AggregatedJdbcCursorItemReader<AggregatedEntity> itemReader() {
AggregatedJdbcCursorItemReader<AggregatedEntity> itemReader = new AggregatedJdbcCursorItemReader<>();
itemReader.setRowMapper(new AggregatedEntityRowMapper());
itemReader.setDataSource(dataSource);
itemReader.setSql(" select * from some_entity order by user_id asc ");
return itemReader;
}
@Bean("itemWriter")
public ItemWriter<AggregatedEntity> itemWriter() {
return items -> {
log.info("[AGGREGATE] result : {}", items);
};
}
}
someJob 을 적절히 구현하였습니다.
- someJob 은 someStep 을 포함하여 수행합니다.
- someStep 은 chunk 단위로 reader -> writer 를 수행합니다.
- reader 는 우리가 구현한 AggregateJdbcCursorItemReader 로 지정해줍니다.
- item reader 의 row mapper 에는 우리가 구현한 AggregatedEntityRowMapper 객체를 넣어줍니다.
- writer 는 읽어들인 chunk 만큼을 (items) stdout 으로 출력만 합니다.
주의
여기서 chunk 는 cursor 에서 읽은 record 의 단위가 아니라, aggregate 된 record 의 단위입니다.
읽어들인 record 의 수가 몇개인지는 중요하지 않습니다. 실제 집약된 객체의 수가 chunk 의 기준입니다.
chunk 가 2 이기 때문에 집약된 2개의 객체가 나올때까지가 하나의 chunk 단위가 됩니다.
가장 처음에 제시한 테이블을 예로 들면 A,B 가 하나의 chunk, C,D 가 하나의 chunk 가 됩니다.
정리
첫 번째 chunk
{
A : [a1, a2],
B : [b1, b2, b3]
}
두 번째 chunk
{
C : [c1],
D : [d1, d2, d3, d4]
}
someStep 의 item reader 에서는 위와 같이 chunk 데이터를 넘겨주게 됩니다.
JdbcTemplate Result Set Extractor
Spring batch 에서는 item reader 가 있지만, 쌩짜 native jdbc template 을 사용할때는 chunk 라는 개념이 없습니다.
따라서 조금 더 제어포인트가 생깁니다.
Jdbc Cursor Item Reader 문단을 읽으셨다면 기억하실겁니다. 바로 hasNext() 에서 isFirst 를 체크하는 것이 그 중 하나입니다.
JdbcTemplate 에서는 각 집약된 객체의 처음을 판별해야합니다.
마치며
처음에 RowMapper 에서 cursor 를 제어하는게 맞을까? 라고 생각이 들었습니다.
클래스명을 보면 Row mapping 을 해야지, cursor 를 직접 제어하는 역할과 책임이 있는것이 아니었기 때문입니다.
그러나, Datasource 를 사용하는 대부분의 row mapper 구현에서 resultset 의 next() 를 이용해 cursor 를 제어하고 있었습니다.
line by line 으로 디버깅을 해보니 구조적으로 이렇게 될 수 밖에 없다는 것을 알았습니다.
덕분에 batch 구조에 대해 조금 더 이해할 수 있게되었네요.
0개의 댓글