Jakarta Batch en Quarkus: Procesamiento por Lotes
Introducción
Jakarta Batch es una especificación para procesar grandes volúmenes de datos en lotes. En Quarkus, Jakarta Batch está disponible a través de quarkus-jberet, que implementa la especificación usando JBeret.
¿Qué es Jakarta Batch?
Jakarta Batch permite procesar grandes cantidades de datos de forma eficiente dividiendo el trabajo en chunks (fragmentos) que se procesan de forma independiente. Es ideal para:
Importación masiva de datos
Procesamiento de archivos grandes
Generación de reportes
Transformación de datos
Conceptos Clave
Job
Un Job es una unidad de trabajo que se ejecuta de principio a fin. Se define en un archivo XML o usando la API programática.
Step
Un Step es una fase del Job. Un Job puede tener múltiples Steps que se ejecutan secuencialmente.
Chunk
Un Chunk procesa un número específico de items antes de hacer commit. Esto mejora el rendimiento y permite recuperación ante fallos.
Componentes de Batch
ItemReader
Lee datos del origen (archivo, base de datos, etc.):
@Named("HeroItemReader")
@Dependent
public class HeroItemReader implements ItemReader {
@PersistenceContext
EntityManager entityManager;
private List<Hero> heroes;
private int index = 0;
@Override
public void open(Serializable checkpoint) throws Exception {
heroes = entityManager.createQuery(
"SELECT h FROM Hero h ORDER BY h.id",
Hero.class
).getResultList();
if (checkpoint != null) {
index = (Integer) checkpoint;
}
}
@Override
public Object readItem() throws Exception {
if (index >= heroes.size()) {
return null; // Fin de los datos
}
Hero hero = heroes.get(index);
index++;
return hero;
}
@Override
public Serializable checkpointInfo() throws Exception {
return index;
}
}
ItemProcessor
Procesa cada item leído:
@Named("HeroItemProcessor")
@Dependent
public class HeroItemProcessor implements ItemProcessor {
@Override
public Object processItem(Object item) throws Exception {
Hero hero = (Hero) item;
// Transformar o validar el item
if (hero.getPowerLevel() < 50) {
return null; // Filtrar items
}
// Calcular estadísticas
hero.setDescription("Processed: " + hero.getName());
return hero;
}
}
ItemWriter
Escribe los items procesados:
@Named("HeroItemWriter")
@Dependent
public class HeroItemWriter implements ItemWriter {
@PersistenceContext
EntityManager entityManager;
@Override
public void writeItems(List<Object> items) throws Exception {
for (Object item : items) {
Hero hero = (Hero) item;
entityManager.persist(hero);
}
}
}
Batchlet
Para tareas simples que no necesitan chunking:
@Named("SimpleBatchlet")
@Dependent
public class SimpleBatchlet implements Batchlet {
@Override
public String process() throws Exception {
// Tarea simple que se ejecuta una vez
logger.info("Processing batch job");
return "COMPLETED";
}
@Override
public void stop() throws Exception {
// Detener el batchlet si es necesario
}
}
Definir un Job
Usando XML (JSL - Job Specification Language)
<job id="import-heroes" xmlns="https://jakarta.ee/xml/ns/jakartaee">
<step id="read-process-write">
<chunk item-count="10">
<reader ref="HeroItemReader"/>
<processor ref="HeroItemProcessor"/>
<writer ref="HeroItemWriter"/>
</chunk>
</step>
</job>
Usando API Programática
@ApplicationScoped
public class ImportHeroesJob {
@Inject
JobOperator jobOperator;
public long startJob() {
Properties jobParameters = new Properties();
jobParameters.setProperty("inputFile", "heroes.csv");
return jobOperator.start("import-heroes", jobParameters);
}
}
Ejecutar y Monitorear Jobs
Iniciar un Job
@Path("/api/batch")
public class BatchResource {
@Inject
JobOperator jobOperator;
@POST
@Path("/jobs/{jobName}/start")
public Response startJob(@PathParam("jobName") String jobName) {
long executionId = jobOperator.start(jobName, new Properties());
return Response.ok(Map.of("executionId", executionId)).build();
}
}
Obtener Estado del Job
@GET
@Path("/jobs/{executionId}")
public Response getJobStatus(@PathParam("executionId") long executionId) {
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
BatchStatus status = jobExecution.getBatchStatus();
return Response.ok(Map.of(
"executionId", executionId,
"status", status.toString(),
"startTime", jobExecution.getStartTime(),
"endTime", jobExecution.getEndTime()
)).build();
}
Detener un Job
@POST
@Path("/jobs/{executionId}/stop")
public Response stopJob(@PathParam("executionId") long executionId) {
jobOperator.stop(executionId);
return Response.ok(Map.of("message", "Job stopped")).build();
}
Checkpoint y Recovery
Los checkpoints permiten reanudar un job después de un fallo:
@Override
public void open(Serializable checkpoint) throws Exception {
if (checkpoint != null) {
// Reanudar desde el checkpoint
index = (Integer) checkpoint;
} else {
// Empezar desde el principio
index = 0;
}
}
@Override
public Serializable checkpointInfo() throws Exception {
// Guardar el estado actual
return index;
}
Ejemplo Completo: Importar Héroes
// 1. Reader: Leer desde CSV o base de datos
@Named("HeroItemReader")
public class HeroItemReader implements ItemReader {
// Lee héroes uno por uno
}
// 2. Processor: Validar y transformar
@Named("HeroItemProcessor")
public class HeroItemProcessor implements ItemProcessor {
// Valida y transforma cada héroe
}
// 3. Writer: Escribir a la base de datos
@Named("HeroItemWriter")
public class HeroItemWriter implements ItemWriter {
// Escribe héroes procesados en chunks
}
// 4. Job Definition (XML)
<job id="import-heroes">
<step id="process-heroes">
<chunk item-count="10">
<reader ref="HeroItemReader"/>
<processor ref="HeroItemProcessor"/>
<writer ref="HeroItemWriter"/>
</chunk>
</step>
</job>
Ventajas de Batch Processing
Eficiencia: Procesa grandes volúmenes de datos
Recuperación: Puede reanudar después de fallos
Escalabilidad: Puede procesar en paralelo
Monitoreo: Estado y progreso del job
Configuración en Quarkus
Dependencias
<dependency>
<groupId>io.quarkiverse.jberet</groupId>
<artifactId>quarkus-jberet</artifactId>
<version>2.6.0</version>
</dependency>
application.properties
# Configuración de Batch
quarkus.batch.enabled=true
Ejemplo Completo
Nuestro demo muestra:
ItemReader para leer héroes de la base de datos
ItemProcessor para procesar y validar
ItemWriter para escribir resultados
Batchlet para tareas simples
Job definitions en XML
Control de jobs vía REST API
Casos de Uso Comunes
Importación Masiva: Importar datos desde archivos CSV/XML
Generación de Reportes: Procesar datos y generar reportes
Transformación de Datos: Convertir entre formatos
Limpieza de Datos: Validar y limpiar datos
Sincronización: Sincronizar datos entre sistemas
Conclusión
Jakarta Batch en Quarkus proporciona una forma poderosa de procesar grandes volúmenes de datos. La arquitectura basada en chunks permite procesamiento eficiente y recuperación ante fallos, mientras que la API estándar hace que sea fácil de usar.




