-
Spring Batch로 배치 처리하기 - Reader, Processor, Writer 커스텀하기Spring/Batch 2025. 7. 3. 01:27
Spring Batch를 사용하여 외부 API를 통해 데이터를 적재하는 배치 처리를 구현했습니다.
The Sports DB API로부터 데이터를 수집하고, 이를 도메인 모델에 맞게 가공한 뒤 데이터베이스에 저장하는 과정을 커스텀한 Step으로 구현한 과정을 기록합니다.
250724 메모: 리팩토링으로 해당 게시글 소폭 수정 예정
들어가기 전1. 사용한 기술 스택
Spring Boot 3.5.3, Gradle(빌드 도구), PostgreSQL
2. 메타데이터 테이블 분리 여부
실무에서는 메타데이터 테이블을 별도의 데이터베이스로 분리하여 운영하는 경우가 많습니다. 하지만 메타데이터 테이블의 부하 정도가 크지 않고, 소규모 프로젝트의 경우 메인 애플리케이션 DB와 함께 구성하는 경우가 많습니다. - by. 멘토님
위와 같은 부분들을 고려하여, 본 프로젝트에서는 메인 애플리케이션 DB와 함께 구성했습니다.
사담 : 이전 프로젝트에서, 멘토님의 조언을 듣기 전까지, DB의 분리가 당연하다고 생각하고 분리했었습니다. 당시 이 과정에서 많은 시간을 쏟았고, 여러가지 사유로 인해 최종적으로 Batch 자체는 도입하지 못하게 되었습니다. (ㅠ_ㅠ)
혹시나 메타데이터 DB를 분리하고자 하는 분들이 계시다면, 아래 영상을 참고하여 따라가면 좋을 것 같습니다.
> 스프링 배치 5 : 4. DB 연결 Config 클래스
Spring Batch를 사용하는 과정 요약
- 메타데이터 테이블을 별도의 DB로 분리하는 경우 : 분리된 DB 설정이 필요하고, 해당 과정이 추가되어야 합니다.
- 커스텀 Step을 사용하지 않는 경우 : ItemReader, ItemProcessor, ItemWriter를 직접 구현하는 단계가 생략됩니다.
의존성 추가 → 설정 파일 작성 → Reader/Processor/Writer 구현 → Step 정의 → Job 정의 → JobLauncher로 실행
1. 의존성 추가하기

build.gradle에 Spring Batch에 관한 의존성을 추가합니다.
2. application 설정 파일에 Batch 관련 설정하기
# application.yaml spring: batch: jdbc: initialize-schema: always #1 job: enabled: false #2#1 spring:batch:jdbc:initialize-schema: always
애플리케이션 실행 시 Spring Batch 메타데이터 테이블을 자동으로 생성합니다. 개발/테스트 단계에서 always로 사용하고, 일반적으로는 never로 설정한 후 DDL을 따로 관리합니다.
#2 spring:batch:job:enabled: false
기본값은 true로, 별도로 설정하지 않으면 Job이 자동 실행됩니다. 이 설정을 false로 하면 Job들을 JobLauncher를 통해 명시적으로 실행해야 합니다.*서버가 켜질 때마다 Job이 자동 실행되는 것 방지하기 위해 이용
주의점 : Spring Boot 3.0 버전 위로는 @EnableBatchProcessing를 함께 사용하지 않도록 합니다. 해당 어노테이션을 사용할시 spring:batch* 설정이 무효화됩니다.
3. Step을 구성할 Reader, Processor, Writer 커스터마이징
ItemReader(1개) → ItemProcessor(1개) → ItemWriter(chunk size 묶음)
Reader, Processor는 하나의 Item 단위로 데이터를 읽고 처리합니다. 이후 Step을 정의할 때 설정한 Chunk Size에 따라 여러 Item이 묶음(batch) 단위로 구성되고, 이 묶음 단위로 Writer가 데이터를 저장합니다.
<Reader 커스터마이징>
Reader의 경우 ItemStream, ItemReader를 상속하고 있는 ItemStreamReader를 이용해 커스텀했습니다.
ItemStreamReader에 대하여 ▽
더보기
ItemStreamReader 인터페이스 ItemReader를 구현할 때는 배치 작업 중 데이터를 읽어오는 read() 메서드를 필수로 구현해야 합니다.
ItemStream를 구현하는 경우, 아래와 같은 메서드를 통해 작업 상태를 저장하거나 복원할 수 있습니다.
- open(ExecutionContext executionContext): Step이 시작되거나 재시작될 때 호출되어 이전 실행 상태를 복원할 수 있습니다.
- update(ExecutionContext executionContext): Chunk 처리가 끝날 때*마다 호출되어 현재 상태를 저장합니다. *Spring Batch의 ItemStream 문서에는 해당 부분이 Commit 이후라고 작성되어 있습니다. Spring Batch는 Chunk 단위로 트랜잭션을 관리하기 때문에 동일한 말입니다.
- close(): Step이 완료되거나 종료될 때 호출됩니다.
여기서 ExecutionContext는 작업 중 추적해야 할 변수를 저장하는 일종의 상태 저장소입니다. put()으로 값을 저장하고 get()으로 값을 꺼낼 수 있습니다.
이 데이터는 내부적으로 Sprign Batch의 메타데이터 테이블인 BATCH_JOB_EXECUTION_CONTEXT와 BATCH_STEP_EXECUTION_CONTEXT에 저장합니다.


BATCH_JOB_EXECUTION_CONTEXT, BATCH_STEP_EXECUTION_CONTEXT에 직렬화되어 저장된 변수들 ItemStreamReader 구현 코드
더보기package team03.mopl.domain.content.batch.sports; import java.net.URI; import java.time.LocalDate; import java.util.ArrayList; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.ItemStreamReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import org.springframework.web.client.RestTemplate; import org.springframework.web.util.UriComponentsBuilder; @Slf4j public class SportsApiReader implements ItemStreamReader<SportsItemDto> { private final RestTemplate restTemplate; private final List<SportsApiRequestInfo> apiRequestInfos; private final String baseUrl; private int nextRequestIndex = 0; private List<SportsItemDto> sportsItemDtos; private int nextItemIndex = 0; public SportsApiReader(RestTemplate restTemplate, String baseUrl) { this.restTemplate = restTemplate; this.apiRequestInfos = buildApiRequestInfo(); this.baseUrl = baseUrl; this.sportsItemDtos = new ArrayList<>(); } @Override public SportsItemDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { // API 호출시 빈 리스트를 대비하여 while(true)를 통해 회피한다 while (true) { // 1. 읽을 Item이 있는지 확인 if (sportsItemDtos == null || nextItemIndex >= sportsItemDtos.size()) { // 2. 다음 API가 없다면 false 반환 if (!fetchSportsFromApi()) { // 2*. null을 반환한다. return null; } } if (!sportsItemDtos.isEmpty() && nextItemIndex < sportsItemDtos.size()) { // 3. Item 을 반환한다. return sportsItemDtos.get(nextItemIndex++); } } } /** * API 호출및 호출 여부 반환 */ private boolean fetchSportsFromApi() { // 1. info를 전부 순회했는지 확인 if (nextRequestIndex >= apiRequestInfos.size()) { return false; } // 2. API 정보 꺼낸다. SportsApiRequestInfo info = apiRequestInfos.get(nextRequestIndex); nextRequestIndex++; // 3. info 객체 정보로 API URL 생성한다. URI uri = UriComponentsBuilder .fromUriString(baseUrl) .path("/{apiKey}/eventsday.php") .queryParam("d", info.getDate()) .queryParam("l", info.getLeagueName()) .build("123"); log.info("uri={}", uri); // 4. RestTemplate으로 API를 호출하고 SportsApiResponse 객체로 받는다. SportsApiResponse response = restTemplate.getForObject(uri, SportsApiResponse.class); // 5. 받은 Response에서 List<SportsItemDto>를 꺼낸다. this.sportsItemDtos = (response != null && response.getEvents() != null) ? response.getEvents() : new ArrayList<>(); // 6. sportsItemDtos 초기화로 인한 nextEventIndex 초기화 this.nextItemIndex = 0; return true; } /** * API 호출을 위한 Info 객체 생성 * <p> * MLB, KBO | 어제 날짜 */ private List<SportsApiRequestInfo> buildApiRequestInfo() { List<SportsApiRequestInfo> requestInfos = new ArrayList<>(); String yesterday = LocalDate.now().minusDays(1).toString(); log.info("yesterday={}", yesterday); List<String> leagues = List.of("MLB", "Korean KBO League"); for (String league : leagues) { requestInfos.add(SportsApiRequestInfo.builder() .date(yesterday) .leagueName(league) .build()); } return requestInfos; } // ----------------------------------------------------------------------------------------------- /** * Step이 시작될 때, 또는 재시작할 때 호출되는 동작 */ @Override public void open(ExecutionContext executionContext) throws ItemStreamException { if (executionContext.containsKey("nextRequestIndex")) { this.nextRequestIndex = executionContext.getInt("nextRequestIndex"); log.info("Job 재시작: " + this.nextRequestIndex + "번째 요청부터 다시 시작합니다."); } else { this.nextRequestIndex = 0; log.info("Job 신규 시작"); } if (executionContext.containsKey("nextItemIndex")) { this.nextItemIndex = executionContext.getInt("nextItemIndex"); } else { this.nextItemIndex = 0; } } /** * Chunk 처리가 끝날 때마다 호출하여 상태를 저장한다. */ @Override public void update(ExecutionContext executionContext) throws ItemStreamException { executionContext.putInt("nextRequestIndex", this.nextRequestIndex); executionContext.putInt("nextItemIndex", this.nextItemIndex); } @Override public void close() throws ItemStreamException { log.info("InitialTmdbApiReader 종료"); } }ItemProcessor 구현 코드
더보기package team03.mopl.domain.content.batch.sports; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; import java.time.ZonedDateTime; import org.springframework.batch.item.ItemProcessor; import team03.mopl.domain.content.Content; import team03.mopl.domain.content.ContentType; public class SportsApiProcessor implements ItemProcessor<SportsItemDto, Content> { @Override public Content process(SportsItemDto item) throws Exception { // 1. description 생성 StringBuilder description = new StringBuilder(); if (item.getStrLeague() != null) { description.append("리그: ").append(item.getStrLeague()).append("\n"); } if (item.getStrVenue() != null) { description.append("장소: ").append(item.getStrVenue()).append("\n\n"); } if (item.getStrHomeTeam() != null && item.getStrAwayTeam() != null) { description.append(item.getStrHomeTeam()).append(" vs ").append(item.getStrAwayTeam()) .append("\n"); } if (item.getIntHomeScore() != null && item.getIntAwayScore() != null) { description.append(item.getIntHomeScore()).append(":").append(item.getIntAwayScore()); } // 2. dateTime 생성 String date = item.getDateEvent(); String time = item.getStrTime(); LocalDateTime dateTime = null; if (date != null || !date.isEmpty() || time != null || !time.isEmpty()) { LocalDate localDate = LocalDate.parse(date); LocalTime localTime = LocalTime.parse(time); ZonedDateTime utcDateTime = ZonedDateTime.of(localDate, localTime, ZoneId.of("UTC")); ZonedDateTime kstDateTime = utcDateTime.withZoneSameInstant(ZoneId.of("Asia/Seoul")); dateTime = kstDateTime.toLocalDateTime(); } // 2. content 객체 생성및 반환 Content content = Content.builder() .title(item.getStrFilename()) .description(description.toString()) .contentType(ContentType.SPORTS) .releaseDate(dateTime) .url(item.getStrVideo()) .build(); return content; } }ItemWriter 구현 코드
더보기package team03.mopl.domain.content.batch.common; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemWriter; import team03.mopl.domain.content.Content; import team03.mopl.domain.content.repository.ContentRepository; @Slf4j @RequiredArgsConstructor public class ApiWriter implements ItemWriter<Content> { private final ContentRepository contentRepository; @Override public void write(Chunk<? extends Content> chunk) throws Exception { contentRepository.saveAll(chunk.getItems()); log.info("Item {}개 저장", chunk.getItems().size()); } }4. Step 정의
Reader, Processor, Writer 구현이 끝났다면, 각각을 Bean으로 등록하고 이를 조합하여 Step을 정의해야 합니다.
이 Step 역시 Bean으로 등록해야 하며, 일반적으로 @Component와 같은 어노테이션을 사용하는 것 보다 @Configuration 클래스에서 @Bean 어노테이션을 통해 명시적으로 정의하는 방식이 가독성과 관리 측면에서 용이합니다.
더보기package team03.mopl.domain.content.batch.sports; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.Step; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.client.RestTemplate; import team03.mopl.domain.content.Content; @Configuration @RequiredArgsConstructor public class SportsBatchConfig { private final RestTemplate restTemplate; private final PlatformTransactionManager transactionManager; private final ItemWriter<Content> itemWriter; private final JobRepository jobRepository; @Value("${sports.baseurl}") private String baseUrl; // 위에서 공개하지 않은 Reader 구현체가 쓰인 Step 입니다 @Bean public Step initialSportsStep(){ return new StepBuilder("initialSportsStep", jobRepository) .<SportsItemDto, Content>chunk(20, transactionManager) .reader(initialSportsReader()) .processor(sportsProcessor()) .writer(itemWriter) .build(); } // Step을 정의합니다 @Bean public Step sportsStep(){ return new StepBuilder("sportsStep", jobRepository) .<SportsItemDto, Content>chunk(5, transactionManager) .reader(sportsReader()) .processor(sportsProcessor()) .writer(itemWriter) .build(); } @Bean public ItemReader<SportsItemDto> initialSportsReader(){ return new InitialSportsApiReader(restTemplate, baseUrl); } @Bean public ItemReader<SportsItemDto> sportsReader(){ return new SportsApiReader(restTemplate, baseUrl); } @Bean public ItemProcessor<SportsItemDto, Content> sportsProcessor(){ return new SportsApiProcessor(); } }package team03.mopl.domain.content.batch.common; import lombok.RequiredArgsConstructor; import org.springframework.batch.item.ItemWriter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import team03.mopl.domain.content.Content; import team03.mopl.domain.content.repository.ContentRepository; @Configuration @RequiredArgsConstructor public class BatchConfig { private final ContentRepository contentRepository; // 공용 Writer로서 작성했습니다. @Bean public ItemWriter<Content> itemWriter(){ return new ApiWriter(contentRepository); } }5. Job 정의
여러 개의 Step Bean을 생성해두면 Spring에게 어떤 Step을 주입할지 알려줘야 합니다.
이처럼 같은 타입의 Bean이 여러 개일 경우 @Qualifier 어노테이션을 사용해 명확히 어떤 Bean을 사용할지 지정해줘야 합니다. @Qualifier에는 Step을 정의할 때 사용한 Bean 이름을 지정합니다.
더보기package team03.mopl.domain.content.batch.job; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class JobConfig { private final JobRepository jobRepository; @Qualifier("initialSportsStep") private final Step initialSportsStep; @Qualifier("sportsStep") private final Step sportsStep; @Qualifier("initialTmdbStep") private final Step initialTmdbStep; @Qualifier("tmdbStep") private final Step tmdbStep; public JobConfig( JobRepository jobRepository, @Qualifier("initialSportsStep") Step initialSportsStep, @Qualifier("sportsStep") Step sportsStep, @Qualifier("initialTmdbStep") Step initialTmdbStep, @Qualifier("tmdbStep") Step tmdbStep) { this.jobRepository = jobRepository; this.initialSportsStep = initialSportsStep; this.sportsStep = sportsStep; this.initialTmdbStep = initialTmdbStep; this.tmdbStep = tmdbStep; } @Bean public Job initialSportsJob(){ return new JobBuilder("initialSportsJob", jobRepository) .start(initialSportsStep) .build(); } @Bean public Job sportsJob(){ return new JobBuilder("sportsJob", jobRepository) .start(sportsStep) .build(); } @Bean public Job initialTmdbJob(){ return new JobBuilder("initialTmdbJob", jobRepository) .start(initialTmdbStep) .build(); } @Bean public Job TmdbJob(){ return new JobBuilder("TmdbJob", jobRepository) .start(tmdbStep) .build(); } }6. JobLauncher를 통해 실행
일반적으로 스케줄러와 연동하여 배치 작업을 실행하고 마무리 합니다.
JobLauncher로 Job을 실행하며, 이때 JobParameter라는 객체를 통해 추가적인 작업 정보를 전달 할 수 있습니다. 전달된 파라미터는 BATCH_JOB_EXECUTION_PARAMS 테이블에 저장됩니다.
더보기
JobParameter 객체로 넘긴 작업 정보의 기록 JobLauncher는 실행 결과로 JobExecution 객체를 반환하고, JobExecution을 통해 배치의 상태를 확인할 수 있습니다.
더보기package team03.mopl.domain.content.batch.launcher; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component @Slf4j public class SportsJobScheduler { private final JobLauncher jobLauncher; private final Job sportsJob; public SportsJobScheduler( JobLauncher jobLauncher, @Qualifier("sportsJob") Job sportsJob) { this.jobLauncher = jobLauncher; this.sportsJob = sportsJob; } @Scheduled(cron = "0 0 12 * * *") public void runSportsJob() { JobParameters jobParameters = new JobParametersBuilder() .addLong("timestamp", System.currentTimeMillis()) .addString("mode", "scheduler") .toJobParameters(); try { log.info("Sports Job 스케줄 실행 시작"); JobExecution jobExecution = jobLauncher.run(sportsJob, jobParameters); log.info("배치 작업 완료 : {}", jobExecution.getStatus()); } catch (Exception e) { log.error("Sports Job 실행 중 오류 발생", e); } } }
위 배치 작업을 완료한 후 DB에 적재된 데이터

래퍼런스
Spring Boot 3 및 Spring Batch 5에서 배치 테이블 자동 생성 문제 해결하기
'Spring > Batch' 카테고리의 다른 글
Spring Batch (1) 2025.07.02