【文法系】【go】基本19:並列処理(Goroutine)
本noteの概要
golang(以下、go)の基本的な文法と出力内容について確認する。
本noteの対象者
・goをインストール済みの方
※ 筆者は仮想環境上でgoを実行していますが、ローカル環境でも基本的に挙動は変わらないと思います、goがインストールされていれば問題ないかと。
▽ 仮想環境上でgoを動かしたい方は以下参考までに ▽
【手順系】【go】仮想環境上でのWebアプリケーション開発①:go環境構築 - This is My note
本noteの環境
PC環境(ホスト)
# OSのバージョン (base) $ sw_vers ProductName: Mac OS X ProductVersion: 10.14.6 BuildVersion: 18G2022 # Virtualboxのバージョン (base) $ VBoxManage -v 6.1.2r135662 # vagrantのバージョン (base) $ vagrant -v Vagrant 2.2.7
仮想環境(ゲスト)
# Linuxのバージョンが記載されているファイルを検索 vagrant@:~$ ls /etc/*-release /etc/lsb-release /etc/os-release # ゲストOSのバージョンを出力 vagrant@:~$ cat /etc/lsb-release DISTRIB_ID=Ubuntu DISTRIB_RELEASE=16.04 DISTRIB_CODENAME=xenial DISTRIB_DESCRIPTION="Ubuntu 16.04.6 LTS"
仮想環境上のディレクトリ構成
workspace - src - test - lesson.go
現在のディレクトリ
vagrant@vagrant-ubuntu-trusty-64:~/workspace/src/test$ pwd /home/vagrant/workspace/src/test
【注】
以降、特段の記述がない限り、コマンドの実行は現在のディレクトリ(test)で行われるものとし、文字数削減のため、表記を以下に省略して記述する。
省略前:vagrant@vagrant-ubuntu-trusty-64:~/workspace/src/test$
↓
省略後:$
Today's Thema:並列処理
1.並列処理(Goroutine)
⑴ 基本的な使い方
使用例
package main import ( "fmt" "time" ) func goroutine(s string){ for i := 0; i < 5; i++{ time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func normal(s string){ for i := 0; i < 5; i++{ time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go goroutine("world") normal("hello") }
▼ 実行
$ go run lesson.go world hello hello world world hello hello world world hello
《解説》
並列処理を使わなかった場合
package main import ( "fmt" "time" ) func goroutine(s string){ for i := 0; i < 5; i++{ time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func normal(s string){ for i := 0; i < 5; i++{ time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { goroutine("world") normal("hello") }
▼ 実行
$ go run lesson.go world world world world world hello hello hello hello hello
《解説》
【補足】
以下のように、time.Sleep()をコメントアウトすると、goroutine()のスレッドは生成されるものの、goroutine()の処理が始まる前に、normal()の処理が完了し、プログラムが終了してしまうためgoroutine()の中身は出力されない。
package main import ( "fmt" // "time" ) func goroutine(s string){ for i := 0; i < 5; i++{ // time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func normal(s string){ for i := 0; i < 5; i++{ // time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go goroutine("world") normal("hello") }
▼ 出力
hello hello hello hello hello
そのため、main関数でtime.Sleep()を使用し、プログラムが終了するまでの時間を指定すると、goroutineも出力はされる。
package main import ( "fmt" "time" ) func goroutine(s string){ for i := 0; i < 5; i++{ // time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func normal(s string){ for i := 0; i < 5; i++{ // time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go goroutine("world") normal("hello") time.Sleep(2000 * time.Millisecond) }
▼ 出力
hello hello hello hello hello world world world world world
2.sync.WaitGroup
並列処理が実行完了するまでプログラムを終了しないようにする。
⑴ 基本的な使い方
使用例
package main import ( "fmt" "sync" "time" ) func goroutine(s string, wg *sync.WaitGroup){ for i := 0; i < 5; i++{ time.Sleep(100 * time.Millisecond) fmt.Println(s) } wg.Done() } func normal(s string){ for i := 0; i < 5; i++{ time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { var wg sync.WaitGroup wg.Add(1) go goroutine("world", &wg) normal("hello") wg.Wait() }
▼ 実行
$ go run lesson.go world hello world hello hello world world hello hello world
《解説》
並列処理が完了するまで、プログラムが終了しないため、1のときとは異なり、time.Sleep()をコメントアウトしても並列処理の内容は実行される。
package main import ( "fmt" "sync" // "time" ) func goroutine(s string, wg *sync.WaitGroup){ for i := 0; i < 5; i++{ // time.Sleep(100 * time.Millisecond) fmt.Println(s) } wg.Done() } func normal(s string){ for i := 0; i < 5; i++{ // time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { var wg sync.WaitGroup wg.Add(1) go goroutine("world", &wg) normal("hello") wg.Wait() }
▼ 出力
hello hello hello hello hello world world world world world
3.channel
並列処理を行う関数同士はそれぞれ独立しており、そのままではデータのやりとりができないため、channelを使用してデータのやりとりができるようにする。
⑴ main関数とgoroutine
使用例
package main import "fmt" func goroutine(s []int, c chan int){ // ④ sum := 0 for _, v := range s{ sum += v } c <- sum // c(channel)にsumを入れる } func main() { s := []int{1, 2, 3, 4, 5} // ① c := make(chan int) // ② go goroutine(s, c) // ③ x := <-c // ⑤ c(channel)に入ったsumを受け取りxに代入 fmt.Println(x) // ⑥ }
▼ 実行
$ go run lesson.go 15
《解説》
⑵ main関数とgoroutine1、goroutine2
使用例
package main import "fmt" func goroutine1(s []int, c chan int){ sum := 0 for _, v := range s{ sum += v } c <- sum } func goroutine2(s []int, c chan int){ sum := 5 for _, v := range s{ sum += v } c <- sum } func main() { s := []int{1, 2, 3, 4, 5} c := make(chan int) go goroutine1(s, c) go goroutine2(s, c) x := <-c fmt.Println(x) y := <-c fmt.Println(y) }
▼ 実行
$ go run lesson.go 20 15
《解説》
3.Buffered Channels
⑴
使用例
package main import "fmt" func main() { ch := make(chan int, 2) // make(chan データ型, バッファの数) ch <- 100 fmt.Println(len(ch)) ch <- 200 fmt.Println(len(ch)) x:= <-ch fmt.Println(x) fmt.Println(len(ch)) ch <- 300 fmt.Println(len(ch)) }
▼ 実行
$ go run lesson.go 1 2 100 1 2
《解説》
以下のように、x:= <-chでchannelを取り出さず、バッファー(今回は2)を超えるデータを入れようとするとエラーになる。
func main() { ch := make(chan int, 2) ch <- 100 fmt.Println(len(ch)) ch <- 200 fmt.Println(len(ch)) ch <- 300 fmt.Println(len(ch)) }
▼ 出力
1 2 fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: main.main() /home/vagrant/workspace/src/myapp/lesson.go:12 +0x187 exit status 2
⑵ rangeとclose(ch)
channelとセットでrangeを使用する場合には、close(ch)でchannelの終了を明示する。
使用場面
package main import "fmt" func goroutine(s []int, c chan int){ sum := 0 for _, v := range s{ sum += v c <- sum } close(c) // channelの終了を明示 } func main() { s := []int{1, 2, 3, 4, 5} c := make(chan int, len(s)) // len(s)でchannelのバッファを指定 go goroutine(s, c) for i := range c{ fmt.Println(i) } }
▼ 実行
$ go run lesson.go 1 3 6 10 15
使い方
package main import "fmt" func main() { ch := make(chan int, 2) ch <- 100 fmt.Println(len(ch)) ch <- 200 fmt.Println(len(ch)) close(ch) // channelの終了を明示 for c := range ch{ fmt.Println(c) } }
▼ 実行
$ go run lesson.go 1 2 100 200
《解説》
close(ch)がないと、バッファで指定した数を超えるchannelを取りにいこうとしてしまうため、以下のようにエラーになる。
func main() { ch := make(chan int, 2) ch <- 100 fmt.Println(len(ch)) ch <- 200 fmt.Println(len(ch)) for c := range ch{ fmt.Println(c) } }
▼ 出力
1 2 100 200 fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() /home/vagrant/workspace/src/myapp/lesson.go:12 +0x200 exit status 2
4.producerとconsumer
ログを集めて、解析をする場面などで使用
⑴
使用例
package main import ( "fmt" "sync" "time" ) func producer(ch chan int, i int) { ch <- i * 2 } func consumer(ch chan int, wg *sync.WaitGroup) { for i := range ch { func() { defer wg.Done() fmt.Println("process", i*1000) }() } fmt.Println("###################") } func main() { var wg sync.WaitGroup ch := make(chan int) // Producer for i := 0; i < 10; i++ { wg.Add(1) go producer(ch, i) } // Consumer go consumer(ch, &wg) wg.Wait() close(ch) time.Sleep(2 * time.Second) fmt.Println("Done") }
▼ 実行
$ go run lesson.go process 0 process 2000 process 4000 process 6000 process 8000 process 10000 process 12000 process 14000 process 16000 process 18000 ################### Done
《解説》
5.fan-out fan-in
⑴
使用例
package main import "fmt" func producer(first chan int) { defer close(first) for i := 0; i < 10; i++ { first <- i } } func multi2(first <-chan int, second chan<- int) { // 「first <-chan」は送信用のchannel、「second chan<-」は受信用のchannelであることを表す。「<-」はなくてもOK。 defer close(second) for i := range first { second <- i * 2 } } func multi4(second chan int, third chan int) { defer close(third) for i := range second { third <- i * 4 } } func main() { first := make(chan int) second := make(chan int) third := make(chan int) go producer(first) go multi2(first, second) go multi4(second, third) for result := range third { fmt.Println(result) } }
▼ 実行
$ go run lesson.go 0 8 16 24 32 40 48 56 64 72
《解説》
producerのfor文でfirst channelが0を受け取り、multi2のfor文でfirst channelが受け取った0×2を行う。さらに、multi2のfor文の結果を受け取ったsecond channelの0を用いて、multi4で0×4を行い、main関数のresultとして0を出力する。その後も同様の流れで、producerのfor文の条件を満たすまで出力を繰り返す。
6.selectを用いたchannelの受信
複数のchannelをお互いにブロッキングしないようにしながら実行する
⑴
使用例
package main import ( "fmt" "time" ) func goroutine1(ch chan string) { for { ch <- "packet from 1" time.Sleep(3 * time.Second) } } func goroutine2(ch chan string) { for { ch <- "packet from 2" time.Sleep(1 * time.Second) } } func main() { c1 := make(chan string) c2 := make(chan string) go goroutine1(c1) go goroutine2(c2) for { select { case msg1 := <-c1: fmt.Println(msg1) case msg2 := <-c2: fmt.Println(msg2) } } }
▼ 実行
$ go run lesson.go packet from 2 packet from 1 packet from 2 packet from 1 packet from 2 packet from 1 packet from 2 packet from 1 packet from 2 packet from 1 packet from 2 packet from 1 : ※ 強制的に終了させるまで繰り返す
《解説》
以下のようにデータ型が異なるものや出力までの待ち時間が異なる場合でもOK。
package main import ( "fmt" "time" ) func goroutine1(ch chan string) { for { ch <- "packet from 1" time.Sleep(3 * time.Second) } } func goroutine2(ch chan int) { for { ch <- 100 time.Sleep(1 * time.Second) } } func main() { c1 := make(chan string) c2 := make(chan int) go goroutine1(c1) go goroutine2(c2) for { select { case msg1 := <-c1: fmt.Println(msg1) case msg2 := <-c2: fmt.Println(msg2) } } }
▼ 出力
100 packet from 1 100 100 packet from 1 100 100 100 packet from 1 100 : ※ 強制的に終了させるまで繰り返す
7.Default Selection と for break
⑴
使用例
package main import ( "fmt" "time" ) func main() { tick := time.Tick(100 * time.Millisecond) // 設定した時間ごとに実行 boom := time.After(500 * time.Millisecond) // 設定した時間経過後に実行 for { select { case <-tick: fmt.Println("tick.") case <-boom: fmt.Println("BOOM!") return default: fmt.Println(" .") time.Sleep(50 * time.Millisecond) } } }
▼ 実行
$ go run lesson.go . . tick. . . tick. . . tick. . . tick. . . BOOM!
《解説》
package main import ( "fmt" "time" ) func main() { tick := time.Tick(100 * time.Millisecond) boom := time.After(500 * time.Millisecond) for { select { case t := <-tick: // tを設定した場合 fmt.Println("tick.", t) case <-boom: fmt.Println("BOOM!") return default: fmt.Println(" .") time.Sleep(50 * time.Millisecond) } } }
▼ 出力
. . tick. 2020-02-25 17:36:31.852435353 +0000 UTC m=+0.100577828 . . tick. 2020-02-25 17:36:31.953306907 +0000 UTC m=+0.201449365 . . tick. 2020-02-25 17:36:32.053527741 +0000 UTC m=+0.301670329 . . tick. 2020-02-25 17:36:32.153991047 +0000 UTC m=+0.402133577 . . tick. 2020-02-25 17:36:32.252393581 +0000 UTC m=+0.500536048 BOOM!
《解説》
⑵ for break
for文を抜ける
使用例
package main import ( "fmt" "time" ) func main() { tick := time.Tick(100 * time.Millisecond) boom := time.After(500 * time.Millisecond) OuterLoop: // OuterLoopの名称は任意のものでOK for { select { case <-tick: fmt.Println("tick.") case <-boom: fmt.Println("BOOM!") break OuterLoop default: fmt.Println(" .") time.Sleep(50 * time.Millisecond) } } fmt.Println("##############") }
▼ 実行
$ go run lesson.go . . tick. . . tick. . . tick. . . tick. . . tick. BOOM! ##############
8.sync.Mutex
⑴
使用例
package main import ( "fmt" "sync" "time" ) type Counter struct { v map[string]int mux sync.Mutex } func (c *Counter) Inc(key string) { c.mux.Lock() defer c.mux.Unlock() c.v[key]++ } func (c *Counter) Value(key string) int { c.mux.Lock() defer c.mux.Unlock() return c.v[key] } func main() { c := Counter{v: make(map[string]int)} go func() { for i := 0; i < 10; i++ { c.Inc("Key") } }() go func() { for i := 0; i < 10; i++ { c.Inc("Key") } }() time.Sleep(1 * time.Second) fmt.Println(c, c.Value("Key")) }
▼ 実行
$ go run lesson.go {map[Key:20] {0 0}} 20
《解説》