О каналах
2019-09-07
Как-то программировал я на Go. И понадобилось мне одну длинную операцию, которая включает в себя обращение к внешнему API и периодическую проверку готовности результатов, провести дважды. И подумал я, что эти две длинные операции нужно проводить параллельно, а потом собрать результаты. Параллельно в Go, это значит, нужно расчехлять горутины. Собирать результаты, значит, нужны каналы.
Получился такой код:
func (c *Client) createDatasets(request *Request) (map[string]*Dataset, error) {
result := make(map[string]*Dataset)
resultCh := make(chan *Dataset, len(layers))
errorCh := make(chan error, 1)
for _, layer := range layers {
go c.createDatasetAsync(request, layer, resultCh, errorCh)
}
for len(result) < len(layers) {
select {
case dataset := <-resultCh:
result[dataset.Layer] = dataset
case err := <-errorCh:
return nil, err
}
return result, nil
}
func (c *Client) createDatasetAsync(request *CreateMapRequest, layer string,
resultCh chan *Dataset, errorCh chan error) {
// долго и мучительно создаём датасет...
if err != nil {
errorCh <- err
return
}
resultCh <- dataset
return
}
А потом я подумал, что непорядок. Можно слишком долго ждать результатов, мало ли как там API подвисло. И можно не дождаться всех результатов, нужного их количества. Поэтому нужен таймаут. На все параллельные операции сразу.
Отдельные таймауты на сетевые взаимодействия там, конечно, тоже есть. Но они в данном случае не помогут, потому что там много сетевых взаимодействий (вызовов API) в цикле.
Получилось так:
func (c *Client) createDatasets(request *Request) (map[string]*Dataset, error) {
result := make(map[string]*Dataset)
resultCh := make(chan *Dataset, len(layers))
errorCh := make(chan error, 1)
timeoutCh := time.After(c.config.DatasetCreateTimeout)
for _, layer := range layers {
go c.createDatasetAsync(request, layer, resultCh, errorCh)
}
for len(result) < len(layers) {
select {
case dataset := <-resultCh:
result[dataset.Layer] = dataset
case err := <-errorCh:
return nil, err
case <-timeoutCh:
return nil, errors.New("timed out datasets creation")
}
}
return result, nil
}
Я был настолько поражён тем, как просто в Go получилось добавить этот самый таймаут, что решил срочно написать про каналы. Вся магия в них.
На самом деле,
это классический пример,
где наши так нынче горячо любимые
абстракции async
и await
протекают в промисы.
function createDatasets(request) {
var promises = layers.map(layer => {
createDatasetAsync(request, layer);
});
Promise.all(promises).then(datasets => {
return datasets;
}).catch(errors => {
//...
});
}
async function createDatasetAsync(request, layer) {
//...
}
Async функции возвращают Promise.
И нам остаётся либо сразу же разрешить этот промис,
используя ключевое слово await
...
Собственно,
для того эти ключевые слова и придумывали,
чтобы сделать асинхронный код похожим на синхронный,
чтобы просто писать одно действие за другим.
Либо работать с промисами явно,
что придётся сделать,
чтобы запустить их все параллельно,
и дождаться результата ото всех сразу,
задействуя явный
Promise.all()
.
Но полного аналога с Go не получается.
Сюда ещё надо добавить таймаут.
Не говорите, что через
setTimeout()
можно закодить тут
выход из функции.
И таки в JavaScript это всё будет выполняться в один поток.
А в Go — в несколько.
Находится
довольно
много
статей,
нужен ли async/await в Go.
И все они сводятся к тому,
что, вообще-то, не нужен.
Зачем?
В Go и так можно писать понятный последовательный блокирующийся код.
Все блокировки
(за редким исключением в виде системных вызовов)
на самом деле блокируют не поток выполнения целиком,
а лишь текущую горутину.
Нужна параллелльность?
go что-то-там
и вперёд.
Нужно обмениваться данными между горутинами?
Используйте каналы.
Каналы в Go, это чудесное воплощение паттерна канала обмена сообщениями, как он описан в книжках. Это канал обмена типизированными сообщениями. Точка-точка, из одной горутины в другую. Хотя, в принципе, кто канал заполучит, тот и может с ним работать, и более пары горутин.
Сначала канал нужно создать. Это вполне себе сущность языка. Отдельный тип переменной. Generic, он знает, какого типа сообщения через него посылаются. При создании канала задаётся размер буфера, что влияет на его использование.
resultCh := make(chan *Dataset, len(layers))
errorCh := make(chan error, 1)
Над каналом определены всего две операции.
Отправка сообщения в канал:
resultCh <- dataset
Отправка блокирует горутину,
если буфер канала полон.
По умолчанию (make(char Something)
) создаётся небуферизованный канал.
В этом случае отправитель будет заблокирован,
пока не появится получатель.
Это уже можно использовать как механизм синхронизации
(ничего не делаем, пока нас не прочитают).
Чтение сообщения из канала:
dataset := <-resultCh
Получатель блокируется, если в буфере канала пусто (или вообще буфера нет).
Канал ещё можно закрыть со стороны отправителя:
ch := make(chan int)
// ...
close(ch)
Тогда получатель сможет об этом явно узнать:
v, opened := <-ch
Или прервать (иначе бесконечный) цикл по читаемым сообщениям:
for i := range ch {
//...
}
Впрочем, закрывают каналы редко. Они и так неплохо прибираются сборщиком мусора, если перестают быть нужны.
Крайне интересными становятся каналы вместе с оператором
select
.
Запросто можно проверять (и блокироваться на проверке) сразу несколько каналов одновременно:
select {
case dataset := <-resultCh:
result[dataset.Layer] = dataset
case err := <-errorCh:
return nil, err
}
Сработает та ветка, в чьем канале первым появится сообщение.
А если добавить ветку default
,
то чтение из канала автомагически становится неблокирующим.
Ну если нам нужно только проверить,
есть там сообщение или нет,
и идти дальше,
если нет.
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
Как видите, иногда даже не важно, что за сообщение пришло через канал. Достаточно самого факта, что сообщение пришло. Хотя, если только для сигнализации, то есть альтернативы.
select
можно использовать и для отправки.
И ветка default
тоже делает отправку неблокирующей.
for _, ch := range subscribers {
select {
case ch <- message:
// log.Debugf("Sent message: %+v", message)
default:
// non blocking send
}
}
Таким образом можно сделать честный pub/sub. Пусть у каждого подписчика будет свой канал. Мы просто пробегаем по всем, и шлём сообщение. Не блокируясь. Конечно, если это канал без буфера, и подписчик в момент отправки сообщения не заблокирован на его получении, он его не получит.
Как вы заметили,
в стандартном пакете time
есть функции,
которые создают канал,
куда шлют сообщение.
Периодически или через таймаут.
Так таймауты и делаются.
Просто ещё один канал,
который селектим.
Собственно, использование каналов для сигнализации прекращения какого-либо действия в другой горутине — довольно часто встречается.
for {
select {
case m := <-ch:
//...
case <-quit:
fmt.Println("quit")
return
}
}
С горутинами и каналами можно мутить всякоразные асинхронные примитивы. Прекрасно. Ещё раз, это самый настоящий канал отправки сообщений. Пусть и работает в рамках одного процесса.