【文法系】【go】基本19:並列処理(Goroutine)

本noteの概要

golang(以下、go)の基本的な文法と出力内容について確認する。

本noteの対象者

・goをインストール済みの方
※ 筆者は仮想環境上でgoを実行していますが、ローカル環境でも基本的に挙動は変わらないと思います、goがインストールされていれば問題ないかと。

▽ 仮想環境上でgoを動かしたい方は以下参考までに ▽
【手順系】【go】仮想環境上でのWebアプリケーション開発①:go環境構築 - This is My note

本noteの環境

PC環境(ホスト)

# OSのバージョン
(base) $ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G2022

# Virtualboxのバージョン
(base) $ VBoxManage -v
6.1.2r135662

# vagrantのバージョン
(base) $ vagrant -v
Vagrant 2.2.7

仮想環境(ゲスト)

# Linuxのバージョンが記載されているファイルを検索
vagrant@:~$ ls /etc/*-release
/etc/lsb-release  /etc/os-release

# ゲストOSのバージョンを出力
vagrant@:~$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.6 LTS"

仮想環境上のディレクトリ構成

workspace
  - src
    - test
      - lesson.go

現在のディレクト

vagrant@vagrant-ubuntu-trusty-64:~/workspace/src/test$ pwd
/home/vagrant/workspace/src/test

【注】
 以降、特段の記述がない限り、コマンドの実行は現在のディレクトリ(test)で行われるものとし、文字数削減のため、表記を以下に省略して記述する。

 省略前:vagrant@vagrant-ubuntu-trusty-64:~/workspace/src/test$
  ↓
 省略後:$

Today's Thema:並列処理

1.並列処理(Goroutine)

⑴ 基本的な使い方
使用例
package main

import (
    "fmt"
    "time"
)

func goroutine(s string){
    for i := 0; i < 5; i++{
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func normal(s string){
    for i := 0; i < 5; i++{
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go goroutine("world")
    normal("hello")
}
▼ 実行
$ go run lesson.go

world
hello
hello
world
world
hello
hello
world
world
hello

《解説》

並列処理を使わなかった場合
package main

import (
    "fmt"
    "time"
)

func goroutine(s string){
    for i := 0; i < 5; i++{
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func normal(s string){
    for i := 0; i < 5; i++{
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    goroutine("world")
    normal("hello")
}
▼ 実行
$ go run lesson.go

world
world
world
world
world
hello
hello
hello
hello
hello

《解説》

【補足】
以下のように、time.Sleep()をコメントアウトすると、goroutine()のスレッドは生成されるものの、goroutine()の処理が始まる前に、normal()の処理が完了し、プログラムが終了してしまうためgoroutine()の中身は出力されない。

package main

import (
    "fmt"
    // "time"
)

func goroutine(s string){
    for i := 0; i < 5; i++{
        // time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func normal(s string){
    for i := 0; i < 5; i++{
        // time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go goroutine("world")
    normal("hello")

}
▼ 出力
hello
hello
hello
hello
hello

そのため、main関数でtime.Sleep()を使用し、プログラムが終了するまでの時間を指定すると、goroutineも出力はされる。

package main

import (
    "fmt"
    "time"
)

func goroutine(s string){
    for i := 0; i < 5; i++{
        // time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func normal(s string){
    for i := 0; i < 5; i++{
        // time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go goroutine("world")
    normal("hello")
    time.Sleep(2000 * time.Millisecond)
}
▼ 出力
hello
hello
hello
hello
hello
world
world
world
world
world

2.sync.WaitGroup

並列処理が実行完了するまでプログラムを終了しないようにする。

⑴ 基本的な使い方
使用例
package main

import (
    "fmt"
    "sync"
    "time"
)

func goroutine(s string, wg *sync.WaitGroup){
    for i := 0; i < 5; i++{
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
    wg.Done()
}

func normal(s string){
    for i := 0; i < 5; i++{
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go goroutine("world", &wg)
    normal("hello")
    wg.Wait()
}
▼ 実行
$ go run lesson.go

world
hello
world
hello
hello
world
world
hello
hello
world

《解説》
並列処理が完了するまで、プログラムが終了しないため、1のときとは異なり、time.Sleep()をコメントアウトしても並列処理の内容は実行される。

package main

import (
    "fmt"
    "sync"
    // "time"
)

func goroutine(s string, wg *sync.WaitGroup){
    for i := 0; i < 5; i++{
        // time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
    wg.Done()
}

func normal(s string){
    for i := 0; i < 5; i++{
        // time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go goroutine("world", &wg)
    normal("hello")
    wg.Wait()
}
▼ 出力
hello
hello
hello
hello
hello
world
world
world
world
world

3.channel

並列処理を行う関数同士はそれぞれ独立しており、そのままではデータのやりとりができないため、channelを使用してデータのやりとりができるようにする。

⑴ main関数とgoroutine
使用例
package main

import "fmt"

func goroutine(s []int, c chan int){ // ④
    sum := 0
    for _, v := range s{
        sum += v
    }
    c <- sum // c(channel)にsumを入れる
}

func main() {
    s := []int{1, 2, 3, 4, 5} // ①
    c := make(chan int) // ②
    go goroutine(s, c) // ③
    x := <-c // ⑤ c(channel)に入ったsumを受け取りxに代入
    fmt.Println(x) // ⑥
}
▼ 実行
$ go run lesson.go

15

《解説》
f:id:otsuba1:20200225224520p:plain

⑵ main関数とgoroutine1、goroutine2
使用例
package main

import "fmt"

func goroutine1(s []int, c chan int){
    sum := 0
    for _, v := range s{
        sum += v
    }
    c <- sum
}

func goroutine2(s []int, c chan int){
    sum := 5
    for _, v := range s{
        sum += v
    }
    c <- sum
}

func main() {
    s := []int{1, 2, 3, 4, 5}
    c := make(chan int)
    go goroutine1(s, c)
    go goroutine2(s, c)
    x := <-c
    fmt.Println(x)
    y := <-c
    fmt.Println(y)
}
▼ 実行
$ go run lesson.go

20
15

《解説》
f:id:otsuba1:20200225224609p:plain

3.Buffered Channels

使用例
package main

import "fmt"

func main() {
    ch := make(chan int, 2) // make(chan データ型, バッファの数)
    ch <- 100
    fmt.Println(len(ch))
    ch <- 200
    fmt.Println(len(ch))

    x:= <-ch
    fmt.Println(x)

    fmt.Println(len(ch))

    ch <- 300
    fmt.Println(len(ch))
}
▼ 実行
$ go run lesson.go

1
2
100
1
2

《解説》
以下のように、x:= <-chでchannelを取り出さず、バッファー(今回は2)を超えるデータを入れようとするとエラーになる。

func main() {
    ch := make(chan int, 2)
    ch <- 100
    fmt.Println(len(ch))
    ch <- 200
    fmt.Println(len(ch))
    
    ch <- 300
    fmt.Println(len(ch))
}
▼ 出力
1
2
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /home/vagrant/workspace/src/myapp/lesson.go:12 +0x187
exit status 2
⑵ rangeとclose(ch)

channelとセットでrangeを使用する場合には、close(ch)でchannelの終了を明示する。

使用場面
package main

import "fmt"

func goroutine(s []int, c chan int){
    sum := 0
    for _, v := range s{
        sum += v
        c <- sum
    }
    close(c) // channelの終了を明示
}

func main() {
    s := []int{1, 2, 3, 4, 5}
    c := make(chan int, len(s)) // len(s)でchannelのバッファを指定
    go goroutine(s, c)
    for i := range c{
        fmt.Println(i)
    }
}
▼ 実行
$ go run lesson.go

1
3
6
10
15
使い方
package main

import "fmt"

func main() {
    ch := make(chan int, 2)
    ch <- 100
    fmt.Println(len(ch))
    ch <- 200
    fmt.Println(len(ch))
    close(ch) // channelの終了を明示

    for c := range ch{
        fmt.Println(c)
    }
}
▼ 実行
$ go run lesson.go

1
2
100
200

《解説》
close(ch)がないと、バッファで指定した数を超えるchannelを取りにいこうとしてしまうため、以下のようにエラーになる。

func main() {
    ch := make(chan int, 2)
    ch <- 100
    fmt.Println(len(ch))
    ch <- 200
    fmt.Println(len(ch))

    for c := range ch{
        fmt.Println(c)
    }
}
▼ 出力
1
2
100
200
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
    /home/vagrant/workspace/src/myapp/lesson.go:12 +0x200
exit status 2

4.producerとconsumer

ログを集めて、解析をする場面などで使用

使用例
package main

import (
    "fmt"
    "sync"
    "time"
)

func producer(ch chan int, i int) {
    ch <- i * 2
}

func consumer(ch chan int, wg *sync.WaitGroup) {
    for i := range ch {
        func() {
            defer wg.Done()
            fmt.Println("process", i*1000)
        }()
    }
    fmt.Println("###################")
}

func main() {
    var wg sync.WaitGroup
    ch := make(chan int)

    // Producer
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go producer(ch, i)
    }

    // Consumer
    go consumer(ch, &wg)
    wg.Wait()
    close(ch)
    time.Sleep(2 * time.Second)
    fmt.Println("Done")
}
▼ 実行
$ go run lesson.go

process 0
process 2000
process 4000
process 6000
process 8000
process 10000
process 12000
process 14000
process 16000
process 18000
###################
Done

《解説》
f:id:otsuba1:20200225234836p:plain

5.fan-out fan-in

使用例
package main

import "fmt"

func producer(first chan int) {
    defer close(first)
    for i := 0; i < 10; i++ {
        first <- i
    }
}

func multi2(first <-chan int, second chan<- int) { // 「first <-chan」は送信用のchannel、「second chan<-」は受信用のchannelであることを表す。「<-」はなくてもOK。
    defer close(second)
    for i := range first {
        second <- i * 2
    }
}

func multi4(second chan int, third chan int) {
    defer close(third)
    for i := range second {
        third <- i * 4
    }
}

func main() {
    first := make(chan int)
    second := make(chan int)
    third := make(chan int)

    go producer(first)
    go multi2(first, second)
    go multi4(second, third)
    for result := range third {
        fmt.Println(result)
    }
}
▼ 実行
$ go run lesson.go

0
8
16
24
32
40
48
56
64
72

《解説》
producerのfor文でfirst channelが0を受け取り、multi2のfor文でfirst channelが受け取った0×2を行う。さらに、multi2のfor文の結果を受け取ったsecond channelの0を用いて、multi4で0×4を行い、main関数のresultとして0を出力する。その後も同様の流れで、producerのfor文の条件を満たすまで出力を繰り返す。

f:id:otsuba1:20200226020414p:plain

6.selectを用いたchannelの受信

複数のchannelをお互いにブロッキングしないようにしながら実行する

使用例
package main

import (
    "fmt"
    "time"
)

func goroutine1(ch chan string) {
    for {
        ch <- "packet from 1"
        time.Sleep(3 * time.Second)
    }
}

func goroutine2(ch chan string) {
    for {
        ch <- "packet from 2"
        time.Sleep(1 * time.Second)
    }
}

func main() {
    c1 := make(chan string)
    c2 := make(chan string)
    go goroutine1(c1)
    go goroutine2(c2)

    for {
        select {
        case msg1 := <-c1:
            fmt.Println(msg1)
        case msg2 := <-c2:
            fmt.Println(msg2)
        }
    }
}
▼ 実行
$ go run lesson.go

packet from 2
packet from 1
packet from 2
packet from 1
packet from 2
packet from 1
packet from 2
packet from 1
packet from 2
packet from 1
packet from 2
packet from 1
:

※ 強制的に終了させるまで繰り返す

《解説》
f:id:otsuba1:20200226023027p:plain 以下のようにデータ型が異なるものや出力までの待ち時間が異なる場合でもOK。

package main

import (
    "fmt"
    "time"
)

func goroutine1(ch chan string) {
    for {
        ch <- "packet from 1"
        time.Sleep(3 * time.Second)
    }
}

func goroutine2(ch chan int) {
    for {
        ch <- 100
        time.Sleep(1 * time.Second)
    }
}

func main() {
    c1 := make(chan string)
    c2 := make(chan int)
    go goroutine1(c1)
    go goroutine2(c2)

    for {
        select {
        case msg1 := <-c1:
            fmt.Println(msg1)
        case msg2 := <-c2:
            fmt.Println(msg2)
        }
    }
}
▼ 出力
100
packet from 1
100
100
packet from 1
100
100
100
packet from 1
100
:

※ 強制的に終了させるまで繰り返す

7.Default Selection と for break

使用例
package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond) // 設定した時間ごとに実行
    boom := time.After(500 * time.Millisecond) // 設定した時間経過後に実行
    for {
        select {
        case <-tick:
            fmt.Println("tick.")
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(50 * time.Millisecond)
        }
    }
}
▼ 実行
$ go run lesson.go

    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
    .
    .
BOOM!

《解説》

package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond)
    boom := time.After(500 * time.Millisecond)

    for {
        select {
        case t := <-tick: // tを設定した場合
            fmt.Println("tick.", t)
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(50 * time.Millisecond)
        }
    }
}
▼ 出力
    .
    .
tick. 2020-02-25 17:36:31.852435353 +0000 UTC m=+0.100577828
    .
    .
tick. 2020-02-25 17:36:31.953306907 +0000 UTC m=+0.201449365
    .
    .
tick. 2020-02-25 17:36:32.053527741 +0000 UTC m=+0.301670329
    .
    .
tick. 2020-02-25 17:36:32.153991047 +0000 UTC m=+0.402133577
    .
    .
tick. 2020-02-25 17:36:32.252393581 +0000 UTC m=+0.500536048
BOOM!

《解説》

⑵ for break

for文を抜ける

使用例
package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond)
    boom := time.After(500 * time.Millisecond)
    OuterLoop: // OuterLoopの名称は任意のものでOK
        for {
            select {
            case <-tick:
                fmt.Println("tick.")
            case <-boom:
                fmt.Println("BOOM!")
                break OuterLoop
            default:
                fmt.Println("    .")
                time.Sleep(50 * time.Millisecond)
            }
        }
    fmt.Println("##############")
}

▼ 実行
$ go run lesson.go

    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
BOOM!
##############

8.sync.Mutex

使用例
package main

import (
    "fmt"
    "sync"
    "time"
)

type Counter struct {
    v   map[string]int
    mux sync.Mutex
}

func (c *Counter) Inc(key string) {
    c.mux.Lock()
    defer c.mux.Unlock()
    c.v[key]++
}

func (c *Counter) Value(key string) int {
    c.mux.Lock()
    defer c.mux.Unlock()
    return c.v[key]
}

func main() {
    c := Counter{v: make(map[string]int)}
    go func() {
        for i := 0; i < 10; i++ {
            c.Inc("Key")
        }
    }()
    go func() {
        for i := 0; i < 10; i++ {
            c.Inc("Key")
        }
    }()
    time.Sleep(1 * time.Second)
    fmt.Println(c, c.Value("Key"))
}
▼ 実行
$ go run lesson.go

{map[Key:20] {0 0}} 20

《解説》