Merge pull request #2 from pngwjpgh/fix/conduit-1.2
Port to conduit 1.2
This commit is contained in:
commit
786c58e967
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,2 +1,5 @@
|
|||||||
*~
|
*~
|
||||||
dist
|
dist
|
||||||
|
*.sw[a-z]
|
||||||
|
.cabal-sandbox/
|
||||||
|
cabal.sandbox.config
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
Name: conduit-resumablesink
|
Name: conduit-resumablesink
|
||||||
Version: 0.1.1
|
Version: 0.2
|
||||||
Synopsis: Allows conduit to resume sinks to feed multiple sources into it.
|
Synopsis: Allows conduit to resume sinks to feed multiple sources into it.
|
||||||
Description:
|
Description:
|
||||||
@conduit-resumablesink@ is a solution to the problem where you have a @conduit@
|
@conduit-resumablesink@ is a solution to the problem where you have a @conduit@
|
||||||
@ -20,8 +20,8 @@ Library
|
|||||||
Exposed-modules: Data.Conduit.ResumableSink
|
Exposed-modules: Data.Conduit.ResumableSink
|
||||||
Build-depends:
|
Build-depends:
|
||||||
base >= 4 && < 5,
|
base >= 4 && < 5,
|
||||||
conduit >= 1.0.5 && <1.1,
|
conduit >= 1.2 && <1.3,
|
||||||
void >= 0.6 && < 0.7
|
void >= 0.6 && < 0.8
|
||||||
ghc-options: -Wall
|
ghc-options: -Wall
|
||||||
|
|
||||||
test-suite test
|
test-suite test
|
||||||
@ -34,6 +34,7 @@ test-suite test
|
|||||||
hspec >= 1.3,
|
hspec >= 1.3,
|
||||||
bytestring,
|
bytestring,
|
||||||
void,
|
void,
|
||||||
|
resourcet,
|
||||||
transformers
|
transformers
|
||||||
ghc-options: -Wall
|
ghc-options: -Wall
|
||||||
|
|
||||||
|
|||||||
@ -5,65 +5,49 @@ module Data.Conduit.ResumableSink (
|
|||||||
where
|
where
|
||||||
|
|
||||||
import Data.Conduit.Internal
|
import Data.Conduit.Internal
|
||||||
import Data.Conduit
|
|
||||||
import Data.Void
|
import Data.Void
|
||||||
|
|
||||||
-- |
|
-- |
|
||||||
data ResumableSink m i r = ResumableSink (Sink i m r)
|
data ResumableSink i m r = ResumableSink (Sink i m r)
|
||||||
|
|
||||||
-- | Connects a new source to a resumable sink. The result will be Right an updated
|
-- | Connects a new source to a resumable sink. The result will be Right an updated
|
||||||
-- ResumableSink or Left result if the Sink completes.
|
-- ResumableSink or Left result if the Sink completes.
|
||||||
connectResumeSink
|
connectResumeSink :: Monad m => Source m i -> ResumableSink i m r -> m (Either r (ResumableSink i m r))
|
||||||
:: Monad m => Source m i -> ResumableSink m i r -> m (Either r (ResumableSink m i r))
|
connectResumeSink (ConduitM left') (ResumableSink (ConduitM right')) = go (return ()) (left' Done) (right' Done)
|
||||||
connectResumeSink left0 (ResumableSink right0) =
|
|
||||||
go (return ()) left0 right0
|
|
||||||
where
|
where
|
||||||
go :: Monad m => m () -> Source m i -> Sink i m r -> m (Either r (ResumableSink m i r))
|
go :: Monad m
|
||||||
go leftFinal left right =
|
=> m ()
|
||||||
case unConduitM right of
|
-> Pipe () () i () m ()
|
||||||
Done r -> leftFinal >> (return . Left $ r)
|
-> Pipe i i Void () m r
|
||||||
PipeM mp -> mp >>= go leftFinal left . ConduitM
|
-> m (Either r (ResumableSink i m r))
|
||||||
HaveOutput _ _ o -> absurd o
|
go final (NeedInput cont0 _ ) right = go final (cont0 ()) right
|
||||||
Leftover p i -> go leftFinal (ConduitM $ HaveOutput (unConduitM left) leftFinal i) $ ConduitM p
|
go final (Done ()) right = return . Right . ResumableSink $ ConduitM (\finalize -> right >>= finalize)
|
||||||
NeedInput rp _ ->
|
go final (PipeM pm) right = pm >>= \left -> go final left right
|
||||||
case unConduitM left of
|
go final (Leftover left ()) right = go final left right
|
||||||
Leftover p () -> go leftFinal (ConduitM p) right
|
go final0 (HaveOutput left1 final1 o) (NeedInput cont0 _) = go (final0 >> final1) left1 (cont0 o)
|
||||||
HaveOutput left' leftFinal' o -> go leftFinal' (ConduitM left') (ConduitM $ rp o)
|
go _ _ (HaveOutput _ _ o) = absurd o
|
||||||
NeedInput _ lc -> go leftFinal (ConduitM $ lc ()) right
|
go final _ (Done r) = Left r <$ final
|
||||||
Done () -> return . Right $ ResumableSink right
|
go final left (PipeM pm) = pm >>= go final left
|
||||||
PipeM mp -> mp >>= \left' -> go leftFinal (ConduitM left') right
|
go final left (Leftover right i) = go final (HaveOutput left (return ()) i) right
|
||||||
|
|
||||||
-- | Converts a sink into a ResumableSink that can be used with ++$$
|
-- | Converts a sink into a ResumableSink that can be used with ++$$
|
||||||
newResumableSink :: Monad m => Sink i m r -> ResumableSink m i r
|
newResumableSink :: Monad m => Sink i m r -> ResumableSink i m r
|
||||||
newResumableSink = ResumableSink
|
newResumableSink = ResumableSink
|
||||||
|
|
||||||
-- | Closes a ResumableSink and gets the final result.
|
-- | Closes a ResumableSink and gets the final result.
|
||||||
closeResumableSink :: Monad m => ResumableSink m i r -> m r
|
closeResumableSink :: Monad m => ResumableSink i m r -> m r
|
||||||
closeResumableSink (ResumableSink sink) =
|
closeResumableSink (ResumableSink sink) = runConduit $ return () =$= sink
|
||||||
go (unConduitM sink)
|
|
||||||
where
|
|
||||||
go right =
|
|
||||||
case right of
|
|
||||||
Leftover p i -> do
|
|
||||||
res <- connectResumeSink (ConduitM $ HaveOutput (return ()) (return ()) i) (ResumableSink $ ConduitM p)
|
|
||||||
case res of
|
|
||||||
Left r -> return r
|
|
||||||
Right rs -> closeResumableSink rs
|
|
||||||
HaveOutput _ _ o -> absurd o
|
|
||||||
NeedInput _ r -> go (r ())
|
|
||||||
Done r -> return r
|
|
||||||
PipeM mp -> mp >>= go
|
|
||||||
|
|
||||||
-- | Connects a source and a sink. The result will be Right a
|
-- | Connects a source and a sink. The result will be Right a
|
||||||
-- ResumableSink or Left result if the Sink completes.
|
-- ResumableSink or Left result if the Sink completes.
|
||||||
(+$$) :: Monad m => Source m i -> Sink i m r -> m (Either r (ResumableSink m i r))
|
(+$$) :: Monad m => Source m i -> Sink i m r -> m (Either r (ResumableSink i m r))
|
||||||
source +$$ sink = source `connectResumeSink` newResumableSink sink
|
source +$$ sink = source `connectResumeSink` newResumableSink sink
|
||||||
|
|
||||||
-- | Connects a new source to a resumable sink. The result will be Right an updated
|
-- | Connects a new source to a resumable sink. The result will be Right an updated
|
||||||
-- ResumableSink or Left result if the Sink completes.
|
-- ResumableSink or Left result if the Sink completes.
|
||||||
(++$$) :: Monad m => Source m i -> ResumableSink m i r -> m (Either r (ResumableSink m i r))
|
(++$$) :: Monad m => Source m i -> ResumableSink i m r -> m (Either r (ResumableSink i m r))
|
||||||
(++$$) = connectResumeSink
|
(++$$) = connectResumeSink
|
||||||
|
|
||||||
-- | Attaches a source to a resumable sink, finishing the sink and returning a result.
|
-- | Attaches a source to a resumable sink, finishing the sink and returning a result.
|
||||||
(-++$$) :: Monad m => Source m i -> ResumableSink m i r -> m r
|
(-++$$) :: Monad m => Source m i -> ResumableSink i m r -> m r
|
||||||
source -++$$ ResumableSink sink = source $$ sink
|
source -++$$ ResumableSink sink = source $$ sink
|
||||||
|
|||||||
@ -4,16 +4,17 @@ import qualified Data.Conduit.List as C
|
|||||||
import Data.Conduit.ResumableSink
|
import Data.Conduit.ResumableSink
|
||||||
import Data.IORef
|
import Data.IORef
|
||||||
import Control.Monad.IO.Class
|
import Control.Monad.IO.Class
|
||||||
|
import Control.Monad.Trans.Resource as R
|
||||||
|
|
||||||
main :: IO ()
|
main :: IO ()
|
||||||
main = hspec $ describe "use resumable sink" $ do
|
main = hspec $ describe "use resumable sink" $ do
|
||||||
it "behaves like normal conduit when -++$$ used immediately" $ do
|
it "behaves like normal conduit when -++$$ used immediately" $ do
|
||||||
r <- C.runResourceT $
|
r <- R.runResourceT $
|
||||||
C.sourceList ["hello", "world"] -++$$ newResumableSink C.consume
|
C.sourceList ["hello", "world"] -++$$ newResumableSink C.consume
|
||||||
r `shouldBe` ["hello", "world"]
|
r `shouldBe` ["hello", "world"]
|
||||||
|
|
||||||
it "sink can be resumed" $ do
|
it "sink can be resumed" $ do
|
||||||
r <- C.runResourceT $ do
|
r <- R.runResourceT $ do
|
||||||
Right r1 <- C.sourceList ["hello", "world"] +$$ C.consume
|
Right r1 <- C.sourceList ["hello", "world"] +$$ C.consume
|
||||||
C.sourceList ["hello", "world"] -++$$ r1
|
C.sourceList ["hello", "world"] -++$$ r1
|
||||||
r `shouldBe` ["hello", "world", "hello", "world"]
|
r `shouldBe` ["hello", "world", "hello", "world"]
|
||||||
@ -22,7 +23,7 @@ main = hspec $ describe "use resumable sink" $ do
|
|||||||
s <- newIORef (0 :: Int, 0 :: Int, 0 :: Int)
|
s <- newIORef (0 :: Int, 0 :: Int, 0 :: Int)
|
||||||
let clean f _ = liftIO $ modifyIORef s f
|
let clean f _ = liftIO $ modifyIORef s f
|
||||||
|
|
||||||
r <- C.runResourceT $ do
|
r <- R.runResourceT $ do
|
||||||
Right r1 <-
|
Right r1 <-
|
||||||
C.addCleanup (clean incA) (C.sourceList ["hello", "world"])
|
C.addCleanup (clean incA) (C.sourceList ["hello", "world"])
|
||||||
+$$ C.addCleanup (clean incC) C.consume
|
+$$ C.addCleanup (clean incC) C.consume
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user