관리 메뉴

ballqs 님의 블로그

[Spring] Spring Batch란? feat.DB 분리 작업 본문

코딩 공부/Spring

[Spring] Spring Batch란? feat.DB 분리 작업

ballqs 2024. 11. 10. 22:03

Spring Batch(Spring Batch)란?

대용량 데이터 처리를 위해 설계된 오픈 소스 프레임워크입니다. 대규모 데이터 처리를 효율적으로 수행하고, 반복적으로 작업을 자동화할 수 있는 강력한 도구를 제공합니다. 데이터베이스에서 데이터를 읽고 처리한 후 다른 데이터베이스나 파일에 기록하는 등의 작업을 안정적이고 일괄적으로 수행할 수 있도록 지원합니다.

 

주요 개념

Job: 스프링 배치에서 수행할 작업의 단위입니다.

Step: Job의 구성 요소로, 실제로 수행할 작업을 정의합니다.

ItemReader: 데이터 소스에서 데이터를 읽어오는 역할을 합니다.

ItemProcessor: 읽어온 데이터를 처리하는 역할을 합니다.

ItemWriter: 처리된 데이터를 저장하는 역할을 합니다.

 

메타 테이블

 

BATCH_JOB_INSTANCE: 실행된 Job의 인스턴스를 저장하는 테이블입니다. 동일한 Job이 여러 번 실행될 경우 각각의 실행을 구분하기 위해 사용됩니다. 이를 통해 스프링 배치는 동일한 Job의 여러 인스턴스들을 개별적으로 관리할 수 있습니다.

BATCH_JOB_SEQ: Job 인스턴스의 고유 식별자를 생성하기 위해 사용되는 시퀀스 테이블입니다. 각 Job 인스턴스에 대해 유일한 ID를 보장합니다.

BATCH_JOB_EXECUTION: Job의 실행 상태(시작 시간, 종료 시간, 상태 등)를 기록하는 테이블입니다. Job의 성공 여부나 실패 상태, 그리고 현재 진행 상태를 저장하여 Job의 이력을 관리합니다.

BATCH_JOB_EXECUTION_PARAMS: Job 실행 시 함께 전달된 매개변수(parameter)를 기록하는 테이블입니다. 이 정보를 통해 동일한 Job이라도 서로 다른 매개변수로 실행된 기록을 추적할 수 있습니다.

BATCH_JOB_EXECUTION_CONTEXT: Job의 실행 컨텍스트를 기록하는 테이블입니다. Job 실행 중 생성된 정보를 저장하며, Job이 재시작될 때 이전의 실행 상태를 참고할 수 있도록 합니다.

BATCH_JOB_EXECUTION_SEQ: Job 실행의 고유 식별자를 생성하기 위한 시퀀스 테이블입니다. 각 Job 실행에 대해 고유한 ID를 제공합니다.

BATCH_STEP_EXECUTION: 각 Step의 실행 상태(시작 시간, 종료 시간, 상태 등)를 기록하는 테이블입니다. 각 Step의 성공 여부와 오류 상태, 실행 중의 상태 등을 추적할 수 있어 Job의 세부 진행 상황을 파악할 수 있습니다.

BATCH_STEP_EXECUTION_CONTEXT: Step의 실행 중 생성된 컨텍스트 정보를 저장하는 테이블입니다. Step이 재시작될 때 해당 컨텍스트 정보를 활용해 이전 상태를 복원할 수 있습니다.

BATCH_STEP_EXECUTION_SEQ: Step 실행의 고유 식별자를 생성하기 위한 시퀀스 테이블입니다. 각 Step 실행에 대해 유일한 ID를 보장합니다.

 

이 메타테이블은 데이터베이스에 생성되며 스프링 배치가 자동으로 생성합니다.

 

스프링 설정

build.gradle 추가

implementation 'org.springframework.boot:spring-boot-starter-batch'

 

application.yml 추가

spring:
  batch:
    job:
      enabled: false
    jdbc:
      initialize-schema: always
      schema: classpath:org/springframework/batch/core/schema-mysql.sql

 

※ 트러블 슈팅

더보기

버전문제로 단순히 initialize-schema: always 이 값을 추가해준다고 동작되진 않습니다.

내가 사용하는 db에 맞는 스키마를 들고와서

schema: classpath:org/springframework/batch/core/schema-mysql.sql

이 값을 넣어줘야지 생성되는 것을 확인할 수 있었다.

 

그러면 내 db에 맞는 것을 어떻게 가져와야할까?...

 

여기에서 맞는 db를 가져와서 작성해두면 됩니다.

 

@EnableBatchProcessing 어노테이션을 사용하여 배치 활성화

@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

 

@Configuration 설정

@Slf4j
@RequiredArgsConstructor
@Configuration
public class BatchConfig {

    private final JobRepository jobRepository;
    private final Step step;

    @Bean(name = "Job")
    public Job job() {
        log.info("job");
        return new JobBuilder("job", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(step.step())
                .next("다음 스텝...")
                .build();
    }
}

 

참조했던 다른 수많은 글에는 알기쉽게 하나의 Config 파일에 step -> reader -> processor -> writer 를 다 넣어두었지만

이 글에서는 분리해두겠습니다.

분리하는 이유는 step에는 꼭 reader -> processor -> writer 가 아닌 다른 것도 사용할 수 있습니다.

tasklet 이란 것도 있습니다.

 

reader -> processor -> writer 의 사용법

step.java 작성!

@Slf4j
@RequiredArgsConstructor
@Configuration
public class SettlementStep {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager platformTransactionManager;
    private final AService aService;

    @Bean
    public Step step() {
        log.info("step");
        return new StepBuilder("step", jobRepository)
                .<ADto, BDto> chunk(10, platformTransactionManager)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ItemReader<ADto> itemReader() {
        log.info("reader");
        List<ADto> aDtoList = aService.aDtoList(); // 전체 데이터를 조회!
        return new ListItemReader<>(aDtoList);
    }

    @Bean
    public ItemProcessor<ADto, BDto> itemProcessor() {
        log.info("processor");
        return aDto -> {
			BDto bDto = new BDto(aDto); // aDto 값을 bDto로 변환
            return bDto;
        };
    }

    @Bean
    public ItemWriter<BDto> itemWriter() {
        log.info("writer");
        return items -> items.forEach(bDto -> {
            // bDto로 쓰기 작업...
        });
    }
}

 

ItemReader에서 모든 데이터를 읽어온다.

ItemProcessor에서 읽어온 데이터를 청크단위로 ADto -> BDto로 변환하는 작업을 한다.

ItemWriter에서 청크단위로 BDto를 쓰기 작업한다.

위 3가지를 하나의 Step으로 여기며 작성하게 되고

next에는 다른 Step를 작성하여 추가적으로 작성이 가능하다.

 

tasklet 의 사용법

step.java 작성

@Slf4j
@RequiredArgsConstructor
@Configuration
public class Step {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager platformTransactionManager;

    @Bean
    public Step step() {
        log.info("step");
        return new StepBuilder("step", jobRepository)
                .tasklet(tasklet() , platformTransactionManager)
                .build();
    }

    @Bean
    public Tasklet tasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                // 정산 작업 수행
                log.info("Executing settlement tasklet...");
                return RepeatStatus.FINISHED; // 작업 완료 상태 반환
            }
        };
    }
}

reader -> processor -> writer 가 아니라 한군데에 다 묶어서 처리하는 방법으로 사용할 수 있습니다.

Tasklet은 단일 작업이나 데이터 흐름이 복잡하지 않은 경우 사용합니다. 예를 들어 데이터를 읽고 단순히 삽입하거나 하는 로직으로 사용됩니다.

 

DB 분리

스프링 배치를 사용하려면 실서버 DB와 메타테이블이 들어갈 DB를 분리해두는게 좋아서 진행해봤습니다.

 

application.yml 작성

spring:
  config:
    import: optional:file:.env[.properties]
  datasource-meta:
    jdbc-url: ${BATCH_DB_URL}
    username: ${BATCH_DB_USERNAME}
    password: ${BATCH_DB_PASSWORD}
    driver-class-name: com.mysql.cj.jdbc.Driver
  datasource-data:
    jdbc-url: ${DB_URL}
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    driver-class-name: com.mysql.cj.jdbc.Driver

 

※ 트러블 슈팅

더보기
spring:
  config:
    import: optional:file:.env[.properties]
  datasource-meta:
    url: ${BATCH_DB_URL}
    username: ${BATCH_DB_USERNAME}
    password: ${BATCH_DB_PASSWORD}
    driver-class-name: com.mysql.cj.jdbc.Driver
  datasource-data:
    url: ${DB_URL}
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    driver-class-name: com.mysql.cj.jdbc.Driver

 처음에는 이렇게 작성했으나 jdbc-url? 을 못찾는다는 에러가 발생했다.

 

url 부분을 -> jdbc-url로 변경하면 해결되는 것을 눈으로 확인했습니다.

 

DB별 Config 파일

MetaDBConfig.java

@Configuration
public class MetaDBConfig {
    @Primary
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource-meta")
    public DataSource metaDBSource() {
        return DataSourceBuilder.create().build();
    }

    @Primary
    @Bean
    public PlatformTransactionManager metaTransactionManager() {
        return new DataSourceTransactionManager(metaDBSource());
    }
}

 

DataDBConfig.java

@Configuration
@EnableJpaRepositories(
        basePackages = "org.sparta.batch.domain",       // Repository
        entityManagerFactoryRef = "dataEntityManager",
        transactionManagerRef = "dataTransactionManager"
)
public class DataDBConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource-data")
    public DataSource dataDBSource() {

        return DataSourceBuilder.create().build();
    }

    @Bean
    public LocalContainerEntityManagerFactoryBean dataEntityManager() {

        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();

        em.setDataSource(dataDBSource());
        em.setPackagesToScan(new String[]{"org.sparta.batch.domain"});  // Entity
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        vendorAdapter.setShowSql(true);
        vendorAdapter.setGenerateDdl(false);  // Schema 자동 생성 비활성화
        em.setJpaVendorAdapter(vendorAdapter);

        // DB 추가 설정같은건 아래와 같이 설정
        HashMap<String, Object> properties = new HashMap<>();
        properties.put("hibernate.hbm2ddl.auto", "update");
        properties.put("hibernate.auto_quote_keyword", "true");
        properties.put("hibernate.highlight_sql", "true");
        properties.put("hibernate.show_sql", "true");
        properties.put("hibernate.format_sql", "true");
        properties.put("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
        properties.put("hibernate.physical_naming_strategy", "org.hibernate.boot.model.naming.CamelCaseToUnderscoresNamingStrategy");
        properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");
        em.setJpaPropertyMap(properties);

        return em;
    }

    @Bean
    public PlatformTransactionManager dataTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(dataEntityManager().getObject());
        return transactionManager;
    }
}

 

※ 트러블 슈팅

더보기

카멜 문법 → 스네이크 문법 변환 에러가 났습니다.

단순히 JpaRepository에서 값을 가져와서 가공하고 DB에 저장시 파싱이 자동으로 안되서 추가된 소스

properties.put("hibernate.physical_naming_strategy", "org.hibernate.boot.model.naming.CamelCaseToUnderscoresNamingStrategy");
properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");

 이 방법은 매우 안좋은 방법이라고 하는데 다른 것을 찾을 시간이 부족했습니다.

 

CamelCaseToUnderscoresNamingStrategy는 표준 Hibernate 네이밍 전략이 아니며, 공식적으로 지원이나 유지보수가 불확실해 향후 Hibernate 업데이트 시 호환성 문제가 발생할 수 있다고 합니다.

SpringImplicitNamingStrategy와 CamelCaseToUnderscoresNamingStrategy는 특정한 네이밍 규칙만 제공하기 때문에, 복잡한 비즈니스 요구 사항에 맞춰 네이밍을 커스터마이즈하기 어렵습니다.

 

그래서 시간이 나면 다른 방법을 찾아 수정해보겠습니다.

 

또 다른 트러블 슈팅으로는 JPA에 Insert 안되는 문제가 있었고 아래의 코드를 추가하여 해결했습니다.

em.setDataSource(dataDBSource());
em.setPackagesToScan(new String[]{"org.sparta.batch.domain"});  // Entity
HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
vendorAdapter.setShowSql(true);
vendorAdapter.setGenerateDdl(false);  // Schema 자동 생성 비활성화
em.setJpaVendorAdapter(vendorAdapter);

 

실제로 사용한 전체 코드(스프링 배치)

GitHub 주소 : https://github.com/final17/batch

 

GitHub - final17/batch

Contribute to final17/batch development by creating an account on GitHub.

github.com

 

BatchConfig.java 작성

더보기
@Slf4j
@RequiredArgsConstructor
@Configuration
public class SettlementBatchConfig {

    private final JobRepository jobRepository;
    private final SettlementStep settlementStep;
    private final SettlementSummaryStep settlementSummaryStep;

    @Bean(name = "settlementJob")
    public Job settlementJob() {
        log.info("settlement job");
        settlementSummaryStep.summaryType(SummaryType.DAY);
        return new JobBuilder("settlementJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(settlementStep.settleStep())
                .next(settlementSummaryStep.summaryStep())
                .build();
    }

    @Bean(name = "settlementSummaryWeek")
    public Job summaryWeekJob() {
        log.info("summaryWeek job");
        settlementSummaryStep.summaryType(SummaryType.WEEK);
        return new JobBuilder("summaryWeekJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(settlementSummaryStep.summaryStep())
                .build();
    }

    @Bean(name = "settlementSummaryMonth")
    public Job summaryMonthJob() {
        log.info("summaryMonth job");
        settlementSummaryStep.summaryType(SummaryType.MONTH);
        return new JobBuilder("summaryMonthJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(settlementSummaryStep.summaryStep())
                .build();
    }
}

SettlementStep.java 작성

더보기
@Slf4j
@RequiredArgsConstructor
@Configuration
public class SettlementStep {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager platformTransactionManager;

    private final PaymentService paymentService;
    private final SettlementRepository settlementRepository;
    private final SettlementFeesRepository settlementFeesRepository;

    private final Converter converter;

    @Bean
    public Step settleStep() {
        log.info("settle step");
        return new StepBuilder("settleStep", jobRepository)
                .<PaymentDto, SettlementDto> chunk(10, platformTransactionManager)
                .reader(settleReader())
                .processor(settleProcessor())
                .writer(settleWriter())
                .build();
    }

    @Bean
    public ItemReader<PaymentDto> settleReader() {
        log.info("settleReader");
        LocalDate today = LocalDate.now();
        String todayStr = today.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));

        // 페이지 번호 초기화
        final int[] pageNumber = {1}; // 배열을 사용하여 effectively final로 유지

        return new ItemReader<PaymentDto>() {
            private List<PaymentDto> currentBatch = new ArrayList<>();
            private int currentIndex = 0;

            @Override
            public PaymentDto read() {
                // 현재 배치가 비어있거나 인덱스가 범위를 초과하면 새로운 배치 읽기
                if (currentBatch.isEmpty() || currentIndex >= currentBatch.size()) {
                    currentBatch = paymentService.paymentDtoList(todayStr, pageNumber[0]++ , chunkSize);
                    currentIndex = 0;

                    // 데이터가 없으면 null 반환하여 종료
                    if (currentBatch.isEmpty()) {
                        return null;
                    }
                }
                return currentBatch.get(currentIndex++);
            }
        };
    }

    @Bean
    public ItemProcessor<PaymentDto, SettlementDto> settleProcessor() {
        log.info("settleProcessor");
        return paymentDto -> {
            // SettlementDto 생성
            SettlementDto settlementDto = new SettlementDto();
            // PaymentDto의 필드에서 SettlementDto의 필드로 데이터 매핑
            settlementDto.setMId(paymentDto.getMId());
            settlementDto.setPaymentKey(paymentDto.getPaymentKey());
            settlementDto.setOrderId(paymentDto.getOrderId());
            settlementDto.setCurrency(paymentDto.getCurrency());
            settlementDto.setMethod(paymentDto.getMethod());
            settlementDto.setAmount(paymentDto.getAmount());
            settlementDto.setApprovedAt(paymentDto.getApprovedAt());
            // 정산 매출일 , 정산 지급일 계산
            LocalDate today = LocalDate.now();
            settlementDto.setSoldDate(converter.calculateSoldDate(today));
            settlementDto.setPaidOutDate(converter.calculatePaidOutDate(today));
            settlementDto.setSettlementFeesDtos(converter.getSettlementFeesDtos(paymentDto.getAmount()));
            settlementDto.setStatus(paymentDto.getStatus());
            settlementDto.setUser(paymentDto.getUser());
            settlementDto.setStore(paymentDto.getStore());

            return settlementDto; // 변환된 SettlementDto 반환
        };
    }

    @Bean
    public ItemWriter<SettlementDto> settleWriter() {
        log.info("settleWriter");
        return items -> items.forEach(settlementDto -> {
            LocalDate date = LocalDate.parse(settlementDto.getApprovedAt(), DateTimeFormatter.ofPattern("yyyy-MM-dd"));
            OffsetDateTime offsetDateTime = date.atStartOfDay(ZoneOffset.UTC).toOffsetDateTime();

            Settlement settlement = Settlement.builder()
                    .mId(settlementDto.getMId())
                    .paymentKey(settlementDto.getPaymentKey())
                    .orderId(settlementDto.getOrderId())
                    .currency(settlementDto.getCurrency())
                    .method(settlementDto.getMethod())
                    .amount(settlementDto.getAmount())
                    .approvedAt(offsetDateTime)
                    .soldDate(settlementDto.getSoldDate())
                    .paidOutDate(settlementDto.getPaidOutDate())
                    .status(settlementDto.getStatus())
                    .user(settlementDto.getUser())
                    .store(settlementDto.getStore())
                    .build();

            Settlement saveSettlement = settlementRepository.save(settlement);
            settlementRepository.flush();

            List<SettlementFees> settlementFeesList = new ArrayList<>();
            for (SettlementFeesDto settlementFeesDto : settlementDto.getSettlementFeesDtos()) {
                SettlementFees settlementFees = new SettlementFees(saveSettlement , settlementFeesDto.getType() , settlementFeesDto.getSupplyAmount());
                settlementFeesList.add(settlementFees);
            }
            settlementFeesRepository.saveAll(settlementFeesList);
            settlementFeesRepository.flush();
        });
    }
}

SettlementSummaryStep.java 작성

더보기
@Slf4j
@RequiredArgsConstructor
@Configuration
public class SettlementSummaryStep {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager platformTransactionManager;

    private final SettlementService settlementService;
    private final SettlementSummaryRepository settlementSummaryRepository;
    private SummaryType summaryType;

    public void summaryType(SummaryType summaryType) {
        this.summaryType = summaryType;
    }

    @Bean
    public Step summaryStep() {
        log.info("second step");
        return new StepBuilder("secondStep", jobRepository)
                .tasklet(summaryTasklet() , platformTransactionManager)
                .build();
    }

    @Bean
    public Tasklet summaryTasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                log.info("Executing settlement tasklet...");

                // JobParameters에서 요약 유형 가져오기
                String type = chunkContext.getStepContext().getStepExecution().getJobParameters().getString("type");
                ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();

                // 마지막 처리된 인덱스 가져오기
                int lastProcessedIndex = executionContext.containsKey("lastProcessedIndex") ?
                        executionContext.getInt("lastProcessedIndex") : 0;

                // 정산 데이터 가져오기
                List<SettlementSummaryDto> settlementSummaryDtos = settlementService.getSettlementSummary(SummaryType.of(type));
                if (lastProcessedIndex >= settlementSummaryDtos.size()) {
                    // 더 이상 처리할 데이터가 없는 경우 종료
                    return RepeatStatus.FINISHED;
                }

                // 남은 데이터 처리
                for (int i = lastProcessedIndex; i < settlementSummaryDtos.size(); i++) {
                    SettlementSummaryDto ssd = settlementSummaryDtos.get(i);
                    SettlementSummary settlementSummary = new SettlementSummary(
                            ssd.getSummaryDate(), ssd.getType(), ssd.getTotalAmount(),
                            ssd.getTotalFee(), ssd.getTotalTransactions(), ssd.getUserId(), ssd.getStoreId()
                    );
                    settlementSummaryRepository.save(settlementSummary);

                    // 마지막 처리된 인덱스 업데이트 및 저장
                    executionContext.putInt("lastProcessedIndex", i + 1);
                }

                return RepeatStatus.FINISHED; // 작업 완료 상태 반환
            }
        };
    }
}

SettlementScheduler.java 작성

더보기
@Slf4j
@Component
@EnableScheduling
public class SettlementScheduler {
    private final JobLauncher jobLauncher;
    private final Job job1;
    private final Job job2;
    private final Job job3;

    public SettlementScheduler(JobLauncher jobLauncher ,
                               @Qualifier("settlementJob") Job job1,
                               @Qualifier("settlementSummaryWeek") Job job2,
                               @Qualifier("settlementSummaryMonth") Job job3
    ) {
        this.jobLauncher = jobLauncher;
        this.job1 = job1;
        this.job2 = job2;
        this.job3 = job3;
    }

    @Scheduled(cron = "0 0 23 * * *")
    public void runBatchJob1() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("timeStamp", System.currentTimeMillis())
                .addString("type" , SummaryType.DAY.name())
                .toJobParameters();

        jobLauncher.run(job1, jobParameters);
    }

    @Scheduled(cron = "0 0 23 * * SUN")
    public void runBatchJob2() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("timeStamp", System.currentTimeMillis())
                .addString("type" , SummaryType.WEEK.name())
                .toJobParameters();

        jobLauncher.run(job2, jobParameters);
    }

    @Scheduled(cron = "0 0 23 L-1 * *")
    public void runBatchJob3() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("timeStamp", System.currentTimeMillis())
                .addString("type" , SummaryType.MONTH.name())
                .toJobParameters();

        jobLauncher.run(job3, jobParameters);
    }
}

스프링 배치 동작해서 DB에 저장된 화면