diff --git a/src/Network/Minio/PutObject.hs b/src/Network/Minio/PutObject.hs index 845ab45..33132b2 100644 --- a/src/Network/Minio/PutObject.hs +++ b/src/Network/Minio/PutObject.hs @@ -7,11 +7,11 @@ module Network.Minio.PutObject import qualified Data.ByteString as B -import qualified Data.ByteString.Lazy as LB import qualified Data.Conduit as C import Data.Conduit.Binary (sourceHandleRange) import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Combinators as CC +import qualified Data.Conduit.List as CL import qualified Data.List as List import qualified Data.Map.Strict as Map @@ -144,40 +144,29 @@ sequentialMultipartUpload b o sizeMay src = do uploadId <- maybe (newMultipartUpload b o []) return uidMay -- upload parts in loop - uploadedParts <- uploadPartsSequentially b o uploadId pmap sizeMay src + let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay + (pnums, _, sizes) = List.unzip3 partSizes + uploadedParts <- src + C..| chunkBSConduit sizes + C..| CL.map PayloadBS + C..| checkAndUpload uploadId pmap pnums + C.$$ CC.sinkList -- complete multipart upload completeMultipartUpload b o uploadId uploadedParts -uploadPartsSequentially :: Bucket -> Object -> UploadId - -> Map PartNumber ListPartInfo -> Maybe Int64 - -> C.Source Minio ByteString -> Minio [PartInfo] -uploadPartsSequentially b o uid pmap sizeMay src' = do - let - rSrc = C.newResumableSource src' - partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay - - loopIt rSrc partSizes [] - where - -- make a sink that consumes only `s` bytes - limitedSink s = CB.isolate (fromIntegral s) C.=$= CB.sinkLbs - - loopIt _ [] uparts = return $ reverse uparts - loopIt src ((n, _, size):ps) uparts = do - (newSrc, buf) <- src C.$$++ (limitedSink size) - - let buflen = LB.length buf - payload = PayloadBS $ LB.toStrict buf - - partMay <- checkUploadNeeded payload n pmap - - if buflen == 0 - then return $ reverse uparts - else do pInfo <- maybe (putObjectPart b o uid n [] payload) - return partMay - loopIt newSrc ps (pInfo:uparts) - + checkAndUpload _ _ [] = return () + checkAndUpload uid pmap (pn:pns) = do + payloadMay <- C.await + case payloadMay of + Nothing -> return () + Just payload -> do partMay <- lift $ checkUploadNeeded payload pn pmap + pinfo <- maybe + (lift $ putObjectPart b o uid pn [] payload) + return partMay + C.yield pinfo + checkAndUpload uid pmap pns -- | Looks for incomplete uploads for an object. Returns the first one -- if there are many. diff --git a/src/Network/Minio/Utils.hs b/src/Network/Minio/Utils.hs index 0f91eff..31bc0e1 100644 --- a/src/Network/Minio/Utils.hs +++ b/src/Network/Minio/Utils.hs @@ -3,9 +3,11 @@ module Network.Minio.Utils where import qualified Control.Concurrent.Async.Lifted as A import qualified Control.Concurrent.QSem as Q import qualified Control.Exception.Lifted as ExL +import qualified Control.Monad.Catch as MC import Control.Monad.Trans.Control (liftBaseOp_, StM) import qualified Control.Monad.Trans.Resource as R -import qualified Control.Monad.Catch as MC + +import qualified Data.ByteString as B import qualified Data.Conduit as C import Data.Text.Encoding.Error (lenientDecode) import qualified Network.HTTP.Client as NClient @@ -136,3 +138,22 @@ mkQuery k mv = (k,) <$> mv -- don't use it with mandatory query params with empty value. mkOptionalParams :: [(Text, Maybe Text)] -> HT.Query mkOptionalParams params = HT.toQuery $ (uncurry mkQuery) <$> params + +chunkBSConduit :: (Monad m, Integral a) + => [a] -> C.Conduit ByteString m ByteString +chunkBSConduit s = loop 0 [] s + where + loop _ _ [] = return () + loop n readChunks (size:sizes) = do + bsMay <- C.await + case bsMay of + Nothing -> if n > 0 + then C.yield $ B.concat readChunks + else return () + Just bs -> if n + fromIntegral (B.length bs) >= size + then do let (a, b) = B.splitAt (fromIntegral $ size - n) bs + chunkBS = B.concat $ readChunks ++ [a] + C.yield chunkBS + loop (fromIntegral $ B.length b) [b] sizes + else loop (n + fromIntegral (B.length bs)) + (readChunks ++ [bs]) (size:sizes) diff --git a/test/Spec.hs b/test/Spec.hs index 06b569f..db77959 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -7,7 +7,6 @@ import Lib.Prelude import System.Directory (getTemporaryDirectory) import qualified System.IO as SIO -import System.IO.Temp (openBinaryTempFile, withSystemTempDirectory) import qualified Control.Monad.Trans.Resource as R import qualified Data.ByteString as BS @@ -225,10 +224,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server" , funTestWithBucket "multipart" $ \step bucket -> do step "upload large object" - -- fPutObject bucket "big" "/tmp/large" - -- putObject bucket "big" ("/dev/zero") - etag <- putObject bucket "big" (ODFile "/dev/zero" $ Just $ 1024*1024*100) - traceShowM etag + void $ putObject bucket "big" (ODFile "/dev/zero" $ Just $ 1024*1024*100) step "cleanup" deleteObject bucket "big" @@ -245,7 +241,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server" step "put object parts 1..10" inputFile <- mkRandFile mb15 h <- liftIO $ SIO.openBinaryFile inputFile SIO.ReadMode - forM [1..10] $ \pnum -> + forM_ [1..10] $ \pnum -> putObjectPart bucket object uid pnum [] $ PayloadH h 0 mb15 step "fetch list parts" @@ -296,7 +292,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server" step "put object parts 1..10" inputFile <- mkRandFile mb15 h <- liftIO $ SIO.openBinaryFile inputFile SIO.ReadMode - forM [1..10] $ \pnum -> + forM_ [1..10] $ \pnum -> putObjectPart bucket object uid pnum [] $ PayloadH h 0 mb15 step "fetch list parts"