Fan-Out / Fan-In en Go: Paralelización y Agregación de Resultados
En los posts anteriores exploramos las goroutines, channels, select, pipelines y worker pools. Hoy vamos a profundizar en uno de los patrones más poderosos para paralelizar procesamiento: Fan-Out / Fan-In. Si vienes de Java, este patrón es similar a dividir trabajo entre múltiples threads y luego combinar resultados, pero mucho más elegante en Go.
¿Qué es Fan-Out / Fan-In?
Fan-Out y Fan-In son dos patrones complementarios que trabajan juntos:
Fan-Out: Distribuir trabajo desde un channel a múltiples workers (1 → N)
Fan-In: Combinar resultados de múltiples channels en uno (N → 1)
Visualización del Patrón
Fan-Out:
Input → [Worker 1]
→ [Worker 2]
→ [Worker 3]
→ [Worker N]
Fan-In:
[Worker 1] ┐
[Worker 2] ├→ Output
[Worker 3] │
[Worker N] ┘
Comparación: Java vs Go
| Aspecto | Java | Go |
| Distribución | ExecutorService.submit() múltiples veces | Fan-out con channels |
| Combinación | Future.get() en loop | Fan-in con select/WaitGroup |
| Sincronización | Compleja (CompletableFuture, etc.) | Simple (channels) |
| Cancelación | Compleja | Simple con context |
| Expresividad | Verbosa | Simple y clara |
¿Por Qué Usar Fan-Out / Fan-In?
Problema sin paralelización:
// ❌ Procesamiento secuencial
for item := range input {
result := processExpensive(item) // Tarda mucho
output <- result
}
Problemas:
❌ Subutiliza recursos (solo usa 1 CPU core)
❌ Lento para trabajos costosos
❌ No escala con más CPUs
Solución con Fan-Out / Fan-In:
// ✅ Procesamiento paralelo
// Fan-out: Distribuir a múltiples workers
// Fan-in: Combinar resultados
Ventajas:
✅ Utiliza múltiples CPUs
✅ Mejora el throughput significativamente
✅ Escala con más workers
✅ Código claro y mantenible
Fan-Out: Distribuir Trabajo
Fan-Out distribuye trabajo desde un channel a múltiples workers. Cada worker procesa elementos del mismo channel.
Implementación Básica de Fan-Out
func fanOut(input <-chan int, numWorkers int) []<-chan int {
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan int)
outputs[i] = output
go func(workerID int) {
defer close(output)
for item := range input {
// Procesar trabajo
result := processItem(item)
fmt.Printf("Worker %d processed %d -> %d\n", workerID, item, result)
output <- result
}
}(i)
}
return outputs
}
Características:
✅ Cada worker lee del mismo channel
input✅ Múltiples workers procesan en paralelo
✅ Cada worker tiene su propio channel de salida
✅ Los workers terminan cuando
inputse cierra
Ejemplo Completo de Fan-Out
func fanOutExample() {
// Input channel
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 10; i++ {
input <- i
}
}()
// Fan-out: Crear múltiples workers
numWorkers := 3
workerOutputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan int)
workerOutputs[i] = output
go func(workerID int) {
defer close(output)
for n := range input {
// Simular trabajo costoso
time.Sleep(100 * time.Millisecond)
result := n * n
fmt.Printf("Worker %d processed %d -> %d\n", workerID, n, result)
output <- result
}
}(i)
}
// Los outputs están listos para fan-in
// ...
}
Comportamiento:
Los workers compiten por elementos del channel
inputEl primer worker disponible toma el siguiente elemento
Esto distribuye el trabajo de forma balanceada
Fan-In: Combinar Resultados
Fan-In combina resultados de múltiples channels en uno solo. Hay dos formas principales de implementarlo:
Método 1: Fan-In con WaitGroup
Ideal cuando tienes un número dinámico de channels:
func fanInWaitGroup(inputs []<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
// Para cada input channel, lanzar una goroutine que lee y envía a output
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for item := range ch {
output <- item
}
}(input)
}
// Cerrar output cuando todos los inputs terminen
go func() {
wg.Wait()
close(output)
}()
return output
}
Ventajas:
✅ Funciona con cualquier número de channels
✅ Simple y claro
✅ Maneja cierre automáticamente
Uso completo:
func fanOutFanInWithWaitGroup() {
// Input
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 10; i++ {
input <- i
}
}()
// Fan-out: Múltiples workers procesan
numWorkers := 3
workerOutputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan int)
workerOutputs[i] = output
go func(workerID int) {
defer close(output)
for n := range input {
time.Sleep(100 * time.Millisecond)
result := n * n
fmt.Printf("Worker %d processed %d -> %d\n", workerID, n, result)
output <- result
}
}(i)
}
// Fan-in: Combinar resultados usando WaitGroup
results := fanInWaitGroup(workerOutputs)
// Recibir resultados combinados
for result := range results {
fmt.Printf("Final result: %d\n", result)
}
}
Método 2: Fan-In con Select
Ideal cuando tienes un número fijo y pequeño de channels:
func fanInSelect(ch1, ch2 <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
ch1Active := ch1
ch2Active := ch2
for ch1Active != nil || ch2Active != nil {
select {
case val, ok := <-ch1Active:
if !ok {
ch1Active = nil // Marcar como cerrado
} else {
output <- val
}
case val, ok := <-ch2Active:
if !ok {
ch2Active = nil // Marcar como cerrado
} else {
output <- val
}
}
}
}()
return output
}
Ventajas:
✅ Más control sobre qué channel leer primero
✅ Puede priorizar channels específicos
✅ Eficiente para número pequeño de channels
Uso:
func fanOutFanInWithSelect() {
// Channel de entrada
input := make(chan int)
// Fan-out: múltiples workers procesan
worker1 := make(chan int)
worker2 := make(chan int)
go func() {
defer close(worker1)
for val := range input {
worker1 <- val * 2
}
}()
go func() {
defer close(worker2)
for val := range input {
worker2 <- val * 3
}
}()
// Fan-in: combinar resultados usando select
output := fanInSelect(worker1, worker2)
// Enviar datos
go func() {
defer close(input)
for i := 1; i <= 5; i++ {
input <- i
}
}()
// Recibir resultados combinados
for result := range output {
fmt.Printf("Result: %d\n", result)
}
}
Comparación: WaitGroup vs Select
| Aspecto | WaitGroup | Select |
| Número de channels | Dinámico (cualquier número) | Fijo (pequeño número) |
| Priorización | No | Sí (orden de cases) |
| Complejidad | Baja | Media |
| Uso de memoria | Más goroutines | Menos goroutines |
| Cuándo usar | Número variable de workers | 2-5 channels fijos |
Ejemplo Práctico: Procesar Archivos en Paralelo
Un caso de uso real: procesar múltiples archivos en paralelo y combinar resultados:
type FileResult struct {
Filename string
Lines int
Error error
}
func processFilesInParallel(filenames []string, numWorkers int) <-chan FileResult {
// Fan-out: Distribuir archivos a workers
files := make(chan string)
go func() {
defer close(files)
for _, filename := range filenames {
files <- filename
}
}()
// Crear workers
workerOutputs := make([]<-chan FileResult, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan FileResult)
workerOutputs[i] = output
go func(workerID int) {
defer close(output)
for filename := range files {
lines, err := countLines(filename)
output <- FileResult{
Filename: filename,
Lines: lines,
Error: err,
}
}
}(i)
}
// Fan-in: Combinar resultados
results := make(chan FileResult)
var wg sync.WaitGroup
for _, workerOutput := range workerOutputs {
wg.Add(1)
go func(ch <-chan FileResult) {
defer wg.Done()
for result := range ch {
results <- result
}
}(workerOutput)
}
go func() {
wg.Wait()
close(results)
}()
return results
}
Fan-Out / Fan-In con Context (Cancelación)
Agregar cancelación hace el patrón más robusto:
func fanOutFanInWithContext(ctx context.Context, input <-chan int, numWorkers int) <-chan int {
// Fan-out con cancelación
workerOutputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan int)
workerOutputs[i] = output
go func(workerID int) {
defer close(output)
for {
select {
case <-ctx.Done():
return
case item, ok := <-input:
if !ok {
return
}
// Procesar con cancelación
result := processItem(ctx, item)
select {
case output <- result:
case <-ctx.Done():
return
}
}
}
}(i)
}
// Fan-in con cancelación
results := make(chan int)
var wg sync.WaitGroup
for _, workerOutput := range workerOutputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case result, ok := <-ch:
if !ok {
return
}
select {
case results <- result:
case <-ctx.Done():
return
}
}
}
}(workerOutput)
}
go func() {
wg.Wait()
close(results)
}()
return results
}
Fan-Out con Rate Limiting
Limitar el rate de distribución puede ser útil:
func fanOutWithRateLimit(input <-chan int, numWorkers int, rate time.Duration) []<-chan int {
outputs := make([]<-chan int, numWorkers)
// Rate limiter
limiter := make(chan struct{}, numWorkers)
go func() {
ticker := time.NewTicker(rate)
defer ticker.Stop()
for range ticker.C {
select {
case limiter <- struct{}{}:
default:
}
}
}()
for i := 0; i < numWorkers; i++ {
output := make(chan int)
outputs[i] = output
go func(workerID int) {
defer close(output)
for item := range input {
<-limiter // Esperar token
result := processItem(item)
output <- result
}
}(i)
}
return outputs
}
Fan-In con Prioridades
Cuando necesitas priorizar ciertos channels:
func fanInWithPriority(highPriority, lowPriority <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
highActive := highPriority
lowActive := lowPriority
for highActive != nil || lowActive != nil {
select {
// Priorizar highPriority
case val, ok := <-highActive:
if !ok {
highActive = nil
} else {
output <- val
}
case val, ok := <-lowActive:
if !ok {
lowActive = nil
} else {
output <- val
}
}
}
}()
return output
}
Ejemplo Avanzado: Pipeline con Fan-Out/Fan-In
Combinando pipelines con fan-out/fan-in:
func advancedPipeline() {
// Stage 1: Generar datos
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 100; i++ {
input <- i
}
}()
// Stage 2: Fan-out a múltiples transformadores
numTransformers := 5
transformedOutputs := make([]<-chan int, numTransformers)
for i := 0; i < numTransformers; i++ {
output := make(chan int)
transformedOutputs[i] = output
go func() {
defer close(output)
for n := range input {
output <- transform(n)
}
}()
}
// Stage 3: Fan-in de transformadores
transformed := fanInWaitGroup(transformedOutputs)
// Stage 4: Filtrar
filtered := make(chan int)
go func() {
defer close(filtered)
for n := range transformed {
if n > 50 {
filtered <- n
}
}
}()
// Stage 5: Consumir
for result := range filtered {
fmt.Printf("Result: %d\n", result)
}
}
Mejores Prácticas
1. Elegir el Número Correcto de Workers
// ✅ BIEN: Basado en CPU cores
numWorkers := runtime.NumCPU()
// ✅ BIEN: Para I/O-bound (más workers)
numWorkers := runtime.NumCPU() * 2
// ✅ BIEN: Configurable
numWorkers := getConfig().WorkerCount
// ❌ MAL: Número arbitrario
numWorkers := 1000 // ¿Por qué 1000?
2. Usar WaitGroup para Fan-In Dinámico
// ✅ BIEN: WaitGroup para número variable de channels
func fanInWaitGroup(inputs []<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for item := range ch {
output <- item
}
}(input)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
3. Usar Select para Fan-In de Pocos Channels
// ✅ BIEN: Select para 2-5 channels fijos
func fanInSelect(ch1, ch2 <-chan int) <-chan int {
// ...
}
// ❌ MAL: Select con muchos cases (difícil de mantener)
func fanInSelectMany(ch1, ch2, ch3, ch4, ch5, ch6, ch7, ch8 <-chan int) <-chan int {
// Demasiado complejo
}
4. Siempre Cerrar Channels Correctamente
// ✅ BIEN: Cerrar en defer
go func() {
defer close(output)
for item := range input {
output <- process(item)
}
}()
// ❌ MAL: Olvidar cerrar causa deadlock
go func() {
for item := range input {
output <- process(item)
}
// Olvidamos close(output)
}()
5. Manejar Errores en Fan-Out
// ✅ BIEN: Incluir errores en resultados
type Result struct {
Value int
Error error
}
func fanOutWithErrors(input <-chan int) []<-chan Result {
outputs := make([]<-chan Result, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan Result)
outputs[i] = output
go func() {
defer close(output)
for item := range input {
result, err := processWithError(item)
output <- Result{Value: result, Error: err}
}
}()
}
return outputs
}
6. Usar Buffers Apropiados
// ✅ BIEN: Buffer basado en throughput
output := make(chan int, numWorkers*2)
// ✅ BIEN: Sin buffer para sincronización
output := make(chan int)
// ❌ MAL: Buffer muy grande
output := make(chan int, 1000000) // Desperdicia memoria
Errores Comunes
❌ Error 1: Leak de Goroutines en Fan-In
// ❌ MAL: Goroutines nunca terminan si channels no se cierran
func badFanIn(inputs []<-chan int) <-chan int {
output := make(chan int)
for _, input := range inputs {
go func(ch <-chan int) {
for item := range ch {
output <- item
}
// Goroutine termina pero output nunca se cierra
}(input)
}
return output // output nunca se cierra
}
// ✅ BIEN: Cerrar output cuando todos terminen
func goodFanIn(inputs []<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for item := range ch {
output <- item
}
}(input)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
❌ Error 2: No Verificar Cierre en Select
// ❌ MAL: Puede recibir zero values
select {
case val := <-ch1:
process(val)
case val := <-ch2:
process(val)
}
// ✅ BIEN: Verificar ok y marcar como nil
ch1Active := ch1
ch2Active := ch2
for ch1Active != nil || ch2Active != nil {
select {
case val, ok := <-ch1Active:
if !ok {
ch1Active = nil
} else {
process(val)
}
case val, ok := <-ch2Active:
if !ok {
ch2Active = nil
} else {
process(val)
}
}
}
❌ Error 3: Race Condition en Fan-Out
// ❌ MAL: Todos los workers ven el mismo valor de i
for i := 0; i < numWorkers; i++ {
go func() {
fmt.Printf("Worker %d\n", i) // i puede ser cualquier valor
}()
}
// ✅ BIEN: Pasar i como parámetro
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
fmt.Printf("Worker %d\n", workerID)
}(i)
}
❌ Error 4: Demasiados o Muy Pocos Workers
// ❌ MAL: Demasiados workers (overhead)
numWorkers := 10000 // Para 100 items
// ❌ MAL: Muy pocos workers (subutilización)
numWorkers := 1 // Para 1000 items
// ✅ BIEN: Número apropiado
numWorkers := runtime.NumCPU() // Para CPU-bound
❌ Error 5: No Manejar Backpressure
// ❌ MAL: Puede bloquear si output está lleno
for item := range input {
output <- process(item) // Bloquea si output está lleno
}
// ✅ BIEN: Manejar backpressure con select
for item := range input {
result := process(item)
select {
case output <- result:
case <-ctx.Done():
return
default:
// Output lleno, manejar (loggear, dropear, etc.)
log.Printf("Output full, dropping result")
}
}
Comparación: Java vs Go
Java: ExecutorService y CompletableFuture
// Java
ExecutorService executor = Executors.newFixedThreadPool(10);
List<CompletableFuture<String>> futures = new ArrayList<>();
for (String file : files) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return processFile(file);
}, executor);
futures.add(future);
}
// Combinar resultados
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
List<String> results = allFutures.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}).join();
Problemas:
Sintaxis verbosa
Manejo de errores complejo
No hay cancelación fácil
Overhead de threads del OS
Go: Fan-Out / Fan-In
// Go
input := make(chan string)
go func() {
defer close(input)
for _, file := range files {
input <- file
}
}()
// Fan-out
numWorkers := 10
workerOutputs := make([]<-chan string, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan string)
workerOutputs[i] = output
go func() {
defer close(output)
for file := range input {
output <- processFile(file)
}
}()
}
// Fan-in
results := fanInWaitGroup(workerOutputs)
for result := range results {
fmt.Println(result)
}
Ventajas:
Sintaxis simple y clara
Manejo de errores natural
Cancelación con context
Overhead mínimo (goroutines)
Conclusiones
Fan-Out / Fan-In es un patrón esencial en Go:
✅ Paralelización: Distribuye trabajo eficientemente a múltiples workers
✅ Agregación: Combina resultados de forma elegante
✅ Escalabilidad: Mejora el throughput significativamente
✅ Flexibilidad: Dos métodos (WaitGroup y Select) para diferentes necesidades
✅ Cancelación: Fácil agregar cancelación con context
✅ Simplicidad: Código claro y mantenible
Si vienes de Java, verás que Fan-Out / Fan-In en Go es mucho más simple y expresivo que usar ExecutorService y CompletableFuture. La combinación de goroutines y channels hace que paralelizar trabajo sea natural y seguro.
Próximos Pasos
En el siguiente post exploraremos otros patrones avanzados de concurrencia en Go, incluyendo circuit breakers, retry/backoff, y otros patrones que complementan los que hemos aprendido.
¿Has usado Fan-Out / Fan-In en tus proyectos de Go? ¿Prefieres WaitGroup o Select? Comparte tus experiencias y casos de uso en los comentarios. Y si quieres ver el código completo de estos ejemplos, puedes encontrarlo en mi repositorio go-mastery-lab.




