Fan-in / Fan-out в Go: как раздавать задачи и собирать результаты
tasks channel → worker 1 → results channel
→ worker 2 → results channel
→ worker 3 → results channel
→ worker 4 → results channel
→ worker 5 → results channel
package main
import "fmt"
type Task struct {
ID int
}
func process(task Task) {
fmt.Println("process task", task.ID)
}
func main() {
tasks := make(chan Task)
for i := 0; i < 5; i++ {
go func(workerID int) {
for task := range tasks {
fmt.Println("worker", workerID)
process(task)
}
}(i)
}
for i := 1; i <= 10; i++ {
tasks <- Task{ID: i}
}
close(tasks)
} workers := runtime.NumCPU() for _, task := range tasks {
go process(task)
} workerCount := 20 package main
import (
"fmt"
"sync"
)
type Task struct {
ID int
}
type Result struct {
TaskID int
Value string
}
func process(task Task) Result {
return Result{
TaskID: task.ID,
Value: fmt.Sprintf("result for task %d", task.ID),
}
}
func main() {
tasks := make(chan Task)
results := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range tasks {
results <- process(task)
}
}(i)
}
go func() {
for i := 1; i <= 10; i++ {
tasks <- Task{ID: i}
}
close(tasks)
}()
go func() {
wg.Wait()
close(results)
}()
for res := range results {
fmt.Println(res)
}
} send on closed channel var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range tasks {
results <- process(task)
}
}()
}
go func() {
wg.Wait()
close(results)
}() go func() {
wg.Add(1)
defer wg.Done()
// work
}() wg.Add(1)
go func() {
defer wg.Done()
// work
}() package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
}
type Result struct {
TaskID int
Text string
}
func process(task Task) Result {
time.Sleep(300 * time.Millisecond)
return Result{
TaskID: task.ID,
Text: fmt.Sprintf("task %d processed", task.ID),
}
}
func worker(id int, tasks <-chan Task, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Println("worker", id, "started task", task.ID)
results <- process(task)
}
}
func main() {
const workerCount = 5
const taskCount = 20
tasks := make(chan Task)
results := make(chan Result)
var wg sync.WaitGroup
for i := 1; i <= workerCount; i++ {
wg.Add(1)
go worker(i, tasks, results, &wg)
}
go func() {
for i := 1; i <= taskCount; i++ {
tasks <- Task{ID: i}
}
close(tasks)
}()
go func() {
wg.Wait()
close(results)
}()
for result := range results {
fmt.Println("result:", result.Text)
}
} results := make(chan Result) results := make(chan Result, 100) for result := range results {
fmt.Println(result)
} go func() {
wg.Wait()
close(results)
}() close(results) results <- result panic: send on closed channel package main
import "sync"
func merge[T any](channels ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan T) {
defer wg.Done()
for value := range c {
out <- value
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
} out := merge(ch1, ch2, ch3)
for value := range out {
fmt.Println(value)
} func startWorkers(workerCount int, tasks <-chan Task, results chan<- Result) *sync.WaitGroup {
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range tasks {
results <- process(task)
}
}(i)
}
return &wg
} source → tasks → workers → results → save func worker(
ctx context.Context,
id int,
tasks <-chan Task,
results chan<- Result,
wg *sync.WaitGroup,
) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-tasks:
if !ok {
return
}
result := process(task)
select {
case <-ctx.Done():
return
case results <- result:
}
}
}
} type Result struct {
TaskID int
Value string
Err error
} value, err := doWork(task)
results <- Result{
TaskID: task.ID,
Value: value,
Err: err,
} for result := range results {
if result.Err != nil {
fmt.Println("error:", result.Err)
continue
}
save(result)
} for _, task := range tasks {
go process(task)
} const workerCount = 20 for task := range tasks {
process(task)
} close(tasks) go func() {
wg.Wait()
close(results)
}() go func() {
wg.Add(1)
defer wg.Done()
}() wg.Add(1)
go func() {
defer wg.Done()
}() results := make(chan Result, 1000000) создали tasks → запустили N воркеров → воркеры пишут в results → WaitGroup закрыл results → читаем range results