Skip to main content

Command Palette

Search for a command to run...

Fan-Out / Fan-In en Go: Paralelización y Agregación de Resultados

Updated
13 min read

En los posts anteriores exploramos las goroutines, channels, select, pipelines y worker pools. Hoy vamos a profundizar en uno de los patrones más poderosos para paralelizar procesamiento: Fan-Out / Fan-In. Si vienes de Java, este patrón es similar a dividir trabajo entre múltiples threads y luego combinar resultados, pero mucho más elegante en Go.

¿Qué es Fan-Out / Fan-In?

Fan-Out y Fan-In son dos patrones complementarios que trabajan juntos:

  • Fan-Out: Distribuir trabajo desde un channel a múltiples workers (1 → N)

  • Fan-In: Combinar resultados de múltiples channels en uno (N → 1)

Visualización del Patrón

Fan-Out:
Input → [Worker 1]
      → [Worker 2]
      → [Worker 3]
      → [Worker N]

Fan-In:
[Worker 1] ┐
[Worker 2] ├→ Output
[Worker 3] │
[Worker N] ┘

Comparación: Java vs Go

AspectoJavaGo
DistribuciónExecutorService.submit() múltiples vecesFan-out con channels
CombinaciónFuture.get() en loopFan-in con select/WaitGroup
SincronizaciónCompleja (CompletableFuture, etc.)Simple (channels)
CancelaciónComplejaSimple con context
ExpresividadVerbosaSimple y clara

¿Por Qué Usar Fan-Out / Fan-In?

Problema sin paralelización:

// ❌ Procesamiento secuencial
for item := range input {
    result := processExpensive(item) // Tarda mucho
    output <- result
}

Problemas:

  • ❌ Subutiliza recursos (solo usa 1 CPU core)

  • ❌ Lento para trabajos costosos

  • ❌ No escala con más CPUs

Solución con Fan-Out / Fan-In:

// ✅ Procesamiento paralelo
// Fan-out: Distribuir a múltiples workers
// Fan-in: Combinar resultados

Ventajas:

  • ✅ Utiliza múltiples CPUs

  • ✅ Mejora el throughput significativamente

  • ✅ Escala con más workers

  • ✅ Código claro y mantenible

Fan-Out: Distribuir Trabajo

Fan-Out distribuye trabajo desde un channel a múltiples workers. Cada worker procesa elementos del mismo channel.

Implementación Básica de Fan-Out

func fanOut(input <-chan int, numWorkers int) []<-chan int {
    outputs := make([]<-chan int, numWorkers)

    for i := 0; i < numWorkers; i++ {
        output := make(chan int)
        outputs[i] = output

        go func(workerID int) {
            defer close(output)
            for item := range input {
                // Procesar trabajo
                result := processItem(item)
                fmt.Printf("Worker %d processed %d -> %d\n", workerID, item, result)
                output <- result
            }
        }(i)
    }

    return outputs
}

Características:

  • ✅ Cada worker lee del mismo channel input

  • ✅ Múltiples workers procesan en paralelo

  • ✅ Cada worker tiene su propio channel de salida

  • ✅ Los workers terminan cuando input se cierra

Ejemplo Completo de Fan-Out

func fanOutExample() {
    // Input channel
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 10; i++ {
            input <- i
        }
    }()

    // Fan-out: Crear múltiples workers
    numWorkers := 3
    workerOutputs := make([]<-chan int, numWorkers)

    for i := 0; i < numWorkers; i++ {
        output := make(chan int)
        workerOutputs[i] = output
        go func(workerID int) {
            defer close(output)
            for n := range input {
                // Simular trabajo costoso
                time.Sleep(100 * time.Millisecond)
                result := n * n
                fmt.Printf("Worker %d processed %d -> %d\n", workerID, n, result)
                output <- result
            }
        }(i)
    }

    // Los outputs están listos para fan-in
    // ...
}

Comportamiento:

  • Los workers compiten por elementos del channel input

  • El primer worker disponible toma el siguiente elemento

  • Esto distribuye el trabajo de forma balanceada

Fan-In: Combinar Resultados

Fan-In combina resultados de múltiples channels en uno solo. Hay dos formas principales de implementarlo:

Método 1: Fan-In con WaitGroup

Ideal cuando tienes un número dinámico de channels:

func fanInWaitGroup(inputs []<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup

    // Para cada input channel, lanzar una goroutine que lee y envía a output
    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for item := range ch {
                output <- item
            }
        }(input)
    }

    // Cerrar output cuando todos los inputs terminen
    go func() {
        wg.Wait()
        close(output)
    }()

    return output
}

Ventajas:

  • ✅ Funciona con cualquier número de channels

  • ✅ Simple y claro

  • ✅ Maneja cierre automáticamente

Uso completo:

func fanOutFanInWithWaitGroup() {
    // Input
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 10; i++ {
            input <- i
        }
    }()

    // Fan-out: Múltiples workers procesan
    numWorkers := 3
    workerOutputs := make([]<-chan int, numWorkers)

    for i := 0; i < numWorkers; i++ {
        output := make(chan int)
        workerOutputs[i] = output
        go func(workerID int) {
            defer close(output)
            for n := range input {
                time.Sleep(100 * time.Millisecond)
                result := n * n
                fmt.Printf("Worker %d processed %d -> %d\n", workerID, n, result)
                output <- result
            }
        }(i)
    }

    // Fan-in: Combinar resultados usando WaitGroup
    results := fanInWaitGroup(workerOutputs)

    // Recibir resultados combinados
    for result := range results {
        fmt.Printf("Final result: %d\n", result)
    }
}

Método 2: Fan-In con Select

Ideal cuando tienes un número fijo y pequeño de channels:

func fanInSelect(ch1, ch2 <-chan int) <-chan int {
    output := make(chan int)

    go func() {
        defer close(output)
        ch1Active := ch1
        ch2Active := ch2

        for ch1Active != nil || ch2Active != nil {
            select {
            case val, ok := <-ch1Active:
                if !ok {
                    ch1Active = nil // Marcar como cerrado
                } else {
                    output <- val
                }
            case val, ok := <-ch2Active:
                if !ok {
                    ch2Active = nil // Marcar como cerrado
                } else {
                    output <- val
                }
            }
        }
    }()

    return output
}

Ventajas:

  • ✅ Más control sobre qué channel leer primero

  • ✅ Puede priorizar channels específicos

  • ✅ Eficiente para número pequeño de channels

Uso:

func fanOutFanInWithSelect() {
    // Channel de entrada
    input := make(chan int)

    // Fan-out: múltiples workers procesan
    worker1 := make(chan int)
    worker2 := make(chan int)

    go func() {
        defer close(worker1)
        for val := range input {
            worker1 <- val * 2
        }
    }()

    go func() {
        defer close(worker2)
        for val := range input {
            worker2 <- val * 3
        }
    }()

    // Fan-in: combinar resultados usando select
    output := fanInSelect(worker1, worker2)

    // Enviar datos
    go func() {
        defer close(input)
        for i := 1; i <= 5; i++ {
            input <- i
        }
    }()

    // Recibir resultados combinados
    for result := range output {
        fmt.Printf("Result: %d\n", result)
    }
}

Comparación: WaitGroup vs Select

AspectoWaitGroupSelect
Número de channelsDinámico (cualquier número)Fijo (pequeño número)
PriorizaciónNoSí (orden de cases)
ComplejidadBajaMedia
Uso de memoriaMás goroutinesMenos goroutines
Cuándo usarNúmero variable de workers2-5 channels fijos

Ejemplo Práctico: Procesar Archivos en Paralelo

Un caso de uso real: procesar múltiples archivos en paralelo y combinar resultados:

type FileResult struct {
    Filename string
    Lines    int
    Error    error
}

func processFilesInParallel(filenames []string, numWorkers int) <-chan FileResult {
    // Fan-out: Distribuir archivos a workers
    files := make(chan string)
    go func() {
        defer close(files)
        for _, filename := range filenames {
            files <- filename
        }
    }()

    // Crear workers
    workerOutputs := make([]<-chan FileResult, numWorkers)
    for i := 0; i < numWorkers; i++ {
        output := make(chan FileResult)
        workerOutputs[i] = output

        go func(workerID int) {
            defer close(output)
            for filename := range files {
                lines, err := countLines(filename)
                output <- FileResult{
                    Filename: filename,
                    Lines:    lines,
                    Error:    err,
                }
            }
        }(i)
    }

    // Fan-in: Combinar resultados
    results := make(chan FileResult)
    var wg sync.WaitGroup
    for _, workerOutput := range workerOutputs {
        wg.Add(1)
        go func(ch <-chan FileResult) {
            defer wg.Done()
            for result := range ch {
                results <- result
            }
        }(workerOutput)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

Fan-Out / Fan-In con Context (Cancelación)

Agregar cancelación hace el patrón más robusto:

func fanOutFanInWithContext(ctx context.Context, input <-chan int, numWorkers int) <-chan int {
    // Fan-out con cancelación
    workerOutputs := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        output := make(chan int)
        workerOutputs[i] = output

        go func(workerID int) {
            defer close(output)
            for {
                select {
                case <-ctx.Done():
                    return
                case item, ok := <-input:
                    if !ok {
                        return
                    }
                    // Procesar con cancelación
                    result := processItem(ctx, item)
                    select {
                    case output <- result:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }(i)
    }

    // Fan-in con cancelación
    results := make(chan int)
    var wg sync.WaitGroup
    for _, workerOutput := range workerOutputs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case result, ok := <-ch:
                    if !ok {
                        return
                    }
                    select {
                    case results <- result:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }(workerOutput)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

Fan-Out con Rate Limiting

Limitar el rate de distribución puede ser útil:

func fanOutWithRateLimit(input <-chan int, numWorkers int, rate time.Duration) []<-chan int {
    outputs := make([]<-chan int, numWorkers)

    // Rate limiter
    limiter := make(chan struct{}, numWorkers)
    go func() {
        ticker := time.NewTicker(rate)
        defer ticker.Stop()
        for range ticker.C {
            select {
            case limiter <- struct{}{}:
            default:
            }
        }
    }()

    for i := 0; i < numWorkers; i++ {
        output := make(chan int)
        outputs[i] = output

        go func(workerID int) {
            defer close(output)
            for item := range input {
                <-limiter // Esperar token
                result := processItem(item)
                output <- result
            }
        }(i)
    }

    return outputs
}

Fan-In con Prioridades

Cuando necesitas priorizar ciertos channels:

func fanInWithPriority(highPriority, lowPriority <-chan int) <-chan int {
    output := make(chan int)

    go func() {
        defer close(output)
        highActive := highPriority
        lowActive := lowPriority

        for highActive != nil || lowActive != nil {
            select {
            // Priorizar highPriority
            case val, ok := <-highActive:
                if !ok {
                    highActive = nil
                } else {
                    output <- val
                }
            case val, ok := <-lowActive:
                if !ok {
                    lowActive = nil
                } else {
                    output <- val
                }
            }
        }
    }()

    return output
}

Ejemplo Avanzado: Pipeline con Fan-Out/Fan-In

Combinando pipelines con fan-out/fan-in:

func advancedPipeline() {
    // Stage 1: Generar datos
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 100; i++ {
            input <- i
        }
    }()

    // Stage 2: Fan-out a múltiples transformadores
    numTransformers := 5
    transformedOutputs := make([]<-chan int, numTransformers)

    for i := 0; i < numTransformers; i++ {
        output := make(chan int)
        transformedOutputs[i] = output
        go func() {
            defer close(output)
            for n := range input {
                output <- transform(n)
            }
        }()
    }

    // Stage 3: Fan-in de transformadores
    transformed := fanInWaitGroup(transformedOutputs)

    // Stage 4: Filtrar
    filtered := make(chan int)
    go func() {
        defer close(filtered)
        for n := range transformed {
            if n > 50 {
                filtered <- n
            }
        }
    }()

    // Stage 5: Consumir
    for result := range filtered {
        fmt.Printf("Result: %d\n", result)
    }
}

Mejores Prácticas

1. Elegir el Número Correcto de Workers

// ✅ BIEN: Basado en CPU cores
numWorkers := runtime.NumCPU()

// ✅ BIEN: Para I/O-bound (más workers)
numWorkers := runtime.NumCPU() * 2

// ✅ BIEN: Configurable
numWorkers := getConfig().WorkerCount

// ❌ MAL: Número arbitrario
numWorkers := 1000 // ¿Por qué 1000?

2. Usar WaitGroup para Fan-In Dinámico

// ✅ BIEN: WaitGroup para número variable de channels
func fanInWaitGroup(inputs []<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup
    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for item := range ch {
                output <- item
            }
        }(input)
    }
    go func() {
        wg.Wait()
        close(output)
    }()
    return output
}

3. Usar Select para Fan-In de Pocos Channels

// ✅ BIEN: Select para 2-5 channels fijos
func fanInSelect(ch1, ch2 <-chan int) <-chan int {
    // ...
}

// ❌ MAL: Select con muchos cases (difícil de mantener)
func fanInSelectMany(ch1, ch2, ch3, ch4, ch5, ch6, ch7, ch8 <-chan int) <-chan int {
    // Demasiado complejo
}

4. Siempre Cerrar Channels Correctamente

// ✅ BIEN: Cerrar en defer
go func() {
    defer close(output)
    for item := range input {
        output <- process(item)
    }
}()

// ❌ MAL: Olvidar cerrar causa deadlock
go func() {
    for item := range input {
        output <- process(item)
    }
    // Olvidamos close(output)
}()

5. Manejar Errores en Fan-Out

// ✅ BIEN: Incluir errores en resultados
type Result struct {
    Value int
    Error error
}

func fanOutWithErrors(input <-chan int) []<-chan Result {
    outputs := make([]<-chan Result, numWorkers)
    for i := 0; i < numWorkers; i++ {
        output := make(chan Result)
        outputs[i] = output
        go func() {
            defer close(output)
            for item := range input {
                result, err := processWithError(item)
                output <- Result{Value: result, Error: err}
            }
        }()
    }
    return outputs
}

6. Usar Buffers Apropiados

// ✅ BIEN: Buffer basado en throughput
output := make(chan int, numWorkers*2)

// ✅ BIEN: Sin buffer para sincronización
output := make(chan int)

// ❌ MAL: Buffer muy grande
output := make(chan int, 1000000) // Desperdicia memoria

Errores Comunes

❌ Error 1: Leak de Goroutines en Fan-In

// ❌ MAL: Goroutines nunca terminan si channels no se cierran
func badFanIn(inputs []<-chan int) <-chan int {
    output := make(chan int)
    for _, input := range inputs {
        go func(ch <-chan int) {
            for item := range ch {
                output <- item
            }
            // Goroutine termina pero output nunca se cierra
        }(input)
    }
    return output // output nunca se cierra
}

// ✅ BIEN: Cerrar output cuando todos terminen
func goodFanIn(inputs []<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup
    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for item := range ch {
                output <- item
            }
        }(input)
    }
    go func() {
        wg.Wait()
        close(output)
    }()
    return output
}

❌ Error 2: No Verificar Cierre en Select

// ❌ MAL: Puede recibir zero values
select {
case val := <-ch1:
    process(val)
case val := <-ch2:
    process(val)
}

// ✅ BIEN: Verificar ok y marcar como nil
ch1Active := ch1
ch2Active := ch2
for ch1Active != nil || ch2Active != nil {
    select {
    case val, ok := <-ch1Active:
        if !ok {
            ch1Active = nil
        } else {
            process(val)
        }
    case val, ok := <-ch2Active:
        if !ok {
            ch2Active = nil
        } else {
            process(val)
        }
    }
}

❌ Error 3: Race Condition en Fan-Out

// ❌ MAL: Todos los workers ven el mismo valor de i
for i := 0; i < numWorkers; i++ {
    go func() {
        fmt.Printf("Worker %d\n", i) // i puede ser cualquier valor
    }()
}

// ✅ BIEN: Pasar i como parámetro
for i := 0; i < numWorkers; i++ {
    go func(workerID int) {
        fmt.Printf("Worker %d\n", workerID)
    }(i)
}

❌ Error 4: Demasiados o Muy Pocos Workers

// ❌ MAL: Demasiados workers (overhead)
numWorkers := 10000 // Para 100 items

// ❌ MAL: Muy pocos workers (subutilización)
numWorkers := 1 // Para 1000 items

// ✅ BIEN: Número apropiado
numWorkers := runtime.NumCPU() // Para CPU-bound

❌ Error 5: No Manejar Backpressure

// ❌ MAL: Puede bloquear si output está lleno
for item := range input {
    output <- process(item) // Bloquea si output está lleno
}

// ✅ BIEN: Manejar backpressure con select
for item := range input {
    result := process(item)
    select {
    case output <- result:
    case <-ctx.Done():
        return
    default:
        // Output lleno, manejar (loggear, dropear, etc.)
        log.Printf("Output full, dropping result")
    }
}

Comparación: Java vs Go

Java: ExecutorService y CompletableFuture

// Java
ExecutorService executor = Executors.newFixedThreadPool(10);
List<CompletableFuture<String>> futures = new ArrayList<>();

for (String file : files) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        return processFile(file);
    }, executor);
    futures.add(future);
}

// Combinar resultados
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0])
);

List<String> results = allFutures.thenApply(v -> {
    return futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
}).join();

Problemas:

  • Sintaxis verbosa

  • Manejo de errores complejo

  • No hay cancelación fácil

  • Overhead de threads del OS

Go: Fan-Out / Fan-In

// Go
input := make(chan string)
go func() {
    defer close(input)
    for _, file := range files {
        input <- file
    }
}()

// Fan-out
numWorkers := 10
workerOutputs := make([]<-chan string, numWorkers)
for i := 0; i < numWorkers; i++ {
    output := make(chan string)
    workerOutputs[i] = output
    go func() {
        defer close(output)
        for file := range input {
            output <- processFile(file)
        }
    }()
}

// Fan-in
results := fanInWaitGroup(workerOutputs)
for result := range results {
    fmt.Println(result)
}

Ventajas:

  • Sintaxis simple y clara

  • Manejo de errores natural

  • Cancelación con context

  • Overhead mínimo (goroutines)

Conclusiones

Fan-Out / Fan-In es un patrón esencial en Go:

Paralelización: Distribuye trabajo eficientemente a múltiples workers

Agregación: Combina resultados de forma elegante

Escalabilidad: Mejora el throughput significativamente

Flexibilidad: Dos métodos (WaitGroup y Select) para diferentes necesidades

Cancelación: Fácil agregar cancelación con context

Simplicidad: Código claro y mantenible

Si vienes de Java, verás que Fan-Out / Fan-In en Go es mucho más simple y expresivo que usar ExecutorService y CompletableFuture. La combinación de goroutines y channels hace que paralelizar trabajo sea natural y seguro.

Próximos Pasos

En el siguiente post exploraremos otros patrones avanzados de concurrencia en Go, incluyendo circuit breakers, retry/backoff, y otros patrones que complementan los que hemos aprendido.


¿Has usado Fan-Out / Fan-In en tus proyectos de Go? ¿Prefieres WaitGroup o Select? Comparte tus experiencias y casos de uso en los comentarios. Y si quieres ver el código completo de estos ejemplos, puedes encontrarlo en mi repositorio go-mastery-lab.

More from this blog

JoeDayz

54 posts

Community Guy | Java Champion | AWS Architect | Software Architect