Recommanded Free YOUTUBE Lecture: <% selectedImage[1] %>

Thread Pool

멀티 쓰레드 프로그램의 성능을 높이기 위해서 미리 쓰레드를 준비해서, 쓰레드를 할당하는, 쓰레드 풀 방식이 널리 사용됩니다. 비교적 구현이 간단하고, 다양한 응용이 가능하기 때문입니다.

예전에 리얼 타임 시그널을 이용한 Thread Pool구현을 다루었는데, 이번에는 조건변수(:12)를 이용한 구현을 알아보도록 하겠습니다. 쓰레드 풀과 조건변수에 대한 일반적인 설명은 링크들을 참고하시기 바랍니다.

아이디어

저는 지금 다중 접속 네트워크 프로그램을 만들려고 하고 있습니다. 이 네트워크 프로그램은 웹 서버처럼 연결과 종료가 빈번한 프로그램입니다. 처음엔 클라이언트 연결 당 하나의 쓰레드를 생성하는 방식을 사용했습니다. 그런데, 초당 연결 클라이언트의 수가 늘어나자 성능이 눈에 띄게 떨어지기 시작했습니다.

아무래도 쓰레드 생성에 너무 많은 비용이 소모되는 것 같군요. 그래서 스레드 풀 방식으로 바꾸기로 했습니다.

지정된 수 만큼 미리 자식 쓰레드를 만들어 놓고, 클라이언트 연결이 들어오면 노는 자식 쓰레드를 깨워서 작업을 할당하는 방식입니다. 쓰레드 생성을 할 필요가 없으니, 성능 향상을 꾀할 수 있겠죠.!?

빨간색 쓰레드는 작업중인 쓰레드이고, 노란색 쓰레드는 노는 쓰레드 입니다. accept() 함수로 클라이언트 연결을 가져오면, 노는 쓰레드 중 하나를 깨웁니다. 깨울 때, 클라이언트 연결 소켓과 작업에 필요한 정보를 함께 넘겨야 하겠죠!

구현

이제 구현을 해볼 차례 군요. 저는 C++을 이용해서 구현할 계획입니다. 부모 쓰레드와 자식 쓰레드간 메서드와 속성을 교환하면 좀 더 쉽게 작업 제어가 될 것 같아서 말입니다.

조건변수로 기다리기

쓰레드 풀 구현에서 가장 중요한 요소 중 하나는 "작업 지시가 있을 때까지 봉쇄되도록"구현하는 것입니다. 여기에서는 조건변수를 이용하기로 했습니다. 자식 쓰레드는 pthread_cond_wait(:3)함수로 부모 쓰레드가 깨울 때까지 기다립니다. 부모 쓰레드가 깨우는 시점은 accept(:2)함수가 반환하는 시점으로, pthread_cond_signal(:3)함수를 이용해서 자식 스레드를 깨웁니다.

pthread_cond_wait 함수는 시그널이 있을 때까지, 봉쇄되므로 "작업지시가 있을 때까지 봉쇄돌 것"이라는 조건은 일단 만족을 시켰습니다.

또 하나 만족시켜야 할 조건으로 "봉쇄가 풀려서 작업중일 때는 이 자식 쓰레드를 호출하지 않도록 할 것"입니다. 이것도 간단하게 해결할 수 있습니다. 조건변수는 뮤텍스(:12)와 함께 사용되기 때문이죠. pthread_cond_wait 함수는 매개변수로 조건변수와 함께 뮤텍스를 사용합니다. _cond_wait함수를 호출하면 매개변수로 주어진 뮤텍스의 잠금을 자동으로 돌려줍니다. 그러다가 시그널을 받고 반환하면, 뮤텍스 잠금을 얻게 됩니다. 즉 작업영역을 임계영역으로 하는 거죠.

그러므로 부모 프로세스는 단지 자식 쓰레드의 뮤텍스 잠금을 조사하는 것만으로도 사용할 수 있는 뮤텍스인지 아닌지 확인할 수 있습니다. pthread_mutex_trylock(:3)함수로 바로 잠금을 얻고 진입이 가능하면, 놀고 있는 자식 쓰레드임을 의미하는게 되니까요.

조건변수 하나로 "작업 지시가 있을 때까지 봉쇄", "작업 중 진입금지"까지 한번에 끈나버리는 군요.

자료 구조

이 자료구조는 부모 쓰레드와 자식 쓰레드가 공유합니다. 뮤텍스와 조건변수, 소켓정보등이 들어갑니다. 자식 쓰레드마다 각각의 뮤텍스와 조건변수, 소켓정보를 가져야 하므로 자식 쓰레드 갯수만큼 만들어서 관리해야 합니다.

자료구조는 대략 다음과 같을 겁니다.
class ThreadInfo
{
	private:
		pthread_cont_t mcond; 
		pthread_mutex_t mmutex;
		int msocket;
	public:
		ThreadInfo(pthread_cond_t, pthread_mutex_t);
		int TryLock();
        int Job();
		int UnLock();
};
  1. mcond : 스레드가 사용할 조건변수
  2. mmutex : 스레드가 사용할 뮤텍스
  3. TryLock() : 조건변수와 함께 사용할 뮤텍스 잠금을 얻기위한 메서드
  4. Job() : 자식 쓰레드가 수행할 코드를 가진 메서드
  5. UnLock() : 자식 쓰레드가 작업을 마치고 잠금을 되돌려주기 위해서 사용한다.
이 자료구조는 자식 쓰레드 갯수 만큼 생성해야 하므로 배열로 관리해야 합니다. STL(:12) vector로 관리하기로 했습니다.
vector<ThreadInfo> ThreadList;

쓰레드 풀 프로세스

쓰레드 풀 프로세스는 다음과 같습니다. 네트워크 서버
  1. 쓰레드 풀을 만든다.
  2. 지정된 갯수 만큼의 쓰레드 풀을 만든다. 메인쓰레드와 자식쓰레드는 ThreadInfo 클래스로 조건변수와 뮤텍스를 공유한다. 자식 쓰레드는 조건변수와 뮤텍스를 초기화 하고, Job메서드를 실행한다. Job 메서드는 대략 다음과 같을 것이다.
    Job()
    {
    	GetLock(&mutex);                         // 먼저 잠금을 얻는다.
    	while(1)
    	{
        	// 조건 변수로 기다린다. 조건 변수를 기다리면서 mutex 잠금을 내놓으면,
            // 부모 쓰레드는 mutex 잠금을 얻을 수 있는 상태가 되는데,
            // 이는 선택한 자식 쓰레드가 작업을 할 수 있는 상태를 의미한다. 
            pthread_cond_wait(&cond, &mutex);       
            // 부모 쓰레드가 cont_signal을 전송하면, _cond_wait는 반환하고
            // 잠금을 얻는다.  
        
            /*
             * 작업을 한다.
             */
       
            // 작업을 마치면, while 문 처음으로 가서 pthread_cond_wait를 호출한다. 
        }
    }
  3. 작업을 하는 동안은 자식 쓰레드(:12)가 뮤텍스(:12) 잠금을 얻은 상태이므로 부모는 TryLock 메서드를 호출하는 것으로 이 쓰레드가 작업 중임을 알 수 있다.
  4. 작업을 마치면 다시 조건변수에서 기다린다.
  5. 반복...
이 방식은 자식 쓰레드가 잠금 권한을 가지고 실행하기 때문에, 동기화 문제에서 자유롭다는 장점을 가진다. 자식 쓰레드는 단지 pthread_cond_wait를 호출 할 때만 잠금을 내놓는다.

셈플 코드

테스트를 위한 코드. 잘된다. 설명은 주석으로
#include <pthread.h>
#include <iostream>
#include <vector>

#include <errno.h>

#define THREAD_POOL_SIZE 5

using namespace std;

class Thread
{
	private:
		pthread_mutex_t mmutex;
		pthread_cond_t mcond;
	public:
		int id;
		Thread();
		int GetLock();
		int TryLock();
		int UnLock();
		int Job();
		int Signal();
};

// 뮤텍스와 조건변수 초기화 
Thread::Thread()
{
	pthread_mutex_init(&mmutex, NULL);
	pthread_cond_init(&mcond, NULL);
}

// 뮤텍스 자금 얻기
int Thread::GetLock()
{
	return pthread_mutex_lock(&mmutex);
}

// 모텍스 잠금 얻기 시도
int Thread::TryLock()
{
	return pthread_mutex_trylock(&mmutex);
}

// 조건변수에 시그널을 전송한다.
// 시그널을 전송한 후에는 뮤텍스 잠금을 되돌려준다.
int Thread::Signal()
{
	pthread_cond_signal(&mcond);
	UnLock();
}

// 뮤텍스 잠금 되돌려준다.
int Thread::UnLock()
{
	pthread_mutex_unlock(&mmutex);
}

// 자식 쓰레드에서 실행할 작업 메서드
int Thread::Job()
{
	GetLock();
	while(1)
	{
		pthread_cond_wait(&mcond, &mmutex);
		// 임계 영역 -----------------------
        // 작업영역으로 자식쓰레드가 이 영역에 머물러 있을 때에는
        // 부모 쓰레드가 TryLock로 잠금을 얻지 못한다.
		cout << "Job Start " << id << endl;
		sleep(3);
		cout << "Job End " << id << endl;
        // ---------------------------------
	}
}

// 쓰레드 함수
void *thread_func(void *arg)
{
	Thread *lThread = (Thread *)arg;
	lThread->Job();
}

int main(int argc, char **argv)
{
	int i = 0;
	pthread_t p_thread;
	vector<Thread *> ThreadList;
	Thread *lThread;

	// 쓰레드 풀을 만든다.
	for(i = 0; i < THREAD_POOL_SIZE; i++)
	{
		lThread = new Thread();
		lThread->id = i+1;
		ThreadList.push_back(lThread);
		pthread_create(&p_thread, NULL, thread_func, (void *)lThread);
		usleep(100);
	}

	int mstat;
	while(1)
	{
		// 작업 가능한 쓰레드를 찾아서
		// 조건변수 시그널을 전송한다.
		for (i = 0; i < THREAD_POOL_SIZE; i++)
		{
			if((mstat = ThreadList[i]->TryLock()) == 0)
			{
				ThreadList[i]->Signal();
				break;
			}
		}
		sleep(1);
	}
}

무.. 문제.

위 프로그램은 잘 돌아갈 것 같지만 몇 가지 문제점이 있다.
  1. pthread mutex의 잠금은 잠금을 얻은 스레드에서 잠금을 되돌려줘야한다. 잠금을 다른 스레드에서 해제할 경우의 상황은 정의되어 있지 않다. 어떤 일이 발생할지 모른다.
    • 일단 잠금을 얻은 스레드에서 잠금을 해제 하도록 수정해야 한다.
  2. 조건변수를 제대로 사용하기 위해서는, 반드시 wait 함수로 먼저 기다려야 한다. 그러지 않을 경우 신호를 읽어 버린다. 그러므로 반드시 wait 함수가 먼저 호출 되도록, 즉 자식 스레드가 반드시 먼저 뮤텍스를 얻도록 장치를 마련해야 한다. flag 값을 검사하는 정도로 될 것 같다.
다음은 이러한 문제를 보강한 코드다. 테스트를 위한 코드도 추가했다.
#include <pthread.h>
#include <iostream>
#include <vector>
#include <stdio.h>
#include <string.h>

#include <errno.h>

#define THREAD_POOL_SIZE 5

using namespace std;

class Thread
{
	private:
		pthread_mutex_t mmutex;
		pthread_cond_t mcond;
		int value;
	public:
		int id;
		int LockFlag;
		Thread();
		int GetLock();
		int TryLock();
		int UnLock();
		int Job();
		int Signal();
		void SetValue(int a)
		{
			value = a;
		}
};

// 뮤텍스와 조건변수 초기화 
Thread::Thread()
{
	LockFlag = 1;
	value = 0;
	pthread_mutex_init(&mmutex, NULL);
	pthread_cond_init(&mcond, NULL);
}


// 모텍스 잠금 얻기 시도
int Thread::TryLock()
{
	int rtv;
	if(LockFlag != 1)
	{
		return -1;
	}
   	LockFlag = 0;
	rtv = pthread_mutex_lock(&mmutex);
	return 0;
}

// 조건변수에 시그널을 전송한다.
// 시그널을 전송한 후에는 뮤텍스 잠금을 되돌려준다.
int Thread::Signal()
{
	pthread_cond_signal(&mcond);
	UnLock();
}

// 자식 쓰레드에서 실행할 작업 메서드
int Thread::Job()
{
	char fname[30];
	FILE *fp;
	sprintf(fname, "Walk_%d.txt", id);
	fp = fopen(fname,"w");
	while(1)
	{
		pthread_mutex_lock(&mmutex);

		LockFlag = 1;		
		pthread_cond_wait(&mcond, &mmutex);
		LockFlag = 0;
		
		cout << "Walk " << id << endl;
		fputs("Job\n", fp);
		usleep(1000);
		pthread_mutex_unlock(&mmutex);
	}
}

// 뮤텍스 잠금 되돌려준다.
int Thread::UnLock()
{
	pthread_mutex_unlock(&mmutex);
}


// 쓰레드 함수
void *thread_func(void *arg)
{
	Thread *lThread = (Thread *)arg;
	lThread->Job();
}

int main(int argc, char **argv)
{
	int i = 0;
	int mstat;
	pthread_t p_thread;
	vector<Thread *> ThreadList;
	Thread *lThread;
	pthread_mutex_t mutex_lock;
	pthread_mutexattr_t attr;
	int kind;



	// 쓰레드 풀을 만든다.
	for(i = 0; i < THREAD_POOL_SIZE; i++)
	{
		lThread = new Thread();
		lThread->id = i+1;
		ThreadList.push_back(lThread);
		pthread_create(&p_thread, NULL, thread_func, (void *)lThread);
		usleep(100);
	}

	FILE *fp;
	fp = fopen("Job.txt","w");
	int j = 0;
	while(1)
	{
		// 작업 가능한 쓰레드를 찾아서
		// 조건변수 시그널을 전송한다.
		for (i = 0; i < THREAD_POOL_SIZE; i++)
		{
			if((mstat = ThreadList[i]->TryLock()) == 0)
			{
				cout << "Job Signal " << i + 1<< endl;
				fputs("Job Signal\n", fp);
				ThreadList[i]->SetValue(i+1);
				ThreadList[i]->Signal();
				break;
			}
		}
		j ++;
		if(j == 10000) break;
		usleep(10);
	}
}