Make copyObject use SourceInfo, DestinationInfo (#73)
* Make CopyPartSource constructor private ... and make individual record accessors exported. This change allows adding of additional record members to CopyPartSource without breaking applications. * Move high-level copyObject functions to CopyObject module * Make copyObject use SourceInfo, DestinationInfo ... to allow supporting features like client-side and server-side encryption subsequently. - fix warnings in tests/LiveServer.hs
This commit is contained in:
parent
3ef237e4d1
commit
fe7aef21b7
37
docs/API.md
37
docs/API.md
@ -557,31 +557,38 @@ main = do
|
||||
```
|
||||
|
||||
<a name="copyObject"></a>
|
||||
### copyObject :: Bucket -> Object -> CopyPartSource -> Minio ()
|
||||
### copyObject :: DestinationInfo -> SourceInfo -> Minio ()
|
||||
Copies content of an object from the service to another
|
||||
|
||||
__Parameters__
|
||||
|
||||
In the expression `copyObject bucketName objectName cps` the parameters
|
||||
In the expression `copyObject dstInfo srcInfo` the parameters
|
||||
are:
|
||||
|
||||
|Param |Type |Description |
|
||||
|:---|:---| :---|
|
||||
| `bucketName` | _Bucket_ (alias for `Text`) | Name of the bucket |
|
||||
| `objectName` | _Object_ (alias for `Text`) | Name of the object |
|
||||
| `cps` | _CopyPartSource_ | A value representing properties of the source object |
|
||||
| `dstInfo` | _DestinationInfo_ | A value representing properties of the destination object |
|
||||
| `srcInfo` | _SourceInfo_ | A value representing properties of the source object |
|
||||
|
||||
|
||||
__CopyPartSource record type__
|
||||
__SourceInfo record type__
|
||||
|
||||
|Field |Type |Description |
|
||||
|:---|:---| :---|
|
||||
| `cpSource` | `Text`| Name of source object formatted as "/srcBucket/srcObject" |
|
||||
| `cpSourceRange` | `Maybe (Int64, Int64)` | Represents the byte range of source object. (0, 9) represents first ten bytes of source object|
|
||||
| `cpSourceIfMatch` | `Maybe Text` | (Optional) ETag source object should match |
|
||||
| `cpSourceIfNoneMatch` | `Maybe Text` | (Optional) ETag source object shouldn't match |
|
||||
| `cpSourceIfUnmodifiedSince` | `Maybe UTCTime` | (Optional) Time since source object wasn't modified |
|
||||
| `cpSourceIfModifiedSince` | `Maybe UTCTime` | (Optional) Time since source object was modified |
|
||||
| `srcBucket` | `Bucket` | Name of source bucket |
|
||||
| `srcObject` | `Object` | Name of source object |
|
||||
| `srcRange` | `Maybe (Int64, Int64)` | (Optional) Represents the byte range of source object. (0, 9) represents first ten bytes of source object|
|
||||
| `srcIfMatch` | `Maybe Text` | (Optional) ETag source object should match |
|
||||
| `srcIfNoneMatch` | `Maybe Text` | (Optional) ETag source object shouldn't match |
|
||||
| `srcIfUnmodifiedSince` | `Maybe UTCTime` | (Optional) Time since source object wasn't modified |
|
||||
| `srcIfModifiedSince` | `Maybe UTCTime` | (Optional) Time since source object was modified |
|
||||
|
||||
__Destination record type__
|
||||
|
||||
|Field |Type |Description |
|
||||
|:---|:---| :---|
|
||||
| `dstBucket` | `Bucket` | Name of destination bucket in server-side copyObject |
|
||||
| `dstObject` | `Object` | Name of destination object in server-side copyObject |
|
||||
|
||||
__Example__
|
||||
|
||||
@ -594,13 +601,13 @@ main = do
|
||||
let
|
||||
bucket = "mybucket"
|
||||
object = "myobject"
|
||||
srcObject = "/mybucket/srcObject"
|
||||
objectCopy = "obj-copy"
|
||||
|
||||
res <- runMinio minioPlayCI $ do
|
||||
copyObject bucket object def { cpSource = srcObject }
|
||||
copyObject def { dstBucket = bucket, dstObject = objectCopy } def { srcBucket = bucket, srcObject = object }
|
||||
|
||||
case res of
|
||||
Left e -> putStrLn $ "Failed to copyObject " ++ show srcObject"
|
||||
Left e -> putStrLn $ "Failed to copyObject " ++ show bucket ++ show "/" ++ show object
|
||||
Right _ -> putStrLn "copyObject was successful"
|
||||
```
|
||||
|
||||
|
||||
@ -21,7 +21,6 @@
|
||||
import Network.Minio
|
||||
|
||||
import Control.Monad.Catch (catchIf)
|
||||
import qualified Data.Text as T
|
||||
import Prelude
|
||||
|
||||
-- | The following example uses minio's play server at
|
||||
@ -50,10 +49,8 @@ main = do
|
||||
fPutObject bucket object localFile
|
||||
|
||||
-- 3. Copy bucket/object to bucket/objectCopy.
|
||||
copyObject bucket objectCopy def {
|
||||
cpSource = T.concat ["/", bucket, "/", object]
|
||||
}
|
||||
copyObject def {dstBucket = bucket, dstObject = objectCopy} def { srcBucket = bucket , srcObject = object }
|
||||
|
||||
case res1 of
|
||||
Left e -> putStrLn $ "copyObject failed." ++ (show e)
|
||||
Left e -> putStrLn $ "copyObject failed." ++ show e
|
||||
Right () -> putStrLn "copyObject succeeded."
|
||||
|
||||
@ -34,6 +34,7 @@ library
|
||||
, Network.Minio.Data.ByteString
|
||||
, Network.Minio.Data.Crypto
|
||||
, Network.Minio.Data.Time
|
||||
, Network.Minio.CopyObject
|
||||
, Network.Minio.Errors
|
||||
, Network.Minio.ListOps
|
||||
, Network.Minio.PresignedOperations
|
||||
@ -110,6 +111,7 @@ test-suite minio-hs-live-server-test
|
||||
other-modules: Lib.Prelude
|
||||
, Network.Minio
|
||||
, Network.Minio.API
|
||||
, Network.Minio.CopyObject
|
||||
, Network.Minio.Data
|
||||
, Network.Minio.Data.ByteString
|
||||
, Network.Minio.Data.Crypto
|
||||
@ -232,6 +234,7 @@ test-suite minio-hs-test
|
||||
, Network.Minio.Data.ByteString
|
||||
, Network.Minio.Data.Crypto
|
||||
, Network.Minio.Data.Time
|
||||
, Network.Minio.CopyObject
|
||||
, Network.Minio.Errors
|
||||
, Network.Minio.ListOps
|
||||
, Network.Minio.PresignedOperations
|
||||
|
||||
@ -91,8 +91,18 @@ module Network.Minio
|
||||
, getObject
|
||||
|
||||
-- ** Server-side copying
|
||||
, CopyPartSource(..)
|
||||
, copyObject
|
||||
, SourceInfo
|
||||
, srcBucket
|
||||
, srcObject
|
||||
, srcRange
|
||||
, srcIfMatch
|
||||
, srcIfNoneMatch
|
||||
, srcIfModifiedSince
|
||||
, srcIfUnmodifiedSince
|
||||
, DestinationInfo
|
||||
, dstBucket
|
||||
, dstObject
|
||||
|
||||
-- ** Querying
|
||||
, statObject
|
||||
@ -146,6 +156,7 @@ import qualified Data.Map as Map
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.CopyObject
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.Errors
|
||||
import Network.Minio.ListOps
|
||||
@ -178,12 +189,13 @@ putObject :: Bucket -> Object -> C.Producer Minio ByteString
|
||||
putObject bucket object src sizeMay =
|
||||
void $ putObjectInternal bucket object $ ODStream src sizeMay
|
||||
|
||||
-- | Perform a server-side copy operation to create an object with the
|
||||
-- given bucket and object name from the source specification in
|
||||
-- CopyPartSource. This function performs a multipart copy operation
|
||||
-- if the new object is to be greater than 5GiB in size.
|
||||
copyObject :: Bucket -> Object -> CopyPartSource -> Minio ()
|
||||
copyObject bucket object cps = void $ copyObjectInternal bucket object cps
|
||||
-- | Perform a server-side copy operation to create an object based on
|
||||
-- the destination specification in DestinationInfo from the source
|
||||
-- specification in SourceInfo. This function performs a multipart
|
||||
-- copy operation if the new object is to be greater than 5GiB in
|
||||
-- size.
|
||||
copyObject :: DestinationInfo -> SourceInfo -> Minio ()
|
||||
copyObject dstInfo srcInfo = void $ copyObjectInternal (dstBucket dstInfo) (dstObject dstInfo) srcInfo
|
||||
|
||||
-- | Remove an object from the object store.
|
||||
removeObject :: Bucket -> Object -> Minio ()
|
||||
|
||||
92
src/Network/Minio/CopyObject.hs
Normal file
92
src/Network/Minio/CopyObject.hs
Normal file
@ -0,0 +1,92 @@
|
||||
--
|
||||
-- Minio Haskell SDK, (C) 2017 Minio, Inc.
|
||||
--
|
||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||
-- you may not use this file except in compliance with the License.
|
||||
-- You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
--
|
||||
|
||||
module Network.Minio.CopyObject where
|
||||
|
||||
import Data.Default (def)
|
||||
import qualified Data.List as List
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.Errors
|
||||
import Network.Minio.S3API
|
||||
import Network.Minio.Utils
|
||||
|
||||
|
||||
-- | Copy an object using single or multipart copy strategy.
|
||||
copyObjectInternal :: Bucket -> Object -> SourceInfo
|
||||
-> Minio ETag
|
||||
copyObjectInternal b' o srcInfo = do
|
||||
let sBucket = srcBucket srcInfo
|
||||
sObject = srcObject srcInfo
|
||||
|
||||
-- get source object size with a head request
|
||||
(ObjectInfo _ _ _ srcSize) <- headObject sBucket sObject
|
||||
|
||||
-- check that byte offsets are valid if specified in cps
|
||||
let rangeMay = srcRange srcInfo
|
||||
range = maybe (0, srcSize) identity rangeMay
|
||||
startOffset = fst range
|
||||
endOffset = snd range
|
||||
|
||||
when (isJust rangeMay &&
|
||||
or [startOffset < 0, endOffset < startOffset,
|
||||
endOffset >= fromIntegral srcSize]) $
|
||||
throwM $ MErrVInvalidSrcObjByteRange range
|
||||
|
||||
-- 1. If sz > 64MiB (minPartSize) use multipart copy, OR
|
||||
-- 2. If startOffset /= 0 use multipart copy
|
||||
let destSize = (\(a, b) -> b - a + 1 ) $
|
||||
maybe (0, srcSize - 1) identity rangeMay
|
||||
|
||||
if destSize > minPartSize || (endOffset - startOffset + 1 /= srcSize)
|
||||
then multiPartCopyObject b' o srcInfo srcSize
|
||||
|
||||
else fst <$> copyObjectSingle b' o srcInfo{srcRange = Nothing} []
|
||||
|
||||
-- | Given the input byte range of the source object, compute the
|
||||
-- splits for a multipart copy object procedure. Minimum part size
|
||||
-- used is minPartSize.
|
||||
selectCopyRanges :: (Int64, Int64) -> [(PartNumber, (Int64, Int64))]
|
||||
selectCopyRanges (st, end) = zip pns $
|
||||
map (\(x, y) -> (st + x, st + x + y - 1)) $ zip startOffsets partSizes
|
||||
where
|
||||
size = end - st + 1
|
||||
(pns, startOffsets, partSizes) = List.unzip3 $ selectPartSizes size
|
||||
|
||||
-- | Perform a multipart copy object action. Since we cannot verify
|
||||
-- existing parts based on the source object, there is no resuming
|
||||
-- copy action support.
|
||||
multiPartCopyObject :: Bucket -> Object -> SourceInfo -> Int64
|
||||
-> Minio ETag
|
||||
multiPartCopyObject b o cps srcSize = do
|
||||
uid <- newMultipartUpload b o []
|
||||
|
||||
let byteRange = maybe (0, fromIntegral $ srcSize - 1) identity $ srcRange cps
|
||||
partRanges = selectCopyRanges byteRange
|
||||
partSources = map (\(x, (start, end)) -> (x, cps {srcRange = Just (start, end) }))
|
||||
partRanges
|
||||
dstInfo = def { dstBucket = b, dstObject = o}
|
||||
|
||||
copiedParts <- limitedMapConcurrently 10
|
||||
(\(pn, cps') -> do
|
||||
(etag, _) <- copyObjectPart dstInfo cps' uid pn []
|
||||
return (pn, etag)
|
||||
)
|
||||
partSources
|
||||
|
||||
completeMultipartUpload b o uid copiedParts
|
||||
@ -14,31 +14,47 @@
|
||||
-- limitations under the License.
|
||||
--
|
||||
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving, TypeFamilies #-}
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
module Network.Minio.Data where
|
||||
|
||||
import Control.Monad.Base
|
||||
import qualified Control.Monad.Catch as MC
|
||||
import qualified Control.Monad.Catch as MC
|
||||
import Control.Monad.Trans.Control
|
||||
import Control.Monad.Trans.Resource
|
||||
|
||||
import qualified Data.ByteString as B
|
||||
import Data.Default (Default(..))
|
||||
import qualified Data.Map as Map
|
||||
import qualified Data.Text as T
|
||||
import Data.Time (formatTime, defaultTimeLocale)
|
||||
import Network.HTTP.Client (defaultManagerSettings)
|
||||
import qualified Network.HTTP.Conduit as NC
|
||||
import Network.HTTP.Types (Method, Header, Query)
|
||||
import qualified Network.HTTP.Types as HT
|
||||
import qualified Data.ByteString as B
|
||||
import Data.Default (Default (..))
|
||||
import qualified Data.Map as Map
|
||||
import qualified Data.Text as T
|
||||
import Data.Time (defaultTimeLocale, formatTime)
|
||||
import Network.HTTP.Client (defaultManagerSettings)
|
||||
import qualified Network.HTTP.Conduit as NC
|
||||
import Network.HTTP.Types (Header, Method, Query)
|
||||
import qualified Network.HTTP.Types as HT
|
||||
import Network.Minio.Errors
|
||||
import Text.XML
|
||||
|
||||
import GHC.Show (Show(..))
|
||||
import GHC.Show (Show (..))
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
|
||||
-- | max obj size is 5TiB
|
||||
maxObjectSize :: Int64
|
||||
maxObjectSize = 5 * 1024 * 1024 * oneMiB
|
||||
|
||||
-- | minimum size of parts used in multipart operations.
|
||||
minPartSize :: Int64
|
||||
minPartSize = 64 * oneMiB
|
||||
|
||||
oneMiB :: Int64
|
||||
oneMiB = 1024 * 1024
|
||||
|
||||
-- | maximum number of parts that can be uploaded for a single object.
|
||||
maxMultipartParts :: Int64
|
||||
maxMultipartParts = 10000
|
||||
|
||||
-- TODO: Add a type which provides typed constants for region. this
|
||||
-- type should have a IsString instance to infer the appropriate
|
||||
-- constant.
|
||||
@ -65,12 +81,12 @@ awsRegionMap = Map.fromList [
|
||||
-- of the provided smart constructors or override fields of the
|
||||
-- Default instance.
|
||||
data ConnectInfo = ConnectInfo {
|
||||
connectHost :: Text
|
||||
, connectPort :: Int
|
||||
, connectAccessKey :: Text
|
||||
, connectSecretKey :: Text
|
||||
, connectIsSecure :: Bool
|
||||
, connectRegion :: Region
|
||||
connectHost :: Text
|
||||
, connectPort :: Int
|
||||
, connectAccessKey :: Text
|
||||
, connectSecretKey :: Text
|
||||
, connectIsSecure :: Bool
|
||||
, connectRegion :: Region
|
||||
, connectAutoDiscoverRegion :: Bool
|
||||
} deriving (Eq, Show)
|
||||
|
||||
@ -169,7 +185,7 @@ type ETag = Text
|
||||
-- |
|
||||
-- BucketInfo returned for list buckets call
|
||||
data BucketInfo = BucketInfo {
|
||||
biName :: Bucket
|
||||
biName :: Bucket
|
||||
, biCreationDate :: UTCTime
|
||||
} deriving (Show, Eq)
|
||||
|
||||
@ -185,102 +201,85 @@ type PartTuple = (PartNumber, ETag)
|
||||
-- | Represents result from a listing of object parts of an ongoing
|
||||
-- multipart upload.
|
||||
data ListPartsResult = ListPartsResult {
|
||||
lprHasMore :: Bool
|
||||
lprHasMore :: Bool
|
||||
, lprNextPart :: Maybe Int
|
||||
, lprParts :: [ObjectPartInfo]
|
||||
, lprParts :: [ObjectPartInfo]
|
||||
} deriving (Show, Eq)
|
||||
|
||||
|
||||
-- | Represents information about an object part in an ongoing
|
||||
-- multipart upload.
|
||||
data ObjectPartInfo = ObjectPartInfo {
|
||||
opiNumber :: PartNumber
|
||||
, opiETag :: ETag
|
||||
, opiSize :: Int64
|
||||
opiNumber :: PartNumber
|
||||
, opiETag :: ETag
|
||||
, opiSize :: Int64
|
||||
, opiModTime :: UTCTime
|
||||
} deriving (Show, Eq)
|
||||
|
||||
-- | Represents result from a listing of incomplete uploads to a
|
||||
-- bucket.
|
||||
data ListUploadsResult = ListUploadsResult {
|
||||
lurHasMore :: Bool
|
||||
, lurNextKey :: Maybe Text
|
||||
lurHasMore :: Bool
|
||||
, lurNextKey :: Maybe Text
|
||||
, lurNextUpload :: Maybe Text
|
||||
, lurUploads :: [(Object, UploadId, UTCTime)]
|
||||
, lurCPrefixes :: [Text]
|
||||
, lurUploads :: [(Object, UploadId, UTCTime)]
|
||||
, lurCPrefixes :: [Text]
|
||||
} deriving (Show, Eq)
|
||||
|
||||
-- | Represents information about a multipart upload.
|
||||
data UploadInfo = UploadInfo {
|
||||
uiKey :: Object
|
||||
uiKey :: Object
|
||||
, uiUploadId :: UploadId
|
||||
, uiInitTime :: UTCTime
|
||||
, uiSize :: Int64
|
||||
, uiSize :: Int64
|
||||
} deriving (Show, Eq)
|
||||
|
||||
-- | Represents result from a listing of objects in a bucket.
|
||||
data ListObjectsResult = ListObjectsResult {
|
||||
lorHasMore :: Bool
|
||||
lorHasMore :: Bool
|
||||
, lorNextToken :: Maybe Text
|
||||
, lorObjects :: [ObjectInfo]
|
||||
, lorObjects :: [ObjectInfo]
|
||||
, lorCPrefixes :: [Text]
|
||||
} deriving (Show, Eq)
|
||||
|
||||
-- | Represents result from a listing of objects version 1 in a bucket.
|
||||
data ListObjectsV1Result = ListObjectsV1Result {
|
||||
lorHasMore' :: Bool
|
||||
lorHasMore' :: Bool
|
||||
, lorNextMarker :: Maybe Text
|
||||
, lorObjects' :: [ObjectInfo]
|
||||
, lorObjects' :: [ObjectInfo]
|
||||
, lorCPrefixes' :: [Text]
|
||||
} deriving (Show, Eq)
|
||||
|
||||
-- | Represents information about an object.
|
||||
data ObjectInfo = ObjectInfo {
|
||||
oiObject :: Object
|
||||
oiObject :: Object
|
||||
, oiModTime :: UTCTime
|
||||
, oiETag :: ETag
|
||||
, oiSize :: Int64
|
||||
, oiETag :: ETag
|
||||
, oiSize :: Int64
|
||||
} deriving (Show, Eq)
|
||||
|
||||
data CopyPartSource = CopyPartSource {
|
||||
-- | formatted like "\/sourceBucket\/sourceObject"
|
||||
cpSource :: Text
|
||||
-- | (0, 9) means first ten bytes of the source object
|
||||
, cpSourceRange :: Maybe (Int64, Int64)
|
||||
, cpSourceIfMatch :: Maybe Text
|
||||
, cpSourceIfNoneMatch :: Maybe Text
|
||||
, cpSourceIfUnmodifiedSince :: Maybe UTCTime
|
||||
, cpSourceIfModifiedSince :: Maybe UTCTime
|
||||
-- | Represents source object in server-side copy object
|
||||
data SourceInfo = SourceInfo {
|
||||
srcBucket :: Text
|
||||
, srcObject :: Text
|
||||
, srcRange :: Maybe (Int64, Int64)
|
||||
, srcIfMatch :: Maybe Text
|
||||
, srcIfNoneMatch :: Maybe Text
|
||||
, srcIfModifiedSince :: Maybe UTCTime
|
||||
, srcIfUnmodifiedSince :: Maybe UTCTime
|
||||
} deriving (Show, Eq)
|
||||
|
||||
instance Default CopyPartSource where
|
||||
def = CopyPartSource "" def def def def def
|
||||
instance Default SourceInfo where
|
||||
def = SourceInfo "" "" def def def def def
|
||||
|
||||
cpsToHeaders :: CopyPartSource -> [HT.Header]
|
||||
cpsToHeaders cps = ("x-amz-copy-source", encodeUtf8 $ cpSource cps) :
|
||||
rangeHdr ++ zip names values
|
||||
where
|
||||
names = ["x-amz-copy-source-if-match", "x-amz-copy-source-if-none-match",
|
||||
"x-amz-copy-source-if-unmodified-since",
|
||||
"x-amz-copy-source-if-modified-since"]
|
||||
values = mapMaybe (fmap encodeUtf8 . (cps &))
|
||||
[cpSourceIfMatch, cpSourceIfNoneMatch,
|
||||
fmap formatRFC1123 . cpSourceIfUnmodifiedSince,
|
||||
fmap formatRFC1123 . cpSourceIfModifiedSince]
|
||||
rangeHdr = ("x-amz-copy-source-range",)
|
||||
. HT.renderByteRanges
|
||||
. (:[])
|
||||
. uncurry HT.ByteRangeFromTo
|
||||
<$> map (both fromIntegral) (maybeToList $ cpSourceRange cps)
|
||||
-- | Represents destination object in server-side copy object
|
||||
data DestinationInfo = DestinationInfo {
|
||||
dstBucket :: Text
|
||||
, dstObject :: Text
|
||||
} deriving (Show, Eq)
|
||||
|
||||
-- | Extract the source bucket and source object name. TODO: validate
|
||||
-- the bucket and object name extracted.
|
||||
cpsToObject :: CopyPartSource -> Maybe (Bucket, Object)
|
||||
cpsToObject cps = do
|
||||
[_, bucket, object] <- Just splits
|
||||
return (bucket, object)
|
||||
where
|
||||
splits = T.splitOn "/" $ cpSource cps
|
||||
instance Default DestinationInfo where
|
||||
def = DestinationInfo "" ""
|
||||
|
||||
-- | A data-type for events that can occur in the object storage
|
||||
-- server. Reference:
|
||||
@ -352,7 +351,7 @@ instance Default FilterRules where
|
||||
-- for objects having a suffix of ".jpg", and the `prefixRule`
|
||||
-- restricts it to objects having a prefix of "images/".
|
||||
data FilterRule = FilterRule
|
||||
{ frName :: Text
|
||||
{ frName :: Text
|
||||
, frValue :: Text
|
||||
} deriving (Show, Eq)
|
||||
|
||||
@ -362,8 +361,8 @@ type Arn = Text
|
||||
-- notification system. It could represent a Queue, Topic or Lambda
|
||||
-- Function configuration.
|
||||
data NotificationConfig = NotificationConfig
|
||||
{ ncId :: Text
|
||||
, ncArn :: Arn
|
||||
{ ncId :: Text
|
||||
, ncArn :: Arn
|
||||
, ncEvents :: [Event]
|
||||
, ncFilter :: Filter
|
||||
} deriving (Show, Eq)
|
||||
@ -374,8 +373,8 @@ data NotificationConfig = NotificationConfig
|
||||
-- described at
|
||||
-- <https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html>
|
||||
data Notification = Notification
|
||||
{ nQueueConfigurations :: [NotificationConfig]
|
||||
, nTopicConfigurations :: [NotificationConfig]
|
||||
{ nQueueConfigurations :: [NotificationConfig]
|
||||
, nTopicConfigurations :: [NotificationConfig]
|
||||
, nCloudFunctionConfigurations :: [NotificationConfig]
|
||||
} deriving (Eq, Show)
|
||||
|
||||
@ -393,14 +392,14 @@ instance Default Payload where
|
||||
def = PayloadBS ""
|
||||
|
||||
data RequestInfo = RequestInfo {
|
||||
riMethod :: Method
|
||||
, riBucket :: Maybe Bucket
|
||||
, riObject :: Maybe Object
|
||||
, riQueryParams :: Query
|
||||
, riHeaders :: [Header]
|
||||
, riPayload :: Payload
|
||||
, riPayloadHash :: Maybe ByteString
|
||||
, riRegion :: Maybe Region
|
||||
riMethod :: Method
|
||||
, riBucket :: Maybe Bucket
|
||||
, riObject :: Maybe Object
|
||||
, riQueryParams :: Query
|
||||
, riHeaders :: [Header]
|
||||
, riPayload :: Payload
|
||||
, riPayloadHash :: Maybe ByteString
|
||||
, riRegion :: Maybe Region
|
||||
, riNeedsLocation :: Bool
|
||||
}
|
||||
|
||||
@ -445,7 +444,7 @@ instance MonadBaseControl IO Minio where
|
||||
|
||||
-- | MinioConn holds connection info and a connection pool
|
||||
data MinioConn = MinioConn {
|
||||
mcConnInfo :: ConnectInfo
|
||||
mcConnInfo :: ConnectInfo
|
||||
, mcConnManager :: NC.Manager
|
||||
}
|
||||
|
||||
|
||||
@ -19,9 +19,6 @@ module Network.Minio.PutObject
|
||||
putObjectInternal
|
||||
, ObjectData(..)
|
||||
, selectPartSizes
|
||||
, copyObjectInternal
|
||||
, selectCopyRanges
|
||||
, minPartSize
|
||||
) where
|
||||
|
||||
|
||||
@ -39,20 +36,6 @@ import Network.Minio.S3API
|
||||
import Network.Minio.Utils
|
||||
|
||||
|
||||
-- | max obj size is 5TiB
|
||||
maxObjectSize :: Int64
|
||||
maxObjectSize = 5 * 1024 * 1024 * oneMiB
|
||||
|
||||
-- | minimum size of parts used in multipart operations.
|
||||
minPartSize :: Int64
|
||||
minPartSize = 64 * oneMiB
|
||||
|
||||
oneMiB :: Int64
|
||||
oneMiB = 1024 * 1024
|
||||
|
||||
maxMultipartParts :: Int64
|
||||
maxMultipartParts = 10000
|
||||
|
||||
-- | A data-type to represent the source data for an object. A
|
||||
-- file-path or a producer-conduit may be provided.
|
||||
--
|
||||
@ -95,23 +78,6 @@ putObjectInternal b o (ODFile fp sizeMay) = do
|
||||
| otherwise -> sequentialMultipartUpload b o (Just size) $
|
||||
CB.sourceFile fp
|
||||
|
||||
-- | Select part sizes - the logic is that the minimum part-size will
|
||||
-- be 64MiB.
|
||||
selectPartSizes :: Int64 -> [(PartNumber, Int64, Int64)]
|
||||
selectPartSizes size = uncurry (List.zip3 [1..]) $
|
||||
List.unzip $ loop 0 size
|
||||
where
|
||||
ceil :: Double -> Int64
|
||||
ceil = ceiling
|
||||
partSize = max minPartSize (ceil $ fromIntegral size /
|
||||
fromIntegral maxMultipartParts)
|
||||
|
||||
m = fromIntegral partSize
|
||||
loop st sz
|
||||
| st > sz = []
|
||||
| st + m >= sz = [(st, sz - st)]
|
||||
| otherwise = (st, m) : loop (st + m) sz
|
||||
|
||||
parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64
|
||||
-> Minio ETag
|
||||
parallelMultipartUpload b o filePath size = do
|
||||
@ -163,69 +129,3 @@ sequentialMultipartUpload b o sizeMay src = do
|
||||
Just payload -> do pinfo <- lift $ putObjectPart b o uid pn [] payload
|
||||
C.yield pinfo
|
||||
uploadPart' uid pns
|
||||
|
||||
-- | Copy an object using single or multipart copy strategy.
|
||||
copyObjectInternal :: Bucket -> Object -> CopyPartSource
|
||||
-> Minio ETag
|
||||
copyObjectInternal b' o cps = do
|
||||
-- validate and extract the src bucket and object
|
||||
(srcBucket, srcObject) <- maybe
|
||||
(throwM $ MErrVInvalidSrcObjSpec $ cpSource cps)
|
||||
return $ cpsToObject cps
|
||||
|
||||
-- get source object size with a head request
|
||||
(ObjectInfo _ _ _ srcSize) <- headObject srcBucket srcObject
|
||||
|
||||
-- check that byte offsets are valid if specified in cps
|
||||
when (isJust (cpSourceRange cps) &&
|
||||
or [fst range < 0, snd range < fst range,
|
||||
snd range >= fromIntegral srcSize]) $
|
||||
throwM $ MErrVInvalidSrcObjByteRange range
|
||||
|
||||
-- 1. If sz > 64MiB (minPartSize) use multipart copy, OR
|
||||
-- 2. If startOffset /= 0 use multipart copy
|
||||
let destSize = (\(a, b) -> b - a + 1 ) $
|
||||
maybe (0, srcSize - 1) identity $ cpSourceRange cps
|
||||
startOffset = maybe 0 fst $ cpSourceRange cps
|
||||
endOffset = maybe (srcSize - 1) snd $ cpSourceRange cps
|
||||
|
||||
if destSize > minPartSize || (endOffset - startOffset + 1 /= srcSize)
|
||||
then multiPartCopyObject b' o cps srcSize
|
||||
|
||||
else fst <$> copyObjectSingle b' o cps{cpSourceRange = Nothing} []
|
||||
|
||||
where
|
||||
range = maybe (0, 0) identity $ cpSourceRange cps
|
||||
|
||||
-- | Given the input byte range of the source object, compute the
|
||||
-- splits for a multipart copy object procedure. Minimum part size
|
||||
-- used is minPartSize.
|
||||
selectCopyRanges :: (Int64, Int64) -> [(PartNumber, (Int64, Int64))]
|
||||
selectCopyRanges (st, end) = zip pns $
|
||||
map (\(x, y) -> (st + x, st + x + y - 1)) $ zip startOffsets partSizes
|
||||
where
|
||||
size = end - st + 1
|
||||
(pns, startOffsets, partSizes) = List.unzip3 $ selectPartSizes size
|
||||
|
||||
-- | Perform a multipart copy object action. Since we cannot verify
|
||||
-- existing parts based on the source object, there is no resuming
|
||||
-- copy action support.
|
||||
multiPartCopyObject :: Bucket -> Object -> CopyPartSource -> Int64
|
||||
-> Minio ETag
|
||||
multiPartCopyObject b o cps srcSize = do
|
||||
uid <- newMultipartUpload b o []
|
||||
|
||||
let byteRange = maybe (0, fromIntegral $ srcSize - 1) identity $
|
||||
cpSourceRange cps
|
||||
partRanges = selectCopyRanges byteRange
|
||||
partSources = map (\(x, y) -> (x, cps {cpSourceRange = Just y}))
|
||||
partRanges
|
||||
|
||||
copiedParts <- limitedMapConcurrently 10
|
||||
(\(pn, cps') -> do
|
||||
(etag, _) <- copyObjectPart b o cps' uid pn []
|
||||
return (pn, etag)
|
||||
)
|
||||
partSources
|
||||
|
||||
completeMultipartUpload b o uid copiedParts
|
||||
|
||||
@ -51,7 +51,6 @@ module Network.Minio.S3API
|
||||
, PartTuple
|
||||
, Payload(..)
|
||||
, PartNumber
|
||||
, CopyPartSource(..)
|
||||
, newMultipartUpload
|
||||
, putObjectPart
|
||||
, copyObjectPart
|
||||
@ -252,17 +251,33 @@ putObjectPart bucket object uploadId partNumber headers payload = do
|
||||
, ("partNumber", Just $ show partNumber)
|
||||
]
|
||||
|
||||
srcInfoToHeaders :: SourceInfo -> [HT.Header]
|
||||
srcInfoToHeaders srcInfo = ("x-amz-copy-source", encodeUtf8 $ format "/{}/{}" [srcBucket srcInfo, srcObject srcInfo]) :
|
||||
rangeHdr ++ zip names values
|
||||
where
|
||||
names = ["x-amz-copy-source-if-match", "x-amz-copy-source-if-none-match",
|
||||
"x-amz-copy-source-if-unmodified-since",
|
||||
"x-amz-copy-source-if-modified-since"]
|
||||
values = mapMaybe (fmap encodeUtf8 . (srcInfo &))
|
||||
[srcIfMatch, srcIfNoneMatch,
|
||||
fmap formatRFC1123 . srcIfUnmodifiedSince,
|
||||
fmap formatRFC1123 . srcIfModifiedSince]
|
||||
rangeHdr = maybe [] (\a -> [("x-amz-copy-source-range", HT.renderByteRanges [a])])
|
||||
$ toByteRange <$> srcRange srcInfo
|
||||
toByteRange :: (Int64, Int64) -> HT.ByteRange
|
||||
toByteRange (x, y) = HT.ByteRangeFromTo (fromIntegral x) (fromIntegral y)
|
||||
|
||||
-- | Performs server-side copy of an object or part of an object as an
|
||||
-- upload part of an ongoing multi-part upload.
|
||||
copyObjectPart :: Bucket -> Object -> CopyPartSource -> UploadId
|
||||
copyObjectPart :: DestinationInfo -> SourceInfo -> UploadId
|
||||
-> PartNumber -> [HT.Header] -> Minio (ETag, UTCTime)
|
||||
copyObjectPart bucket object cps uploadId partNumber headers = do
|
||||
copyObjectPart dstInfo srcInfo uploadId partNumber headers = do
|
||||
resp <- executeRequest $
|
||||
def { riMethod = HT.methodPut
|
||||
, riBucket = Just bucket
|
||||
, riObject = Just object
|
||||
, riBucket = Just $ dstBucket dstInfo
|
||||
, riObject = Just $ dstObject dstInfo
|
||||
, riQueryParams = mkOptionalParams params
|
||||
, riHeaders = headers ++ cpsToHeaders cps
|
||||
, riHeaders = headers ++ srcInfoToHeaders srcInfo
|
||||
}
|
||||
|
||||
parseCopyObjectResponse $ NC.responseBody resp
|
||||
@ -275,17 +290,17 @@ copyObjectPart bucket object cps uploadId partNumber headers = do
|
||||
-- | Performs server-side copy of an object that is upto 5GiB in
|
||||
-- size. If the object is greater than 5GiB, this function throws the
|
||||
-- error returned by the server.
|
||||
copyObjectSingle :: Bucket -> Object -> CopyPartSource -> [HT.Header]
|
||||
copyObjectSingle :: Bucket -> Object -> SourceInfo -> [HT.Header]
|
||||
-> Minio (ETag, UTCTime)
|
||||
copyObjectSingle bucket object cps headers = do
|
||||
-- validate that cpSourceRange is Nothing for this API.
|
||||
when (isJust $ cpSourceRange cps) $
|
||||
copyObjectSingle bucket object srcInfo headers = do
|
||||
-- validate that srcRange is Nothing for this API.
|
||||
when (isJust $ srcRange srcInfo) $
|
||||
throwM MErrVCopyObjSingleNoRangeAccepted
|
||||
resp <- executeRequest $
|
||||
def { riMethod = HT.methodPut
|
||||
, riBucket = Just bucket
|
||||
, riObject = Just object
|
||||
, riHeaders = headers ++ cpsToHeaders cps
|
||||
, riHeaders = headers ++ srcInfoToHeaders srcInfo
|
||||
}
|
||||
parseCopyObjectResponse $ NC.responseBody resp
|
||||
|
||||
|
||||
@ -24,8 +24,10 @@ import qualified Control.Monad.Trans.Resource as R
|
||||
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.ByteString.Lazy as LB
|
||||
import Data.CaseInsensitive (mk)
|
||||
import qualified Data.Conduit as C
|
||||
import qualified Data.Conduit.Binary as CB
|
||||
import qualified Data.List as List
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding.Error (lenientDecode)
|
||||
import Data.Text.Read (decimal)
|
||||
@ -35,11 +37,11 @@ import qualified Network.HTTP.Conduit as NC
|
||||
import qualified Network.HTTP.Types as HT
|
||||
import qualified Network.HTTP.Types.Header as Hdr
|
||||
import qualified System.IO as IO
|
||||
import Data.CaseInsensitive (mk)
|
||||
|
||||
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.XmlParser (parseErrResponse)
|
||||
|
||||
allocateReadFile :: (R.MonadResource m, R.MonadResourceBase m, MonadCatch m)
|
||||
@ -203,3 +205,20 @@ chunkBSConduit s = loop 0 [] s
|
||||
loop (fromIntegral $ B.length b) [b] sizes
|
||||
else loop (n + fromIntegral (B.length bs))
|
||||
(readChunks ++ [bs]) (size:sizes)
|
||||
|
||||
-- | Select part sizes - the logic is that the minimum part-size will
|
||||
-- be 64MiB.
|
||||
selectPartSizes :: Int64 -> [(PartNumber, Int64, Int64)]
|
||||
selectPartSizes size = uncurry (List.zip3 [1..]) $
|
||||
List.unzip $ loop 0 size
|
||||
where
|
||||
ceil :: Double -> Int64
|
||||
ceil = ceiling
|
||||
partSize = max minPartSize (ceil $ fromIntegral size /
|
||||
fromIntegral maxMultipartParts)
|
||||
|
||||
m = fromIntegral partSize
|
||||
loop st sz
|
||||
| st > sz = []
|
||||
| st + m >= sz = [(st, sz - st)]
|
||||
| otherwise = (st, m) : loop (st + m) sz
|
||||
|
||||
@ -42,7 +42,6 @@ import System.Environment (lookupEnv)
|
||||
|
||||
import Network.Minio
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.ListOps
|
||||
import Network.Minio.PutObject
|
||||
import Network.Minio.S3API
|
||||
import Network.Minio.Utils
|
||||
@ -57,8 +56,6 @@ tests = testGroup "Tests" [liveServerUnitTests]
|
||||
randomDataSrc :: MonadIO m => Int64 -> C.Producer m ByteString
|
||||
randomDataSrc s' = genBS s'
|
||||
where
|
||||
oneMiB = 1024*1024
|
||||
|
||||
concatIt bs n = BS.concat $ replicate (fromIntegral q) bs ++
|
||||
[BS.take (fromIntegral r) bs]
|
||||
where (q, r) = n `divMod` fromIntegral (BS.length bs)
|
||||
@ -166,7 +163,10 @@ highLevelListingTest = funTestWithBucket "High-level listObjects Test" $
|
||||
(map oiObject objects)
|
||||
|
||||
step "High-level listing of objects (version 1)"
|
||||
objects <- listObjectsV1 bucket Nothing True $$ sinkList
|
||||
objectsV1 <- listObjectsV1 bucket Nothing True $$ sinkList
|
||||
|
||||
liftIO $ assertEqual "Objects match failed!" (sort expectedObjects)
|
||||
(map oiObject objectsV1)
|
||||
|
||||
step "Cleanup actions"
|
||||
forM_ expectedObjects $
|
||||
@ -226,14 +226,14 @@ listingTest = funTestWithBucket "Listing Test" $ \step bucket -> do
|
||||
(map oiObject $ lorObjects res)
|
||||
|
||||
step "Simple list version 1"
|
||||
res <- listObjectsV1' bucket Nothing Nothing Nothing Nothing
|
||||
resV1 <- listObjectsV1' bucket Nothing Nothing Nothing Nothing
|
||||
let expected = sort $ map (T.concat .
|
||||
("lsb-release":) .
|
||||
(\x -> [x]) .
|
||||
T.pack .
|
||||
show) [1..10::Int]
|
||||
liftIO $ assertEqual "Objects match failed!" expected
|
||||
(map oiObject $ lorObjects' res)
|
||||
(map oiObject $ lorObjects' resV1)
|
||||
|
||||
step "Cleanup actions"
|
||||
forM_ objects $ \obj -> deleteObject bucket obj
|
||||
@ -338,8 +338,8 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
|
||||
fPutObject bucket object inputFile
|
||||
|
||||
step "copy object"
|
||||
let cps = def { cpSource = format "/{}/{}" [bucket, object] }
|
||||
(etag, modTime) <- copyObjectSingle bucket objCopy cps []
|
||||
let srcInfo = def { srcBucket = bucket, srcObject = object}
|
||||
(etag, modTime) <- copyObjectSingle bucket objCopy srcInfo []
|
||||
|
||||
-- retrieve obj info to check
|
||||
ObjectInfo _ t e s <- headObject bucket objCopy
|
||||
@ -368,10 +368,11 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
|
||||
liftIO $ (T.length uid > 0) @? "Got an empty multipartUpload Id."
|
||||
|
||||
step "put object parts 1-3"
|
||||
let cps' = def {cpSource = format "/{}/{}" [bucket, srcObj]}
|
||||
let srcInfo' = def { srcBucket = bucket, srcObject = srcObj }
|
||||
dstInfo' = def { dstBucket = bucket, dstObject = copyObj }
|
||||
parts <- forM [1..3] $ \p -> do
|
||||
(etag', _) <- copyObjectPart bucket copyObj cps'{
|
||||
cpSourceRange = Just ((p-1)*mb5, (p-1)*mb5 + (mb5 - 1))
|
||||
(etag', _) <- copyObjectPart dstInfo' srcInfo'{
|
||||
srcRange = Just $ (,) ((p-1)*mb5) ((p-1)*mb5 + (mb5 - 1))
|
||||
} uid (fromIntegral p) []
|
||||
return (fromIntegral p, etag')
|
||||
|
||||
@ -398,7 +399,7 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
|
||||
|
||||
step "make small and large object copy"
|
||||
forM_ (zip copyObjs srcs) $ \(cp, src) ->
|
||||
copyObject bucket cp def{cpSource = format "/{}/{}" [bucket, src]}
|
||||
copyObject def {dstBucket = bucket, dstObject = cp} def{srcBucket = bucket, srcObject = src}
|
||||
|
||||
step "verify uploaded objects"
|
||||
uploadedSizes <- fmap oiSize <$> forM copyObjs (headObject bucket)
|
||||
@ -415,9 +416,10 @@ liveServerUnitTests = testGroup "Unit tests against a live server"
|
||||
fPutObject bucket src =<< mkRandFile size
|
||||
|
||||
step "copy last 10MiB of object"
|
||||
copyObject bucket copyObj def{
|
||||
cpSource = format "/{}/{}" [bucket, src]
|
||||
, cpSourceRange = Just (5 * 1024 * 1024, size - 1)
|
||||
copyObject def { dstBucket = bucket, dstObject = copyObj } def{
|
||||
srcBucket = bucket
|
||||
, srcObject = src
|
||||
, srcRange = Just $ (,) (5 * 1024 * 1024) (size - 1)
|
||||
}
|
||||
|
||||
step "verify uploaded object"
|
||||
|
||||
@ -22,6 +22,8 @@ import qualified Data.List as L
|
||||
import Lib.Prelude
|
||||
|
||||
import Network.Minio.API.Test
|
||||
import Network.Minio.CopyObject
|
||||
import Network.Minio.Data
|
||||
import Network.Minio.PutObject
|
||||
import Network.Minio.Utils.Test
|
||||
import Network.Minio.XmlGenerator.Test
|
||||
|
||||
Loading…
Reference in New Issue
Block a user