From 688f326b6e3679e559aeccb3447863eb4a0c00fa Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Wed, 8 Feb 2017 14:48:58 +0530 Subject: [PATCH] Make parallel upload also resume an existing upload --- minio-hs.cabal | 2 + src/Network/Minio/Data/Crypto.hs | 16 +++++- src/Network/Minio/PutObject.hs | 99 +++++++++++++++++++------------- 3 files changed, 74 insertions(+), 43 deletions(-) diff --git a/minio-hs.cabal b/minio-hs.cabal index 7f7b417..fbc5cb3 100644 --- a/minio-hs.cabal +++ b/minio-hs.cabal @@ -43,6 +43,7 @@ library , cryptonite-conduit , data-default , exceptions + , extra , filepath , http-client , http-conduit @@ -87,6 +88,7 @@ test-suite minio-hs-test , cryptonite-conduit , data-default , exceptions + , extra , filepath , http-client , http-conduit diff --git a/src/Network/Minio/Data/Crypto.hs b/src/Network/Minio/Data/Crypto.hs index 800f4c3..620c823 100644 --- a/src/Network/Minio/Data/Crypto.hs +++ b/src/Network/Minio/Data/Crypto.hs @@ -4,6 +4,7 @@ module Network.Minio.Data.Crypto , hashSHA256FromSource , hashMD5 + , hashMD5FromSource , hmacSHA256 , hmacSHA256RawBS @@ -32,6 +33,18 @@ hashSHA256FromSource src = do sinkSHA256Hash :: Monad m => C.Consumer ByteString m (Digest SHA256) sinkSHA256Hash = sinkHash +hashMD5 :: ByteString -> ByteString +hashMD5 = digestToBase16 . hashWith MD5 + +hashMD5FromSource :: Monad m => C.Producer m ByteString -> m ByteString +hashMD5FromSource src = do + digest <- src C.$$ sinkMD5Hash + return $ digestToBase16 digest + where + -- To help with type inference + sinkMD5Hash :: Monad m => C.Consumer ByteString m (Digest MD5) + sinkMD5Hash = sinkHash + hmacSHA256 :: ByteString -> ByteString -> HMAC SHA256 hmacSHA256 message key = hmac key message @@ -43,6 +56,3 @@ digestToBS = convert digestToBase16 :: ByteArrayAccess a => a -> ByteString digestToBase16 = convertToBase Base16 - -hashMD5 :: ByteString -> ByteString -hashMD5 = digestToBase16 . hashWith MD5 diff --git a/src/Network/Minio/PutObject.hs b/src/Network/Minio/PutObject.hs index b777141..ac99930 100644 --- a/src/Network/Minio/PutObject.hs +++ b/src/Network/Minio/PutObject.hs @@ -5,11 +5,14 @@ module Network.Minio.PutObject ) where -import qualified Data.Conduit as C -import qualified Data.Conduit.Combinators as CC -import qualified Data.Conduit.Binary as CB -import qualified Data.List as List +import Control.Monad.Extra (loopM) +import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as LB +import qualified Data.Conduit as C +import Data.Conduit.Binary (sourceHandleRange) +import qualified Data.Conduit.Binary as CB +import qualified Data.Conduit.Combinators as CC +import qualified Data.List as List import qualified Data.Map.Strict as Map import Lib.Prelude @@ -47,6 +50,7 @@ data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and option -- | Put an object from ObjectData. This high-level API handles -- objects of all sizes, and even if the object size is unknown. putObject :: Bucket -> Object -> ObjectData Minio -> Minio ETag +putObject b o (ODStream src sizeMay) = sequentialMultipartUpload b o sizeMay src putObject b o (ODFile fp sizeMay) = do hResE <- withNewHandle fp $ \h -> liftM2 (,) (isHandleSeekable h) (getFileSize h) @@ -72,7 +76,6 @@ putObject b o (ODFile fp sizeMay) = do | isSeekable -> parallelMultipartUpload b o fp size | otherwise -> sequentialMultipartUpload b o (Just size) $ CB.sourceFile fp -putObject b o (ODStream src sizeMay) = sequentialMultipartUpload b o sizeMay src -- | Select part sizes - the logic is that the minimum part-size will -- be 64MiB. TODO: write quickcheck tests. @@ -85,16 +88,35 @@ selectPartSizes size = List.zip3 [1..] partOffsets partSizes partSizes = replicate (fromIntegral numParts) partSize ++ lastPart partOffsets = List.scanl' (+) 0 partSizes +-- returns partinfo if part is already uploaded. +checkUploadNeeded :: Payload -> PartNumber + -> Map.Map PartNumber ListPartInfo + -> Minio (Maybe PartInfo) +checkUploadNeeded payload n pmap = do + (md5hash, pSize) <- case payload of + PayloadBS bs -> return (hashMD5 bs, fromIntegral $ B.length bs) + PayloadH h off size -> liftM (, size) $ + hashMD5FromSource $ sourceHandleRange h (Just $ fromIntegral off) + (Just $ fromIntegral size) + case Map.lookup n pmap of + Nothing -> return Nothing + Just (ListPartInfo _ etag size _) -> return $ + bool Nothing (Just (PartInfo n etag)) $ + md5hash == encodeUtf8 etag && size == pSize + parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64 -> Minio ETag parallelMultipartUpload b o filePath size = do + (uidMay, pmap) <- getExistingUpload b o + + -- get a new upload id if needed. + uploadId <- maybe (newMultipartUpload b o []) return uidMay + let partSizeInfo = selectPartSizes size - -- get new upload id. - uploadId <- newMultipartUpload b o [] - -- perform upload with 10 threads - uploadedPartsE <- limitedMapConcurrently 10 (uploadPart uploadId) partSizeInfo + uploadedPartsE <- limitedMapConcurrently 10 + (uploadPart pmap uploadId) partSizeInfo -- if there were any errors, rethrow exception. mapM_ throwM $ lefts uploadedPartsE @@ -102,20 +124,29 @@ parallelMultipartUpload b o filePath size = do -- if we get here, all parts were successfully uploaded. completeMultipartUpload b o uploadId $ rights uploadedPartsE where - uploadPart uploadId (partNum, offset, sz) = withNewHandle filePath $ - \h -> putObjectPart b o uploadId partNum [] $ PayloadH h offset sz + uploadPart pmap uploadId (partNum, offset, sz) = + withNewHandle filePath $ \h -> do + let payload = PayloadH h offset sz + pInfoMay <- checkUploadNeeded payload partNum pmap + maybe + (putObjectPart b o uploadId partNum [] payload) + return pInfoMay -- | Upload multipart object from conduit source sequentially sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64 -> C.Producer Minio ByteString -> Minio ETag sequentialMultipartUpload b o sizeMay src = do - (uidMay, pinfos) <- getExistingUpload b o + (uidMay, pmap) <- getExistingUpload b o -- get a new upload id if needed. uploadId <- maybe (newMultipartUpload b o []) return uidMay -- upload parts in loop - uploadedParts <- loop pinfos uploadId rSrc partSizeInfo [] + let + rSrc = C.newResumableSource src + partSizeInfo = selectPartSizes $ maybe maxObjectSize identity sizeMay + + uploadedParts <- loopM (loopFunc pmap uploadId rSrc) (partSizeInfo, []) -- complete multipart upload completeMultipartUpload b o uploadId uploadedParts @@ -123,44 +154,32 @@ sequentialMultipartUpload b o sizeMay src = do rSrc = C.newResumableSource src partSizeInfo = selectPartSizes $ maybe maxObjectSize identity sizeMay - -- returns partinfo if part is already uploaded. - checkUploadNeeded :: LByteString -> PartNumber - -> Map.Map PartNumber ListPartInfo - -> Maybe PartInfo - checkUploadNeeded lbs n pmap = do - pinfo@(ListPartInfo _ etag size _) <- Map.lookup n pmap - bool Nothing (return (PartInfo n etag)) $ - LB.length lbs == size && - hashMD5 (LB.toStrict lbs) == encodeUtf8 etag - -- make a sink that consumes only `s` bytes limitedSink s = CB.isolate (fromIntegral s) C.=$= CB.sinkLbs -- FIXME: test, confirm and remove traceShowM statements - loop _ _ _ [] uparts = return $ reverse uparts - loop pinfos uid rSource ((partNum, _, size):ps) u = do - - -- load data from resume-able source into bytestring. + loopFunc pmap uid rSource ([], uparts) = return $ Right $ reverse uparts + loopFunc pmap uid rSource (((partNum, _, size):ps), uparts) = do (newSource, buf) <- rSource C.$$++ (limitedSink size) traceShowM "psize: " traceShowM (LB.length buf) - case checkUploadNeeded buf partNum pinfos of - Just pinfo -> loop pinfos uid newSource ps (pinfo:u) + let payload = PayloadBS $ LB.toStrict buf + partMay <- checkUploadNeeded payload partNum pmap + case partMay of + Just pinfo -> return $ Left (ps, pinfo:uparts) Nothing -> do - pInfo <- putObjectPart b o uid partNum [] $ - PayloadBS $ LB.toStrict buf - + -- upload the part + pInfo <- putObjectPart b o uid partNum [] payload if LB.length buf == size - -- upload the full size part. - then loop pinfos uid newSource ps (pInfo:u) + then return $ Left (ps, pInfo:uparts) - -- got a smaller part, so its the last one. - else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.") - finalData <- newSource C.$$+- (limitedSink size) - traceShowM "finalData size:" - traceShowM (LB.length finalData) - return $ reverse (pInfo:u) + -- got a smaller part, so its the last one. + else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.") + finalData <- newSource C.$$+- (limitedSink size) + traceShowM "finalData size:" + traceShowM (LB.length finalData) + return $ Right $ reverse (pInfo:uparts) -- | Looks for incomplete uploads for an object. Returns the first one -- if there are many.