10 minute read

HTTP 요청을 처리할 스레드 풀 라이브러리를 구현한다.

싱글 스레드 기반 웹서버 만들기

TCP를 이용한 바이트통신과 HTTP를 이용한 요청과 응답 주고받기

use std::fs::File;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();
        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let mut file = File::open(filename).unwrap();
    let mut contents = String::new();
    file.read_to_string(&mut contents).unwrap();

    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

TCP 연결 처리

표준 라이브러리 std::net 모듈을 사용한다.

TcpListener::bind로 TCP 연결을 수신한다.

TcpListenerincoming 메서드로 커넥션 스트림을 받는다.

요청 데이터 읽기

handle_connection 함수로 스트림을 인자로 받아 요청을 처리한다.

TcpStream은 내부에서 반환되는 데이터를 추적하므로 stream 매개변수를 가변으로 받는다.

std::io::prelude에서 스트림 읽고 쓰는 특성을 가져온다.

버퍼를 생성하고 stream.read에 전달해서 요청 바이트를 읽는다.

응답 작성하기

HTTP 응답은 다음과 같은 양식을 갖는다.

HTTP-Version Status-Code Reason-Phrase CRLF
headers CRLF
message-body

responseas_bytes를 호출해서 문자열 데이터를 바이트 배열(&[u8])로 변환한다.

이 바이트 배열을 streamwrite 메서드에 넣고 flush 메서드로 버퍼를 비워서 커넥션으로 전송한다.

요청을 확인하고 선택적으로 응답하기

b"GET / HTTP/1.1\r\n";/ 경로로 요청이 왔는지 확인한다.

버퍼에서 원시 바이트를 읽어들였으므로 비교도 바이트로 한다.

싱글 스레드 서버를 멀티 스레드 서버로 바꾸기

싱글 스레드 기반 서버는 한 요청이 끝나야 다음 요청을 처리할 수 있어서 많은 요청이 들어오면 처리가 늦어진다.

스레드 풀로 처리량 높이기

대기나 준비 상태에 있는 스레드들을 모아둔 게 스레드 풀이다.

요청은 큐로 저장되고, 스레드 하나당 큐에서 꺼낸 하나의 요청을 처리를 한다.

처리가 완료되면 스레드는 풀로 다시 돌아와 대기 상태로 돌아간다.

다음 코드는 모든 요청에 스레드 하나씩 생성한다.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

스레드가 무한히 생성될 수 있으므로, 시스템 부하를 막을 방법이 필요하다.

스레드 수를 제어하기 위한 인터페이스

thread::spawn 대신 ThreadPool이라는 인터페이스를 만들려고 한다.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

컴파일러 주도 개발로 ThreadPool 라이브러리 만들기

cargo check 명령에서 나오는 컴파일러 에러로 개발을 진행할 수 있다.

cargo checkThreadPool 모듈이 없다고 에러가 난다.

src/lib.rs 에 다음 코드를 작성한다.

pub struct ThreadPool;

main.rs 파일은 src/bin/main.rs로 옮기고 다음 가져오기 코드를 추가한다.

extern crate hello;
use hello::ThreadPool;

cargo checknew 함수가 없다고 에러가 난다.

new 함수를 정의하고, 스레드 개수는 항상 양수이므로 인자로 usize 타입을 받는다.

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

cargo checkexecute 함수가 없다고 에러가 난다.

표준 라이브러리의 thread::spawn 구현체를 참고한다.

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

요청 처리는 한번만 하므로 FnOnce를 클로저의 트레잇으로 사용됐다.

스레드 간 클로저가 전달되므로 Send 트레잇을, 스레드의 제거 시점을 알 수 없으므로 'static 라이프사이클을 사용됐다.

execute는 다음과 같이 정의한다.

pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }

Worker 구조체 사용

thread:::spawn는 클로저를 받아 생성되자마자 실행된다.

스레드 실행 타이밍을 조절하는 Worker 구조체를 만든다.

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

Worker 구조체는 다른 워커와 구별하기 위한 id와 빈 클로저를 가진 스레드를 담는 thread 필드로 구성되어 있다.

ThreadPool 구조체는 Worker 벡터를 가진다.

pub struct ThreadPool {
    workers: Vec<Worker>,
}

채널을 통해 스레드에 요청 보내기

  1. TheadPool은 채널을 생성하고 채널 송신자를 가진다.
  2. Worker는 채널의 수신자를 가진다.
  3. execute 메서드 호출 시 채널의 송신자로 작업을 전송한다.
  4. 스레드 안에서 Worker에서 받은 작업을 실행한다.

ThreadPool::new에서 새 채널을 만든다.

let (sender, receiver) = mpsc::channel();

senderThreadPool 구조체 필드 내에 저장한다.

receiverArc<Mutex<T>>로 감싸서 각 Worker 구조체에 보내고 여러 스레드가 공유할 수 있게 한다.

Arc 덕분에 여러 스레드가 하나의 수신자를 공유할 수 있고,

Mutex 덕분에 한번에 하나의 Workerreceiver에서 작업을 가져가도록 한다.

workers.push(Worker::new(id, Arc::clone(&receiver)));

execute 메소드 구현

trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

type Job = Box<FnBox + Send + 'static>;

pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }

클로저를 Box<T>에 담아서 보낸 뒤, 박스를 벗기고 호출해야 한다.

하지만, 러스트는 Box<T> 안의 값의 크기를 알 수 없으므로 박스 안의 값을 옮기는 것을 허용하지 않는다.

Box<T> 에 저장된 Self 값의 소유권을 다룰 수 있도록 self: Box<Self> 를 사용하는 FnBox 트레잇을 정의했다.

call_box 메서드는 (*self)()를 사용하여 클로저를Box<T> 밖으로 빼내고 호출한다.

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

우아한 종료와 정리

스레드가 종료되기 전에 처리하던 요청을 마저 처리하도록 ThreadPoolDrop 트레잇을 구현한다.

또한, 스레드에 새로운 요청을 그만 받고 종료하라는 메시지를 받게 한다.

ThreadPoolDrop 트레잇 구현하기

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}


impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

Worker를 빌리기만 해서 thread의 소유권을 빼낼 수가 없기 때문에

Workerthread::JoinHandle<()>Option으로 감싼다.

Optiontake 메서드로 Some 내부 값은 빼내고, 옵션을 None 값으로 바꿀 수 있다.

스레드가 작업 리스닝을 중지하도록 신호하기

enum Message {
    NewJob(Job),
    Terminate,
}

Worker가 종료 신호도 받을 수 있도록 Job 대신 Message를 사용한다.

Worker::new의 일부분을 다음과 같이 수정한다.

let thread = thread::spawn(move ||{
    loop {
        let message = receiver.lock().unwrap().recv().unwrap();

        match message {
            Message::NewJob(job) => {
                println!("Worker {} got a job; executing.", id);

                job.call_box();
            },
            Message::Terminate => {
                println!("Worker {} was told to terminate.", id);

                break;
            },
        }
    }
});

execute 메서드로 Message를 보내면

WorkerMessage를 받아서 NewJob인 경우 작업을 실행하고,

Terminate인 경우 작업 신호에 대기하고 있던 루프에서 벗어나서 스레드를 종료한다.

Drop 트레잇 구현은 다음과 같이 수정한다.

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

워커들을 두 번 순회해서 한번은 Terminate 메시지를 보내고, 한번은 스레드에 join을 호출한다.

메시지 전송과 join 호출을 한번에 하면, 일부 스레드가 일하느라 메시지를 못 받는 경우, 다른 스레드가 이 메시지를 채간다.

메시지를 못받은 스레드는 영원히 종료되지 않고 교착상태에 빠진다.

Categories:

Updated: