CLOSE

Recommanded Free YOUTUBE Lecture: <% selectedImage %>

## 동시성

### Goroutines

개념적으로 고루틴(Gorutines)은 thread, coroutines, processes 등과 혼동될 수 있다. 용어에 따른 개념의 혼란을 피하기 위해서 "고루틴"이라는 새로운 용어를 만들었다고 한다. 실제 사용하다보면, 멀티 스레드, 멀티 프로세스 등과 비슷하면서도 해석과 사용에 미묘한 차이가 있음을 알 수 있다. 고루틴을 여러개 실행해 보면, 스레드 처럼 작동하지 않는다는 걸 알 수 있습니다. 수십개의 고루틴을 실행해도 단지 몇개의 스레드만 확인할 수 있다.

고루틴은 go 키워드를 이용해서 만들 수 있다. 아래의 코드를 보자.
```import (
"fmt"
)

func f(n int) {
for i := 0; i < 10; i++ {
fmt.Println(n, ":", i)
}
}
func main() {
go f(0)
var input string
fmt.Scanln(&input)
}```
이 프로그램은 두 개의 고루틴으로 구성된다. 첫번째 고루틴은 main 함수다. 두 번째 고루틴은 go f(0)을 호출 할 때 만들어진다. 일반적으로 우리가 함수를 호출하면, 호출한 함수에 있는 모든 코드를 실행하고 리턴 값을 받아야 다음 줄(코드)로 넘어간다. 고루틴은 리턴 값을 받지 않고 즉, 함수가 완료되는 걸 기다리지 않고 즉시 다음 코드로 넘어간다. 예제 코드에서 Scanln 함수를 호출한 이유다. Scanln 함수가 없다면, f() 함수가 끝나기 전에 main() 함수가 끝나버릴 거다. Thread와 유사함을 알 수 있다.

고루틴은 그게 몇 개든지 간단하게 만들 수 있다. 10개의 고루틴을 만들어보자.
```func main() {
for i := 0; i < 10; i++ {
go f(0)
}
var input string
fmt.Scanln(&input)
}```
그런데 f 함수가 너무 빨리 끝나버려서, 실제 고루틴이 제대로 작동하는지를 확인할수가 없다. time.Sleep함수와 rand.Intn을 이용해서, 테스트하기 쉬운 코드를 만들었다.
```package main

import (
"fmt"
"time"
"math/rand"
)

func f(n int) {
for i := 0; i < 10; i++ {
fmt.Println(n, ":", i)
amt := time.Duration(rand.Intn(250))
time.Sleep(time.Millisecond * amt)
}
}
func main() {
for i := 0; i < 10; i++ {
go f(i)
}
var input string
fmt.Scanln(&input)
}```
f 함수는 각각 0에서 10까지의 숫자를 출력하고, 0에서 250ms를 sleep 한다. 고루틴이 독립적으로 실행되는 걸 명확히 확인할 수 있을 것이다.

## Channels

고루틴은 기본적으로 독립적으로 작동하는 프로그램이다. 위 예제에서 f 고루틴들과 main고루틴은 서로에 대해서 완전히 무관심하다. 하지만 고루틴끼리 (마치 스레드끼리 통신을 해야 하는 것처럼)통신을 해야 하는 경우가 있다. 고루틴은 채널(channel)을 이용해서 정보를 주고 받을 수 있다.

아래는 간단한 채널 예제다.
```package main

import (
"fmt"
"time"
)

func pinger(c chan string) {
for i := 0; ; i++ {
c <- "ping"
}
}

func printer(c chan string) {
for {
msg := <-c
time.Sleep(time.Second * 1)
}
}

func main() {
var c chan string = make(chan string)
go pinger(c)
go printer(c)

var input string
fmt.Scanln(&input)
}```
이 프로그램은 키보드 입력을 받을 때까지, Ping을 무한대로 전송한다. Go 언어에서 채널은 고유 데이터 타입으로 chan키워드를 이용해서 만들 수 있다. 채널은 "메시지 를 주고 받기 위해서" 사용하기 때무에, 수신측과 송신측이 명시되기 마련이다. 채널로 부터 메시지를 주고 받기 위해서는 <-키워드를 사용한다. 키워드의 사용방법은 직관적이다. 채널로 메시지를 보내기 위해서는 c <- "ping", 채널로 부터 메시지를 받기 위해서는 msg := <-c 하면된다.

아래와 같이 채널로 부터 메시지를 직접 읽는 방법도 있다.
`fmt.Println(<-c)`

채널은 두 개의 고루틴을 동기화하는 역할도 수행한다. pinger는 printer가 메시지를 읽을 준비가 되야지 기다린다.(메시지가 준비될 때까지 blocking 된다). ponger 라는 다른 함수가 추가된다면 어떻게 될지 테스트 해보자.
```func ponger(c chan string) {
for i := 0; ; i++ {
c <- "pong"
}
}

func main() {
var c chan string = make(chan string)
go pinger(c)
go ponger(c)
go printer(c)

var input string
fmt.Scanln(&input)
}```
"ping"과 "pong"이 번갈아 출력되는 걸 확인할 수 있다. 멀티 스레드 프로그램의 경우 두 개이 상의 스레드가 하나의 데이터 영역을 제어하려고 하면 경쟁조건이 걸리는데, ping/pong 메시지가 서로 번갈아 가면서 예쁘게 출력되는 걸 확인할 수 있다. 접근 제어를 하는 것 같은데, 살펴봐야 하겠다.

### Channel Direction

채널 타입은 "<-" 키워드를 이용해서, 메시지를 수신할 것인지 아니면 송신할 것인지에 대한 방향을 명시적으로 지정할 수 있다. pinger 함수를 아래와 같이 바꿔보자.
`func pinger(c chan<- string)`
이제 c는 단시 전송 전용으로만 사용할 수 있다. 만약 함수내에서 c를 수신용으로 사용하려고 하면 컴파일 에러가 발생한다.

printer도 수신 전용으로 바꿔보자.
`func printer(c <-chan string)`

### Select

두 개 이상의 채널을 가진 복잡한 프로그램을 개발 할 경우, "select"를 이용해서 채널을 분류할 수 있다. 문법이 switch와 비슷해서 사용하기도 쉽다.
```package main

import (
"fmt"
"time"
)

func main() {
c1 := make(chan string)
c2 := make(chan string)

go func() {
for {
c1 <- "From 1"
time.Sleep(time.Second * 2)
}
}()
go func() {
for {
c2 <- "From 2"
time.Sleep(time.Second * 3)
}
}()

go func() {
for {
select {
case msg1 := <-c1:
fmt.Println(msg1)
case msg2 := <-c2:
fmt.Println(msg2)
}
}
}()

var input string
fmt.Scanln(&input)
}```
3 개의 anonymouse 함수가 있다. 하나는 2초 간격으로 "From 1"메시지를 c1 채널로 전송한다. 다른 하나는 3초 간력으로 "From 2"메시지를 c2 채널로 전송하고 있다. 마지막 anonymouse 함수는 select를 이용해서 c1, c2 어느 채널에서 발생한 메시지인지를 분류한 다음 처리한다.

select는 "타임아웃" 구현에 사용할 수도 있다.
```select {
case msg1 := <- c1:
fmt.Println("Message 1", msg1)
case msg2 := <- c2:
fmt.Println("Message 2", msg2)
case <- time.After(time.Second):
fmt.Println("timeout")
}```
time.After는 매개변수로 주어진 시간이 지나면, 채널을 통해서 Time 메시지를 전송한다.

### Buffered Channels

make의 두 번째 매개변수로 채널의 크기를 설정할 수 있다.
`c := make(chan int, 1)`
여기에서는 채널의 버퍼의 크기를 1로 했다. 일반적으로 채널은 동기적(synchronous)으로 작동한다. 즉 메시지를 받는 쪽은 메시지가 준비될 때까지 기다리므로서, 서로의 메시지 시간을 맞춘다. 반면 버퍼드 채널은 비동기적(asynchronous)하게 작동한다. 메시지를 보내고 받는 쪽은 채널이 메시지로 가득차기를 기다리지 않는다.

## Go Concurrency Patterns 들

Channel을 이용한 동시성 구현에 대해서 간단히 알아봤다. 한 발 더 나아가서 자주 사용되는 동시성 패턴들을 예제를 통해서 살펴보려한다.

### Pipeline

파이프라인은 동시성을 제어하는 프로그램에서 사용하는 여러 패턴 중 하나다. 보통은 pipelineprocessing design pattern이라고 부른다. Go언어 고유의 패턴은 아닌 셈이다. 파이프라인 패턴은 channel로 연결된 여러 개의 stage의 모음으로 구성이된다. 데이터를 받은 stage는 이 데이터를 처리한 다음, 그 결과를 channel을 통해서 다음 stage로 전달하는 방식으로 작동한다. 1. 가장 위에 있는 stage(stage 1)은 채널을 이용해서 데이터를 읽는다.
2. 이 데이터를 처리해서 결과 값을 만든다.
3. 이 결과 값을 아래에 있는(downstream) stage(stage 2)의 채널에 데이터를 쓴다.

#### 예제 : 제곱

파이프라인 패턴을 이용한 제곱연산 프로그램을 만들어보자.

첫번째 stage에는 gen 함수가 있다. 이 함수는 연산에 사용할 데이터(int형 숫자)를 만드는 일을 한다. gen 함수는 두번째 stage의 함수에게 데이터를 넘겨주기 위한 채널을 리턴한다.
```func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}```

두번째 stage에는 sq 함수가 자리잡고 있다. sq는 입력 채널에서 데이터를 읽은 다음, 제곱연산한 결과를 출력 체널로 보낸다. 물론 입력체널은 gen 함수에서 생성한 채널이다.
```func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}```

마지막 stage에는 main 함수가 있다. main 함수는 sq의 채널로 부터 메시지를 읽어서 화면에 출력하는 일을 한다. 아래는 3개의 stage를 포함한 완성된 프로그램이다.
```package main

import (
"fmt"
)

func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
c := gen(2, 3)
out := sq(c)

fmt.Println(<-out)
fmt.Println(<-out)
}```

main 함수를 깔끔하게 정리했다.
```func main() {
for n := range sq(gen(2, 3)) {
fmt.Println(n)
}
}```
sq의 sq 적용
```func main() {
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n)
}
}```

#### Fan-out, fan-in

하나의 채널에서 다수의(multiple) 함수가 (채널이 닫힐 때까지)메시지를 읽어서 처리하는 방식을 fan-out방식이라고 한다. 이 방식은 다수의 워커(worker)에서 Job을 분산처리하기 위한 목적으로 사용한다. 하나의 채널을 이용해서 다수의 고루틴으로 부터 데이터를 읽어서 처리하는 방식을 fan-in이라고 한다.

위의 예제 프로그램을 두 개의 sq함수가 하나의 채널을 이용해서 처리하는 fan-out방식으로 바꿔보자.
```import "sync"
......
......
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// 입력 채널로 부터 데이터를 읽는 고루틴을 만든다.
// 코드의 마지막에 wg.Done()을 추가해서, 작업을 완료했음을 알린다.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}

// 채널의 갯수 2를 등록한다.
// 채널 갯수 만큼 output 고루틴을 실행한다.
for _, c := range cs {
go output(c)
}

// wg.Wait()를 이용해서 등록된 고루틴이 모두 끝나기를 기다린다.
// wg.Add로 2를 등록했으니, 카운트가 0이 되면 종료한다.
// wg.Add로 + 카운트 한걸, wg.Done()으로 - 카운트한다.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := gen(2, 3)
c1 := sq(in)
c2 := sq(in)

for n := range merge(c1, c2) {
fmt.Println(n)
}
}```
채널은 반드시 고루틴들이 모두 끝난 다음에 닫아야 한다. 아직 고루틴이 실행 중인데 채널이 닫혀버리면, 연산실패가 발생할 거다. 이 문제는 고루틴들이 모두 끝난 걸 기다린 다음, 채널을 닫는 것으로 해결할 수 있다. sync.WaitGroup를 이용해서 고루틴들을 동기화 할 수 있다. sync.WaitGroup은 "1. 동기화 해야 하는 채널을 관리 그룹에 등록한다. 2. 채널이 끝날 때까지 기다린다."방식으로 작동한다.

#### Stopping short

파이프라인 함수의 패턴을 정리해 보자.
• 메시지를 보내는 작업이 모두 끝나면 출력 채널응 닫고 stage가 종료 된다.
• 입력 채널로 부터 데이터를 읽는 stage는 채널이 닫힐 때까지 작업을 수행한다.
이 방식은 입력과 출력이 모두 제대로 처리된다고 가정하면 아주 깔끔하게 작동할 것이다.

하지만 실제 파이프라인에서는 각 스테이지의 이상 작동이나 혹은 다른 필요로, 다음 스테이지로 값을 넘기지 않는(혹은 못하는) 경우가 생길 수 있다. 이 경우 도착하지 않는 값을 무한히 기다리는 함수가 생길 수 있는데, 메모리 누수로 이어지기 때문에 정리를 해줘야 한다. 고루틴에 대해서는 가비지콜렉터가 작동하지 않는다

아래의 코드를 보자.
```out := merge(c1, c2)
fmt.Println(<-out)
return
// 두번째 값은 받지 않았음.
// 따라서 첫번째 고루틴은 hung 상태가 됨.```

채널마다 고루틴이 만들어지는게 문제이니, 새로운 고루틴이 만들어지지 않도록 코드를 만들면 된다. Buffered channel을 이용하자.
```func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}```

merge한수는 채널의 크기를 1로 잡아도 된다.
```func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1)
// ......```
작동은 하겠지만 프로그램내에 "blocked groutine"이 생성되므로 좋은 코드라고 할 수 없다.

### Explicit cancellation

상위 stage의 고루틴을 "명시적(explicit)"으로 취소하는게 가장 확실한 방법이다.
```package main

import "fmt"
import "sync"

func sq(done <-chan int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
func gen(done <-chan int, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
//close(out)
}()
return out
}

func merge(done <-chan int, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
fmt.Println("cancel")
return
}
}
}

for _, c := range cs {
go output(c)
}

go func() {
wg.Wait()
close(out)
}()
return out
}

func main() {
done := make(chan int)
defer close(done)

in := gen(done, 2, 3)

c1 := sq(done, in)
c2 := sq(done, in)

out := merge(done, c1, c2)
fmt.Println(<-out)
}```

## 예제 : MD5 해쉬

여러개의 파일에서 MD5 해쉬 값을 추출하는 프로그램을 만들어보자.

### 단순한 예제

MD5는 메시지 축약 알고리즘으로 파일의 checksum 데이터등을 만들 때 사용한다. 리눅스에서는 md5sum 명령을 이용해서 파일에 대한 md5 메시지를 만들 수 있다.
```# md5sum *.go
c0fcdeb08426acc1644c165bd40b5aa7  ping.go
8a5567eaf2b1061017f9b2d4213f05ea  rsa.go
md5sum과 비슷한 일을 하는 프로그램을 만들려고 한다. 이 프로그램의 이름은 serial.go로 파일이름 대신 디렉토리 이름을 받아서, 디렉토리에 있는 모든 파일들에 대해서 md5 알고리즘을 적용한다. 적용한 결과는 정렬(sort)해서 출력한다.
```# go run serial.go .
8a5567eaf2b1061017f9b2d4213f05ea  rsa.go
c0fcdeb08426acc1644c165bd40b5aa7  ping.go

아래는 완전한 코드
```package main

import (
"crypto/md5"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
)

func main() {
// 디렉토리 밑에 있는 모든 파일들에 대해서 MD5 연산을 한다.
m, err := MD5All(os.Args)
if err != nil {
fmt.Println(err)
return
}

var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}

// 디렉토리에 있는 파일들을 모두 읽어서 MD5 연산을 하고
// 그 결과를 map 자료형으로 반환한다.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}```
테스트

### 복잡한 예제 : Parallel digestion

parallel.go는 MD5All을 두 개의 stage로 나누고 파이프라인으로 연결했다. 첫번째 state는 sumFile로 디렉토리에 있는 각각의 파일마다 고루틴을 만들어서 MD5 연산을 하고, 그 결과를 result struct 데이터 타입으로 채널로 전송한다.
```type result struct {
path string
sum [md5.size]byte
err error
}```

sumFile은 두 개의 채널을 반환한다. 첫번째는 result 이고 다른 하나는 에러를 반환하기 위해서 사용한다. 설명은 주석으로 대신한다.
```func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// 각각의 일반 파일들에 대해서 고루틴을 수행해서 MD5를 계산한 후 c 채널로 전송한다.
// 에러는 errc 채널로 전송한다.
c := make(chan result)
errc := make(chan error, 1)

// filepath.Walk를 이용해서 파일들을 MD5 연산을 한다.
// 여기에서 블럭되면 안되므로 고루틴으로 작성한다.
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
// MD5 연산을 한다.
// 고루틴을 이용 여러 파일에 대해서 동시에 MD5 연산을 한다.
go func() {
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// 고루틴들이 모두 끝나길 기다린다.
go func() {
wg.Wait()
close(c)
}()
errc <- err
}()
return c, errc
}```

MD5All은 c 채널로 부터 파일과 MD5 축약 값을 전송 받는다. 어떤 이유로 MD5All이 종료하면 defer를 이용해서, done을 닫는다.
```func MD5All(root string) (map[string][md5.Size]byte, error) {
// 채널 c와 errc로부터 메시지를 모두 받기전에 에러가 발생할 경우
// done 채널을 닫는다.
done := make(chan struct{})
defer close(done)

c, errc := sumFiles(done, root)

m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}```

### Bounded parallelism

parallel.go는 각 파일에 대해서 고루틴을 실행한다. 멀티 코어 CPU 자원을 모두 활용하기 때문에 빠르게 작동할 거다. 하지만 디렉토리에 많은 수의 파일이 있을 경우, 메모리를 지나치게 많이 소모하는 등 오히려 비효율적으로 작동할 수도 있다.

그래서 동시에 수행할 수 있는 고루틴의 갯수를 제한하기로 했다. 이 프로그램의 이름은 bounded.go로 고정된 숫자의 고루틴을 만든다. 이 프로그램은 1. walk the tree, 2. read and digest the files, 3. collect the digest 3개의 스테이지로 구성된다.
1. walkFiles : filepath.Walk 하면서, MD5 처리할 파일이름을 읽어서 채널로 전송한다.
2. digester : 채널로 부터 파일을 읽어서 MD5 축약한다.
3. MD5All : digester를 정해진 갯수만큼만 실행한다.
```package main

import (
"crypto/md5"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
)

type result struct {
path string
sum  [md5.Size]byte
err  error
}

// 디렉토리로 부터, 일반 파일을 읽어서 string channel로 보낸다.
// 에러는 에러 채널로 보낸다.
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() { // HL
// 고루틴이 끝난 후에는 채널을 닫는다.
defer close(paths) // HL
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path: // HL
case <-done: // HL
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}

// 채널로 부터 읽은 파일의 내용을 MD5 축약한다.
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths { // HLpaths
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}

func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)

paths, errc := walkFiles(done, root)

// Start a fixed number of goroutines to read and digest files.
c := make(chan result) // HLc
var wg sync.WaitGroup
const numDigesters = 20
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c) // HLc
wg.Done()
}()
}
go func() {
wg.Wait()
close(c) // HLc
}()
// End of pipeline. OMIT

m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil { // HLerrc
return nil, err
}
return m, nil
}

func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args)
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x  %s\n", m[path], path)
}
}```