Happstack and Streaming: Part 3: Implementation (Sort Of)

Implementation

For our proof-of-concept for trying to do streaming with Happstack, here’s a simple web application that implements each of the three possible approaches discussed earlier. To keep things simple, the data we’ll be streaming is a series of timestamps taken from the system clock at regular intervals. Not a particularly useful application, but it is something simple that uses IO, which is all we’re looking for here. To make things a bit more interesting, the web app will support each approach two different ways: first, streaming a finite number of timestamp values before ending the stream, and second, streaming an endless series of timestamp values until the browser closes the connection.

Specifically, the web app will support the following six paths:

/pipe/limited
A finite number of timestamps, using a OS-level pipe.
/pipe/infinite
An infinite number of timestamps, using an OS-level pipe.
/chan/limited
A finite number of timestamps, using a Chan.
/chan/infinite
An infinite number of timestamps, using a Chan.
/manual/limited
A finite number of timestamps, using unsafeInterleaveIO manually.
/manual/infinite
An infinite number of timestamps, using unsafeInterleaveIO manually.

First, let’s get all the module imports out of the way. There’s nothing particularly interesting about any of them, so I won’t comment on them further.

{-# LANGUAGE FlexibleContexts #-}
 
module Main (main) where
 
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Chan (Chan, getChanContents, newChan, writeChan)
import Control.Monad (liftM, MonadPlus, msum, when)
import Control.Monad.Trans (liftIO, MonadIO)
import Data.Time.Clock (diffUTCTime, getCurrentTime)
import Happstack.Server.HTTP.Types (Method (..), noContentLength, nullConf, Response, resultBS)
import Happstack.Server.SimpleHTTP (dir, FilterMonad, internalServerError, methodSP, nullDir,
                                    ServerMonad, simpleHTTP, toResponse)
import System.IO (Handle, hClose, hFlush, hPrint, hPutStrLn, stderr)
import System.IO.Unsafe (unsafeInterleaveIO)
import System.Posix.IO (createPipe, fdToHandle)
 
import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString.Lazy.Char8 as L

Next is the code that sets up the Happstack server with the six paths mentioned above.

main :: IO ()
main = simpleHTTP nullConf root
 
root :: (ServerMonad m, MonadPlus m, MonadIO m, FilterMonad Response m) => m Response
root = msum [ dir "pipe"   $ subdir outputPipe
            , dir "chan"   $ subdir outputChan
            , dir "manual" $ subdir outputManual
            ]
 
subdir :: (ServerMonad m, MonadPlus m, MonadIO m) => ((Int -> Int) -> IO Response) -> m Response
subdir output = msum [ dir "limited"  $ nullDir >> methodSP GET (liftIO $ output decr)
                     , dir "infinite" $ nullDir >> methodSP GET (liftIO $ output id)
                     ]
    where decr n = n - 1

To support both finite and infinite streams, each stream generator takes an initial counter and a decrement function. After generating each timestamp value, it applies the decrement function to the counter, and stops if the counter reaches zero. For finite streams, the decrement function subtracts one from the counter. For infinite streams, it does nothing, so that the counter never reaches zero.

Yeah, it’s kind of an ugly hack, but it’s good enough for this.

Here’s the code for generating a stream using OS-level pipes:

outputPipe :: (Int -> Int) -> IO Response
outputPipe decr = do h <- pipeClock decr limitedCount
                     bs <- L.hGetContents h
                     return $ streamBS bs
 
pipeClock :: (Int -> Int) -> Int -> IO Handle
pipeClock decr n = do (readFd, writeFd) <- createPipe
                      writeH <- fdToHandle writeFd
                      forkIO $ output writeH n `catch` abort writeH
                      fdToHandle readFd
    where output h 0 = do hPutStrLn stderr "closing pipe"
                          hClose h
          output h n = do now <- getCurrentTime
                          hPrint h now
                          hFlush h
                          tick now
                          threadDelay interval
                          output h (decr n)
          abort h e = do hPutStrLn stderr $ "caught error: " ++ show e
                         hClose h

The above code creates a pipe. A new thread is forked off to repeatedly write timestamps into the pipe. For a finite stream, the new thread closes its end of the pipe when it’s done writing. Meanwhile, the original thread uses lazy IO to read the entire contents of the pipe into a lazy ByteString, which gets passed back down to Happstack for sending to the browser.

Here’s the code for generating a stream using an IO channel:

outputChan :: (Int -> Int) -> IO Response
outputChan decr = do ch <- chanClock decr limitedCount
                     chunks <- getChanContents ch
                     let bs = L.fromChunks $ takeWhile (not . S.null) chunks
                     return $ streamBS bs
 
chanClock :: (Int -> Int) -> Int -> IO (Chan S.ByteString)
chanClock decr n = do ch <- newChan
                      forkIO $ output ch n
                      return ch
    where output ch 0 = do hPutStrLn stderr "done writing to channel"
                           writeChan ch S.empty
          output ch n = do now <- getCurrentTime
                           writeChan ch $ chunkify now
                           tick now
                           threadDelay interval
                           output ch (decr n)

The above code does the same basic thing, just with a channel of strict ByteStrings instead of a pipe. A new thread is forked off to write timestamp values to the channel. For a finite stream, the new thread writes an empty ByteString to indicate that it’s done. Meanwhile, the orginal thread uses lazy IO to read the entire contents of the channel into a (lazy) list, up until an empty ByteString. It then lazily combines the individual strict ByteStrings into a single lazy ByteString that it then passes down to Happstack.

The two above approaches each use a new thread that writes a timestamp, waits for the specified interval, writes another timestamp, and so on. Thus, the two code snippets look pretty similar in basic structure. Not so for using unsafeInterleaveIO directly:

outputManual :: (Int -> Int) -> IO Response
outputManual decr = do bs <- manualClock decr limitedCount
                       return $ streamBS bs
 
manualClock :: (Int -> Int) -> Int -> IO L.ByteString
manualClock decr n = do now <- getCurrentTime
                        tick now
                        allFutureChunks <- unsafeInterleaveIO $ ticksAfter now
                        let futureChunks = map fst $ zip allFutureChunks countdown
                        return . L.fromChunks $ chunkify now : futureChunks
    where ticksAfter since = do now <- getCurrentTime
                                let delta = diffUTCTime now since
                                when (delta < interval / 1000000) $
                                        threadDelay (round (interval - delta * 1000000))
                                now' <- getCurrentTime
                                tick now'
                                futureChunks <- unsafeInterleaveIO $ ticksAfter now'
                                return $ chunkify now' : futureChunks
          countdown = takeWhile (> 0) $ iterate decr (decr n)

Here, all the action takes place in a single thread. Again, it takes the strategy of combining a bunch of strict ByteStrings, one per timestamp, into a lazy ByteString with everything. It generates the first timestamp immediately, but defers building the rest of the list until it’s needed. When it is needed, it again generates the first timestamp of the rest of the list immediately, and defers building the rest.

It’s worth noting that this approach is the most difficult to write, since putting unsafeInterleaveIO in the right place is critical to making it work. Also, since everything is happening in a single thread, strictly speaking it’s no longer good enough to just delay for the desired interval between timestamps, since there’s no telling how much time has been spent in other parts of the code. Instead, it needs to check the clock twice each time around: first to figure out how long to delay, if any, and second to actually get the timestamp. Not surprisingly, the code is also the hardest to read of the three.

Finally, there are some odds and ends that bear mentioning. Here’s a simple function that converts a time value into a strict ByteString:

chunkify :: Show a => a -> S.ByteString
chunkify = S.pack . (++ "\n") . show

Here’s a helper function that prints out a message whenever a new timestamp is generated, so we can watch the app server’s progress:

tick :: Show a => a -> IO ()
tick x = hPutStrLn stderr $ "tick " ++ show x

As you can see from the type signatures of those two utility functions, there’s nothing that makes them unique to time values; any value that can be converted to a string (i.e., anything belonging to the Show class), works. We just happen to only use them with UTCTimes.

Anyway, here’s a simple function for converting a lazy ByteString into the Response object that Happstack ultimately wants:

streamBS :: L.ByteString -> Response
streamBS = noContentLength . resultBS 200

We explicitly tell Happstack not to add the Content-Length header, since to do that it would need to measure the length of the entire response, which would mean it can’t send anything until it sees the entire response, which defeats the entire point of what we’re trying to accomplish.

Last, and also least, here are the constants specifying the time interval between timestamps, and how many timestamps to generate in a finite stream before stopping:

interval :: Num a => a
interval = 100000        -- microseconds
 
limitedCount :: Num a => a
limitedCount = 30

That’s the entirety of the code. If you have GHC installed, you can go ahead and compile the program and play around with it to find out whether or not any of the approaches we’ve tried actually work. Since we didn’t pass any configuration parameters to Happstack, it will start a web server on port 8000 that you can point your favorite browser to, at any of the six paths we defined.

Stay tuned for Part 4, where we’ll see why (spoiler alert!) none of the three approaches actually work.

2 Responses

  1. Unrelated to Happstack – does the cannonical Kuliniwich have sourdough or ciabatta bread?

  2. Ciabatta

Comments are closed.