From 84f6e979917ed991edb8f52a66f96ce7ce5fbfb3 Mon Sep 17 00:00:00 2001 From: Andrew Miller Date: Thu, 20 Dec 2012 23:00:39 +1300 Subject: [PATCH] Initial version of the library --- .gitignore | 2 + LICENSE | 31 ++++++++++++++ conduit-resumablesink.cabal | 41 ++++++++++++++++++ hssrc/Data/Conduit/ResumableSink.hs | 66 +++++++++++++++++++++++++++++ test/main.hs | 31 ++++++++++++++ 5 files changed, 171 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 conduit-resumablesink.cabal create mode 100644 hssrc/Data/Conduit/ResumableSink.hs create mode 100644 test/main.hs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..733412c --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*~ +dist diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e6084e0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,31 @@ +Copyright (c) 2012 Andrew Miller + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of Isaac Jones nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/conduit-resumablesink.cabal b/conduit-resumablesink.cabal new file mode 100644 index 0000000..7fa703f --- /dev/null +++ b/conduit-resumablesink.cabal @@ -0,0 +1,41 @@ +Name: conduit-resumablesink +Version: 0.1 +Synopsis: Allows conduit to resume sinks to feed multiple sources into it. +Description: + @conduit-resumablesink@ is a solution to the problem where you have a @conduit@ + sink and you want to feed multiple sources into it as each source is exhausted. + This is essentially the opposite of the ResumableSource functionality supplied + with conduit. +License: BSD3 +License-file: LICENSE +Author: Andrew Miller +Maintainer: andrew@amxl.com +Category: Data, Conduit +Build-type: Simple +Cabal-version: >=1.8 +Homepage: http://github.com/A1kmm/conduit-resumablesink + +Library + Hs-Source-Dirs: hssrc + Exposed-modules: Data.Conduit.ResumableSink + Build-depends: base >= 4 && < 5, + conduit >= 0.5 && <0.6, + void >= 0.5.5 && < 0.6 + ghc-options: -Wall + +test-suite test + hs-source-dirs: test + main-is: main.hs + type: exitcode-stdio-1.0 + build-depends: conduit, + conduit-resumablesink, + base, + hspec >= 1.3, + bytestring, + void, + transformers + ghc-options: -Wall + +source-repository head + type: git + location: git://github.com/A1kmm/conduit-resumablesink.git diff --git a/hssrc/Data/Conduit/ResumableSink.hs b/hssrc/Data/Conduit/ResumableSink.hs new file mode 100644 index 0000000..d1ea890 --- /dev/null +++ b/hssrc/Data/Conduit/ResumableSink.hs @@ -0,0 +1,66 @@ +module Data.Conduit.ResumableSink ( + ResumableSink(..), connectResumeSink, newResumableSink, closeResumableSink, + (+$$), (++$$), (-++$$) + ) +where + +import Data.Conduit.Internal +import Data.Conduit +import Data.Void + +-- | +data ResumableSink m i r = ResumableSink (Sink i m r) + +-- | Connects a new source to a resumable sink. The result will be Right an updated +-- ResumableSink or Left result if the Sink completes. +connectResumeSink + :: Monad m => Source m i -> ResumableSink m i r -> m (Either r (ResumableSink m i r)) +connectResumeSink left0 (ResumableSink right0) = + go (return ()) left0 right0 + where + go :: Monad m => m () -> Source m i -> Sink i m r -> m (Either r (ResumableSink m i r)) + go leftFinal left right = + case right of + Done r -> leftFinal >> (return . Left $ r) + PipeM mp -> mp >>= go leftFinal left + HaveOutput _ _ o -> absurd o + Leftover p i -> go leftFinal (HaveOutput left leftFinal i) p + NeedInput rp _ -> + case left of + Leftover p () -> go leftFinal p right + HaveOutput left' leftFinal' o -> go leftFinal' left' (rp o) + NeedInput _ lc -> go leftFinal (lc ()) right + Done () -> return . Right $ ResumableSink right + PipeM mp -> mp >>= \left' -> go leftFinal left' right + +-- | Converts a sink into a ResumableSink that can be used with ++$$ +newResumableSink :: Monad m => Sink i m r -> ResumableSink m i r +newResumableSink s = ResumableSink s + +-- | Closes a ResumableSink and gets the final result. +closeResumableSink :: Monad m => ResumableSink m i r -> m r +closeResumableSink (ResumableSink sink) = + go sink + where + go right = + case right of + Leftover p i -> do + res <- connectResumeSink (HaveOutput (return ()) (return ()) i) (ResumableSink p) + case res of + Left r -> return r + Right rs -> closeResumableSink rs + HaveOutput _ _ o -> absurd o + NeedInput _ r -> go (r ()) + Done r -> return r + PipeM mp -> mp >>= go + +(+$$) :: Monad m => Source m i -> Sink i m r -> m (Either r (ResumableSink m i r)) +source +$$ sink = source `connectResumeSink` (newResumableSink sink) + +(++$$) :: Monad m => Source m i -> ResumableSink m i r -> m (Either r (ResumableSink m i r)) +(++$$) = connectResumeSink + +(-++$$) :: Monad m => Source m i -> ResumableSink m i r -> m r +source -++$$ (ResumableSink sink) = do + r <- source $$ sink + return r diff --git a/test/main.hs b/test/main.hs new file mode 100644 index 0000000..0b2a0eb --- /dev/null +++ b/test/main.hs @@ -0,0 +1,31 @@ +import Test.Hspec +import qualified Data.Conduit as C +import qualified Data.Conduit.List as C +import Data.Conduit.ResumableSink +import Data.IORef +import Control.Monad.IO.Class + +main :: IO () +main = hspec $ do + describe "use resumable sink" $ do + it "behaves like normal conduit when -++$$ used immediately" $ do + r <- C.runResourceT $ + (C.sourceList ["hello", "world"]) -++$$ (newResumableSink C.consume) + r `shouldBe` ["hello", "world"] + + it "sink can be resumed" $ do + r <- C.runResourceT $ do + Right r1 <- ((C.sourceList ["hello", "world"]) +$$ C.consume) + (C.sourceList ["hello", "world"]) -++$$ r1 + r `shouldBe` ["hello", "world", "hello", "world"] + + it "does correct cleanup" $ do + s <- newIORef (0 :: Int, 0 :: Int, 0 :: Int) + r <- C.runResourceT $ do + Right r1 <- + ((C.addCleanup (const . liftIO $ modifyIORef s (\(a,b,c) -> (a + 1, b, c))) (C.sourceList ["hello", "world"])) +$$ + C.addCleanup (const . liftIO $ modifyIORef s (\(a,b,c) -> (a,b,c+1))) (C.consume)) + ((C.addCleanup (const . liftIO $ modifyIORef s (\(a, b, c) -> (a, b + 1, c))) (C.sourceList ["hello", "world"]))) -++$$ r1 + `shouldBe` ["hello", "world", "hello", "world"] + sfinal <- readIORef s + sfinal `shouldBe` (1, 1, 1)