small medium large xlarge

Dave_gnome_head_isolated_pragsmall
15 Jul 2013, 03:42
Dave Thomas (390 posts)
  • Use the same server code, but instead run a function that finds the number of times the word “cat” appears in each file in a given directory. Run one server process per file. The function File.ls! returns the names of files in a directory, and File.read! reads the contents of a file as a binary.

    Run your code on a directory with a reasonable number of files (maybe 100 or so) so you can experiment with the effects of concurrency.

Generic-user-small
10 Jan 2015, 16:56
Pierre Sugar (57 posts)
defmodule WordCounter do

  def count(scheduler, file, word) do
    send scheduler, { self, { file, count_words(word, file) } }
  end

  defp _count_words({:ok, regex}, file), 
    do: Regex.scan(regex, File.read!(file)) |> length
  defp _count_words({:error, reason}, _file), do: -1
  def count_words(word, file), 
    do: _count_words(Regex.compile("#{word}"), file)

end

defmodule Scheduler do

  def schedule(module, function, data, args) do
    data
    |> Enum.map(fn(d) -> spawn(module, function, [self, d, args]) end)
    |> collect_results([])
  end

  def collect_results(processes, results) do
    receive do
      { child, result } when length(processes) > 1 ->
        collect_results(List.delete(processes, child), [result | results ]) 
      { _child, result } ->
        [ result | results ]
    end
  end

end

{ _, wd } = File.cwd
File.cd!("test-files/")
IO.puts "Start word count"
Scheduler.schedule(WordCounter, :count, File.ls!, "Relmmiad")
|> Enum.map(fn({file, count}) ->
  :io.format "~-30s     ~10B~n", [file, count]
end)
IO.puts "End word count"
File.cd! wd

C2f77d9bcc71d1f9ddabd89a8ddf9615_pragsmall
10 Jul 2015, 07:44
Rebecca Skinner (6 posts)

I found this to be a very confusingly-worded exercise - it implies we should be changing the Scheduler module only (“take the scheduler code and update it”), but then combined with “run one server process per file”, it doesn’t make a lot of sense.

Also, what kind of ‘experiments’ with concurrency can be run when we won’t have varying number of processes like with the previous exercise - 100 files is a small enough sample set that doubling it or more wouldn’t make much of a difference, unless they were very large files.

Generic-user-small
06 Aug 2015, 11:47
Stefan Chrobot (14 posts)

Here’s my generalized scheduler:

defmodule Scheduler do
  def run(process_count, func, work_queue) do
    1..process_count
    |> Enum.map(fn _ -> spawn_link(Scheduler, :worker, [self, func]) end)
    |> schedule(work_queue)
  end

  def order_work_queue(work_queue) do
    1..length(work_queue)
    |> Enum.zip(work_queue)
  end

  def worker(scheduler_pid, func) do
    # IO.puts "worker #{inspect(self)}: ready"
    send scheduler_pid, {:ready, self}
    receive do
      {{item_id, work_item}, scheduler_pid} ->
        # IO.puts "worker #{inspect(self)}: processing work item #{inspect(work_item)}"
        send scheduler_pid, {:result, {item_id, work_item, func.(work_item)}, self}
        worker(scheduler_pid, func)
      {:shutdown} ->
        # IO.puts "worker #{inspect(self)}: shutting down"
    end
  end

  def scheduler([], _ordered_work_queue, results) do
    results
  end
  def scheduler(workers, ordered_work_queue, results) do
    receive do
      {:ready, worker_pid} when length(ordered_work_queue) > 0 ->
        [next | tail] = ordered_work_queue
        # IO.puts "scheduler: sending next work item '#{inspect(next)}' to worker #{inspect(worker_pid)}"
        send worker_pid, {next, self}
        scheduler(workers, tail, results)
      {:ready, worker_pid} ->
        # IO.puts "scheduler: no more work to do, shutting down worker #{inspect(worker_pid)}"
        send worker_pid, {:shutdown}
        scheduler(workers -- [worker_pid], ordered_work_queue, results)
      {:result, {item_id, work_item, result}, worker_pid} ->
        # IO.puts "scheduler: worker #{inspect(worker_pid)} responded #{inspect(result)} to work item #{inspect(work_item)}"
        scheduler(workers, ordered_work_queue, [{item_id, {work_item, result}} | results])
    end
  end
 
  def schedule(workers, work_queue) do
    ordered_work_queue = order_work_queue(work_queue)
    scheduler(workers, ordered_work_queue, [])
    |> Enum.sort(fn {item_id_1, _}, {item_id_2, _} -> item_id_1 < item_id_2 end)
    |> Enum.map(fn {_item_id, result} -> result end)
  end
end

It returns the results in the same order that they appear in the work queue (this may or may not make sense for specific use case). I also use spawn_link to bring down the scheduler if the worker process crashes.

Here’s the benchmark for the “cat” search task:

defmodule Funcs do
  def count_cats(binary) do
    Regex.scan(~r/cat/, binary)
    |> length
  end
  def count_cats_in_file(path) do
    path
    |> File.read!
    |> count_cats
  end
end

dir = "some/dir/with/jpgs"
File.cd!(dir)
work_queue = File.ls!

max_process_count = 10
Enum.each 1..max_process_count, fn process_count ->
  {time, result} = :timer.tc(Scheduler, :run, [process_count, &Funcs.count_cats_in_file/1, work_queue])
  if process_count == 1 do
    IO.puts inspect result
    IO.puts "\n #   time (s)"
  end
  :io.format "~2B     ~.2f~n", [process_count, time/1000000.0]
end

And the results:

 #   time (s)
 1     4.20
 2     3.26
 3     2.34
 4     2.41
 5     2.37
 6     2.37
 7     2.38
 8     2.38
 9     2.38
10     2.36
20160310-gunnar_pragsmall
02 May 2016, 16:52
Felipe Juarez Murillo (9 posts)
defmodule Cat.Word do

  def cat(scheduler) do
    send scheduler, { :ready, self }

    receive do
      { :find, absname, client } ->
        send client, { :answer, absname, word_finder(absname), self }
        cat(scheduler)

      { :shutdown } ->
        exit(:normal)
    end
  end


  defp word_finder(absname) do
    data = File.read!(absname)
    Regex.scan(~r/cat/i, data) |> length
  end

end

defmodule Cat.Scheduler do

  def run(path, module, func) do
    to_calculate = path
                   |> File.ls!
                   |> Enum.map(&("#{path}/#{&1}"))

    num_processes = length(to_calculate)

    (1..num_processes)
    |> Enum.map(fn(_) -> spawn(module, func, [self]) end)
    |> schedule_processes(to_calculate, [])
    |> print_table
  end

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when length(queue) > 0 ->
        [ next | tail ] = queue
        send pid, {:find, next, self}
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
        send pid, {:shutdown}
        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          Enum.sort(results, fn {_,n1}, {_,n2} -> n1 >= n2 end)
        end

      {:answer, absname, result, _pid} ->
        schedule_processes(processes, queue, [ {absname, result} | results ])
    end
  end

  defp print_table(results) do
    :io.format("~-50s | ~-11s |~n", ["absname", "cat counter"])

    results
    |> Enum.each(fn({absname, counter}) ->
      :io.format("~-50s | ~-11s |~n", [absname, to_string(counter)])
    end)
  end
end

path = "/Users/makingdevs/Documents/elixir_book"
Cat.Scheduler.run(path, Cat.Word, :cat)

Result:

absname                                            | cat counter |
/Users/makingdevs/Documents/elixir_book/cat.exs    | 6           |
/Users/makingdevs/Documents/elixir_book/Elixir.Cat | 5           |
/Users/makingdevs/Documents/elixir_book/Elixir.Cat | 3           |
/Users/makingdevs/Documents/elixir_book/spawn4.ex  | 0           |
/Users/makingdevs/Documents/elixir_book/spawn3.ex  | 0           |
/Users/makingdevs/Documents/elixir_book/spawn2.ex  | 0           |
/Users/makingdevs/Documents/elixir_book/spawn1.ex  | 0           |
/Users/makingdevs/Documents/elixir_book/spawn-basi | 0           |
/Users/makingdevs/Documents/elixir_book/pmap.exs   | 0           |
/Users/makingdevs/Documents/elixir_book/monitor1.e | 0           |
/Users/makingdevs/Documents/elixir_book/lists_and_ | 0           |
/Users/makingdevs/Documents/elixir_book/link3.exs  | 0           |
/Users/makingdevs/Documents/elixir_book/link2.exs  | 0           |
/Users/makingdevs/Documents/elixir_book/link1.exs  | 0           |
/Users/makingdevs/Documents/elixir_book/fib.exs    | 0           |
/Users/makingdevs/Documents/elixir_book/exercise_p | 0           |
/Users/makingdevs/Documents/elixir_book/exercise_p | 0           |
/Users/makingdevs/Documents/elixir_book/Elixir.Spa | 0           |
/Users/makingdevs/Documents/elixir_book/Elixir.Spa | 0           |
/Users/makingdevs/Documents/elixir_book/Elixir.MyL | 0           |
/Users/makingdevs/Documents/elixir_book/chain.ex   | 0           |
Generic-user-small
10 Feb 2017, 00:19
Oleg (15 posts)

I was struggling to understand code flow for the scheduler. So I rewrote it in a more logical way (with debug messages and comments). So now I get it

Also you’d need a large dataset to work here with. If I process all the elixir files I made so far (looking for “def” substring) I see no difference between 1 or 10 processes. Only when I crammed :timer.sleep I could see something.

defmodule DeFinder do

  def find(caller) do
    # IO.puts "Process #{inspect self()}: Ready to work"

    send caller, {:ready, self()}

    receive do

      {:find, counter, filename} ->
        # IO.puts "Process #{inspect self()}: Returning result"
        send caller, {:result, {counter, find_in(filename)}, self()}
        find(caller)

      {:shutdown} ->
        # IO.puts "Process #{inspect self()}: Shutting down"
        exit(:normal)

    end
  end

  defp find_in(filename) do
    text = File.read!(filename)
    # This is way too fast
    :timer.sleep(10)
    {filename, length(Regex.scan(~r/def/, text))}
  end
end

defmodule Scheduler do

  def run(number_of_processes, module, func, queue) do
    (1..number_of_processes)
      |> Enum.map(fn(_) -> spawn_link(module, func, [self()]) end)
      |> schedule(queue)
  end

  def schedule(processes, queue, results \\ []) do

    receive do

      {:ready, pid} when length(queue) == 0 ->
        # IO.puts "Scheduler: No more items in queue. Asking process to shutdown"
        send pid, {:shutdown}


        if length(processes) > 1 do
          # if there are still processes kicking about, we need to wait until they are done
          schedule(List.delete(processes, pid), queue, results)
        else
          # because results can come in any order, we'll sort them using counter
          # number we attached to result data
          results
            |> Enum.sort(fn({n1, _}, {n2, _}) -> n1 >= n2 end)
            |> Enum.map(fn({_counter, result}) -> result end)
        end

      {:ready, pid} ->
        # IO.puts "Scheduler: Sending item from queue to process"

        counter = length(queue)
        [current | rest] = queue

        # doing the countdown so we can reorder results later
        send pid, {:find, counter, current}

        schedule(processes, rest, results)


      {:result, result, _pid} ->
        # IO.puts "Scheduler: Received result"
        schedule(processes, queue, [result | results])

      _ ->
        raise "Unexpected message"

    end
  end
end

filenames = Path.wildcard("**/*.{exs,ex}")

Enum.each 1..10, fn num_processes ->
  {time, result} = :timer.tc(
    Scheduler, :run, [num_processes, DeFinder, :find, filenames]
  )

  if num_processes == 1 do
    IO.puts inspect result
    IO.puts "\n # time (s)"
  end

  :io.format "~2B     ~.2f~n", [num_processes, time/1_000_000.0]
end
Generic-user-small
01 Jul 2017, 14:46
Larry Prall (8 posts)

I did it similarly to everyone else:

defmodule Cats do

  def count(scheduler) do
    send scheduler, { :ready, self() }
    receive do
      { :count, file, client } ->
        send client, { :answer, file, count_cats(file), self() }
        count(scheduler)
      { :shutdown } ->
        exit(:normal)
    end
  end
  
  def count_cats(file), do: read(file)

  defp read(file) do
    case File.read file do
      {:ok, body} -> 
        body  |> (&Regex.scan(~r{\bcats?\b}i, &1)).()
              |> length
      {:error,error} -> {:error, error}
    end
  end

end

defmodule Scheduler do

  def run(module, func, files) do
    (1..length(files))
    |> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
    |> schedule_processes(files, [])
  end

  defp schedule_processes(processes, queue, results) do
    receive do 
      {:ready, pid} when length(queue) > 0 ->
        [ next | tail ] = queue
        send pid, {:count, next, self()}
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
        send pid, {:shutdown}
        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          Enum.sort(results, fn {n1,_}, {n2,_}  -> n1 <= n2 end)
        end

      {:answer, number, result, _pid} ->
        schedule_processes(processes, queue, [ {number, result} | results ])
    end
  end

 end

files = File.ls!

{time, result} = :timer.tc(
  Scheduler, :run,
  [Cats, :count, files]
  )
total = length files
read = Enum.filter(result, fn(x) -> is_integer(elem(x,1)) end)
have_cats = Enum.filter(read, fn(x) -> elem(x,1) > 0 end) |> Enum.sort( fn(x,y) -> elem(x,1) > elem(y,1) end)
IO.puts "Examined #{total} files"
IO.puts "Read #{length read} files"
IO.puts "Of those, #{length have_cats} files had instances of \"cat\""
IO.puts "time: #{time}"

format = "~5s ... ~-70s ~n"
:io.format( format, ["cats", "filename"])
have_cats
|> Enum.each(fn({name,count}) ->
  :io.format( format, [to_string(count), name])end)

I originally tried it with only one process, and it took a bit over five minutes to scan a directory with over 3700 files (many of which were directories – OK, I’m a packrat).

In present form, with one process per file it takes under a minute. For comparison, running an egrep for the same string in the same directory has taken around an hour and it’s still going…

In a somewhat smaller directory:

habu:tst larry$ elixir ../../spawn/cats.exs

Examined 11 files

Read 10 files

Of those, 8 files had instances of “cat”

time: 143292

cats … filename
8 … Every Star Trek top 10 list ever invented in one e mail.rtf
5 … cats
2 … cats.exs
1 … dogs
1 … cith.pdf
1 … cats.exs.save
1 … cats-1.exs
1 … Allan Quatermain.txt
habu:tst larry$

The top of output from the large directory:

habu:Documents larry$ elixir elixir/spawn/cats.exs

Examined 3740 files

Read 3224 files

Of those, 363 files had instances of “cat”

time: 53156789

cats … filename
704 … Archero.sparseimage
178 … ACCD_Habu_1.dmg
130 … F430Action.MOV
121 … Archive.pgd
92 … Cine.pgd
78 … F430Action.m4v
72 … CompleteBridgeJournal.pdf
63 … Jean Webster.dmg
60 … iWork ‘09.zip
59 … Bookmarks.plist
50 … Utilities.zip
47 … Hound of the Baskervilles - Eng-Rus.dmg
45 … diffs iMacAppsByKind
45 … Terminal Saved Output
28 … TRS English US 1-8.dmg
26 … hist.txt
26 … 12monkeys.hist …

As you can see, it found “cat” or “cats” in some binary files, but egrep finds it in the same files.

Addendum: The egrep finally finished…

Binary file המסך המפוצל Lovelorn.pdf matches

egrep: イ家江島.rtfd: Is a directory

egrep: 沖縄の島唄ベスト・コレクション List.rtfd: Is a directory

real 74m28.101s

user 38m18.604s

sys 1m45.124s

You must be logged in to comment