api: Support single PUT on stream uploads (#84)
Current functionality only supported stream based uploads using multipart, this meant that even for small file sizes we were performing multipart operations which is slower and unnecessary. This PR fixes this behavior.
This commit is contained in:
parent
6d20558098
commit
02b28bc100
@ -22,11 +22,12 @@ module Network.Minio.PutObject
|
||||
) where
|
||||
|
||||
|
||||
import qualified Data.Conduit as C
|
||||
import qualified Data.Conduit.Binary as CB
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
import qualified Data.Conduit as C
|
||||
import qualified Data.Conduit.Binary as CB
|
||||
import qualified Data.Conduit.Combinators as CC
|
||||
import qualified Data.Conduit.List as CL
|
||||
import qualified Data.List as List
|
||||
import qualified Data.Conduit.List as CL
|
||||
import qualified Data.List as List
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
@ -59,7 +60,19 @@ data ObjectData m
|
||||
-- objects of all sizes, and even if the object size is unknown.
|
||||
putObjectInternal :: Bucket -> Object -> PutObjectOptions
|
||||
-> ObjectData Minio -> Minio ETag
|
||||
putObjectInternal b o opts (ODStream src sizeMay) = sequentialMultipartUpload b o opts sizeMay src
|
||||
putObjectInternal b o opts (ODStream src sizeMay) = do
|
||||
case sizeMay of
|
||||
-- unable to get size, so assume non-seekable file and max-object size
|
||||
Nothing -> sequentialMultipartUpload b o opts (Just maxObjectSize) src
|
||||
|
||||
-- got file size, so check for single/multipart upload
|
||||
Just size ->
|
||||
if | size <= 64 * oneMiB -> do
|
||||
bs <- C.runConduit $ src C..| CB.sinkLbs
|
||||
putObjectSingle' b o (pooToHeaders opts) $ LBS.toStrict bs
|
||||
| size > maxObjectSize -> throwM $ MErrVPutSizeExceeded size
|
||||
| otherwise -> sequentialMultipartUpload b o opts (Just size) src
|
||||
|
||||
putObjectInternal b o opts (ODFile fp sizeMay) = do
|
||||
hResE <- withNewHandle fp $ \h ->
|
||||
liftM2 (,) (isHandleSeekable h) (getFileSize h)
|
||||
|
||||
@ -42,6 +42,7 @@ module Network.Minio.S3API
|
||||
---------------------------------
|
||||
, putBucket
|
||||
, ETag
|
||||
, putObjectSingle'
|
||||
, putObjectSingle
|
||||
, copyObjectSingle
|
||||
|
||||
@ -90,6 +91,7 @@ module Network.Minio.S3API
|
||||
) where
|
||||
|
||||
import Control.Monad.Catch (Handler (..), catches)
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.Conduit as C
|
||||
import Data.Default (def)
|
||||
import qualified Data.Text as T
|
||||
@ -143,6 +145,28 @@ putBucket bucket location = void $
|
||||
maxSinglePutObjectSizeBytes :: Int64
|
||||
maxSinglePutObjectSizeBytes = 5 * 1024 * 1024 * 1024
|
||||
|
||||
putObjectSingle' :: Bucket -> Object -> [HT.Header] -> ByteString -> Minio ETag
|
||||
putObjectSingle' bucket object headers bs = do
|
||||
let size = fromIntegral (BS.length bs)
|
||||
-- check length is within single PUT object size.
|
||||
when (size > maxSinglePutObjectSizeBytes) $
|
||||
throwM $ MErrVSinglePUTSizeExceeded size
|
||||
|
||||
-- content-length header is automatically set by library.
|
||||
resp <- executeRequest $
|
||||
def { riMethod = HT.methodPut
|
||||
, riBucket = Just bucket
|
||||
, riObject = Just object
|
||||
, riHeaders = headers
|
||||
, riPayload = PayloadBS bs
|
||||
}
|
||||
|
||||
let rheaders = NC.responseHeaders resp
|
||||
etag = getETagHeader rheaders
|
||||
maybe
|
||||
(throwM MErrVETagHeaderNotFound)
|
||||
return etag
|
||||
|
||||
-- | PUT an object into the service. This function performs a single
|
||||
-- PUT object call, and so can only transfer objects upto 5GiB.
|
||||
putObjectSingle :: Bucket -> Object -> [HT.Header] -> Handle -> Int64
|
||||
|
||||
@ -124,6 +124,29 @@ lowLevelMultipartTest = funTestWithBucket "Low-level Multipart Test" $
|
||||
step "Cleanup actions"
|
||||
removeObject bucket object
|
||||
|
||||
putObjectSizeTest :: TestTree
|
||||
putObjectSizeTest = funTestWithBucket "PutObject of conduit source with size" $
|
||||
\step bucket -> do
|
||||
-- putObject test (conduit source, size specified)
|
||||
let obj = "msingle"
|
||||
mb1 = 1 * 1024 * 1024
|
||||
|
||||
step "Prepare for putObject with from source with size."
|
||||
rFile <- mkRandFile mb1
|
||||
|
||||
step "Upload single file."
|
||||
putObject bucket obj (CB.sourceFile rFile) (Just mb1) def
|
||||
|
||||
step "Retrieve and verify file size"
|
||||
destFile <- mkRandFile 0
|
||||
fGetObject bucket obj destFile def
|
||||
gotSize <- withNewHandle destFile getFileSize
|
||||
liftIO $ gotSize == Right (Just mb1) @?
|
||||
"Wrong file size of put file after getting"
|
||||
|
||||
step "Cleanup actions"
|
||||
deleteObject bucket obj
|
||||
|
||||
putObjectNoSizeTest :: TestTree
|
||||
putObjectNoSizeTest = funTestWithBucket "PutObject of conduit source with no size" $
|
||||
\step bucket -> do
|
||||
@ -280,6 +303,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
|
||||
, listingTest
|
||||
, highLevelListingTest
|
||||
, lowLevelMultipartTest
|
||||
, putObjectSizeTest
|
||||
, putObjectNoSizeTest
|
||||
, funTestWithBucket "Multipart Tests" $
|
||||
\step bucket -> do
|
||||
|
||||
Loading…
Reference in New Issue
Block a user