diff --git a/minio-hs.cabal b/minio-hs.cabal index 56e3eec..e0f997a 100644 --- a/minio-hs.cabal +++ b/minio-hs.cabal @@ -64,6 +64,7 @@ library , MultiParamTypeClasses , MultiWayIf , RankNTypes + , ScopedTypeVariables , TypeFamilies , TupleSections diff --git a/src/Lib/Prelude.hs b/src/Lib/Prelude.hs index a5549c3..f761999 100644 --- a/src/Lib/Prelude.hs +++ b/src/Lib/Prelude.hs @@ -11,5 +11,6 @@ module Lib.Prelude import Protolude as Exports import Data.Time as Exports (UTCTime) +import Data.Maybe as Exports (catMaybes, listToMaybe) import Control.Monad.Catch as Exports (throwM, MonadThrow, MonadCatch) diff --git a/src/Network/Minio/PutObject.hs b/src/Network/Minio/PutObject.hs index f34a8fe..f7e633e 100644 --- a/src/Network/Minio/PutObject.hs +++ b/src/Network/Minio/PutObject.hs @@ -5,7 +5,6 @@ module Network.Minio.PutObject ) where -import qualified Control.Monad.Trans.Resource as R import qualified Data.Conduit as C import qualified Data.Conduit.Binary as CB import qualified Data.List as List @@ -44,15 +43,16 @@ data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and option -- objects of all sizes, and even if the object size is unknown. putObject :: Bucket -> Object -> ObjectData Minio -> Minio ETag putObject b o (ODFile fp sizeMay) = do - isSeekable <- isFileSeekable fp + hResE <- withNewHandle fp $ \h -> do + isSeekable <- isHandleSeekable h + handleSizeMay <- getFileSize h + return (isSeekable, handleSizeMay) - -- FIXME: allocateReadFile may return exceptions and shortcircuit - finalSizeMay <- maybe (do (rKey, h) <- allocateReadFile fp - sizeE <- getFileSize h - R.release rKey - return $ hush $ sizeE - ) - (return . Just) sizeMay + (isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return + hResE + + -- prefer given size to queried size. + let finalSizeMay = listToMaybe $ catMaybes [sizeMay, handleSizeMay] case finalSizeMay of -- unable to get size, so assume non-seekable file and max-object size @@ -62,11 +62,9 @@ putObject b o (ODFile fp sizeMay) = do -- got file size, so check for single/multipart upload Just size -> if | size <= 64 * oneMiB -> do - (rKey, h) <- allocateReadFile fp - etag <- putObjectSingle b o [] h 0 size - R.release rKey - return etag - | size > maxObjectSize -> R.throwM $ ValidationError $ + resE <- withNewHandle fp (\h -> putObjectSingle b o [] h 0 size) + either throwM return resE + | size > maxObjectSize -> throwM $ ValidationError $ MErrVPutSizeExceeded size | isSeekable -> parallelMultipartUpload b o fp size | otherwise -> sequentialMultipartUpload b o (Just size) $ @@ -93,18 +91,18 @@ parallelMultipartUpload b o filePath size = do uploadId <- newMultipartUpload b o [] -- perform upload with 10 threads - uploadedParts <- limitedMapConcurrently 10 (uploadPart uploadId) partSizeInfo + uploadedPartsE <- limitedMapConcurrently 10 (uploadPart uploadId) partSizeInfo - completeMultipartUpload b o uploadId uploadedParts + -- if there were any errors, rethrow exception. + mapM_ throwM $ lefts uploadedPartsE + + -- if we get here, all parts were successfully uploaded. + completeMultipartUpload b o uploadId $ rights uploadedPartsE where - uploadPart uploadId (partNum, offset, sz) = do - (rKey, h) <- allocateReadFile filePath - pInfo <- putObjectPart b o uploadId partNum [] $ PayloadH h offset sz - R.release rKey - return pInfo + uploadPart uploadId (partNum, offset, sz) = withNewHandle filePath $ + \h -> putObjectPart b o uploadId partNum [] $ PayloadH h offset sz --- | Upload multipart object from conduit source sequentially without --- object size information. +-- | Upload multipart object from conduit source sequentially sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64 -> C.Producer Minio ByteString -> Minio ETag sequentialMultipartUpload b o sizeMay src = do @@ -145,3 +143,15 @@ sequentialMultipartUpload b o sizeMay src = do pInfo <- putObjectPart b o uid partNum [] $ PayloadBS $ LB.toStrict buf return $ reverse (pInfo:u) + +-- | Looks for incomplete uploads for an object. Returns the first one +-- if there are many. +getExistingUpload :: Bucket -> Object + -> Minio (Maybe (UploadId, [ListPartInfo])) +getExistingUpload b o = do + uploadsRes <- listIncompleteUploads' b (Just o) Nothing Nothing Nothing + case uiUploadId <$> listToMaybe (lurUploads uploadsRes) of + Nothing -> return Nothing + Just uid -> do + lpr <- listIncompleteParts' b o uid Nothing Nothing + return $ Just (uid, lprParts lpr) diff --git a/src/Network/Minio/S3API.hs b/src/Network/Minio/S3API.hs index e7b4e42..ae2e084 100644 --- a/src/Network/Minio/S3API.hs +++ b/src/Network/Minio/S3API.hs @@ -213,7 +213,7 @@ abortMultipartUpload bucket object uploadId = do -- | List incomplete multipart uploads. listIncompleteUploads' :: Bucket -> Maybe Text -> Maybe Text -> Maybe Text - -> Maybe Text -> Minio ListUploadsResult + -> Maybe Text -> Minio ListUploadsResult listIncompleteUploads' bucket prefix delimiter keyMarker uploadIdMarker = do resp <- executeRequest $ def { riMethod = HT.methodGet , riBucket = Just bucket diff --git a/src/Network/Minio/Utils.hs b/src/Network/Minio/Utils.hs index b43a218..0f91eff 100644 --- a/src/Network/Minio/Utils.hs +++ b/src/Network/Minio/Utils.hs @@ -5,6 +5,7 @@ import qualified Control.Concurrent.QSem as Q import qualified Control.Exception.Lifted as ExL import Control.Monad.Trans.Control (liftBaseOp_, StM) import qualified Control.Monad.Trans.Resource as R +import qualified Control.Monad.Catch as MC import qualified Data.Conduit as C import Data.Text.Encoding.Error (lenientDecode) import qualified Network.HTTP.Client as NClient @@ -26,17 +27,43 @@ allocateReadFile fp = do openReadFile f = ExL.try $ IO.openBinaryFile f IO.ReadMode cleanup = either (const $ return ()) IO.hClose +-- | Queries the file size from the handle. Catches any file operation +-- exceptions and returns Nothing instead. getFileSize :: (R.MonadResourceBase m, R.MonadResource m) - => Handle -> m (Either IOException Int64) -getFileSize h = ExL.try $ liftIO $ fromIntegral <$> IO.hFileSize h + => Handle -> m (Maybe Int64) +getFileSize h = do + resE <- liftIO $ try $ fromIntegral <$> IO.hFileSize h + case resE of + Left (_ :: IOException) -> return Nothing + Right s -> return $ Just s -isFileSeekable :: (R.MonadResource m, R.MonadResourceBase m) - => FilePath -> m Bool -isFileSeekable fp = do - (rKey, h) <- allocateReadFile fp +-- | Queries if handle is seekable. Catches any file operation +-- exceptions and return False instead. +isHandleSeekable :: (R.MonadResource m, R.MonadResourceBase m) + => Handle -> m Bool +isHandleSeekable h = do resE <- liftIO $ try $ IO.hIsSeekable h - R.release rKey - either (throwM . MEFile) return resE + case resE of + Left (_ :: IOException) -> return False + Right v -> return v + +-- | Helper function that opens a handle to the filepath and performs +-- the given action on it. Exceptions of type MError are caught and +-- returned - both during file handle allocation and when the action +-- is run. +withNewHandle :: (R.MonadResourceBase m, R.MonadResource m, MonadCatch m) + => FilePath -> (Handle -> m a) -> m (Either MError a) +withNewHandle fp fileAction = do + -- opening a handle can throw MError exception. + handleE <- MC.try $ allocateReadFile fp + either (return . Left) doAction handleE + where + doAction (rkey, h) = do + -- fileAction may also throw MError exception, so we catch and + -- return it. + resE <- MC.try $ fileAction h + R.release rkey + return resE lookupHeader :: HT.HeaderName -> [HT.Header] -> Maybe ByteString