Pipelines

2011-10-15 16:25 +0000

Hardly a day passes without me using pipelines on my shell. Say I want to observe an Apache log file for requests from a certain IP address:

tail -f access.log | grep 207.46.195.223

Oh, and I only care about the paths:

tail -f access.log | grep 207.46.195.223 | cut -d ' ' -f7

Granted, that's a bit coarse but it does the job and is ready at hand. Pipelines are great! It occured to me that they actually work a lot like functional programming: The individual programs that make up the pipeline are merely acting on their inputs and produce outputs without sharing any state. Okay, they may write to files and whatnot or even communicate among each other via back channels but that's secondary special-case trickery. The expressiveness and elegance of the pipeline construct lies in the simplicity of its parts. And it's a very robust interface that composes well. The tar program is probably as old as Unix itself and yet in 2011 you can pipe the chicken-doc repository into it obtained directly from Jim's server by means of a protocol from the 90s without it batting an eye:

curl http://3e8.org/pub/chicken-doc/chicken-doc-repo.tgz | sudo tar zx

Try that with JavaBeans.

Can I Get That With Parentheses, Please?

The log example above could be expressed in (Chicken) Scheme like this:

(use extras srfi-13)

(call-with-input-file "access.log"
  (lambda (file)
    (for-each
     print 
     (map (lambda (x)
            (list-ref (string-split x " ") 6))
          (filter (lambda (x)
                    (string-contains x "207.46.195.223"))
                  (read-lines file))))))

Well, this is not quite the same as it reads the whole file and doesn't follow it for changes like tail -f but it's close enough. This is not the only difference though: Note how first all lines are read, then all lines are filtered, then all filtered lines are split, and finally all results are printed. This is very different from how the pipeline works: grep can process lines as soon as they have been passed on by tail and cut can process lines as soon as they are passed on by grep while the Scheme code completely buffers each step in between. Depending on the amount of data this can be quite inefficient. Also, this way something like tail -f is impossible to implement.¹ We could change the program to process the data in a similar fashion:

(use extras srfi-13)

(call-with-input-file "access.log"
  (lambda (file)
    (let loop ((line (read-line file)))
      (unless (eof-object? line)
        (when (string-contains line "207.46.195.223")
          (print (list-ref (string-split line " ") 6)))
        (loop (read-line file))))))

This still won't follow file changes (for that to work we would have to use something like inotify or kqueue) but at least it won't buffer intermediate results like the previous version did. Also, the code has pretty much turned inside out and looks much more imperative now. In this simple case that is even an improvement readability-wise and also easier to follow but it can quickly deteriorate into an entangled mess of spaghetti code in more complicated situations.

There's another problem though: Say we'd read from a network connection rather than from a file and instead of calling string-contains we would do a database lookup that's really, really slow because the database runs somewhere in the cloud on a VM that's sharing a host system with 49 other VMs. What would happen? The whole pipeline would have to wait for that one lookup operation to finish its work. In the meantime, no data would be read from the network connection and in the worst case it may even time out. Clearly, that's not a very good use of resources!

Parallel Universe

A solution to this would be to parallelize the parts of the pipeline. In fact, that's exactly what happens in a shell pipeline: As each part is a separate process the OS will schedule them accordingly or, in the presence of multiple processors, actually run them in parallel. In our little Scheme program we could also fork and have each pipeline part executed in a separate process but that's rather expensive. Alternatively there are threads which in most Scheme implementations (such as Chicken) are implemented as light-weight green threads. We could run each part of the pipeline in a separate thread and communicate between them using shared queues. Since there is no queue implementation in RnRS and Chicken's core queue data structure is not thread-safe, this would involve quite a bit of mutex juggling. I'll skip that step in the evolution of our program and leave it as an excercise for the masochists among the readers. Let's use Christian's mailbox-threads egg instead. What it does is that it attaches a queue to each thread through which it can receive arbitrary messages from other threads. If you have Chicken installed on your system you should be able to install it with

chicken-install mailbox-threads

Well then, let's have one thread per pipeline part:

(use mailbox-threads srfi-13)

(define printer
  (thread-start!
   (lambda ()
     (let loop ()
       (let ((msg (thread-receive)))
         (unless (eof-object? msg)
           (print msg)
           (thread-yield!)
           (loop)))))))

(define splitter
  (thread-start!
   (lambda ()
     (let loop ()
       (let ((msg (thread-receive)))
         (if (eof-object? msg)
             (thread-send printer msg)
             (begin
               (thread-send printer 
                            (list-ref (string-split msg " ") 6))
               (thread-yield!)
               (loop))))))))

(define filter
  (thread-start!
   (lambda ()
     (let loop ()
       (let ((msg (thread-receive)))
         (if (eof-object? msg)
             (thread-send splitter msg)
             (begin
               (when (string-contains msg "207.46.195.223")
                 (thread-send splitter msg)
                 (thread-yield!))
               (loop))))))))

(define reader
  (thread-start!
   (lambda ()
     (call-with-input-file "access.log"
       (lambda (file)
         (let loop ()
           (let ((line (read-line file)))
             (thread-send filter line)
             (thread-yield!)
             (unless (eof-object? line)
               (loop)))))))))

(thread-join! printer)

Note that we have to define the parts in reverse order to be able to send messages from one pipeline part to its successor.² Although the code is very verbose it should be easy to follow: Each part fulfills its respective task and passes on the result to the next thread (if any). Each sending thread also tries to be cooperative by yielding control before trying to receive the next message. This way each message should flow as quickly as possible through the whole pipeline. #!eof objects are always forwarded to all threads so they know when to shut down.³ Since we want each line to flow through the whole pipeline before the program exits the main thread joins on the printer thread as it's the end of the pipeline.

Comparing the performace of these two implementations is pretty pointless for this simple case. But we can simulate expensive operations by sprinkling some thread-sleep! in the code. Adding a (thread-sleep! 0.1) before string-contains call and (thread-sleep! 0.2) before the string-split call simulates a slow filtering and an even slower splitting operation. The latter will only be run on lines matching our condition. For a file of 50 lines (9.1K) with 2 matches in it I get the following results:

Without pipeline: 0.12s user 0.02s system 2% cpu 6.168 total
With pipeline:    0.10s user 0.02s system 2% cpu 5.156 total

So we already gained a bit! This shows quite well that in the non-pipelined version, read-line has to wait for each thread-sleep! to finish while in the pipelined version it can keep going. Obviously this will consume more memory so it may make sense to cap the thread mailbox capacity in some scenarios.

The obvious drawback of this approach is its redundancy. This calls for an abstraction!

(pipeline ...)

Let's make up a function pipeline that accepts a start function and a parts rest argument of functions. The start function is supposed to produce input values for the pipeline. When it returns the pipeline shuts down. To be able to send values into the pipeline (i.e. send it to the next thread) it is passed a send function. The parts functions receive such a function as well (except for the last one as it can't sensibly send values anywhere). In addition they receive all further arguments passed to the send function by their respectively preceding thread. Our explicit and verbose program from above could then be expressed more concisely like this:

(use extras srfi-13 ports)

(thread-join!
 (pipeline (lambda (send)
             (with-input-from-file "access.log"
               (lambda ()
                 (port-for-each send read-line))))
           (lambda (send line)
             (when (string-contains line "207.46.195.223")
               (send line)))
           (lambda (send line)
             (send (list-ref (string-split line " ") 6)))
           (lambda (line)
             (print line))))

For additional conciseness we use port-for-each from Chicken's ports unit. The implementation of pipeline is slightly complex:

(define end-of-pipeline '(end-of-pipeline))

(define (end-of-pipeline? x)
  (eq? end-of-pipeline x))

(define (pipeline-sender thread)
  (lambda msg
    (thread-send thread msg)
    (thread-yield!)))

(define (make-pipeline parts)
  (fold-right
   (lambda (process next+end)
     (if next+end
         (let* ((next (car next+end))
                (send (pipeline-sender next)))
           (cons (thread-start! 
                  (lambda ()
                    (let loop ()
                      (let ((msg (thread-receive)))
                        (if (end-of-pipeline? msg)
                            (thread-send next msg)
                            (begin
                              (apply process send msg)
                              (loop)))))))
                 (cdr next+end)))
         (let ((end (thread-start!
                     (lambda ()
                       (let loop ()
                         (let ((msg (thread-receive)))
                           (unless (end-of-pipeline? msg)
                             (apply process msg)
                             (thread-yield!)
                             (loop))))))))
           (cons end end))))
   #f
   parts))

(define (pipeline start . parts)
  (let* ((next+end (make-pipeline parts))
         (next (car next+end))
         (end (cdr next+end))
         (send (pipeline-sender next)))
    (thread-start!
     (lambda ()
       (start send)
       (thread-send next end-of-pipeline)))
    end))

Just like in the explicit version above we need to walk the parts functions from the end to the head (thus fold-right) so we can pass the next thread to its respective predecessor. We also have to preserve the end, i.e. the thread for the last part, to be able to return it from the pipeline function. This is desirable because other threads may want to join on it to wait for the pipeline's completion just like we did in the verbose version. In our example above we do exactly this to join the primordial thread on the pipeline's end so that the program doesn't exit before the pipeline has finished its work. Once the start function has returned (i.e. it has finished producing values) we send the special end-of-pipeline message that stops all participating threads.

It may make sense to have a pipeline! function as well which always joins the current thread as that seems to be a pretty common thing to do:

(define (pipeline! . parts)
  (thread-join! (apply pipeline parts)))

There's More To Be Had

Now that we have the basics covered there is another obvious limitation: Each pipeline part is executed in only one thread. But what keeps the filtering or splitting functions from running multiple times in parallel? As they are both pure functions that should work without problems. And it would improve the pipeline's performance in case one of those operations takes a long time or has to block. One way to achieve this is to create another function which parallelizes a pipeline part similar to xargs -P or GNU Parallel in shell pipelines. Let's create (pipeline-fork num process) which spawns num threads for the regular pipeline function process. It would be used like this:

(pipeline! (lambda (send)
             (with-input-from-file "access.log"
               (lambda ()
                 (port-for-each send read-line))))
           (pipeline-fork 4
             (lambda (send line)
               (when (string-contains line "207.46.195.223")
                 (send line))))
           (pipeline-fork 2
             (lambda (send line)
               (send (list-ref (string-split line " ") 6))))
           (lambda (line)
             (print line)))

For this to work we also have to change make-pipeline a bit because we need to hook into the end-of-pipeline passing phase in order know when to shut down the forked threads. I have decided to implement it in such a way that make-pipeline now also accepts pairs in parts. If a pair is found it will expect the actual processing function in its car and a thunk in the cdr that gets called when the end of the pipeline is reached. Once that function returns, the remaining pipeline is shut down. In code that would be:

(define (make-pipeline parts)
  (fold-right
   (lambda (part next+end)
     (if next+end
         (let* ((next (car next+end))
                (send (pipeline-sender next))
                (process (if (pair? part)
                             (car part)
                             part))
                (handle-end (lambda ()
                              (thread-send next end-of-pipeline)))
                (handle-end (if (pair? part)
                                (lambda ()
                                  ((cdr part))
                                  (handle-end))
                                handle-end)))
           (cons (thread-start!
                  (lambda ()
                    (let loop ()
                      (let ((msg (thread-receive)))
                        (if (end-of-pipeline? msg)
                            (handle-end)
                            (begin
                              (apply process send msg)
                              (loop)))))))
                 (cdr next+end)))
         (let ((end (thread-start!
                     (lambda ()
                       (let loop ()
                         (let ((msg (thread-receive)))
                           (unless (end-of-pipeline? msg)
                             (apply part msg)
                             (thread-yield!)
                             (loop))))))))
           (cons end end))))
   #f
   parts))

And with this we can implement pipeline-fork like this:

(define (pipeline-fork num process)
  (let* ((threads (list->vector
                   (map (lambda _
                          (thread-start!
                           (lambda ()
                             (let loop ()
                               (let ((msg (thread-receive)))
                                 (unless (end-of-pipeline? msg)
                                   (apply process msg)
                                   (loop)))))))
                        (iota num))))
         (next (let ((next-num num))
                 (lambda ()
                   (set! next-num (modulo (+ next-num 1) num))
                   next-num))))
    (cons (lambda msg
            (thread-send (vector-ref threads (next)) msg))
          (lambda ()
            (let loop ((n 0))
              (when (< n num)
                (let ((thread (vector-ref threads n)))
                  (thread-send thread end-of-pipeline)
                  (thread-join! thread))
                (loop (+ n 1))))))))

As you can see we simply distribute the messages in a round-robin fashion among the forked threads. This could certainly be made smarter but does the trick for most cases already. Using the benchmark setup from above I get the following results:

1  filter,   1 splitter:  0.11s user 0.04s system  3% cpu 5.199 total
2  filters,  1 splitters: 0.09s user 0.03s system  4% cpu 2.634 total
4  filters,  2 splitters: 0.08s user 0.03s system  7% cpu 1.420 total
20 filters, 10 splitters: 0.08s user 0.02s system 24% cpu 0.418 total

Nice! Note though that we might not receive all results in the same order as they appear in the log file. How about adding a flag to pipeline-fork that keeps messages in order? What other kinds of pipeline features could be useful? As usual you can find the complete code at gitorious.⁴ Feel free to hack it and let me know if you have a patch!


1
At least in Scheme where we don't have built in lazy sequences like for example in Clojure.
2
An alternative way is to first define all identifiers and then set! them afterwards.
3
This is actually bad practice as we are overloading the meaning of #!eof to signify the end of the pipeline. Bear with me though, we'll fix this later on.
4
It alrady contains a function not featured in this article called pipeline-collect. Its last function receives a send function as well and and all values sent by it are added to a list. This list is the return value of pipeline-collect. For completeness there is also pipeline-collect! which is, of course, the joining version of pipeline-collect.