관리 메뉴

Value Creator의 IT(프로그래밍 / 전자제품)

Chapter 18 멀티 쓰레드 기반의 서버 구현 본문

1. 프로그래밍/4) Network

Chapter 18 멀티 쓰레드 기반의 서버 구현

valuecreatort 2019. 11. 11. 18:53
반응형

쓰레드란?

1. 경량화된 프로세스(컨텍스트 스위칭이 빠르다.)

2. 쓰레드끼리 메모리 공유가 가능하다.

3. 프로세스 내에서 프로그램 흐름을 추가한다?

 

 

프로세스는 각자의 독립된 메모리 영역을 가진다.

쓰레드는 프로세스에 속한 것으로서 쓰레드 끼리는 공유하는 메모리 영역이 있다.

 

 

운영체제 > 프로세스 > 쓰레드

thread1.c

#include <stdio.h>
#include <pthread.h>
void* thread_main(void *arg); //쓰레드에서 사용할 함수 

int main(int argc, char *argv[]) 
{
	pthread_t t_id;
	int thread_param=5; //
	
	if(pthread_create(&t_id, NULL, thread_main, (void*)&thread_param)!=0) //쓰레드 생성 
	{
		puts("pthread_create() error");
		return -1;
	}; 	
	sleep(10); // 프로세스가 종료되면 쓰레드도 종료되기 때문에 sleep을 걸어놓았다. 
    puts("end of main");
	return 0; 
}

void* thread_main(void *arg) //쓰레드 생성되면 곧바로 실행됨 
{
	int i;
	int cnt=*((int*)arg); //5회 실행됨 
	for(i=0; i<cnt; i++)
	{
		sleep(1);  puts("running thread");	 
	}
	return NULL;
}

 

실행 방법이 좀 다르다.

gcc thread1.c -o tr1 -lpthread

 

-lpthread는 쓰레드 라이브러리 링크를 별도로 지시한 것이다. 그래야  헤더파일 pthread.h에 선언된 함수를 호출할 수 있다.

 

 

 

 

 

 

 

다음은 pthread_join 함수를 통해서 쓰레드가 종료할 때까지 프로세스를 대기시킬 수 있다는 내용이다.

thread2.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
void* thread_main(void *arg);

int main(int argc, char *argv[]) 
{
	pthread_t t_id;
	int thread_param=5;
	void * thr_ret;
	
	if(pthread_create(&t_id, NULL, thread_main, (void*)&thread_param)!=0) //쓰레드 생성 
	{
		puts("pthread_create() error");
		return -1;
	}; 	

	if(pthread_join(t_id, &thr_ret)!=0) //쓰레드 종료를 대기한다.
	{
		puts("pthread_join() error");
		return -1;
	};

	printf("Thread return message: %s \n", (char*)thr_ret);
	free(thr_ret); //쓰레드 종료시 반환 메세지의 메모리 영역 해제?
	return 0;
}

void* thread_main(void *arg) 
{
	int i;
	int cnt=*((int*)arg);
	char * msg=(char *)malloc(sizeof(char)*50);
	strcpy(msg, "Hello, I'am thread~ \n");

	for(i=0; i<cnt; i++)
	{
		sleep(1);  puts("running thread");	 
	}
	return (void*)msg;
}

 

실행방법

gcc thread2.c -o tr2 -lpthread

 

 

 

 

 

워커 쓰레드(Worker thread) 모델

메인 함수가 쓰레드를 일꾼(worker) 처럼 관리.

thread3.c

#include <stdio.h>
#include <pthread.h>
void * thread_summation(void * arg); //sum 하는 함수 

int sum=0;

int main(int argc, char *argv[])
{
	pthread_t id_t1, id_t2;
	int range1[]={1, 5};
	int range2[]={6, 10};
	
	pthread_create(&id_t1, NULL, thread_summation, (void *)range1); //1번 쓰레드 생성 
	pthread_create(&id_t2, NULL, thread_summation, (void *)range2); //2번 쓰레드 생성 
	//쓰레드가 생성되면서 thread_summation함수가 실행된다. 
    //1번 쓰레드와 2번 쓰레드가 동시에 실행될 것 같다.
	pthread_join(id_t1, NULL); //1번 쓰레드 종료될 때까지 프로세스 종료하지 않음
	pthread_join(id_t2, NULL); //2번 쓰레드 종료될 때까지 프로세스 종료하지 않음
	printf("result: %d \n", sum); //쓰레드1과 2가 전역변수 sum에 접근하기 때문에 문제의 소지가 있다.
	return 0;
}

void * thread_summation(void * arg) 
{
	int start=((int*)arg)[0];
	int end=((int*)arg)[1];

	while(start<=end)
	{
		sum+=start;
		start++;
	}
	return NULL;
}

실행 방법이 좀 다르다.

gcc thread3.c -D_REENTRANT -o tr3 -lpthread

 

-D_REENTRANT를 통해 쓰레드에 안전하지 않은 함수를 쓰레드에 안전한 함수로 자동으로 바꿔준다.

 

 

 

 

 

임계영역?(Critical Section)

하나의 변수에 쓰레드가 순차적으로 접근하면 문제가 없지만, 동시적인 순간에 접근할 경우 문제가 발생할 수 있다.

임계영역은 둘 이상의 쓰레드가 동시에 실행되면 문제를 일으키는 영역이다.

 

임계영역과 관련해서 오류를 일으키는 예제이다.

thread4.c

#include <stdio.h>
//#include <windows.h>
#include <unistd.h>
#include <stdlib.h>
//#include <process.h>
#include <pthread.h>
#define NUM_THREAD	100

void * thread_inc(void * arg);
void * thread_des(void * arg);
long long num=0;

int main(int argc, char *argv[]) 
{
	//HANDLE thread_id[NUM_THREAD];
	pthread_t thread_id[NUM_THREAD]; //100개의 쓰레드 생성 
	int i;

	printf("sizeof long long: %d \n", sizeof(long long));
	for(i=0; i<NUM_THREAD; i++)
	{
		if(i%2)
			pthread_create(&(thread_id[i]), NULL, thread_inc, NULL); 
		else
			pthread_create(&(thread_id[i]), NULL, thread_des, NULL);	
	}	

	for(i=0; i<NUM_THREAD; i++)
		pthread_join(thread_id[i], NULL);

	printf("result: %lld \n", num); //1번 쓰레드와 2번 쓰레드가 동시에 실행되는 경우 때문에 원하는 값이 나오지 않는다.
	return 0;
}

void * thread_inc(void * arg) 
{
	int i;
	for(i=0; i<50000000; i++)
		num+=1;
	return NULL;
}
void * thread_des(void * arg)
{
	int i;
	for(i=0; i<50000000; i++)
		num-=1;
	return NULL;
}

-> 동기화를 통해서 동시 접근을 막을 수 있고, 접근 순서를 지정하는 것도 가능하다.

동기화 기법

1. 뮤텍스(Mutex) 기반

2. 세마포어(Semaphore) 기반

동기화는 운영체제가 제공하는 기능이기 때문에 운영체제에 따라 다르다.

 

 

mutex.c

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD	100

void * thread_inc(void * arg); //증가 함수
void * thread_des(void * arg); //감소 함수 

long long num=0;
pthread_mutex_t mutex; //뮤텍스 선언 

int main(int argc, char *argv[]) 
{
	pthread_t thread_id[NUM_THREAD]; //쓰레드 100개 생성 
	int i;
	
	pthread_mutex_init(&mutex, NULL); //뮤텍스 생성 

	for(i=0; i<NUM_THREAD; i++) //증가함수와 감소함수를 번갈아 가며 실행 
	{
		if(i%2)
			pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
		else
			pthread_create(&(thread_id[i]), NULL, thread_des, NULL);	
	}	

	for(i=0; i<NUM_THREAD; i++)
		pthread_join(thread_id[i], NULL); 

	printf("result: %lld \n", num);
	pthread_mutex_destroy(&mutex);
	return 0;
}

void * thread_inc(void * arg) 
{
	int i;
	pthread_mutex_lock(&mutex); //증가 함수가 실행되는 동안 다른 쓰레드가 실행되지 않도록 함?
	for(i=0; i<50000000; i++)
		num+=1;
	pthread_mutex_unlock(&mutex); //뮤텍스 해제. for문 앞뒤로 mutex로 감쌌다. lock, unlock이 1회씩만 실행됨
	return NULL;
}
void * thread_des(void * arg)
{
	int i;
	for(i=0; i<50000000; i++)
	{
		pthread_mutex_lock(&mutex); 
		num-=1;
		pthread_mutex_unlock(&mutex);
	} // Mutex lock, unlock이 for문 안에 들어가기 때문에 50000000회 실행된다.
	return NULL;
}

/*
swyoon@com:~/tcpip$ gcc mutex.c -D_REENTRANT -o mutex -lpthread
swyoon@com:~/tcpip$ ./mutex
result: 0 

*/

 

뮤텍스가 다른 쓰레드의 접근을 막고 블로킹 상태에서 빠져나가지 못하는 경우를 '데드락(Dead-lock)' 상태 라고 함.

데드락 상태에 빠지지 않도록 주의 해야함.

lock, unlock 횟수는 최소화 하는 것이 좋다. 실행 횟수가 많아지면 속도가 느려진다.

 

semaphore.c

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

void * read(void * arg);
void * accu(void * arg);
static sem_t sem_one; //세마포어 1의 주소값
static sem_t sem_two; //세마포어 2의 주소값 
static int num;

int main(int argc, char *argv[])
{
	pthread_t id_t1, id_t2; //쓰레드 1번, 2번 생성 
	sem_init(&sem_one, 0, 0); //세마포어 1번을 0으로 설정 -> 0이면 블로킹 상태
	sem_init(&sem_two, 0, 1); //세마포어 2번을 1로 설정 -> 1이면 블로킹 해제 상태

	pthread_create(&id_t1, NULL, read, NULL); //쓰레드 1번 실행
	pthread_create(&id_t2, NULL, accu, NULL); //쓰레드 2번 실행 

	pthread_join(id_t1, NULL); //쓰레드 1번 끝날 때까지 프로세스 대기 
	pthread_join(id_t2, NULL); //쓰레드 2번 끝날 때까지 프로세스 대기 

	sem_destroy(&sem_one); //세마포어 1번 종료 
	sem_destroy(&sem_two); //세마포어 2번 종료 
	return 0;
}

void * read(void * arg) //값을 읽어들이는 함수 
{
	int i;
	for(i=0; i<5; i++)
	{
		fputs("Input num: ", stdout); 

		sem_wait(&sem_two); //세마포어 2번의 값을 0으로 해서 블로킹. num을 쓰레드 2번의 임계영역에 넣는다. 
		scanf("%d", &num); //num값을 정상적으로 읽은 뒤에 세마포어 1번의 블로킹을 해제한다. 
		sem_post(&sem_one); //num값을 정상적으로 읽은 후에 세마포어 1번의 값을 1로 해서 블로킹 해제
	}
	return NULL;	
}
void * accu(void * arg)
{
	int sum=0, i;
	for(i=0; i<5; i++)
	{
		sem_wait(&sem_one); //세마포어 1번을 블로킹 해서 Num값이 정상적으로 넘어오는지 확인. num을 쓰레드 1번의 임계영역에 넣는다. 
		sum+=num;
		sem_post(&sem_two); //sum에 값이 정상적으로 담긴 후에 세마포어 2번의 블로킹 해제
	}
	printf("Result: %d \n", sum);
	return NULL;
}

 

-> 세마포어 값이 0과 1을 오가는 것을 바이너리 세마포어라고 한다.

두 개의 세마포어를 이용해서 접근 순서를 동기화하고 있다.

 

 

 

 

멀티스레드 기반 채팅 서버 클라이언트 구현

 

chat_serv.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>

#define BUF_SIZE 100
#define MAX_CLNT 256

void * handle_clnt(void * arg);
void send_msg(char * msg, int len);
void error_handling(char * msg);

int clnt_cnt=0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;

int main(int argc, char *argv[])
{
	int serv_sock, clnt_sock;
	struct sockaddr_in serv_adr, clnt_adr;
	int clnt_adr_sz;
	pthread_t t_id;
	if(argc!=2) {
		printf("Usage : %s <port>\n", argv[0]);
		exit(1);
	}
  
	pthread_mutex_init(&mutx, NULL);
	serv_sock=socket(PF_INET, SOCK_STREAM, 0);

	memset(&serv_adr, 0, sizeof(serv_adr));
	serv_adr.sin_family=AF_INET; 
	serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
	serv_adr.sin_port=htons(atoi(argv[1]));
	
	if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
		error_handling("bind() error");
	if(listen(serv_sock, 5)==-1)
		error_handling("listen() error");
	
	while(1)
	{
		clnt_adr_sz=sizeof(clnt_adr);
		clnt_sock=accept(serv_sock, (struct sockaddr*)&clnt_adr,&clnt_adr_sz);
		
		pthread_mutex_lock(&mutx);
		clnt_socks[clnt_cnt++]=clnt_sock;
		pthread_mutex_unlock(&mutx);
	
		pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);
		pthread_detach(t_id);
		printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr));
	}
	close(serv_sock);
	return 0;
}
	
void * handle_clnt(void * arg)
{
	int clnt_sock=*((int*)arg);
	int str_len=0, i;
	char msg[BUF_SIZE];
	
	while((str_len=read(clnt_sock, msg, sizeof(msg)))!=0)
		send_msg(msg, str_len);
	
	pthread_mutex_lock(&mutx);
	for(i=0; i<clnt_cnt; i++)   // remove disconnected client
	{
		if(clnt_sock==clnt_socks[i])
		{
			while(i++<clnt_cnt-1)
				clnt_socks[i]=clnt_socks[i+1];
			break;
		}
	}
	clnt_cnt--;
	pthread_mutex_unlock(&mutx);
	close(clnt_sock);
	return NULL;
}
void send_msg(char * msg, int len)   // send to all
{
	int i;
	pthread_mutex_lock(&mutx);
	for(i=0; i<clnt_cnt; i++)
		write(clnt_socks[i], msg, len);
	pthread_mutex_unlock(&mutx);
}
void error_handling(char * msg)
{
	fputs(msg, stderr);
	fputc('\n', stderr);
	exit(1);
}

 

 

chat_clnt.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> 
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>
	
#define BUF_SIZE 100
#define NAME_SIZE 20
	
void * send_msg(void * arg);
void * recv_msg(void * arg);
void error_handling(char * msg);
	
char name[NAME_SIZE]="[DEFAULT]";
char msg[BUF_SIZE];
	
int main(int argc, char *argv[])
{
	int sock;
	struct sockaddr_in serv_addr;
	pthread_t snd_thread, rcv_thread;
	void * thread_return;
	if(argc!=4) {
		printf("Usage : %s <IP> <port> <name>\n", argv[0]);
		exit(1);
	 }
	
	sprintf(name, "[%s]", argv[3]);
	sock=socket(PF_INET, SOCK_STREAM, 0);
	
	memset(&serv_addr, 0, sizeof(serv_addr));
	serv_addr.sin_family=AF_INET;
	serv_addr.sin_addr.s_addr=inet_addr(argv[1]);
	serv_addr.sin_port=htons(atoi(argv[2]));
	  
	if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr))==-1)
		error_handling("connect() error");
	
	pthread_create(&snd_thread, NULL, send_msg, (void*)&sock);
	pthread_create(&rcv_thread, NULL, recv_msg, (void*)&sock);
	pthread_join(snd_thread, &thread_return);
	pthread_join(rcv_thread, &thread_return);
	close(sock);  
	return 0;
}
	
void * send_msg(void * arg)   // send thread main
{
	int sock=*((int*)arg);
	char name_msg[NAME_SIZE+BUF_SIZE];
	while(1) 
	{
		fgets(msg, BUF_SIZE, stdin);
		if(!strcmp(msg,"q\n")||!strcmp(msg,"Q\n")) 
		{
			close(sock);
			exit(0);
		}
		sprintf(name_msg,"%s %s", name, msg);
		write(sock, name_msg, strlen(name_msg));
	}
	return NULL;
}
	
void * recv_msg(void * arg)   // read thread main
{
	int sock=*((int*)arg);
	char name_msg[NAME_SIZE+BUF_SIZE];
	int str_len;
	while(1)
	{
		str_len=read(sock, name_msg, NAME_SIZE+BUF_SIZE-1);
		if(str_len==-1) 
			return (void*)-1;
		name_msg[str_len]=0;
		fputs(name_msg, stdout);
	}
	return NULL;
}
	
void error_handling(char *msg)
{
	fputs(msg, stderr);
	fputc('\n', stderr);
	exit(1);
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

반응형
Comments