Add support for S3Select API (#108)

This commit is contained in:
Aditya Manthramurthy 2019-03-08 15:54:36 -08:00 committed by Harshavardhana
parent ab7d04bb59
commit 72bf08129c
12 changed files with 818 additions and 16 deletions

View File

@ -28,7 +28,7 @@ awsCI { connectAccesskey = "your-access-key"
|[`listObjects`](#listObjects)|[`fPutObject`](#fPutObject)||
|[`listObjectsV1`](#listObjectsV1)|[`copyObject`](#copyObject)||
|[`listIncompleteUploads`](#listIncompleteUploads)|[`removeObject`](#removeObject)||
|[`bucketExists`](#bucketExists)|||
|[`bucketExists`](#bucketExists)|[`selectObjectContent`](#selectObjectContent)||
## 1. Connecting and running operations on the storage service
@ -743,6 +743,59 @@ main = do
Right _ -> putStrLn "Removed incomplete upload successfully"
```
<a name="selectObjectContent"></a>
### selectObjectContent :: Bucket -> Object -> SelectRequest -> Minio (ConduitT () EventMessage Minio ())
Removes an ongoing multipart upload of an object from the service
__Parameters__
In the expression `selectObjectContent bucketName objectName selReq`
the parameters are:
|Param |Type |Description |
|:---|:---| :---|
| `bucketName` | _Bucket_ (alias for `Text`) | Name of the bucket |
| `objectName` | _Object_ (alias for `Text`) | Name of the object |
| `selReq` | _SelectRequest_ | Select request parameters |
__SelectRequest record__
This record is created using `selectRequest`. Please refer to the Haddocks for further information.
__Return Value__
The return value can be used to read individual `EventMessage`s in the response. Please refer to the Haddocks for further information.
|Return type | Description |
|:---|:---|
| _Minio (C.conduitT () EventMessage Minio ())_ | A Conduit source of `EventMessage` values. |
__Example__
```haskell
{-# Language OverloadedStrings #-}
import Network.Minio
import qualified Conduit as C
main :: IO ()
main = do
let
bucket = "mybucket"
object = "myobject"
res <- runMinio minioPlayCI $ do
let sr = selectRequest "Select * from s3object"
defaultCsvInput defaultCsvOutput
res <- selectObjectContent bucket object sr
C.runConduit $ res C..| getPayloadBytes C..| C.stdoutC
case res of
Left _ -> putStrLn "Failed!"
Right _ -> putStrLn "Success!"
```
<a name="BucketExists"></a>
### bucketExists :: Bucket -> Minio Bool
Checks if a bucket exists.

50
examples/SelectObject.hs Executable file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env stack
-- stack --resolver lts-13.1 runghc --package minio-hs
--
-- Minio Haskell SDK, (C) 2019 Minio, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
{-# LANGUAGE OverloadedStrings #-}
import Network.Minio
import qualified Conduit as C
import Control.Monad (when)
import qualified Data.ByteString.Lazy as LB
import Prelude
main :: IO ()
main = do
let bucket = "selectbucket"
object = "1.csv"
content = "Name,Place,Temperature\n"
<> "James,San Jose,76\n"
<> "Alicia,San Leandro,88\n"
<> "Mark,San Carlos,90\n"
res <- runMinio minioPlayCI $ do
exists <- bucketExists bucket
when (not exists) $
makeBucket bucket Nothing
C.liftIO $ putStrLn "Uploading csv object"
putObject bucket object (C.sourceLazy content) Nothing defaultPutObjectOptions
let sr = selectRequest "Select * from s3object" defaultCsvInput defaultCsvOutput
res <- selectObjectContent bucket object sr
C.runConduit $ res C..| getPayloadBytes C..| C.stdoutC
print res

View File

@ -41,6 +41,7 @@ library
, Network.Minio.ListOps
, Network.Minio.PresignedOperations
, Network.Minio.PutObject
, Network.Minio.SelectAPI
, Network.Minio.Sign.V4
, Network.Minio.Utils
, Network.Minio.XmlGenerator
@ -50,6 +51,7 @@ library
, protolude >= 0.2 && < 0.3
, aeson >= 1.2
, base64-bytestring >= 1.0
, binary >= 0.8.5.0
, bytestring >= 0.10
, case-insensitive >= 1.2
, conduit >= 1.3
@ -57,6 +59,7 @@ library
, containers >= 0.5
, cryptonite >= 0.25
, cryptonite-conduit >= 0.2
, digest >= 0.0.1
, directory
, filepath >= 1.4
, http-client >= 0.5
@ -64,12 +67,14 @@ library
, http-types >= 0.12
, ini
, memory >= 0.14
, raw-strings-qq >= 1
, resourcet >= 1.2
, text >= 1.2
, time >= 1.8
, transformers >= 0.5
, unliftio >= 0.2
, unliftio-core >= 0.1
, unordered-containers >= 0.2
, xml-conduit >= 1.8
default-language: Haskell2010
default-extensions: BangPatterns
@ -120,6 +125,7 @@ test-suite minio-hs-live-server-test
, Network.Minio.PresignedOperations
, Network.Minio.PutObject
, Network.Minio.S3API
, Network.Minio.SelectAPI
, Network.Minio.Sign.V4
, Network.Minio.TestHelpers
, Network.Minio.Utils
@ -131,11 +137,12 @@ test-suite minio-hs-live-server-test
, Network.Minio.XmlParser.Test
, Network.Minio.JsonParser
, Network.Minio.JsonParser.Test
build-depends: base
build-depends: base >= 4.7 && < 5
, minio-hs
, protolude >= 0.1.6
, aeson
, base64-bytestring
, binary
, bytestring
, case-insensitive
, conduit
@ -143,6 +150,7 @@ test-suite minio-hs-live-server-test
, containers
, cryptonite
, cryptonite-conduit
, digest
, directory
, filepath
, http-client
@ -151,6 +159,7 @@ test-suite minio-hs-live-server-test
, ini
, memory
, QuickCheck
, raw-strings-qq >= 1
, resourcet
, tasty
, tasty-hunit
@ -162,6 +171,7 @@ test-suite minio-hs-live-server-test
, transformers
, unliftio
, unliftio-core
, unordered-containers
, xml-conduit
if !flag(live-test)
buildable: False
@ -170,11 +180,12 @@ test-suite minio-hs-test
type: exitcode-stdio-1.0
hs-source-dirs: test, src
main-is: Spec.hs
build-depends: base
build-depends: base >= 4.7 && < 5
, minio-hs
, protolude >= 0.1.6
, aeson
, base64-bytestring
, binary
, bytestring
, case-insensitive
, conduit
@ -183,6 +194,7 @@ test-suite minio-hs-test
, cryptonite
, cryptonite-conduit
, filepath
, digest
, directory
, http-client
, http-conduit
@ -190,6 +202,7 @@ test-suite minio-hs-test
, ini
, memory
, QuickCheck
, raw-strings-qq >= 1
, resourcet
, tasty
, tasty-hunit
@ -201,6 +214,7 @@ test-suite minio-hs-test
, transformers
, unliftio
, unliftio-core
, unordered-containers
, xml-conduit
ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010
@ -230,6 +244,7 @@ test-suite minio-hs-test
, Network.Minio.PresignedOperations
, Network.Minio.PutObject
, Network.Minio.S3API
, Network.Minio.SelectAPI
, Network.Minio.Sign.V4
, Network.Minio.TestHelpers
, Network.Minio.Utils

View File

@ -158,6 +158,9 @@ module Network.Minio
, removeObject
, removeIncompleteUpload
-- ** Select Object Content with SQL
, module Network.Minio.SelectAPI
-- * Presigned Operations
-------------------------
, UrlExpiry
@ -207,6 +210,7 @@ import Network.Minio.Errors
import Network.Minio.ListOps
import Network.Minio.PutObject
import Network.Minio.S3API
import Network.Minio.SelectAPI
import Network.Minio.Utils
-- | Lists buckets.

View File

@ -14,6 +14,7 @@
-- limitations under the License.
--
{-# LANGUAGE CPP #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE TypeFamilies #-}
module Network.Minio.Data where
@ -25,6 +26,7 @@ import Control.Monad.IO.Unlift (MonadUnliftIO, UnliftIO (..),
import Control.Monad.Trans.Resource
import qualified Data.ByteString as B
import Data.CaseInsensitive (mk)
import qualified Data.HashMap.Strict as H
import qualified Data.Ini as Ini
import qualified Data.Map as Map
import Data.String (IsString (..))
@ -511,6 +513,237 @@ data Notification = Notification
defaultNotification :: Notification
defaultNotification = Notification [] [] []
--------------------------------------------------------------------------
-- Select API Related Types
--------------------------------------------------------------------------
-- | SelectRequest represents the Select API call. Use the
-- `selectRequest` function to create a value of this type.
data SelectRequest = SelectRequest
{ srExpression :: Text
, srExpressionType :: ExpressionType
, srInputSerialization :: InputSerialization
, srOutputSerialization :: OutputSerialization
, srRequestProgressEnabled :: Maybe Bool
} deriving (Eq, Show)
data ExpressionType = SQL
deriving (Eq, Show)
-- | InputSerialization represents format information of the input
-- object being queried. Use one of the smart constructors such as
-- `defaultCsvInput` as a starting value, and add compression info
-- using `setInputCompressionType`
data InputSerialization = InputSerialization
{ isCompressionType :: Maybe CompressionType
, isFormatInfo :: InputFormatInfo
} deriving (Eq, Show)
data CompressionType = CompressionTypeNone
| CompressionTypeGzip
| CompressionTypeBzip2
deriving (Eq, Show)
data InputFormatInfo = InputFormatCSV CSVInputProp
| InputFormatJSON JSONInputProp
| InputFormatParquet
deriving (Eq, Show)
-- | defaultCsvInput returns InputSerialization with default CSV
-- format, and without any compression setting.
defaultCsvInput :: InputSerialization
defaultCsvInput = InputSerialization Nothing (InputFormatCSV defaultCSVProp)
-- | linesJsonInput returns InputSerialization with JSON line based
-- format with no compression setting.
linesJsonInput :: InputSerialization
linesJsonInput = InputSerialization Nothing
(InputFormatJSON $ JSONInputProp JSONTypeLines)
-- | documentJsonInput returns InputSerialization with JSON document
-- based format with no compression setting.
documentJsonInput :: InputSerialization
documentJsonInput = InputSerialization Nothing
(InputFormatJSON $ JSONInputProp JSONTypeDocument)
-- | defaultParquetInput returns InputSerialization with Parquet
-- format, and no compression setting.
defaultParquetInput :: InputSerialization
defaultParquetInput = InputSerialization Nothing InputFormatParquet
-- | setInputCompressionType sets the compression type for the input
-- of the SelectRequest
setInputCompressionType :: CompressionType -> SelectRequest
-> SelectRequest
setInputCompressionType c i =
let is = srInputSerialization i
is' = is { isCompressionType = Just c }
in i { srInputSerialization = is' }
-- | defaultCsvOutput returns OutputSerialization with default CSV
-- format.
defaultCsvOutput :: OutputSerialization
defaultCsvOutput = OutputSerializationCSV defaultCSVProp
-- | defaultJsonInput returns OutputSerialization with default JSON
-- format.
defaultJsonOutput :: OutputSerialization
defaultJsonOutput = OutputSerializationJSON (JSONOutputProp Nothing)
-- | selectRequest is used to build a `SelectRequest`
-- value. @selectRequest query inputSer outputSer@ represents a
-- SelectRequest with the SQL query text given by @query@, the input
-- serialization settings (compression format and format information)
-- @inputSer@ and the output serialization settings @outputSer@.
selectRequest :: Text -> InputSerialization -> OutputSerialization
-> SelectRequest
selectRequest sqlQuery inputSer outputSer =
SelectRequest { srExpression = sqlQuery
, srExpressionType = SQL
, srInputSerialization = inputSer
, srOutputSerialization = outputSer
, srRequestProgressEnabled = Nothing
}
-- | setRequestProgressEnabled sets the flag for turning on progress
-- messages when the Select response is being streamed back to the
-- client.
setRequestProgressEnabled :: Bool -> SelectRequest -> SelectRequest
setRequestProgressEnabled enabled sr =
sr { srRequestProgressEnabled = Just enabled }
type CSVInputProp = CSVProp
-- | CSVProp represents CSV format properties. It is built up using
-- the Monoid instance.
data CSVProp = CSVProp (H.HashMap Text Text)
deriving (Eq, Show)
#if (__GLASGOW_HASKELL__ >= 804)
instance Semigroup CSVProp where
(CSVProp a) <> (CSVProp b) = CSVProp (b <> a)
#endif
instance Monoid CSVProp where
mempty = CSVProp mempty
#if (__GLASGOW_HASKELL__ < 804)
mappend (CSVProp a) (CSVProp b) = CSVProp (b <> a)
#endif
defaultCSVProp :: CSVProp
defaultCSVProp = mempty
recordDelimiter :: Text -> CSVProp
recordDelimiter = CSVProp . H.singleton "RecordDelimiter"
fieldDelimiter :: Text -> CSVProp
fieldDelimiter = CSVProp . H.singleton "FieldDelimiter"
quoteCharacter :: Text -> CSVProp
quoteCharacter = CSVProp . H.singleton "QuoteCharacter"
quoteEscapeCharacter :: Text -> CSVProp
quoteEscapeCharacter = CSVProp . H.singleton "QuoteEscapeCharacter"
-- | FileHeaderInfo specifies information about column headers for CSV
-- format.
data FileHeaderInfo
= FileHeaderNone -- ^ No column headers are present
| FileHeaderUse -- ^ Headers are present and they should be used
| FileHeaderIgnore -- ^ Header are present, but should be ignored
deriving (Eq, Show)
fileHeaderInfo :: FileHeaderInfo -> CSVProp
fileHeaderInfo = CSVProp . H.singleton "FileHeaderInfo" . toString
where
toString FileHeaderNone = "NONE"
toString FileHeaderUse = "USE"
toString FileHeaderIgnore = "IGNORE"
commentCharacter :: Text -> CSVProp
commentCharacter = CSVProp . H.singleton "Comments"
allowQuotedRecordDelimiter :: CSVProp
allowQuotedRecordDelimiter = CSVProp $ H.singleton "AllowQuotedRecordDelimiter" "TRUE"
-- | Set the CSV format properties in the InputSerialization.
setInputCSVProps :: CSVProp -> InputSerialization -> InputSerialization
setInputCSVProps p is = is { isFormatInfo = InputFormatCSV p }
-- | Set the CSV format properties in the OutputSerialization.
outputCSVFromProps :: CSVProp -> OutputSerialization
outputCSVFromProps p = OutputSerializationCSV p
data JSONInputProp = JSONInputProp { jsonipType :: JSONType }
deriving (Eq, Show)
data JSONType = JSONTypeDocument | JSONTypeLines
deriving (Eq, Show)
-- | OutputSerialization represents output serialization settings for
-- the SelectRequest. Use `defaultCsvOutput` or `defaultJsonOutput` as
-- a starting point.
data OutputSerialization = OutputSerializationJSON JSONOutputProp
| OutputSerializationCSV CSVOutputProp
deriving (Eq, Show)
type CSVOutputProp = CSVProp
-- | quoteFields is an output serialization parameter
quoteFields :: QuoteFields -> CSVProp
quoteFields q = CSVProp $ H.singleton "QuoteFields" $
case q of
QuoteFieldsAsNeeded -> "ASNEEDED"
QuoteFieldsAlways -> "ALWAYS"
data QuoteFields = QuoteFieldsAsNeeded | QuoteFieldsAlways
deriving (Eq, Show)
data JSONOutputProp = JSONOutputProp { jsonopRecordDelimiter :: Maybe Text }
deriving (Eq, Show)
-- | Set the output record delimiter for JSON format
outputJSONFromRecordDelimiter :: Text -> OutputSerialization
outputJSONFromRecordDelimiter t =
OutputSerializationJSON (JSONOutputProp $ Just t)
-- Response related types
-- | An EventMessage represents each kind of message received from the server.
data EventMessage = ProgressEventMessage { emProgress :: Progress }
| StatsEventMessage { emStats :: Stats }
| RequestLevelErrorMessage { emErrorCode :: Text
, emErrorMessage :: Text
}
| RecordPayloadEventMessage { emPayloadBytes :: ByteString }
deriving (Eq, Show)
data MsgHeaderName = MessageType
| EventType
| ContentType
| ErrorCode
| ErrorMessage
deriving (Eq, Show)
msgHeaderValueType :: Word8
msgHeaderValueType = 7
type MessageHeader = (MsgHeaderName, Text)
data Progress = Progress { pBytesScanned :: Int64
, pBytesProcessed :: Int64
, pBytesReturned :: Int64
}
deriving (Eq, Show)
type Stats = Progress
--------------------------------------------------------------------------
-- Select API Related Types End
--------------------------------------------------------------------------
-- | Represents different kinds of payload that are used with S3 API
-- requests.
data Payload = PayloadBS ByteString
@ -530,8 +763,8 @@ data AdminReqInfo = AdminReqInfo {
, ariQueryParams :: Query
}
data S3ReqInfo = S3ReqInfo {
riMethod :: Method
data S3ReqInfo = S3ReqInfo
{ riMethod :: Method
, riBucket :: Maybe Bucket
, riObject :: Maybe Object
, riQueryParams :: Query

View File

@ -51,6 +51,7 @@ data ServiceErr = BucketAlreadyExists
| NoSuchBucket
| InvalidBucketName
| NoSuchKey
| SelectErr Text Text
| ServiceErr Text Text
deriving (Show, Eq)

View File

@ -0,0 +1,302 @@
--
-- Minio Haskell SDK, (C) 2017-2019 Minio, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
module Network.Minio.SelectAPI
(
-- | The `selectObjectContent` allows querying CSV, JSON or Parquet
-- format objects in AWS S3 and in Minio using SQL Select
-- statements. This allows significant reduction of data transfer
-- from object storage for computation-intensive tasks, as relevant
-- data is filtered close to the storage.
selectObjectContent
, SelectRequest
, selectRequest
-- *** Input Serialization
-------------------------
, InputSerialization
, defaultCsvInput
, linesJsonInput
, documentJsonInput
, defaultParquetInput
, setInputCSVProps
, CompressionType(..)
, setInputCompressionType
-- *** CSV Format details
------------------------
-- | CSV format options such as delimiters and quote characters are
-- specified using using the functions below. Options are combined
-- monoidally.
, CSVProp
, recordDelimiter
, fieldDelimiter
, quoteCharacter
, quoteEscapeCharacter
, commentCharacter
, allowQuotedRecordDelimiter
, FileHeaderInfo(..)
, fileHeaderInfo
, QuoteFields(..)
, quoteFields
-- *** Output Serialization
-------------------------
, OutputSerialization
, defaultCsvOutput
, defaultJsonOutput
, outputCSVFromProps
, outputJSONFromRecordDelimiter
-- *** Progress messages
------------------------
, setRequestProgressEnabled
-- *** Interpreting Select output
--------------------------------------------
-- | The conduit returned by `selectObjectContent` returns values of
-- the `EventMessage` data type. This returns the query output
-- messages formatted according to the chosen output serialization,
-- interleaved with progress messages (if enabled by
-- `setRequestProgressEnabled`), and at the end a statistics
-- message.
--
-- If the application is interested in only the payload, then
-- `getPayloadBytes` can be used. For example to simply print the
-- payload to stdout:
--
-- > resultConduit <- selectObjectContent bucket object mySelectRequest
-- > runConduit $ resultConduit .| getPayloadBytes .| stdoutC
--
-- Note that runConduit, the connect operator (.|) and stdoutC are
-- all from the "conduit" package.
, getPayloadBytes
, EventMessage(..)
, Progress(..)
, Stats
) where
import Conduit ((.|))
import qualified Conduit as C
import qualified Data.Binary as Bin
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import Data.Digest.CRC32 (crc32, crc32Update)
import qualified Network.HTTP.Conduit as NC
import qualified Network.HTTP.Types as HT
import UnliftIO (MonadUnliftIO)
import Lib.Prelude
import Network.Minio.API
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.Utils
import Network.Minio.XmlGenerator
import Network.Minio.XmlParser
data EventStreamException = ESEPreludeCRCFailed
| ESEMessageCRCFailed
| ESEUnexpectedEndOfStream
| ESEDecodeFail [Char]
| ESEInvalidHeaderType
| ESEInvalidHeaderValueType
| ESEInvalidMessageType
deriving (Eq, Show)
instance Exception EventStreamException
-- chunkSize in bytes is 32KiB
chunkSize :: Int
chunkSize = 32 * 1024
parseBinary :: Bin.Binary a => ByteString -> IO a
parseBinary b = do
case Bin.decodeOrFail $ LB.fromStrict b of
Left (_, _, msg) -> throwIO $ ESEDecodeFail msg
Right (_, _, r) -> return r
bytesToHeaderName :: Text -> IO MsgHeaderName
bytesToHeaderName t = case t of
":message-type" -> return MessageType
":event-type" -> return EventType
":content-type" -> return ContentType
":error-code" -> return ErrorCode
":error-message" -> return ErrorMessage
_ -> throwIO ESEInvalidHeaderType
parseHeaders :: MonadUnliftIO m
=> Word32 -> C.ConduitM ByteString a m [MessageHeader]
parseHeaders 0 = return []
parseHeaders hdrLen = do
bs1 <- readNBytes 1
n :: Word8 <- liftIO $ parseBinary bs1
headerKeyBytes <- readNBytes $ fromIntegral n
let headerKey = decodeUtf8Lenient headerKeyBytes
headerName <- liftIO $ bytesToHeaderName headerKey
bs2 <- readNBytes 1
headerValueType :: Word8 <- liftIO $ parseBinary bs2
when (headerValueType /= 7) $ throwIO ESEInvalidHeaderValueType
bs3 <- readNBytes 2
vLen :: Word16 <- liftIO $ parseBinary bs3
headerValueBytes <- readNBytes $ fromIntegral vLen
let headerValue = decodeUtf8Lenient headerValueBytes
m = (headerName, headerValue)
k = 1 + fromIntegral n + 1 + 2 + fromIntegral vLen
ms <- parseHeaders (hdrLen - k)
return (m:ms)
-- readNBytes returns N bytes read from the string and throws an
-- exception if N bytes are not present on the stream.
readNBytes :: MonadUnliftIO m => Int -> C.ConduitM ByteString a m ByteString
readNBytes n = do
b <- LB.toStrict <$> (C.takeCE n .| C.sinkLazy)
if B.length b /= n
then throwIO ESEUnexpectedEndOfStream
else return b
crcCheck :: MonadUnliftIO m
=> C.ConduitM ByteString ByteString m ()
crcCheck = do
b <- readNBytes 12
n :: Word32 <- liftIO $ parseBinary $ B.take 4 b
preludeCRC :: Word32 <- liftIO $ parseBinary $ B.drop 8 b
when (crc32 (B.take 8 b) /= preludeCRC) $
throwIO ESEPreludeCRCFailed
-- we do not yield the checksum
C.yield $ B.take 8 b
-- 12 bytes have been read off the current message. Now read the
-- next (n-12)-4 bytes and accumulate the checksum, and yield it.
let startCrc = crc32 b
finalCrc <- accumulateYield (fromIntegral n-16) startCrc
bs <- readNBytes 4
expectedCrc :: Word32 <- liftIO $ parseBinary bs
when (finalCrc /= expectedCrc) $
throwIO ESEMessageCRCFailed
-- we unconditionally recurse - downstream figures out when to
-- quit reading the stream
crcCheck
where
accumulateYield n checkSum = do
let toRead = min n chunkSize
b <- readNBytes toRead
let c' = crc32Update checkSum b
n' = n - B.length b
C.yield b
if n' > 0
then accumulateYield n' c'
else return c'
handleMessage :: MonadUnliftIO m => C.ConduitT ByteString EventMessage m ()
handleMessage = do
b1 <- readNBytes 4
msgLen :: Word32 <- liftIO $ parseBinary b1
b2 <- readNBytes 4
hdrLen :: Word32 <- liftIO $ parseBinary b2
hs <- parseHeaders hdrLen
let payloadLen = msgLen - hdrLen - 16
getHdrVal h = fmap snd . headMay . filter ((h ==) . fst)
eventHdrValue = getHdrVal EventType hs
msgHdrValue = getHdrVal MessageType hs
errCode = getHdrVal ErrorCode hs
errMsg = getHdrVal ErrorMessage hs
case msgHdrValue of
Just "event" -> do
case eventHdrValue of
Just "Records" -> passThrough $ fromIntegral payloadLen
Just "Cont" -> return ()
Just "Progress" -> do
bs <- readNBytes $ fromIntegral payloadLen
progress <- parseSelectProgress bs
C.yield $ ProgressEventMessage progress
Just "Stats" -> do
bs <- readNBytes $ fromIntegral payloadLen
stats <- parseSelectProgress bs
C.yield $ StatsEventMessage stats
Just "End" -> return ()
_ -> throwIO ESEInvalidMessageType
when (eventHdrValue /= Just "End") handleMessage
Just "error" -> do
let reqMsgMay = RequestLevelErrorMessage <$> errCode <*> errMsg
maybe (throwIO ESEInvalidMessageType) C.yield reqMsgMay
_ -> throwIO ESEInvalidMessageType
where
passThrough 0 = return ()
passThrough n = do
let c = min n chunkSize
b <- readNBytes c
C.yield $ RecordPayloadEventMessage b
passThrough $ n - B.length b
selectProtoConduit :: MonadUnliftIO m
=> C.ConduitT ByteString EventMessage m ()
selectProtoConduit = crcCheck .| handleMessage
-- | selectObjectContent calls the SelectRequest on the given
-- object. It returns a Conduit of event messages that can be consumed
-- by the client.
selectObjectContent :: Bucket -> Object -> SelectRequest
-> Minio (C.ConduitT () EventMessage Minio ())
selectObjectContent b o r = do
let reqInfo = defaultS3ReqInfo { riMethod = HT.methodPost
, riBucket = Just b
, riObject = Just o
, riPayload = PayloadBS $ mkSelectRequest r
, riNeedsLocation = False
, riQueryParams = [("select", Nothing), ("select-type", Just "2")]
}
--print $ mkSelectRequest r
resp <- mkStreamRequest reqInfo
return $ NC.responseBody resp .| selectProtoConduit
-- | A helper conduit that returns only the record payload bytes.
getPayloadBytes :: MonadIO m => C.ConduitT EventMessage ByteString m ()
getPayloadBytes = do
evM <- C.await
case evM of
Just v -> do
case v of
RecordPayloadEventMessage b -> C.yield b
RequestLevelErrorMessage c m -> liftIO $ throwIO $ SelectErr c m
_ -> return ()
getPayloadBytes
Nothing -> return ()

View File

@ -18,11 +18,12 @@ module Network.Minio.XmlGenerator
( mkCreateBucketConfig
, mkCompleteMultipartUploadRequest
, mkPutNotificationRequest
, mkSelectRequest
) where
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Map as M
import qualified Data.HashMap.Strict as H
import qualified Data.Text as T
import Text.XML
@ -35,7 +36,7 @@ import Network.Minio.Data
mkCreateBucketConfig :: Text -> Region -> ByteString
mkCreateBucketConfig ns location = LBS.toStrict $ renderLBS def bucketConfig
where
s3Element n = Element (s3Name ns n) M.empty
s3Element n = Element (s3Name ns n) mempty
root = s3Element "CreateBucketConfiguration"
[ NodeElement $ s3Element "LocationConstraint"
[ NodeContent location]
@ -47,12 +48,12 @@ mkCompleteMultipartUploadRequest :: [PartTuple] -> ByteString
mkCompleteMultipartUploadRequest partInfo =
LBS.toStrict $ renderLBS def cmur
where
root = Element "CompleteMultipartUpload" M.empty $
root = Element "CompleteMultipartUpload" mempty $
map (NodeElement . mkPart) partInfo
mkPart (n, etag) = Element "Part" M.empty
[ NodeElement $ Element "PartNumber" M.empty
mkPart (n, etag) = Element "Part" mempty
[ NodeElement $ Element "PartNumber" mempty
[NodeContent $ T.pack $ show n]
, NodeElement $ Element "ETag" M.empty
, NodeElement $ Element "ETag" mempty
[NodeContent etag]
]
cmur = Document (Prologue [] Nothing []) root []
@ -67,9 +68,9 @@ toXML ns node = LBS.toStrict $ renderLBS def $
Document (Prologue [] Nothing []) (xmlNode node) []
where
xmlNode :: XNode -> Element
xmlNode (XNode name nodes) = Element (s3Name ns name) M.empty $
xmlNode (XNode name nodes) = Element (s3Name ns name) mempty $
map (NodeElement . xmlNode) nodes
xmlNode (XLeaf name content) = Element (s3Name ns name) M.empty
xmlNode (XLeaf name content) = Element (s3Name ns name) mempty
[NodeContent content]
class ToXNode a where
@ -100,3 +101,65 @@ getFRXNode (FilterRule n v) = XNode "FilterRule" [ XLeaf "Name" n
mkPutNotificationRequest :: Text -> Notification -> ByteString
mkPutNotificationRequest ns = toXML ns . toXNode
mkSelectRequest :: SelectRequest -> ByteString
mkSelectRequest r = LBS.toStrict $ renderLBS def sr
where
sr = Document (Prologue [] Nothing []) root []
root = Element "SelectRequest" mempty $
[ NodeElement (Element "Expression" mempty
[NodeContent $ srExpression r])
, NodeElement (Element "ExpressionType" mempty
[NodeContent $ show $ srExpressionType r])
, NodeElement (Element "InputSerialization" mempty $
inputSerializationNodes $ srInputSerialization r)
, NodeElement (Element "OutputSerialization" mempty $
outputSerializationNodes $ srOutputSerialization r)
] ++ maybe [] reqProgElem (srRequestProgressEnabled r)
reqProgElem enabled = [NodeElement
(Element "RequestProgress" mempty
[NodeElement
(Element "Enabled" mempty
[NodeContent
(if enabled then "TRUE" else "FALSE")]
)
]
)
]
inputSerializationNodes is = comprTypeNode (isCompressionType is) ++
[NodeElement $ formatNode (isFormatInfo is)]
comprTypeNode (Just c) = [NodeElement $ Element "CompressionType" mempty
[NodeContent $
if | c == CompressionTypeNone -> "NONE"
| c == CompressionTypeGzip -> "GZIP"
| c == CompressionTypeBzip2 -> "BZIP2"
]
]
comprTypeNode Nothing = []
kvElement (k, v) = Element (Name k Nothing Nothing) mempty [NodeContent v]
formatNode (InputFormatCSV (CSVProp h)) =
Element "CSV" mempty
(map NodeElement $ map kvElement $ H.toList h)
formatNode (InputFormatJSON p) =
Element "JSON" mempty
[NodeElement
(Element "Type" mempty
[NodeContent $
if | jsonipType p == JSONTypeDocument -> "DOCUMENT"
| jsonipType p == JSONTypeLines -> "LINES"
]
)
]
formatNode InputFormatParquet = Element "Parquet" mempty []
outputSerializationNodes (OutputSerializationJSON j) =
[NodeElement (Element "JSON" mempty $
rdElem $ jsonopRecordDelimiter j)]
outputSerializationNodes (OutputSerializationCSV (CSVProp h)) =
[NodeElement $ Element "CSV" mempty
(map NodeElement $ map kvElement $ H.toList h)]
rdElem Nothing = []
rdElem (Just t) = [NodeElement $ Element "RecordDelimiter" mempty
[NodeContent t]]

View File

@ -26,8 +26,10 @@ module Network.Minio.XmlParser
, parseListPartsResponse
, parseErrResponse
, parseNotification
, parseSelectProgress
) where
import qualified Data.ByteString.Lazy as LB
import Data.List (zip3, zip4, zip5)
import qualified Data.Map as Map
import qualified Data.Text as T
@ -261,3 +263,13 @@ parseNotification xmldata = do
s3Elem ns "FilterRule" &| getFilterRule ns
return $ NotificationConfig id arn events
(Filter $ FilterKey $ FilterRules rules)
parseSelectProgress :: MonadIO m => ByteString -> m Progress
parseSelectProgress xmldata = do
r <- parseRoot $ LB.fromStrict xmldata
let bScanned = T.concat $ r $/ element "BytesScanned" &/ content
bProcessed = T.concat $ r $/element "BytesProcessed" &/ content
bReturned = T.concat $ r $/element "BytesReturned" &/ content
Progress <$> parseDecimal bScanned
<*> parseDecimal bProcessed
<*> parseDecimal bReturned

View File

@ -15,7 +15,7 @@
# resolver:
# name: custom-snapshot
# location: "./custom-snapshot.yaml"
resolver: lts-11.1
resolver: lts-13.1
# User packages to be built.
# Various formats can be used as shown in the example below.

View File

@ -14,12 +14,14 @@
-- limitations under the License.
--
{-# LANGUAGE QuasiQuotes #-}
module Network.Minio.XmlGenerator.Test
( xmlGeneratorTests
) where
import Test.Tasty
import Test.Tasty.HUnit
import Text.RawString.QQ (r)
import Lib.Prelude
@ -33,6 +35,7 @@ xmlGeneratorTests = testGroup "XML Generator Tests"
[ testCase "Test mkCreateBucketConfig" testMkCreateBucketConfig
, testCase "Test mkCompleteMultipartUploadRequest" testMkCompleteMultipartUploadRequest
, testCase "Test mkPutNotificationRequest" testMkPutNotificationRequest
, testCase "Test mkSelectRequest" testMkSelectRequest
]
testMkCreateBucketConfig :: Assertion
@ -95,3 +98,46 @@ testMkPutNotificationRequest =
[ObjectCreated] defaultFilter
]
]
testMkSelectRequest :: Assertion
testMkSelectRequest = mapM_ assertFn cases
where
assertFn (a, b) = assertEqual "selectRequest XML should match: " b $ mkSelectRequest a
cases = [ ( SelectRequest "Select * from S3Object" SQL
(InputSerialization (Just CompressionTypeGzip)
(InputFormatCSV $ fileHeaderInfo FileHeaderIgnore
<> recordDelimiter "\n"
<> fieldDelimiter ","
<> quoteCharacter "\""
<> quoteEscapeCharacter "\""
))
(OutputSerializationCSV $ quoteFields QuoteFieldsAsNeeded
<> recordDelimiter "\n"
<> fieldDelimiter ","
<> quoteCharacter "\""
<> quoteEscapeCharacter "\""
)
(Just False)
, [r|<?xml version="1.0" encoding="UTF-8"?><SelectRequest><Expression>Select * from S3Object</Expression><ExpressionType>SQL</ExpressionType><InputSerialization><CompressionType>GZIP</CompressionType><CSV><QuoteCharacter>&#34;</QuoteCharacter><RecordDelimiter>
</RecordDelimiter><FileHeaderInfo>IGNORE</FileHeaderInfo><QuoteEscapeCharacter>&#34;</QuoteEscapeCharacter><FieldDelimiter>,</FieldDelimiter></CSV></InputSerialization><OutputSerialization><CSV><QuoteCharacter>&#34;</QuoteCharacter><QuoteFields>ASNEEDED</QuoteFields><RecordDelimiter>
</RecordDelimiter><QuoteEscapeCharacter>&#34;</QuoteEscapeCharacter><FieldDelimiter>,</FieldDelimiter></CSV></OutputSerialization><RequestProgress><Enabled>FALSE</Enabled></RequestProgress></SelectRequest>|]
)
, ( setRequestProgressEnabled False $
setInputCompressionType CompressionTypeGzip $
selectRequest "Select * from S3Object" documentJsonInput
(outputJSONFromRecordDelimiter "\n")
, [r|<?xml version="1.0" encoding="UTF-8"?><SelectRequest><Expression>Select * from S3Object</Expression><ExpressionType>SQL</ExpressionType><InputSerialization><CompressionType>GZIP</CompressionType><JSON><Type>DOCUMENT</Type></JSON></InputSerialization><OutputSerialization><JSON><RecordDelimiter>
</RecordDelimiter></JSON></OutputSerialization><RequestProgress><Enabled>FALSE</Enabled></RequestProgress></SelectRequest>|]
)
, ( setRequestProgressEnabled False $
setInputCompressionType CompressionTypeNone $
selectRequest "Select * from S3Object" defaultParquetInput
(outputCSVFromProps $ quoteFields QuoteFieldsAsNeeded
<> recordDelimiter "\n"
<> fieldDelimiter ","
<> quoteCharacter "\""
<> quoteEscapeCharacter "\"")
, [r|<?xml version="1.0" encoding="UTF-8"?><SelectRequest><Expression>Select * from S3Object</Expression><ExpressionType>SQL</ExpressionType><InputSerialization><CompressionType>NONE</CompressionType><Parquet/></InputSerialization><OutputSerialization><CSV><QuoteCharacter>&#34;</QuoteCharacter><QuoteFields>ASNEEDED</QuoteFields><RecordDelimiter>
</RecordDelimiter><QuoteEscapeCharacter>&#34;</QuoteEscapeCharacter><FieldDelimiter>,</FieldDelimiter></CSV></OutputSerialization><RequestProgress><Enabled>FALSE</Enabled></RequestProgress></SelectRequest>|]
)
]

View File

@ -14,15 +14,16 @@
-- limitations under the License.
--
{-# LANGUAGE QuasiQuotes #-}
module Network.Minio.XmlParser.Test
(
xmlParserTests
( xmlParserTests
) where
import qualified Data.Map as Map
import Data.Time (fromGregorian)
import Test.Tasty
import Test.Tasty.HUnit
import Text.RawString.QQ (r)
import UnliftIO (MonadUnliftIO)
import Lib.Prelude
@ -43,6 +44,7 @@ xmlParserTests = testGroup "XML Parser Tests"
, testCase "Test parseListPartsResponse" testParseListPartsResponse
, testCase "Test parseCopyObjectResponse" testParseCopyObjectResponse
, testCase "Test parseNotification" testParseNotification
, testCase "Test parseSelectProgress" testParseSelectProgress
]
tryValidationErr :: (MonadUnliftIO m) => m a -> m (Either MErrV a)
@ -356,3 +358,24 @@ testParseNotification = do
forM_ cases $ \(xmldata, val) -> do
result <- runExceptT $ runTestNS $ parseNotification xmldata
eitherValidationErr result (@?= val)
-- | Tests parsing of both progress and stats
testParseSelectProgress :: Assertion
testParseSelectProgress = do
let cases = [ ([r|<?xml version="1.0" encoding="UTF-8"?>
<Progress>
<BytesScanned>512</BytesScanned>
<BytesProcessed>1024</BytesProcessed>
<BytesReturned>1024</BytesReturned>
</Progress>|] , Progress 512 1024 1024)
, ([r|<?xml version="1.0" encoding="UTF-8"?>
<Stats>
<BytesScanned>512</BytesScanned>
<BytesProcessed>1024</BytesProcessed>
<BytesReturned>1024</BytesReturned>
</Stats>|], Progress 512 1024 1024)
]
forM_ cases $ \(xmldata, progress) -> do
result <- runExceptT $ parseSelectProgress xmldata
eitherValidationErr result (@?= progress)