Improve error handling; misc
This commit is contained in:
parent
aa66ba291e
commit
43bfabd186
@ -64,6 +64,7 @@ library
|
||||
, MultiParamTypeClasses
|
||||
, MultiWayIf
|
||||
, RankNTypes
|
||||
, ScopedTypeVariables
|
||||
, TypeFamilies
|
||||
, TupleSections
|
||||
|
||||
|
||||
@ -11,5 +11,6 @@ module Lib.Prelude
|
||||
import Protolude as Exports
|
||||
|
||||
import Data.Time as Exports (UTCTime)
|
||||
import Data.Maybe as Exports (catMaybes, listToMaybe)
|
||||
|
||||
import Control.Monad.Catch as Exports (throwM, MonadThrow, MonadCatch)
|
||||
|
||||
@ -5,7 +5,6 @@ module Network.Minio.PutObject
|
||||
) where
|
||||
|
||||
|
||||
import qualified Control.Monad.Trans.Resource as R
|
||||
import qualified Data.Conduit as C
|
||||
import qualified Data.Conduit.Binary as CB
|
||||
import qualified Data.List as List
|
||||
@ -44,15 +43,16 @@ data ObjectData m = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and option
|
||||
-- objects of all sizes, and even if the object size is unknown.
|
||||
putObject :: Bucket -> Object -> ObjectData Minio -> Minio ETag
|
||||
putObject b o (ODFile fp sizeMay) = do
|
||||
isSeekable <- isFileSeekable fp
|
||||
hResE <- withNewHandle fp $ \h -> do
|
||||
isSeekable <- isHandleSeekable h
|
||||
handleSizeMay <- getFileSize h
|
||||
return (isSeekable, handleSizeMay)
|
||||
|
||||
-- FIXME: allocateReadFile may return exceptions and shortcircuit
|
||||
finalSizeMay <- maybe (do (rKey, h) <- allocateReadFile fp
|
||||
sizeE <- getFileSize h
|
||||
R.release rKey
|
||||
return $ hush $ sizeE
|
||||
)
|
||||
(return . Just) sizeMay
|
||||
(isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return
|
||||
hResE
|
||||
|
||||
-- prefer given size to queried size.
|
||||
let finalSizeMay = listToMaybe $ catMaybes [sizeMay, handleSizeMay]
|
||||
|
||||
case finalSizeMay of
|
||||
-- unable to get size, so assume non-seekable file and max-object size
|
||||
@ -62,11 +62,9 @@ putObject b o (ODFile fp sizeMay) = do
|
||||
-- got file size, so check for single/multipart upload
|
||||
Just size ->
|
||||
if | size <= 64 * oneMiB -> do
|
||||
(rKey, h) <- allocateReadFile fp
|
||||
etag <- putObjectSingle b o [] h 0 size
|
||||
R.release rKey
|
||||
return etag
|
||||
| size > maxObjectSize -> R.throwM $ ValidationError $
|
||||
resE <- withNewHandle fp (\h -> putObjectSingle b o [] h 0 size)
|
||||
either throwM return resE
|
||||
| size > maxObjectSize -> throwM $ ValidationError $
|
||||
MErrVPutSizeExceeded size
|
||||
| isSeekable -> parallelMultipartUpload b o fp size
|
||||
| otherwise -> sequentialMultipartUpload b o (Just size) $
|
||||
@ -93,18 +91,18 @@ parallelMultipartUpload b o filePath size = do
|
||||
uploadId <- newMultipartUpload b o []
|
||||
|
||||
-- perform upload with 10 threads
|
||||
uploadedParts <- limitedMapConcurrently 10 (uploadPart uploadId) partSizeInfo
|
||||
uploadedPartsE <- limitedMapConcurrently 10 (uploadPart uploadId) partSizeInfo
|
||||
|
||||
completeMultipartUpload b o uploadId uploadedParts
|
||||
-- if there were any errors, rethrow exception.
|
||||
mapM_ throwM $ lefts uploadedPartsE
|
||||
|
||||
-- if we get here, all parts were successfully uploaded.
|
||||
completeMultipartUpload b o uploadId $ rights uploadedPartsE
|
||||
where
|
||||
uploadPart uploadId (partNum, offset, sz) = do
|
||||
(rKey, h) <- allocateReadFile filePath
|
||||
pInfo <- putObjectPart b o uploadId partNum [] $ PayloadH h offset sz
|
||||
R.release rKey
|
||||
return pInfo
|
||||
uploadPart uploadId (partNum, offset, sz) = withNewHandle filePath $
|
||||
\h -> putObjectPart b o uploadId partNum [] $ PayloadH h offset sz
|
||||
|
||||
-- | Upload multipart object from conduit source sequentially without
|
||||
-- object size information.
|
||||
-- | Upload multipart object from conduit source sequentially
|
||||
sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64
|
||||
-> C.Producer Minio ByteString -> Minio ETag
|
||||
sequentialMultipartUpload b o sizeMay src = do
|
||||
@ -145,3 +143,15 @@ sequentialMultipartUpload b o sizeMay src = do
|
||||
pInfo <- putObjectPart b o uid partNum [] $
|
||||
PayloadBS $ LB.toStrict buf
|
||||
return $ reverse (pInfo:u)
|
||||
|
||||
-- | Looks for incomplete uploads for an object. Returns the first one
|
||||
-- if there are many.
|
||||
getExistingUpload :: Bucket -> Object
|
||||
-> Minio (Maybe (UploadId, [ListPartInfo]))
|
||||
getExistingUpload b o = do
|
||||
uploadsRes <- listIncompleteUploads' b (Just o) Nothing Nothing Nothing
|
||||
case uiUploadId <$> listToMaybe (lurUploads uploadsRes) of
|
||||
Nothing -> return Nothing
|
||||
Just uid -> do
|
||||
lpr <- listIncompleteParts' b o uid Nothing Nothing
|
||||
return $ Just (uid, lprParts lpr)
|
||||
|
||||
@ -213,7 +213,7 @@ abortMultipartUpload bucket object uploadId = do
|
||||
|
||||
-- | List incomplete multipart uploads.
|
||||
listIncompleteUploads' :: Bucket -> Maybe Text -> Maybe Text -> Maybe Text
|
||||
-> Maybe Text -> Minio ListUploadsResult
|
||||
-> Maybe Text -> Minio ListUploadsResult
|
||||
listIncompleteUploads' bucket prefix delimiter keyMarker uploadIdMarker = do
|
||||
resp <- executeRequest $ def { riMethod = HT.methodGet
|
||||
, riBucket = Just bucket
|
||||
|
||||
@ -5,6 +5,7 @@ import qualified Control.Concurrent.QSem as Q
|
||||
import qualified Control.Exception.Lifted as ExL
|
||||
import Control.Monad.Trans.Control (liftBaseOp_, StM)
|
||||
import qualified Control.Monad.Trans.Resource as R
|
||||
import qualified Control.Monad.Catch as MC
|
||||
import qualified Data.Conduit as C
|
||||
import Data.Text.Encoding.Error (lenientDecode)
|
||||
import qualified Network.HTTP.Client as NClient
|
||||
@ -26,17 +27,43 @@ allocateReadFile fp = do
|
||||
openReadFile f = ExL.try $ IO.openBinaryFile f IO.ReadMode
|
||||
cleanup = either (const $ return ()) IO.hClose
|
||||
|
||||
-- | Queries the file size from the handle. Catches any file operation
|
||||
-- exceptions and returns Nothing instead.
|
||||
getFileSize :: (R.MonadResourceBase m, R.MonadResource m)
|
||||
=> Handle -> m (Either IOException Int64)
|
||||
getFileSize h = ExL.try $ liftIO $ fromIntegral <$> IO.hFileSize h
|
||||
=> Handle -> m (Maybe Int64)
|
||||
getFileSize h = do
|
||||
resE <- liftIO $ try $ fromIntegral <$> IO.hFileSize h
|
||||
case resE of
|
||||
Left (_ :: IOException) -> return Nothing
|
||||
Right s -> return $ Just s
|
||||
|
||||
isFileSeekable :: (R.MonadResource m, R.MonadResourceBase m)
|
||||
=> FilePath -> m Bool
|
||||
isFileSeekable fp = do
|
||||
(rKey, h) <- allocateReadFile fp
|
||||
-- | Queries if handle is seekable. Catches any file operation
|
||||
-- exceptions and return False instead.
|
||||
isHandleSeekable :: (R.MonadResource m, R.MonadResourceBase m)
|
||||
=> Handle -> m Bool
|
||||
isHandleSeekable h = do
|
||||
resE <- liftIO $ try $ IO.hIsSeekable h
|
||||
R.release rKey
|
||||
either (throwM . MEFile) return resE
|
||||
case resE of
|
||||
Left (_ :: IOException) -> return False
|
||||
Right v -> return v
|
||||
|
||||
-- | Helper function that opens a handle to the filepath and performs
|
||||
-- the given action on it. Exceptions of type MError are caught and
|
||||
-- returned - both during file handle allocation and when the action
|
||||
-- is run.
|
||||
withNewHandle :: (R.MonadResourceBase m, R.MonadResource m, MonadCatch m)
|
||||
=> FilePath -> (Handle -> m a) -> m (Either MError a)
|
||||
withNewHandle fp fileAction = do
|
||||
-- opening a handle can throw MError exception.
|
||||
handleE <- MC.try $ allocateReadFile fp
|
||||
either (return . Left) doAction handleE
|
||||
where
|
||||
doAction (rkey, h) = do
|
||||
-- fileAction may also throw MError exception, so we catch and
|
||||
-- return it.
|
||||
resE <- MC.try $ fileAction h
|
||||
R.release rkey
|
||||
return resE
|
||||
|
||||
|
||||
lookupHeader :: HT.HeaderName -> [HT.Header] -> Maybe ByteString
|
||||
|
||||
Loading…
Reference in New Issue
Block a user