Skip to main content

Command Palette

Search for a command to run...

Jakarta Batch en Quarkus: Procesamiento por Lotes

Updated
5 min read

Introducción

Jakarta Batch es una especificación para procesar grandes volúmenes de datos en lotes. En Quarkus, Jakarta Batch está disponible a través de quarkus-jberet, que implementa la especificación usando JBeret.

¿Qué es Jakarta Batch?

Jakarta Batch permite procesar grandes cantidades de datos de forma eficiente dividiendo el trabajo en chunks (fragmentos) que se procesan de forma independiente. Es ideal para:

  • Importación masiva de datos

  • Procesamiento de archivos grandes

  • Generación de reportes

  • Transformación de datos

Conceptos Clave

Job

Un Job es una unidad de trabajo que se ejecuta de principio a fin. Se define en un archivo XML o usando la API programática.

Step

Un Step es una fase del Job. Un Job puede tener múltiples Steps que se ejecutan secuencialmente.

Chunk

Un Chunk procesa un número específico de items antes de hacer commit. Esto mejora el rendimiento y permite recuperación ante fallos.

Componentes de Batch

ItemReader

Lee datos del origen (archivo, base de datos, etc.):

@Named("HeroItemReader")
@Dependent
public class HeroItemReader implements ItemReader {

    @PersistenceContext
    EntityManager entityManager;

    private List<Hero> heroes;
    private int index = 0;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        heroes = entityManager.createQuery(
            "SELECT h FROM Hero h ORDER BY h.id", 
            Hero.class
        ).getResultList();

        if (checkpoint != null) {
            index = (Integer) checkpoint;
        }
    }

    @Override
    public Object readItem() throws Exception {
        if (index >= heroes.size()) {
            return null; // Fin de los datos
        }

        Hero hero = heroes.get(index);
        index++;
        return hero;
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return index;
    }
}

ItemProcessor

Procesa cada item leído:

@Named("HeroItemProcessor")
@Dependent
public class HeroItemProcessor implements ItemProcessor {

    @Override
    public Object processItem(Object item) throws Exception {
        Hero hero = (Hero) item;

        // Transformar o validar el item
        if (hero.getPowerLevel() < 50) {
            return null; // Filtrar items
        }

        // Calcular estadísticas
        hero.setDescription("Processed: " + hero.getName());

        return hero;
    }
}

ItemWriter

Escribe los items procesados:

@Named("HeroItemWriter")
@Dependent
public class HeroItemWriter implements ItemWriter {

    @PersistenceContext
    EntityManager entityManager;

    @Override
    public void writeItems(List<Object> items) throws Exception {
        for (Object item : items) {
            Hero hero = (Hero) item;
            entityManager.persist(hero);
        }
    }
}

Batchlet

Para tareas simples que no necesitan chunking:

@Named("SimpleBatchlet")
@Dependent
public class SimpleBatchlet implements Batchlet {

    @Override
    public String process() throws Exception {
        // Tarea simple que se ejecuta una vez
        logger.info("Processing batch job");
        return "COMPLETED";
    }

    @Override
    public void stop() throws Exception {
        // Detener el batchlet si es necesario
    }
}

Definir un Job

Usando XML (JSL - Job Specification Language)

<job id="import-heroes" xmlns="https://jakarta.ee/xml/ns/jakartaee">
    <step id="read-process-write">
        <chunk item-count="10">
            <reader ref="HeroItemReader"/>
            <processor ref="HeroItemProcessor"/>
            <writer ref="HeroItemWriter"/>
        </chunk>
    </step>
</job>

Usando API Programática

@ApplicationScoped
public class ImportHeroesJob {

    @Inject
    JobOperator jobOperator;

    public long startJob() {
        Properties jobParameters = new Properties();
        jobParameters.setProperty("inputFile", "heroes.csv");

        return jobOperator.start("import-heroes", jobParameters);
    }
}

Ejecutar y Monitorear Jobs

Iniciar un Job

@Path("/api/batch")
public class BatchResource {

    @Inject
    JobOperator jobOperator;

    @POST
    @Path("/jobs/{jobName}/start")
    public Response startJob(@PathParam("jobName") String jobName) {
        long executionId = jobOperator.start(jobName, new Properties());
        return Response.ok(Map.of("executionId", executionId)).build();
    }
}

Obtener Estado del Job

@GET
@Path("/jobs/{executionId}")
public Response getJobStatus(@PathParam("executionId") long executionId) {
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    BatchStatus status = jobExecution.getBatchStatus();

    return Response.ok(Map.of(
        "executionId", executionId,
        "status", status.toString(),
        "startTime", jobExecution.getStartTime(),
        "endTime", jobExecution.getEndTime()
    )).build();
}

Detener un Job

@POST
@Path("/jobs/{executionId}/stop")
public Response stopJob(@PathParam("executionId") long executionId) {
    jobOperator.stop(executionId);
    return Response.ok(Map.of("message", "Job stopped")).build();
}

Checkpoint y Recovery

Los checkpoints permiten reanudar un job después de un fallo:

@Override
public void open(Serializable checkpoint) throws Exception {
    if (checkpoint != null) {
        // Reanudar desde el checkpoint
        index = (Integer) checkpoint;
    } else {
        // Empezar desde el principio
        index = 0;
    }
}

@Override
public Serializable checkpointInfo() throws Exception {
    // Guardar el estado actual
    return index;
}

Ejemplo Completo: Importar Héroes

// 1. Reader: Leer desde CSV o base de datos
@Named("HeroItemReader")
public class HeroItemReader implements ItemReader {
    // Lee héroes uno por uno
}

// 2. Processor: Validar y transformar
@Named("HeroItemProcessor")
public class HeroItemProcessor implements ItemProcessor {
    // Valida y transforma cada héroe
}

// 3. Writer: Escribir a la base de datos
@Named("HeroItemWriter")
public class HeroItemWriter implements ItemWriter {
    // Escribe héroes procesados en chunks
}

// 4. Job Definition (XML)
<job id="import-heroes">
    <step id="process-heroes">
        <chunk item-count="10">
            <reader ref="HeroItemReader"/>
            <processor ref="HeroItemProcessor"/>
            <writer ref="HeroItemWriter"/>
        </chunk>
    </step>
</job>

Ventajas de Batch Processing

  1. Eficiencia: Procesa grandes volúmenes de datos

  2. Recuperación: Puede reanudar después de fallos

  3. Escalabilidad: Puede procesar en paralelo

  4. Monitoreo: Estado y progreso del job

Configuración en Quarkus

Dependencias

<dependency>
    <groupId>io.quarkiverse.jberet</groupId>
    <artifactId>quarkus-jberet</artifactId>
    <version>2.6.0</version>
</dependency>

application.properties

# Configuración de Batch
quarkus.batch.enabled=true

Ejemplo Completo

Nuestro demo muestra:

  • ItemReader para leer héroes de la base de datos

  • ItemProcessor para procesar y validar

  • ItemWriter para escribir resultados

  • Batchlet para tareas simples

  • Job definitions en XML

  • Control de jobs vía REST API

Casos de Uso Comunes

  1. Importación Masiva: Importar datos desde archivos CSV/XML

  2. Generación de Reportes: Procesar datos y generar reportes

  3. Transformación de Datos: Convertir entre formatos

  4. Limpieza de Datos: Validar y limpiar datos

  5. Sincronización: Sincronizar datos entre sistemas

Conclusión

Jakarta Batch en Quarkus proporciona una forma poderosa de procesar grandes volúmenes de datos. La arquitectura basada en chunks permite procesamiento eficiente y recuperación ante fallos, mientras que la API estándar hace que sea fácil de usar.

Recursos

More from this blog

JoeDayz

53 posts

Community Guy | Java Champion | AWS Architect | Software Architect