Software Development

Concurrency in Elixir

Erlang has been around for over 30 years and was built well before multi-core CPUs existed. Yet it’s a language that couldn’t be more relevant today! The underlying architecture of the language lends itself perfectly to the modern CPUs that are on every computer and mobile device.

The computer I’m writing this article on has a 2.2 GHz Intel Core i7 CPU, but more importantly it comes with eight cores. Simply put, it can perform eight tasks at once.

The ability to take advantage of these cores exists in many languages but often feels out of place or fraught with traps and challenges. If you’ve ever had to worry about a mutex, shared mutable state, and code being thread safe, you know that there are at least several pitfalls to be wary of.

In Erlang, and therefor Elixir which leverages the Erlang VM (BEAM), it makes writing and reasoning about concurrent code feel effortless. While Ruby has some great libraries for helping write concurrent code, with Elixir it’s built-in and a first-class citizen.

That isn’t to say that writing highly concurrent or distributed systems is easy. Far from it! But with Elixir, the language is on your side.

Processes, PIDs, and Mailboxes

Before we look at how to go about writing concurrent code in Elixir, it’s a good idea to understand the terms that we’ll be using and the model of concurrency that Elixir employs.

Actor Model

Concurrency in Elixir (and Erlang) is based upon the Actor Model. Actors are single threaded processes which can send and receive messages amongst themselves. The Erlang VM manages their creation, execution, and their communication. Their memory is completely isolated, which makes having to worry about “shared state” a non-issue.

  • Process: Similar to an OS level thread, but much more lightweight. This is essentially the unit of concurrency in Elixir. The processes are managed by BEAM (the Erlang runtime), which handles spreading the work out over all the cores of the CPU or even across other BEAM nodes on the network. A system can have millions of these processes at a time, and you shouldn’t be afraid to take liberal advantage of them.
  • Process ID (PID): This is a reference to a specific process. Much like an IP address on the internet, a PID is how you tell Elixir which process you want to send a message to.
  • Mailbox: For processes to communicate with each other, messages are sent back and forth. When a message is sent to a process, it arrives to that process’ mailbox. It is up to that process to receive the messages sitting in its mailbox.

So to bring it all together, a process in Elixir is the actor. It can communicate with another actor by sending a message to a specific PID. The recipient can receive a message by checking its mailbox for new messages.

Writing Concurrent Code

In this section, we’ll look at how the Actor Model for concurrency is actually used within Elixir.

Creating processes

Creating a new process is done with the spawn or spawn_link functions. This function accepts an anonymous function which will be invoked in a separate process. In response, we are given a process identifier, often referred to as a PID. This is important if we want to communicate with this process going forward or ask the kernel for information about the process.

pid = spawn(fn -> :timer.sleep 15000 end)
#PID<0.89.0>

Everything in Elixir runs within a process. You can find out the PID of your current process by calling the self() function. So even when you are in the iex shell, by calling self() you can see the PID for that iex session, something like #PID<0.80.0>.

We can use this PID to ask Elixir for information about the process. This is done using the Process.info(pid) function.

Process.info(pid)

[current_function: {:timer, :sleep, 1}, initial_call: {:erlang, :apply, 2},
 status: :waiting, message_queue_len: 0, messages: [], links: [],
 dictionary: [], trap_exit: false, error_handler: :error_handler,
 priority: :normal, group_leader: #PID<0.50.0>, total_heap_size: 233,
 heap_size: 233, stack_size: 2, reductions: 43,
 garbage_collection: [max_heap_size: %{error_logger: true, kill: true, size: 0},
  min_bin_vheap_size: 46422, min_heap_size: 233, fullsweep_after: 65535,
  minor_gcs: 0], suspending: []]

It’s interesting what you can find here! For example, in iex if you ask for the info about itself Process.info(self()), you’ll see the history of the commands you’ve typed:

iex(1)> 5 + 5
iex(2)> IO.puts "Hello!"
iex(3)> pid = spawn(fn -> :timer.sleep 15000 end)
iex(4)> Process.info(self())[:dictionary][:iex_history]

%IEx.History.State{queue: {[
  {3, 'pid = spawn(fn -> :timer.sleep 15000 end)\n', #PID<0.84.0>},
  {2, 'IO.puts "Hello!"\n', :ok}],
  [{1, '5 + 5\n', 10}]},
 size: 3, start: 1}

Sending messages

Messages can be sent to a process using the send function. You provide it with the PID of the process you wish to send a message to along with the data being sent. The message is sent to the receiving processes’ mailbox.

Sending is only half the battle though. If the recipient isn’t prepared to receive the message, it will fall on deaf ears. A process can receive messages by using the receive construct, which pattern matches on the messages being received.

In the example below, we spawn a new process which waits to receive a message. Once it has received a message in its mailbox, we’ll simply output it to the screen.

pid = spawn(fn ->
  IO.puts "Waiting for messages"
  receive do
    msg -> IO.puts "Received #{inspect msg}"
  end
end)

send(pid, "Hello Process!")

Keeping our process alive

A process exits when it no longer has any code to execute. In the example above, the process will stay alive until it has received its first message, then exit. So the question then arises: How do we get a long running process?

We can do this by utilizing a loop function that calls itself recursively. This loop will simply receive a message and then call itself to wait for the next one.

defmodule MyLogger do
  def start do
    IO.puts "#{__MODULE__} at your service"
    loop()
  end

  def loop do
    receive do
      msg -> IO.puts msg
    end
    loop()
  end
end

# This time we will spawn a new processes based on the MyLogger module's method `start`.
pid = spawn(MyLogger, :start, [])

send(pid, "First message")
send(pid, "Another message")

Maintaining state

Our current process doesn’t track any state. It simply executes its code without maintaining any extra state or information.

What if we wanted our logger to keep track of some stats, like the number of messages it has logged? Notice the call spawn(MyLogger, :start, []); the last parameter, which is an empty list, is actually a list of args that can be passed to the process. This acts as the “initial state” or what is passed to the entry point function. Our state will simply be a number that tracks the number of messages we’ve logged.

Now, when the init function is called, it will be passed the number 0. It’s up to us to keep track of this number as we do our work, always passing the updated state to the next loop of our process.

Another thing we’ve done is added an additional action our logger can perform. It can now log messages and also print out the stats. To do this, we’ll send our messages as a tuple where the first value is an atom that represents the command we want our process to perform. Pattern matching in the receive construct allows us to differ one message’s intent from another.

defmodule MyLogger do
  def start_link do
    # __MODULE__ refers to the current module
    spawn(__MODULE__, :init, [0])
  end

  def init(count) do
    # Here we could initialize other values if we wanted to
    loop(count)
  end

  def loop(count) do
    new_count = receive do
      {:log, msg} ->
        IO.puts msg
        count + 1
      {:stats} ->
        IO.puts "I've logged #{count} messages"
        count
    end
    loop(new_count)
  end
end

pid = MyLogger.start_link
send(pid, {:log, "First message"})
send(pid, {:log, "Another message"})
send(pid, {:stats})

Refactoring into a client and server

We can refactor our module a little bit to make it more user friendly. Instead of directly using the send function, we can hide the details behind a client module. Its job will be to send messages to the process running the server module and optionally wait for a response for synchronous calls.

defmodule MyLogger.Client do
  def start_link do
    spawn(MyLogger.Server, :init, [0])
  end

  def log(pid, msg) do
    send(pid, {:log, msg})
  end

  def print_stats(pid) do
    send(pid, {:print_stats})
  end

  def return_stats(pid) do
    send(pid, {:return_stats, self()})
    receive do
      {:stats, count} -> count
    end
  end
end

Our server module is quite simple. It consists of an init function which doesn’t do much in this case other than start the loop function looping. The loop function is in charge of receiving messages from the mailbox, performing the requested task and then looping again with the updated state.

defmodule MyLogger.Server do
  def init(count \\ 0) do
    loop(count)
  end

  def loop(count) do
    new_count = receive do
      {:log, msg} ->
        IO.puts msg
        count + 1
      {:print_stats} ->
        IO.puts "I've logged #{count} messages"
        count
      {:return_stats, caller} ->
        send(caller, {:stats, count})
        count
    end
    loop(new_count)
  end
end

If we are to use the code below, we don’t really need to know how the server is implemented. We interact directly with the client, and it in turn sends messages to the server. I’ve aliased the module just to avoid typing MyLogger.Client various times.

alias MyLogger.Client, as: Logger

pid = Logger.start_link
Logger.log(pid, "First message")
Logger.log(pid, "Another message")
Logger.print_stats(pid)
stats = Logger.return_stats(pid)

Refactoring the server

Notice that all of the messages being received by the server are being pattern matched in order to determine how to handle them? We can do better than having a single large function by creating a series of “handler” functions that pattern match on the data being received.

Not only does this clean up our code, it also makes it much easier to test. We can simply call the individual handle_receive functions with the correct arguments to test that they are working correctly.

defmodule MyLogger.Server do
  def init(count \\ 0) do
    loop(count)
  end

  def loop(count) do
    new_count = receive do
      message -> handle_receive(message, count)
    end
    loop(new_count)
  end

  def handle_receive({:log, msg}, count) do
    IO.puts msg
    count + 1
  end

  def handle_receive({:print_stats}, count) do
    IO.puts "I've logged #{count} messages"
    count
  end

  def handle_receive({:return_stats, caller}, count) do
    send(caller, {:stats, count})
    count
  end

  def handle_receive(other, count) do
    IO.puts "Unhandled message of #{inspect other} received by logger"
    count
  end
end

Parallel map

For a final example, let’s take a look at performing a parallel map.

What we’ll be doing is mapping a list of URLs to their returned HTTP status code. If we were to do this without any concurrency, our speed would be the sum of the speed of checking each URL. If we had five and each took oe second, it would take approximately five seconds to finish checking all the URLs. If we could check them in parallel though, the amount of time would be about one second, the time of the slowest URL since they are happening all at once.

Our test implementation looks like this:

defmodule StatusesTest do
  use ExUnit.Case

  test "parallel status map" do
    urls = [
      url1 = "http://www.fakeresponse.com/api/?sleep=2",
      url2 = "http://www.fakeresponse.com/api/?sleep=1",
      url3 = "http://www.fakeresponse.com/api/?status=500",
      url4 = "https://www.leighhalliday.com",
      url5 = "https://www.reddit.com"
    ]
    assert Statuses.map(urls) == [
      {url1, 200},
      {url2, 200},
      {url3, 500},
      {url4, 200},
      {url5, 200}
    ]
  end
end

Now for the implementation of the actual code. I’ve added comments to make it clear what each step is doing.

defmodule Statuses do
  def map(urls) do
    # Put self into variable to send to spawned process
    caller = self()
    urls
      # Map the URLs to a spawns process. Remember a `pid` is returned.
      |> Enum.map(&(spawn(fn -> process(&1, caller) end)))
      # Map the returned pids
      |> Enum.map(fn pid ->
          # Receive the response from this pid
          receive do
            {^pid, url, status} -> {url, status}
          end
        end)
  end

  def process(url, caller) do
    status =
      case HTTPoison.get(url) do
        {:ok, %HTTPoison.Response{status_code: status_code}} ->
          status_code
        {:error, %HTTPoison.Error{reason: reason}} ->
          {:error, reason}
      end
    # Send message back to caller with result
    send(caller, {self(), url, status})
  end
end

When we ran the code, it took 2.2 seconds. This makes sense because one of the URLs is a faker URL service that we told to delay the response by two seconds…so it took approximately the time of the slowest URL.

Where to go from here?

In this article, we covered the basics of spawning a new process, sending that process a message, maintaining state in the process via recursive looping, and receiving messages from other processes. This is a good start, but there is a lot more!

Elixir comes with some very cool modules to help us remove some of the boilerplate involved in what we did today. Agent is a module for maintaining state in a process. Task is a module for running code concurrently and optionally receiving its response. GenServer handles both state and concurrent tasks in a long standing process. I plan on covering these topics in a second article in this series.

Lastly there is the whole topic of linking, monitoring, and responding to errors which may occur in a process. Elixir comes with a Supervisor module for this and is all part of building a dependable fault-tolerant system.

Reference: Concurrency in Elixir from our JCG partner Leigh Halliday at the Codeship Blog blog.

Leigh Halliday

Leigh Halliday is a developer at Regalii. He writes about Ruby, Rails, and software development on his personal site.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button