Привет! Меня зовут Нина, и я программист Go.

В этой статье я хочу рассказать о трех шаблонах параллелизма в Go, которые могут быть очень полезными:

  • for-select-done
  • группа ошибок
  • рабочий пул

Шаблон 1: For-Select-Done

Основная идея шаблона for-select-done заключается в использовании бесконечного цикла for для обработки событий из различных каналов с помощью оператора select.

Оператор select позволяет выбрать первую операцию, готовую к выполнению из нескольких каналов. Это может быть сигнал к выполнению какой-то полезной задачи или к выходу из бесконечного цикла.

В этом шаблоне бесконечный цикл for обычно вызывается в отдельной горутине, чтобы избежать блокировки основного потока.

Пример кода

Нам нужно вызывать функцию someTask() один раз в секунду. В определенный момент мы также должны иметь возможность выйти из бесконечного цикла в зависимости от контекста и завершить горутину.

Давайте посмотрим пример кода:

package main

import (
  "context"
  "fmt"
  "math/rand"
  "os"
  "os/signal"
  "syscall"
  "time"
)

// someTask function that we call periodically.
func someTask() {
  fmt.Println(rand.Int() * rand.Int())
}

// PeriodicTask runs someTask every 1 second.
// If canceled goroutine should be stopped.
func PeriodicTask(ctx context.Context) {
  // Create a new ticker with a period of 1 second.
  ticker := time.NewTicker(time.Second)
  for {
    select {
    case <-ticker.C:
      someTask()
    case <-ctx.Done():
      fmt.Println("stopping PeriodicTask")
      ticker.Stop()
      return
    }
  }
}

func main() {
  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  go PeriodicTask(ctx)

  // Create a channel to receive signals from the operating system.
  sigCh := make(chan os.Signal, 1)
  signal.Notify(sigCh, syscall.SIGTERM)

  // The code blocks until a signal is received (e.g. Ctrl+C).
  <-sigCh
}

Для периодического вызова someTask() создадим новый тикер с периодом 1 секунда. Каждую секунду в канал ticker.C отправляется сообщение, которое считывается в соответствующем операторе case и запускает выполнение функции someTask().

Если контекст отменен, сообщение будет отправлено на канал <-ctx.Done(), и будет запущен соответствующий кейс, который приведет к выходу из цикла for и горутины.

В основной функции создаем контекст ctx с таймаутом 5 секунд. Это означает, что если операция, связанная с этим контекстом, не будет завершена в течение указанного времени, контекст будет отменен, а все операции, связанные с ним, будут прерваны.

В бесконечном цикле горутины PeriodicTask тикер будет срабатывать несколько раз, и функция someTask() будет выполняться несколько раз. Через 5 секунд сработает тикер контекста, в операторе select будет запущен случай <-ctx.Done(), и бесконечный цикл будет прерван.

Результат выполнения кода:

-777992493516638130
-3179832721048378793
-4070697154687973288
2884823370254822744
stopping PeriodicTask

Когда использовать этот шаблон

Этот шаблон полезен, когда вам нужно выполнить задачу в бесконечном цикле на основе некоторого события или таймера, а затем остановить ее выполнение на основе определенного условия.

Например, его можно использовать для выполнения отложенных вычислений с использованием данных, сохраненных в базе данных, или для асинхронного обогащения записей в базе данных данными из других служб. При этом у нас всегда есть возможность безопасно завершить горутину при отмене контекста или возникновении какого-то другого внешнего события.

Паттерн 2: группа ошибок

Основная идея шаблона errgroup состоит в том, чтобы запустить группу горутин, подождать, пока они закончат свою работу, и обработать любые ошибки, которые могут возникнуть во время выполнения.

Пример кода

Вот пример кода с использованием пакета golang.org/x/sync/errgroup, который реализует шаблон errgroup.

package main

import (
  "errors"
  "fmt"

  "golang.org/x/sync/errgroup"
)

// errFailure some custom error.
var errFailure = errors.New("some error")

func main() {
  // Create errgroup.
  group := errgroup.Group{}

  // Run first task.
  group.Go(func() error {
     time.Sleep(5 * time.Second)
     fmt.Println("doing some work 1")
     return nil
  })

  // Run second task.
  group.Go(func() error {
     fmt.Println("doing some work 2")
     return nil
  })

  // Run third task.
  group.Go(func() error {
     fmt.Println("doing some work 3")
     return errFailure
  })
  
  // Wait for all goroutines to complete.
  if err := group.Wait(); err != nil {
     fmt.Printf("errgroup tasks ended up with an error: %v\n", err)
  } else {
     fmt.Println("all works done successfully")
  }
}

В этом примере мы вызываем группу задач errgroup.Group{}, которая выполняется параллельно с использованием метода group.Go().

Мы используем group.Wait() для ожидания завершения всех задач в группе. Если какая-либо из задач завершится с ошибкой, метод group.Wait() вернет первую полученную ошибку. Если все задачи завершатся успешно, метод group.Wait() вернет nil.

В этом примере третья задача завершается с ошибкой, поэтому group.Wait() возвращает ошибку, которую мы обрабатываем.

Результат выполнения кода:

doing some work 3
doing some work 2
doing some work 1
errgroup tasks ended up with an error: some error

Давайте рассмотрим другой вариант использования этого шаблона.

Функция errgroup.WithContext() создает новую группу горутин типа errgroup.Group и новую context.Context, которая может передаваться между горутинами и при необходимости позволит отменить выполнение группы задач.

package main

import (
  "context"
  "errors"
  "fmt"
  "time"

  "golang.org/x/sync/errgroup"
)

// errFailure some custom error.
var errFailure = errors.New("some error")

func main() {
  // Create errgroup with context.
  group, qctx := errgroup.WithContext(context.Background())

  // Run first periodic task.
  group.Go(func() error {
     firstTask(qctx)
     return nil
  })

  // Run second task.
  group.Go(func() error {
     if err := secondTask(); err != nil {
        return err
     }
     return nil
  })

  // Wait for all tasks to complete or the error to appear.
  if err := group.Wait(); err != nil {
     fmt.Printf("errgroup tasks ended up with an error: %v", err)
  }
}

func firstTask(ctx context.Context) {
  var counter int
  for {
     select {
     case <-ctx.Done():
        return
     case <-time.After(500 * time.Millisecond):
        fmt.Println("some task")
        if counter > 10 {
            return
   }
        counter++
     }
  }
}

func secondTask() error {
  time.Sleep(3 * time.Second)
  return errFailure
}

Здесь функция firstTask() представляет периодическую задачу, которая должна выполнить какое-то действие 10 раз. Задача secondTask() — это функция, которая выполняет какую-то задачу, но возвращает ошибку.

В этом случае наша периодическая задача firstTask() успевает завершиться несколько раз, прежде чем задача secondTask() завершается с ошибкой. Эта ошибка приводит к отмене контекста qctx для группы горутин, и они прекращают свое выполнение.

Результат выполнения кода:

some task
some task
some task
some task
some task
errgroup tasks ended up with an error: some error

Когда использовать этот шаблон

Я использую этот шаблон там, где важно, чтобы все горутины в группе успешно завершились без ошибок.

Например, если мне нужно выполнить расчет, используя объединенные данные. Я не могу выполнить расчет, если данные из определенной таблицы отсутствуют (например, она еще не сохранена). В этом случае я возвращаю пользовательскую ошибку об отсутствующих данных, а выполнение всех остальных запросов к базе данных прерывается до запуска следующей группы задач.

Шаблон 3: рабочий пул

Шаблон рабочего пула — это шаблон, который позволяет распараллеливать задачи, ограничивая количество одновременно выполняемых горутин.

В этом шаблоне мы создаем фиксированное количество рабочих процессов, которые ждут задач из очереди. При появлении задачи она добавляется в очередь. Если рабочий процесс свободен, он берет задачу из очереди и выполняет ее. Результат выполнения задачи может быть возвращен в основной поток, где он может быть обработан.

Пример кода

Рассмотрим одну из вариаций рабочего пула. Здесь мы не будем возвращать результат выполнения задачи в основной поток.

package main

import (
  "fmt"
  "sync"
  "time"
)

// Data to be proccessed.
var taskCount = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

const (
  // Number of concurrent workers.
  numberOfWorkers = 3
)

func main() {
  // Create buffered channel.
  jobs := make(chan struct{}, numberOfWorkers)
  wg := sync.WaitGroup{}

  // Add workers.
  for id := range taskCount {
    wg.Add(1)
    jobs <- struct{}{}

    go func(id int) {
      worker(id)
      <-jobs
      defer wg.Done()
    }(id)
  }
  
  // Wait for all workers to complete.
  wg.Wait()
}

func worker(id int) {
  fmt.Println(id)
  time.Sleep(2 * time.Second)
}

Срез taskCount содержит данные, которые необходимо обработать (всего 10 элементов). В константе numberOfWorkers мы задаем количество одновременных воркеров — размер нашей очереди.

Далее мы создаем буферизованный канал jobs типа struct{} с размером буфера numberOfWorkers.

Чтобы дождаться завершения всех задач, мы создаем файл WaitGroup. Используя цикл for, мы перебираем все данные в срезе taskCount, увеличивая счетчик WaitGroup на 1 и добавляя задачу в очередь канала jobs для обработки.

Затем через горутину вызывается наш рабочий обработчик, который обрабатывает данные, а затем удаляет задачу из очереди с помощью wg.Done(), уменьшая счетчик WaitGroup на 1.

Используя wg.Wait(), мы ждем завершения всех горутин, пока счетчик группы не станет равным нулю. У нас 3 воркера в numberOfWorkers, поэтому первые три горутины будут выполняться одновременно, а на четвертом задании строка jobs <-struct{}{} будет заблокирована до тех пор, пока один из воркеров не закончит свою работу и не возьмет следующий элемент из очереди <-jobs.

Когда использовать этот шаблон

Шаблон worker pool полезен, когда требуется обработать большое количество задач, но мы хотим ограничить количество одновременно выполняемых горутин, что положительно скажется на производительности кода и позволит избежать перегрузки системы. Этот шаблон также позволяет легко масштабировать систему за счет увеличения числа допустимых одновременных рабочих процессов.

Рабочий пул можно использовать для обработки клиентских запросов на стороне сервера или для выполнения фоновых задач, таких как создание отчетов или обработка данных.

Я часто использую этот шаблон, когда есть необходимость обработать и сохранить большое количество строк данных в базе данных. Кроме того, этот паттерн очень удобен для обработки событий из распределенной очереди, такой как Kafka.

Заключение

Существует много шаблонов параллелизма, но здесь я представил три продвинутых шаблона параллелизма, которые можно использовать в своей работе. шаблон канала моста.

И я надеюсь, что моя статья была вам полезна!