Happstack and Streaming: Part 3: Implementation (Sort Of)
Implementation
For our proof-of-concept for trying to do streaming with Happstack, here’s a simple web application that implements each of the three possible approaches discussed earlier. To keep things simple, the data we’ll be streaming is a series of timestamps taken from the system clock at regular intervals. Not a particularly useful application, but it is something simple that uses IO, which is all we’re looking for here. To make things a bit more interesting, the web app will support each approach two different ways: first, streaming a finite number of timestamp values before ending the stream, and second, streaming an endless series of timestamp values until the browser closes the connection.
Specifically, the web app will support the following six paths:
- /pipe/limited
- A finite number of timestamps, using a OS-level pipe.
- /pipe/infinite
- An infinite number of timestamps, using an OS-level pipe.
- /chan/limited
- A finite number of timestamps, using a
Chan
. - /chan/infinite
- An infinite number of timestamps, using a
Chan
. - /manual/limited
- A finite number of timestamps, using
unsafeInterleaveIO
manually. - /manual/infinite
- An infinite number of timestamps, using
unsafeInterleaveIO
manually.
First, let’s get all the module imports out of the way. There’s nothing particularly interesting about any of them, so I won’t comment on them further.
{-# LANGUAGE FlexibleContexts #-}
module Main (main) where
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Chan (Chan, getChanContents, newChan, writeChan)
import Control.Monad (liftM, MonadPlus, msum, when)
import Control.Monad.Trans (liftIO, MonadIO)
import Data.Time.Clock (diffUTCTime, getCurrentTime)
import Happstack.Server.HTTP.Types (Method (..), noContentLength, nullConf, Response, resultBS)
import Happstack.Server.SimpleHTTP (dir, FilterMonad, internalServerError, methodSP, nullDir,
ServerMonad, simpleHTTP, toResponse)
import System.IO (Handle, hClose, hFlush, hPrint, hPutStrLn, stderr)
import System.IO.Unsafe (unsafeInterleaveIO)
import System.Posix.IO (createPipe, fdToHandle)
import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString.Lazy.Char8 as L
Next is the code that sets up the Happstack server with the six paths mentioned above.
main :: IO ()
main = simpleHTTP nullConf root
root :: (ServerMonad m, MonadPlus m, MonadIO m, FilterMonad Response m) => m Response
root = msum [ dir "pipe" $ subdir outputPipe
, dir "chan" $ subdir outputChan
, dir "manual" $ subdir outputManual
]
subdir :: (ServerMonad m, MonadPlus m, MonadIO m) => ((Int -> Int) -> IO Response) -> m Response
subdir output = msum [ dir "limited" $ nullDir >> methodSP GET (liftIO $ output decr)
, dir "infinite" $ nullDir >> methodSP GET (liftIO $ output id)
]
where decr n = n - 1
To support both finite and infinite streams, each stream generator takes an initial counter and a decrement function. After generating each timestamp value, it applies the decrement function to the counter, and stops if the counter reaches zero. For finite streams, the decrement function subtracts one from the counter. For infinite streams, it does nothing, so that the counter never reaches zero.
Yeah, it’s kind of an ugly hack, but it’s good enough for this.
Here’s the code for generating a stream using OS-level pipes:
outputPipe :: (Int -> Int) -> IO Response
outputPipe decr = do h <- pipeClock decr limitedCount
bs <- L.hGetContents h
return $ streamBS bs
pipeClock :: (Int -> Int) -> Int -> IO Handle
pipeClock decr n = do (readFd, writeFd) <- createPipe
writeH <- fdToHandle writeFd
forkIO $ output writeH n `catch` abort writeH
fdToHandle readFd
where output h 0 = do hPutStrLn stderr "closing pipe"
hClose h
output h n = do now <- getCurrentTime
hPrint h now
hFlush h
tick now
threadDelay interval
output h (decr n)
abort h e = do hPutStrLn stderr $ "caught error: " ++ show e
hClose h
The above code creates a pipe. A new thread is forked off to repeatedly write timestamps into the pipe. For a finite stream, the new thread closes its end of the pipe when it’s done writing. Meanwhile, the original thread uses lazy IO to read the entire contents of the pipe into a lazy ByteString
, which gets passed back down to Happstack for sending to the browser.
Here’s the code for generating a stream using an IO channel:
outputChan :: (Int -> Int) -> IO Response
outputChan decr = do ch <- chanClock decr limitedCount
chunks <- getChanContents ch
let bs = L.fromChunks $ takeWhile (not . S.null) chunks
return $ streamBS bs
chanClock :: (Int -> Int) -> Int -> IO (Chan S.ByteString)
chanClock decr n = do ch <- newChan
forkIO $ output ch n
return ch
where output ch 0 = do hPutStrLn stderr "done writing to channel"
writeChan ch S.empty
output ch n = do now <- getCurrentTime
writeChan ch $ chunkify now
tick now
threadDelay interval
output ch (decr n)
The above code does the same basic thing, just with a channel of strict ByteString
s instead of a pipe. A new thread is forked off to write timestamp values to the channel. For a finite stream, the new thread writes an empty ByteString
to indicate that it’s done. Meanwhile, the orginal thread uses lazy IO to read the entire contents of the channel into a (lazy) list, up until an empty ByteString
. It then lazily combines the individual strict ByteString
s into a single lazy ByteString
that it then passes down to Happstack.
The two above approaches each use a new thread that writes a timestamp, waits for the specified interval, writes another timestamp, and so on. Thus, the two code snippets look pretty similar in basic structure. Not so for using unsafeInterleaveIO
directly:
outputManual :: (Int -> Int) -> IO Response
outputManual decr = do bs <- manualClock decr limitedCount
return $ streamBS bs
manualClock :: (Int -> Int) -> Int -> IO L.ByteString
manualClock decr n = do now <- getCurrentTime
tick now
allFutureChunks <- unsafeInterleaveIO $ ticksAfter now
let futureChunks = map fst $ zip allFutureChunks countdown
return . L.fromChunks $ chunkify now : futureChunks
where ticksAfter since = do now <- getCurrentTime
let delta = diffUTCTime now since
when (delta < interval / 1000000) $
threadDelay (round (interval - delta * 1000000))
now' <- getCurrentTime
tick now'
futureChunks <- unsafeInterleaveIO $ ticksAfter now'
return $ chunkify now' : futureChunks
countdown = takeWhile (> 0) $ iterate decr (decr n)
Here, all the action takes place in a single thread. Again, it takes the strategy of combining a bunch of strict ByteString
s, one per timestamp, into a lazy ByteString
with everything. It generates the first timestamp immediately, but defers building the rest of the list until it’s needed. When it is needed, it again generates the first timestamp of the rest of the list immediately, and defers building the rest.
It’s worth noting that this approach is the most difficult to write, since putting unsafeInterleaveIO
in the right place is critical to making it work. Also, since everything is happening in a single thread, strictly speaking it’s no longer good enough to just delay for the desired interval between timestamps, since there’s no telling how much time has been spent in other parts of the code. Instead, it needs to check the clock twice each time around: first to figure out how long to delay, if any, and second to actually get the timestamp. Not surprisingly, the code is also the hardest to read of the three.
Finally, there are some odds and ends that bear mentioning. Here’s a simple function that converts a time value into a strict ByteString
:
chunkify :: Show a => a -> S.ByteString
chunkify = S.pack . (++ "\n") . show
Here’s a helper function that prints out a message whenever a new timestamp is generated, so we can watch the app server’s progress:
tick :: Show a => a -> IO ()
tick x = hPutStrLn stderr $ "tick " ++ show x
As you can see from the type signatures of those two utility functions, there’s nothing that makes them unique to time values; any value that can be converted to a string (i.e., anything belonging to the Show
class), works. We just happen to only use them with UTCTime
s.
Anyway, here’s a simple function for converting a lazy ByteString
into the Response
object that Happstack ultimately wants:
streamBS :: L.ByteString -> Response
streamBS = noContentLength . resultBS 200
We explicitly tell Happstack not to add the Content-Length header, since to do that it would need to measure the length of the entire response, which would mean it can’t send anything until it sees the entire response, which defeats the entire point of what we’re trying to accomplish.
Last, and also least, here are the constants specifying the time interval between timestamps, and how many timestamps to generate in a finite stream before stopping:
interval :: Num a => a
interval = 100000 -- microseconds
limitedCount :: Num a => a
limitedCount = 30
That’s the entirety of the code. If you have GHC installed, you can go ahead and compile the program and play around with it to find out whether or not any of the approaches we’ve tried actually work. Since we didn’t pass any configuration parameters to Happstack, it will start a web server on port 8000 that you can point your favorite browser to, at any of the six paths we defined.
Stay tuned for Part 4, where we’ll see why (spoiler alert!) none of the three approaches actually work.
2 Responses
Unrelated to Happstack – does the cannonical Kuliniwich have sourdough or ciabatta bread?
Ciabatta