Golang Cafe #3 まとめ

2013/11/10に開催された「Golang Cafe #3」についてのまとめです。
今回のテーマは並列処理(Goroutineとchannel)で、下記書籍にて予習をしてから臨みました。

基礎からわかる Go言語

基礎からわかる Go言語


本日のお題はこちらになります。+TakashiYokoyama氏が作成してくれています。git cloneして、各章ごとにtagが付けてあるので切り替えて参照します。

Step1、Step2

goroutineの基本的な使い方になります。
関数を呼び出す際にgoを付けます。通常の関数でも無名関数でも構いません。

package main

import (
    "fmt"
)

func main() {
    go func() {
        fmt.Println("Goroutineの処理")
    }()

    go doSomething(1)
}

func doSomething(a int) int {
    fmt.Printf("%d", a)

    return a
}

関数に引数を渡すことは出来ますが、戻り値を取得or参照することは出来ません。

Step3、Step4

goroutineの終了待ちをします。Step1、Step2のサンプルは実行後何も表示されずに終了します。メインスレッドがgoroutineを呼び出した後そのまま終了してしまうためです。そこでgoroutineの終了待ちをするためにchan(チャンネル)を使用します。

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)

    go doSomething(ch)

    <-ch
}

func doSomething(ch chan int) {
    fmt.Println("Goroutineの処理")
    ch <- 1
}

make関数で型を指定してチャンネルを作成します。そして作成したチャンネルをgoroutineで実行する関数に渡します。<-chはチャンネルから値を受信(取得)します。ch<-はチャンネルに値を送信(設定)します。
goroutine側の引数定義でchan<-としておくと関数内では送信専用のチャンネルとなり、関数内で受信しようとするとビルドの時点で怒られます。
チャンネルから値を受信しようとした時、チャンネル内に値がない場合は受信できるようになるまで処理がブロックされます(受信はしようとするが結果が戻らないので待ちます)。
goroutineで実行する関数の処理が終わったタイミングでチャンネルに値を送信することで、待っていた受信処理に結果が戻りメインスレッド側の処理が再開されます。

Step5

チャンネルにバッファサイズを指定します。Step3、Step4のサンプルではmake関数でチャンネルを作成する際、型のみを指定していましたが、型とバッファサイズを指定することが出来ます。
バッファサイズを指定すると、バッファが一杯になるまで値を送信することができ、バッファが一杯になると値を送信可能になるまで(バッファに空きができるまで)処理がブロックされます(送信処理が完了しません)。
また受信側はバッファが空になるまで値を受信することができ、バッファが空になると値を受信可能になるまで処理がブロックされます。

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int, 10)

    for i := 1; i < 11; i++ {
        go doSomething(i, ch)
    }

    for no := range ch {
        fmt.Printf("goroutine[%02d] Finished.\n", no)
    }
}

func doSomething(no int, ch chan int) {
    for i := 0; i < 10; i++ {
        fmt.Printf("Goroutineの処理[%02d]\n", no)
    }

    ch <- no

    if no == 10 {
        close(ch)
    }
}

このサンプルではバッファサイズ10のチャンネルを作成し11個のgoroutineを実行します。

for no := range ch {

と書くことでチャンネルに送信された値を順次受信することができます。バッファが空になった場合は待ち続けます。
このままではプログラムが永遠に終了しないので、11回目のスレッドで

close(ch)

としチャンネルをクローズします。チャンネルがクローズされると受信のブロックが解除され、またチャンネルからの受信も終了しforのループを抜けます。
チャンネルをクローズした後はチャンネルに対して送信できません。

Step6、Step7

Step5のサンプルでは1つのチャンネルを使いまわしましたが、今回は複数のgoroutineに対して別々のチャンネルを使用してメインスレッドで待ち受けます。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)

    go process1(ch1)
    go process2(ch2)
    go process3(ch3)

//L:
    for {
        select {
            case res1 := <-ch1:
                fmt.Printf("process1 Finished[%d]\n", res1)
            case res2 := <-ch2:
                fmt.Printf("process2 Finished[%d]\n", res2)
            case <-ch3:
                fmt.Println("Finish!")
                return
                //break L
            default:
                fmt.Println("該当しない場合はここを通る")
                time.Sleep(10000000)
        }
    }

}

func process1(ch chan int) {
    for i := 0; i < 10; i++ {
        fmt.Printf("process1: [%02d]\n", i)
    }

    ch <- 1
}

func process2(ch chan int) {
    for i := 0; i < 10; i++ {
        fmt.Printf("process2: [%02d]\n", i)
    }

    ch <- 2
}

func process3(ch chan int) {
    time.Sleep(5000000000)
    ch <- 3
}

selectを使いcase句にチャンネルの受信処理を書くことで、それぞれのチャンネルを受信した時の処理を書くことができます。
defaultを追加すると、チャンネルからの受信を待っている間定期的にdefaultに書いた処理が実行されます。
このサンプルではprocess3が実行された後5秒後にメインスレッド側でreturnが呼ばれ処理が終了します。またコメントアウトされていますが、breakでラベルをしてしてforループを抜けることもできます。
今回のサンプルでは明示的に使用したチャンネルをcloseしていませんが即終了しているので問題はないようです。

Step8

timeパッケージAfter()Tick()を使用したサンプルです。それぞれともにチャンネルを戻り値としていますが、Afterは指定時間後、Tickは指定間隔ごとにその時間をチャンネルに対して送信します。このチャンネルを待ち受けることで指定時間後、指定間隔後に処理を実行することができます。

for {
    select {
        case res1 := <-ch1:
            fmt.Printf("process1 Finished[%d]\n", res1)
        case res2 := <-ch2:
            fmt.Printf("process2 Finished[%d]\n", res2)
        case <- time.After(5 * time.Second):
            fmt.Println("Finish!")
            return
        //case <- time.Tick(3 * time.Second):
        //    fmt.Println("Tick!")
    }
}

ただし、selectのcase句でtime.After()またはtime.Tick()を使用する場合は注意が必要です。
上記のソースコードではtime.Tick()の部分をコメントアウトしていますが、time.After()とtime.Tick()を同時にcase句で直接使用した場合には期待した動作をしません。
これを回避するには以下のように修正します。

chAfter := time.After(5 * time.Second)
chTick := time.Tick(3 * time.Second)

for {
    select {
        case res1 := <-ch1:
            fmt.Printf("process1 Finished[%d]\n", res1)
        case res2 := <-ch2:
            fmt.Printf("process2 Finished[%d]\n", res2)
        case <- chAfter:
            fmt.Println("Finish!")
            return
        case <- chTick:
            fmt.Println("Tick!")
    }
}

time.After()とtime.Tick()をcase句で直接使用せず、selectのスコープの外で定義すると期待する動作をするようです。

Step9

今までのサンプルは実行すれば分かるように同時には1つのgoroutineしか実行されていません。Go言語ではデフォルトでは1スレッドしか実行できないようです。同時に実行するスレッド数を増やすには、環境変数にGOMAXPROCSを追加するか、プログラム中でruntime.GOMAXPROCS()を使用します。
例えば下記のように、

num := runtime.GOMAXPROCS(2)

のようにしてgoroutineを呼び出す前に設定します。ちなみに戻り値は変更前の値になります。

その他

チャンネルを用いればgoroutineの同期や制御を簡単に行えますが、より高度に制御するにはsyncパッケージを使用するとのことです。一度みてみたいと思います。