- 2 Programming Guess
- 12 An I/O Project: Building a Command Line Program
- 20 Final Project, Building a Multithreaded Web Server
- 20-1 Building a Single-Threaded Web Server
- 20-2 Turning Our Single-Threaded Server into a Multithreaded Server
- 20-2-1 Simulating a Slow Request in the Current Server Implementation
- 20-2-2 Improving Throughput with a Thread Pool
- 20-2-2-1 Code Structure If We Could Spawn a Thread for Each Request
- 20-2-2-2 Creating a Similar Interface for a Finite Number of Threads
- 20-2-2-3 Building the
ThreadPool
Struct Using Compiler Driven Development - 20-2-2-4 Validating the Number of Threads in
new
- 20-2-2-5 Creating Space to Store the Threads
- 20-2-2-6 A
Worker
Struct Responsible for Sending Code from theThreadPool
to a Thread - 20-2-2-7 Sending Requests to Threads via Channels
- 20-2-2-8 Implementing the
execute
Method
- 20-3 Graceful Shutdown and Cleanup
2 Programming Guess
1 | let foo = 5; // immutable |
1 | cargo doc --open // documents |
let mut guess = String::new();
line has created a mutable variable that is currently bound to a new, empty instance of a String.
io::stdin().read_line(&mut guess)
reference, just like C pointer;
.expect('something);
read_line
method return a Result type, The Result types are enumerations, often referred to as enums. just like C enum to haddle errors. It’s also a callback.
Remember that a crate
is a collection of Rust source code files. The project we’ve been building is a binary crate, which is an executable. The rand crate is a library crate, which contains code intended to be used in other programs.
A match
expression is made up of arms. An arm consists of a pattern and the code that should be run if the value given to the beginning of the match
expression fits that arm’s pattern.
Rust allows us to shadow the previous value of guess
with a new one. This feature is often used in situations in which you want to convert a value from one type to another type.
We bind guess
to the expression guess.trim().parse()
. The guess
in the expression refers to the original guess
that was a String
with the input in it. The trim
method on a String
instance will eliminate any whitespace at the beginning and end. Although u32
can contain only numerical characters, the user must press enter to satisfy read_line
. When the user presses enter, a newline character is added to the string. For example, if the user types 5 and presses enter, guess
looks like this: 5\n
. The \n
represents “newline,” the result of pressing enter. The trim
method eliminates \n
, resulting in just 5
.
The colon (:
) after guess
tells Rust we’ll annotate the variable’s type.
Switching from an expect
call to a match
expression is how you generally move from crashing on an error to handling the error. Remember that parse
returns a Result
type and Result
is an enum that has the variants Ok
or Err
. We’re using a match
expression here, as we did with the Ordering
result of the cmp
method.
- That
Ok
value will match the first arm’s pattern, and thematch
expression will just return thenum
value thatparse
produced and put inside theOk
value. - If
parse
is not able to turn the string into a number, it will return anErr
value that contains more information about the error. TheErr
value does not match theOk(num)
pattern in the firstmatch
arm, but it does match theErr(_)
pattern in the second arm.
12 An I/O Project: Building a Command Line Program
1 | fn main() { |
-
std::env::args
returns an iterator that producesString
values -
std::env::args_os
returns an iterator that producesOsString
values
1 | // Extracting the Argument Parser |
It’s okay to use clone
to copy a few strings to continue making progress.
1 | // Grouping Configuration Values |
parse_config
function is to create a Config
instance, we can change parse_config
from a plain function to a function named new
that is associated with the Config
struct.
1 | // Creating a Constructor for Config |
1 | // Improving the Error Message |
1 | // USE |
1 | // USE |
1 | // Extracting Logic from main |
1 | // USE |
1 | // USE |
1 | // USE |
- Test-Driven
- Write a test that fails and run it to make sure it fails for the reason you expect.
- Write or modify just enough code to make the new test pass.
- Refactor the code you just added or changed and make sure the tests continue to pass.
- Repeat from step 1!
The whole code can be found here: Coding-Collections/Rust/minigrep.
20 Final Project, Building a Multithreaded Web Server
20-1 Building a Single-Threaded Web Server
Rust is a systems programming language, we can choose the level of abstraction we want to work with and can go to a lower level than is possible or practical in other languages.
20-1-1 Listening to the Tcp Connection
1 | use std::net::TcpListener; |
The bind
function in this scenario works like the new
function in that it will return a new TcpListener
instance. he bind
function returns a Result
, which indicates that binding might fail.
The incoming
method on TcpListener
returns an iterator that gives us a sequence of streams (more specifically, streams of type TcpStream
). A single stream represents an open connection between the client and the server. A connection is the name for the full request and response process in which a client connects to the server, the server generates a response, and the server closes the connection. As such, TcpStream
will read from itself to see what the client sent and then allow us to write our response to the stream. Overall, this for
loop will process each connection in turn and produce a series of streams for us to handle.
unwrap
terminate our program if the stream has any errors. The reason we might receive errors from the incoming
method when a client connects to the server is that we’re not actually iterating over connections. Instead, we’re iterating over connection attempts. The connection might not be successful for a number of reasons, many of them operating system specific.
20-1-2 Reading the Request
1 | use std::io::prelude::*; |
In the handle_connection
function, we’ve made the stream
parameter mutable. The reason is that the TcpStream
instance keeps track of what data it returns to us internally. It might read more data than we asked for and save that data for the next time we ask for data.
stream.read
will read bytes from the TcpStream
and put them in the buffer. String::from_utf8_lossy
function takes a &[u8]
and produces a String
from it. The “lossy” part of the name indicates the behavior of this function when it sees an invalid UTF-8 sequence: it will replace the invalid sequence with �
, the U+FFFD REPLACEMENT CHARACTER
.
20-1-3 Writing a Response
1 | fn handle_connection(mut stream: TcpStream) { |
write
method on stream
takes a &[u8]
and sends those bytes directly down the connection. Because the write
operation could fail, we use unwrap
on any error result as before. Again, in a real application you would add error handling here.
flush
will wait and prevent the program from continuing until all the bytes are written to the connection; TcpStream
contains an internal buffer to minimize calls to the underlying operating system.
20-1-4 Returning Real HTML
1 | use std::fs; |
20-1-5 Validating the Request and Selectively Responding
1 | fn handle_connection(mut stream: TcpStream) { |
20-1-6 A Touch of Refactoring
1 | fn handle_connection(mut stream: TcpStream) { |
The code can be found here.
20-2 Turning Our Single-Threaded Server into a Multithreaded Server
20-2-1 Simulating a Slow Request in the Current Server Implementation
There are multiple ways we could change how our web server works to avoid having more requests back up behind a slow request; the one we’ll implement is a thread pool.
1 | use std::thread; |
20-2-2 Improving Throughput with a Thread Pool
A thread pool is a group of spawned threads that are waiting and ready to handle a task. As requests come in, they’ll be sent to the pool for processing. The pool will maintain a queue of incoming requests. Each of the threads in the pool will pop off a request from this queue, handle the request, and then ask the queue for another request.
When you’re trying to design code, writing the client interface first can help guide your design. Write the API of the code so it’s structured in the way you want to call it; then implement the functionality within that structure rather than implementing the functionality and then designing the public API.
We’ll use compiler-driven development here. We’ll write the code that calls the functions we want, and then we’ll look at errors from the compiler to determine what we should change next to get the code to work.
20-2-2-1 Code Structure If We Could Spawn a Thread for Each Request
1 | fn main() { |
thread::spawn
will create a new thread and then run the code in the closure in the new thread.
20-2-2-2 Creating a Similar Interface for a Finite Number of Threads
1 | // src/bin/main.rs |
We need to implement pool.execute
so it takes the closure and gives it to a thread in the pool to run. This code won’t yet compile, but we’ll try so the compiler can guide us in how to fix it.
20-2-2-3 Building the ThreadPool
Struct Using Compiler Driven Development
cargo check
1 | // src/lib.rs |
20-2-2-4 Validating the Number of Threads in new
1 | // src/lib.rs |
or
1 | pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {} |
20-2-2-5 Creating Space to Store the Threads
1 | // src/lib.rs |
We’ve brought std::thread
into scope in the library crate, because we’re using thread::JoinHandle
as the type of the items in the vector in ThreadPool
.
Once a valid size is received, our ThreadPool
creates a new vector that can hold size
items. with_capacity
performs the same task as Vec::new
but with an important difference: it preallocates space in the vector. Because we know we need to store size
elements in the vector, doing this allocation up front is slightly more efficient than using Vec::new
, which resizes itself as elements are inserted.
20-2-2-6 A Worker
Struct Responsible for Sending Code from the ThreadPool
to a Thread
The standard library provides thread::spawn
as a way to create threads, and thread::spawn
expects to get some code the thread should run as soon as the thread is created. However, in our case, we want to create the threads and have them wait for code that we’ll send later.
Instead of storing a vector of JoinHandle<()>
instances in the thread pool, we’ll store instances of the Worker
struct. Each Worker
will store a single JoinHandle<()>
instance. Then we’ll implement a method on Worker
that will take a closure of code to run and send it to the already running thread for execution. We’ll also give each worker an id
so we can distinguish between the different workers in the pool when logging or debugging.
When we create a ThreadPool
we’ll implement the code that sends the closure to the thread after we have Worker
set up in this way:
- Define a
Worker
struct that holds anid
and aJoinHandle<()>
. - Change
ThreadPool
to hold a vector ofWorker
instances. - Define a
Worker::new
function that takes anid
number and returns aWorker
instance that holds theid
and a thread spawned with an empty closure. - In
ThreadPool::new
, use thefor
loop counter to generate anid
, create a newWorker
with thatid
, and store the worker in the vector.
1 | // src/lib.rs |
20-2-2-7 Sending Requests to Threads via Channels
Currently, we get the closure we want to execute in the execute
method. But we need to give thread::spawn
a closure to run when we create each Worker
during the creation of the ThreadPool
.
We want the Worker
structs that we just created to fetch code to run from a queue held in the ThreadPool
and send that code to its thread to run.
We’ll use a channel to function as the queue of jobs, and execute
will send a job from the ThreadPool
to the Worker
instances, which will send the job to its thread. Here is the plan:
- The
ThreadPool
will create a channel and hold on to the sending side of the channel. - Each
Worker
will hold on to the receiving side of the channel. - We’ll create a new
Job
struct that will hold the closures we want to send down the channel. - The
execute
method will send the job it wants to execute down the sending side of the channel. - In its thread, the
Worker
will loop over its receiving side of the channel and execute the closures of any jobs it receives.
1 | // src/lib.rs |
In ThreadPool::new
, we create our new channel and have the pool hold the sending end.
1 | impl ThreadPool { |
We pass the receiving end of the channel into Worker::new
, and then we use it inside the closure.
The code is trying to pass receiver
to multiple Worker
instances. This won’t work, as you’ll recall from Chapter 16: the channel implementation that Rust provides is multiple producer, single consumer. This means we can’t just clone the consuming end of the channel to fix this code. Even if we could, that is not the technique we would want to use; instead, we want to distribute the jobs across threads by sharing the single receiver
among all the workers.
Additionally, taking a job off the channel queue involves mutating the receiver
, so the threads need a safe way to share and modify receiver
; otherwise, we might get race conditions (as covered in Chapter 16).
To share ownership across multiple threads and allow the threads to mutate the value, we need to use Arc<Mutex<T>>
. The Arc
type will let multiple workers own the receiver, and Mutex
will ensure that only one worker gets a job from the receiver at a time.
1 | // src/lib.rs |
We put the receiving end of the channel in an Arc
and a Mutex
. For each new worker, we clone the Arc
to bump the reference count so the workers can share ownership of the receiving end.
20-2-2-8 Implementing the execute
Method
We’ll change Job
from a struct to a type alias for a trait object that holds the type of closure that execute
receives.
1 | // src/lib.rs |
After creating a new Job
instance using the closure we get in execute
, we send that job down the sending end of the channel. We’re calling unwrap
on send
for the case that sending fails. The reason we use unwrap
is that we know the failure case won’t happen, but the compiler doesn’t know that.
Now we need the closure to loop forever, asking the receiving end of the channel for a job and running the job when it gets one.
1 | impl Worker { |
Here, we first call lock
on the receiver
to acquire the mutex, and then we call unwrap
to panic on any errors. Acquiring a lock might fail if the mutex is in a poisoned state, which can happen if some other thread panicked while holding the lock rather than releasing the lock. In this situation, calling unwrap
to have this thread panic is the correct action to take.
If we get the lock on the mutex, we call recv
to receive a Job
from the channel. A final unwrap
moves past any errors here as well, which might occur if the thread holding the sending side of the channel has shut down, similar to how the send
method returns Err
if the receiving side shuts down.
The call to recv
blocks, so if there is no job yet, the current thread will wait until a job becomes available. The Mutex
ensures that only one Worker
thread at a time is trying to request a job.
1 | error[E0161]: cannot move a value of type std::ops::FnOnce() + |
This error is fairly cryptic because the problem is fairly cryptic. To call a FnOnce
closure that is stored in a Box
(which is what our Job
type alias is), the closure needs to move itself out of the Box
because the closure takes ownership of self
when we call it. In general, Rust doesn’t allow us to move a value out of a Box
because Rust doesn’t know how big the value inside the Box
will be: recall in Chapter 15 that we used Box
precisely because we had something of an unknown size that we wanted to store in a Box
to get a value of a known size.
We can write methods that use the syntax self: Box
, which allows the method to take ownership of a Self
value stored in a Box
. That’s exactly what we want to do here, but unfortunately Rust won’t let us: the part of Rust that implements behavior when a closure is called isn’t implemented using self: Box
. So Rust doesn’t yet understand that it could use self: Box
in this situation to take ownership of the closure and move the closure out of the Box
.
We can tell Rust explicitly that in this case we can take ownership of the value inside the Box
using self: Box
; then, once we have ownership of the closure, we can call it. This involves defining a new trait FnBox
with the method call_box
that will use self: Box
in its signature, defining FnBox
for any type that implements FnOnce()
, changing our type alias to use the new trait, and changing Worker
to use the call_box
method.
1 | trait FnBox { |
First, we create a new trait named FnBox
. This trait has the one method call_box
, which is similar to the call
methods on the other Fn*
traits except that it takes self: Box
to take ownership of self
and move the value out of the Box
.
Next, we implement the FnBox
trait for any type F
that implements the FnOnce()
trait. Effectively, this means that any FnOnce()
closures can use our call_box
method. The implementation of call_box
uses (*self)()
to move the closure out of the Box
and call the closure.
We now need our Job
type alias to be a Box
of anything that implements our new trait FnBox
. This will allow us to use call_box
in Worker
when we get a Job
value instead of invoking the closure directly. Implementing the FnBox
trait for any FnOnce()
closure means we don’t have to change anything about the actual values we’re sending down the channel. Now Rust is able to recognize that what we want to do is fine.
Note: if you open /sleep in multiple browser windows simultaneously, they might load one at a time in 5 second intervals. Some web browsers execute multiple instances of the same request sequentially for caching reasons. This limitation is not caused by our web server.
1 | impl Worker { |
This code compiles and runs but doesn’t result in the desired threading behavior: a slow request will still cause other requests to wait to be processed.
The reason is somewhat subtle: the Mutex
struct has no public unlock
method because the ownership of the lock is based on the lifetime of the MutexGuard
within the LockResult>
that the lock
method returns. At compile time, the borrow checker can then enforce the rule that a resource guarded by a Mutex
cannot be accessed unless we hold the lock. But this implementation can also result in the lock being held longer than intended if we don’t think carefully about the lifetime of the MutexGuard
. Because the values in the while
expression remain in scope for the duration of the block, the lock remains held for the duration of the call to job.call_box()
, meaning other workers cannot receive jobs.
By using loop
instead and acquiring the lock and a job within the block rather than outside it, the MutexGuard
returned from the lock
method is dropped as soon as the let job
statement ends. This ensures that the lock is held during the call to recv
, but it is released before the call to job.call_box()
, allowing multiple requests to be serviced concurrently.
20-3 Graceful Shutdown and Cleanup
Now we’ll implement the Drop
trait to call join
on each of the threads in the pool so they can finish the requests they’re working on before closing. Then we’ll implement a way to tell the threads they should stop accepting new requests and shut down.
20-3-1 Implementing the Drop
Trait on ThreadPool
When the pool is dropped, our threads should all join to make sure they finish their work.
1 | // src/lib.rs |
The error tells us we can’t call join
because we only have a mutable borrow of each worker
and join
takes ownership of its argument. To solve this issue, we need to move the thread out of the Worker
instance that owns thread
so join
can consume the thread.
A Worker
that is running will have a Some
variant in thread
, and when we want to clean up a Worker
, we’ll replace Some
with None
so the Worker
doesn’t have a thread to run.
1 | struct Worker { |
We need to wrap the thread
value in Some
when we create a new Worker
. We mentioned earlier that we intended to call take
on the Option
value to move thread
out of worker
.
1 | impl Worker { |
The take
method on Option
takes the Some
variant out and leaves None
in its place. We’re using if let
to destructure the Some
and get the thread; then we call join
on the thread. If a worker’s thread is already None
, we know that worker has already had its thread cleaned up, so nothing happens in that case.
20-3-2 Signaling to the Threads to Stop Listening for Jobs
Now, this code doesn’t function the way we want it to yet. The key is the logic in the closures run by the threads of the Worker
instances: at the moment, we call join
, but that won’t shut down the threads because they loop
forever looking for jobs. If we try to drop our ThreadPool
with our current implementation of drop
, the main thread will block forever waiting for the first thread to finish.
To fix this problem, we’ll modify the threads so they listen for either a Job
to run or a signal that they should stop listening and exit the infinite loop.
1 | enum Message { |
To incorporate the Message
enum, we need to change Job
to Message
in two places: the definition of ThreadPool
and the signature of Worker::new
.
The execute
method of ThreadPool
needs to send jobs wrapped in the Message::NewJob
variant. Then, in Worker::new
where a Message
is received from the channel, the job will be processed if the NewJob
variant is received, and the thread will break out of the loop if the Terminate
variant is received.
Then create messages of the Terminate
:
1 | impl Drop for ThreadPool { |
We’re now iterating over the workers twice: once to send one Terminate
message for each worker and once to call join
on each worker’s thread. If we tried to send a message and join
immediately in the same loop, we couldn’t guarantee that the worker in the current iteration would be the one to get the message from the channel.
To prevent Deadlock, we first put all of our Terminate
messages on the channel in one loop; then we join on all the threads in another loop. Each worker will stop receiving requests on the channel once it gets a terminate message. So, we can be sure that if we send the same number of terminate messages as there are workers, each worker will receive a terminate message before join
is called on its thread.
1 | fn main() { |
The take
method is defined in the Iterator
trait and limits the iteration to the first two items at most. The ThreadPool
will go out of scope at the end of main
, and the drop
implementation will run.
1 | $ cargo run |
We can see how this code works from the messages: workers 0 and 3 got the first two requests, and then on the third request, the server stopped accepting connections. When the ThreadPool
goes out of scope at the end of main
, its Drop
implementation kicks in, and the pool tells all workers to terminate. The workers each print a message when they see the terminate message, and then the thread pool calls join
to shut down each worker thread.
Notice one interesting aspect of this particular execution: the ThreadPool
sent the terminate messages down the channel, and before any worker received the messages, we tried to join worker 0. Worker 0 had not yet received the terminate message, so the main thread blocked waiting for worker 0 to finish. In the meantime, each of the workers received the termination messages. When worker 0 finished, the main thread waited for the rest of the workers to finish. At that point, they had all received the termination message and were able to shut down.
The code can be found here. Future work:
- Add more documentation to
ThreadPool
and its public methods. - Add tests of the library’s functionality.
- Change calls to
unwrap
to more robust error handling. - Use
ThreadPool
to perform some task other than serving web requests. - Find a thread pool crate on crates.io and implement a similar web server using the crate instead. Then compare its API and robustness to the thread pool we implemented.