Multipart upload bug fixes and test fixes
This commit is contained in:
parent
915d099112
commit
0509d90ef1
@ -43,7 +43,6 @@ library
|
||||
, cryptonite-conduit
|
||||
, data-default
|
||||
, exceptions
|
||||
, extra
|
||||
, filepath
|
||||
, http-client
|
||||
, http-conduit
|
||||
@ -87,8 +86,8 @@ test-suite minio-hs-test
|
||||
, cryptonite
|
||||
, cryptonite-conduit
|
||||
, data-default
|
||||
, directory
|
||||
, exceptions
|
||||
, extra
|
||||
, filepath
|
||||
, http-client
|
||||
, http-conduit
|
||||
@ -103,6 +102,7 @@ test-suite minio-hs-test
|
||||
, tasty-hunit
|
||||
, tasty-quickcheck
|
||||
, tasty-smallcheck
|
||||
, temporary
|
||||
, text
|
||||
, time
|
||||
, transformers
|
||||
|
||||
@ -2,10 +2,10 @@ module Network.Minio.PutObject
|
||||
(
|
||||
putObject
|
||||
, ObjectData(..)
|
||||
, selectPartSizes
|
||||
) where
|
||||
|
||||
|
||||
import Control.Monad.Extra (loopM)
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.ByteString.Lazy as LB
|
||||
import qualified Data.Conduit as C
|
||||
@ -81,7 +81,10 @@ putObject b o (ODFile fp sizeMay) = do
|
||||
selectPartSizes :: Int64 -> [(PartNumber, Int64, Int64)]
|
||||
selectPartSizes size = List.zip3 [1..] partOffsets partSizes
|
||||
where
|
||||
partSize = max (64 * oneMiB) (size `div` maxMultipartParts)
|
||||
ceil :: Double -> Int64
|
||||
ceil = ceiling
|
||||
partSize = max (64 * oneMiB) (ceil $ fromIntegral size /
|
||||
fromIntegral maxMultipartParts)
|
||||
(numParts, lastPartSize) = size `divMod` partSize
|
||||
lastPart = filter (> 0) [lastPartSize]
|
||||
partSizes = replicate (fromIntegral numParts) partSize ++ lastPart
|
||||
@ -141,41 +144,40 @@ sequentialMultipartUpload b o sizeMay src = do
|
||||
uploadId <- maybe (newMultipartUpload b o []) return uidMay
|
||||
|
||||
-- upload parts in loop
|
||||
let
|
||||
rSrc = C.newResumableSource src
|
||||
partSizeInfo = selectPartSizes $ maybe maxObjectSize identity sizeMay
|
||||
|
||||
uploadedParts <- loopM (loopFunc pmap uploadId rSrc) (partSizeInfo, [])
|
||||
uploadedParts <- uploadPartsSequentially b o uploadId pmap sizeMay src
|
||||
|
||||
-- complete multipart upload
|
||||
completeMultipartUpload b o uploadId uploadedParts
|
||||
|
||||
uploadPartsSequentially :: Bucket -> Object -> UploadId
|
||||
-> Map PartNumber ListPartInfo -> Maybe Int64
|
||||
-> C.Source Minio ByteString -> Minio [PartInfo]
|
||||
uploadPartsSequentially b o uid pmap sizeMay src' = do
|
||||
let
|
||||
rSrc = C.newResumableSource src'
|
||||
partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay
|
||||
|
||||
loopIt rSrc partSizes []
|
||||
|
||||
where
|
||||
-- make a sink that consumes only `s` bytes
|
||||
limitedSink s = CB.isolate (fromIntegral s) C.=$= CB.sinkLbs
|
||||
|
||||
-- FIXME: test, confirm and remove traceShowM statements
|
||||
loopFunc _ _ _ ([], uparts) = return $ Right $ reverse uparts
|
||||
loopFunc pmap uid rSource (((partNum, _, size):ps), uparts) = do
|
||||
(newSource, buf) <- rSource C.$$++ (limitedSink size)
|
||||
traceShowM "psize: "
|
||||
traceShowM (LB.length buf)
|
||||
loopIt _ [] uparts = return $ reverse uparts
|
||||
loopIt src ((n, _, size):ps) uparts = do
|
||||
(newSrc, buf) <- src C.$$++ (limitedSink size)
|
||||
|
||||
let payload = PayloadBS $ LB.toStrict buf
|
||||
partMay <- checkUploadNeeded payload partNum pmap
|
||||
case partMay of
|
||||
Just pinfo -> return $ Left (ps, pinfo:uparts)
|
||||
Nothing -> do
|
||||
-- upload the part
|
||||
pInfo <- putObjectPart b o uid partNum [] payload
|
||||
if LB.length buf == size
|
||||
then return $ Left (ps, pInfo:uparts)
|
||||
let buflen = LB.length buf
|
||||
payload = PayloadBS $ LB.toStrict buf
|
||||
|
||||
partMay <- checkUploadNeeded payload n pmap
|
||||
|
||||
if buflen == 0
|
||||
then return $ reverse uparts
|
||||
else do pInfo <- maybe (putObjectPart b o uid n [] payload)
|
||||
return partMay
|
||||
loopIt newSrc ps (pInfo:uparts)
|
||||
|
||||
-- got a smaller part, so its the last one.
|
||||
else do traceShowM (("Found a piece with length < than "::[Char]) ++ show size ++ " - uploading as last and quitting.")
|
||||
finalData <- newSource C.$$+- (limitedSink size)
|
||||
traceShowM "finalData size:"
|
||||
traceShowM (LB.length finalData)
|
||||
return $ Right $ reverse (pInfo:uparts)
|
||||
|
||||
-- | Looks for incomplete uploads for an object. Returns the first one
|
||||
-- if there are many.
|
||||
|
||||
95
test/Spec.hs
95
test/Spec.hs
@ -5,10 +5,15 @@ import Test.Tasty.HUnit
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
import System.Directory (getTemporaryDirectory)
|
||||
import qualified System.IO as SIO
|
||||
import System.IO.Temp (openBinaryTempFile, withSystemTempDirectory)
|
||||
|
||||
import Control.Monad.Trans.Resource (runResourceT)
|
||||
import Data.Conduit (($$))
|
||||
import qualified Control.Monad.Trans.Resource as R
|
||||
import qualified Data.ByteString as BS
|
||||
import Data.Conduit (($$), yield)
|
||||
import qualified Data.Conduit as C
|
||||
import qualified Data.Conduit.Binary as CB
|
||||
import Data.Conduit.Combinators (sinkList)
|
||||
import Data.Default (Default(..))
|
||||
import qualified Data.Text as T
|
||||
@ -16,6 +21,7 @@ import qualified Data.Text as T
|
||||
import Network.Minio
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.S3API
|
||||
import Network.Minio.Utils
|
||||
import Network.Minio.XmlGenerator.Test
|
||||
import Network.Minio.XmlParser.Test
|
||||
|
||||
@ -50,6 +56,29 @@ properties = testGroup "Properties" [] -- [scProps, qcProps]
|
||||
-- (n :: Integer) >= 3 QC.==> x^n + y^n /= (z^n :: Integer)
|
||||
-- ]
|
||||
|
||||
-- conduit that generates random binary stream of given length
|
||||
randomDataSrc :: MonadIO m => Int64 -> C.Producer m ByteString
|
||||
randomDataSrc s' = genBS s'
|
||||
where
|
||||
oneMiB = 1024*1024
|
||||
|
||||
concatIt bs n = BS.concat $ replicate (fromIntegral q) bs ++
|
||||
[BS.take (fromIntegral r) bs]
|
||||
where (q, r) = n `divMod` fromIntegral (BS.length bs)
|
||||
|
||||
genBS s = do
|
||||
w8s <- liftIO $ generate $ Q.vectorOf 64 (Q.choose (0, 255))
|
||||
let byteArr64 = BS.pack w8s
|
||||
if s < oneMiB
|
||||
then yield $ concatIt byteArr64 s
|
||||
else do yield $ concatIt byteArr64 oneMiB
|
||||
genBS (s - oneMiB)
|
||||
|
||||
mkRandFile :: R.MonadResource m => Int64 -> m FilePath
|
||||
mkRandFile size = do
|
||||
dir <- liftIO $ getTemporaryDirectory
|
||||
randomDataSrc size C.$$ CB.sinkTempFile dir "miniohstest.random"
|
||||
|
||||
funTestBucketPrefix :: Text
|
||||
funTestBucketPrefix = "miniohstest-"
|
||||
|
||||
@ -97,30 +126,72 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
|
||||
step "delete object works"
|
||||
deleteObject bucket "lsb-release"
|
||||
|
||||
, funTestWithBucket "Multipart Upload Test" $ \step bucket -> do
|
||||
, funTestWithBucket "Basic Multipart Test" $ \step bucket -> do
|
||||
let object = "newmpupload"
|
||||
|
||||
step "create new multipart upload"
|
||||
uid <- newMultipartUpload bucket object []
|
||||
liftIO $ (T.length uid > 0) @? ("Got an empty multipartUpload Id.")
|
||||
|
||||
step "put object parts 1..10"
|
||||
h <- liftIO $ SIO.openBinaryFile "/tmp/inputfile" SIO.ReadMode
|
||||
let mb15 = 15 * 1024 * 1024
|
||||
partInfo <- forM [1..10] $ \pnum ->
|
||||
putObjectPart bucket object uid pnum [] $ PayloadH h 0 mb15
|
||||
randFile <- mkRandFile mb15
|
||||
|
||||
step "put object parts 1 of 1"
|
||||
h <- liftIO $ SIO.openBinaryFile randFile SIO.ReadMode
|
||||
partInfo <- putObjectPart bucket object uid 1 [] $ PayloadH h 0 mb15
|
||||
|
||||
step "complete multipart"
|
||||
etag <- completeMultipartUpload bucket object uid partInfo
|
||||
void $ completeMultipartUpload bucket object uid [partInfo]
|
||||
|
||||
step $ "completeMultipart success - got etag: " ++ show etag
|
||||
|
||||
step $ "Retrieve the created object"
|
||||
fGetObject bucket object "/tmp/newUpload"
|
||||
destFile <- mkRandFile 0
|
||||
step $ "Retrieve the created object and check size"
|
||||
fGetObject bucket object destFile
|
||||
gotSize <- withNewHandle destFile getFileSize
|
||||
liftIO $ gotSize == Right (Just mb15) @?
|
||||
"Wrong file size of put file after getting"
|
||||
|
||||
step $ "Cleanup actions"
|
||||
deleteObject bucket object
|
||||
|
||||
, funTestWithBucket "Multipart test with unknown object size" $
|
||||
\step bucket -> do
|
||||
let obj = "mpart"
|
||||
|
||||
step "Prepare"
|
||||
let mb100 = 100 * 1024 * 1024
|
||||
rFile <- mkRandFile mb100
|
||||
|
||||
step "Upload multipart file."
|
||||
putObjectFromSource bucket obj (CB.sourceFile rFile) Nothing
|
||||
|
||||
step "Retrieve and verify file size"
|
||||
destFile <- mkRandFile 0
|
||||
fGetObject bucket obj destFile
|
||||
gotSize <- withNewHandle destFile getFileSize
|
||||
liftIO $ gotSize == Right (Just mb100) @?
|
||||
"Wrong file size of put file after getting"
|
||||
|
||||
step $ "Cleanup actions"
|
||||
deleteObject bucket obj
|
||||
|
||||
, funTestWithBucket "Multipart test with non-seekable file" $
|
||||
\step bucket -> do
|
||||
let obj = "mpart"
|
||||
mb100 = 100 * 1024 * 1024
|
||||
|
||||
step "Upload multipart file."
|
||||
void $ putObject bucket obj $ ODFile "/dev/zero" (Just mb100)
|
||||
|
||||
step "Retrieve and verify file size"
|
||||
destFile <- mkRandFile 0
|
||||
fGetObject bucket obj destFile
|
||||
gotSize <- withNewHandle destFile getFileSize
|
||||
liftIO $ gotSize == Right (Just mb100) @?
|
||||
"Wrong file size of put file after getting"
|
||||
|
||||
step $ "Cleanup actions"
|
||||
deleteObject bucket obj
|
||||
|
||||
, funTestWithBucket "Basic listObjects Test" $ \step bucket -> do
|
||||
step "put 10 objects"
|
||||
forM_ [1..10::Int] $ \s ->
|
||||
|
||||
Loading…
Reference in New Issue
Block a user