This repository has been archived on 2024-10-24. You can view files and clone it, but cannot push or open issues or pull requests.
fradrive-old/src/Jobs/Handler/Files.hs

426 lines
23 KiB
Haskell

{-# LANGUAGE BangPatterns #-}
module Jobs.Handler.Files
( dispatchJobPruneSessionFiles
, dispatchJobPruneUnreferencedFiles
, dispatchJobInjectFiles, dispatchJobRechunkFiles
, dispatchJobDetectMissingFiles
) where
import Import hiding (matching, maximumBy, init)
import Database.Persist.Sql (deleteWhereCount)
import qualified Database.Esqueleto as E
import qualified Database.Esqueleto.PostgreSQL as E
import qualified Database.Esqueleto.Utils as E
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as C (mapMaybe, unfoldM)
import Handler.Utils.Minio
import qualified Network.Minio as Minio
import Crypto.Hash (hashDigestSize, digestFromByteString)
import Data.List ((!!), unfoldr, maximumBy, init, genericLength)
import qualified Data.ByteString as ByteString
import Data.Bits (Bits(shiftR))
import qualified Data.Map.Strict as Map
import Control.Monad.Random.Lazy (evalRand, mkStdGen)
import System.Random.Shuffle (shuffleM)
import System.IO.Unsafe
import Handler.Utils.Files (sourceFileDB)
import Control.Monad.Logger (askLoggerIO, runLoggingT)
import System.Clock
import qualified Data.Set as Set
import qualified Data.Sequence as Seq
import Jobs.Queue (YesodJobDB)
dispatchJobPruneSessionFiles :: JobHandler UniWorX
dispatchJobPruneSessionFiles = JobHandlerAtomicWithFinalizer act fin
where
act = hoist lift $ do
now <- liftIO getCurrentTime
expires <- getsYesod $ view _appSessionFilesExpire
deleteWhereCount [ SessionFileTouched <. addUTCTime (- expires) now ]
fin n = $logInfoS "PruneSessionFiles" [st|Deleted #{n} expired session files|]
fileReferences :: E.SqlExpr (E.Value FileContentReference) -> [E.SqlQuery ()]
fileReferences (E.just -> fHash)
= [ E.from $ \appFile -> E.where_ $ appFile E.^. CourseApplicationFileContent E.==. fHash
, E.from $ \matFile -> E.where_ $ matFile E.^. MaterialFileContent E.==. fHash
, E.from $ \newsFile -> E.where_ $ newsFile E.^. CourseNewsFileContent E.==. fHash
, E.from $ \sheetFile -> E.where_ $ sheetFile E.^. SheetFileContent E.==. fHash
, E.from $ \appInstr -> E.where_ $ appInstr E.^. CourseAppInstructionFileContent E.==. fHash
, E.from $ \matching -> E.where_ $ E.just (matching E.^. AllocationMatchingLog) E.==. fHash
, E.from $ \subFile -> E.where_ $ subFile E.^. SubmissionFileContent E.==. fHash
, E.from $ \sessFile -> E.where_ $ sessFile E.^. SessionFileContent E.==. fHash
, E.from $ \lock -> E.where_ $ E.just (lock E.^. FileLockContent) E.==. fHash
, E.from $ \chunkLock -> E.where_ . E.exists . E.from $ \fileContentEntry ->
E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. fHash
E.&&. chunkLock E.^. FileChunkLockHash E.==. E.subSelectForeign fileContentEntry FileContentEntryChunkHash (E.^. FileContentChunkHash)
]
workflowFileReferences :: MonadResource m => ConduitT () FileContentReference (SqlPersistT m) ()
workflowFileReferences = mconcat [ E.selectSource (E.from $ pure . (E.^. WorkflowDefinitionGraph)) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)
, E.selectSource (E.from $ pure . (E.^. WorkflowInstanceGraph )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)
, E.selectSource (E.from $ pure . (E.^. WorkflowWorkflowGraph )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)
, E.selectSource (E.from $ pure . (E.^. WorkflowWorkflowState )) .| awaitForever (mapMOf_ (typesCustom @WorkflowChildren . _fileReferenceContent . _Just) yield . E.unValue)
]
dispatchJobDetectMissingFiles :: JobHandler UniWorX
dispatchJobDetectMissingFiles = JobHandlerAtomicWithFinalizer act fin
where
act :: YesodJobDB UniWorX (Map Text (NonNull (Set FileContentReference)))
act = hoist lift $ do
uploadBucket <- getsYesod $ view _appUploadCacheBucket
missingDb <- execWriterT $ do
tellM . forM trackedReferences $ \refQuery ->
fmap (Set.fromList . mapMaybe E.unValue) . E.select $ do
ref <- refQuery
E.where_ . E.not_ $ E.isNothing ref
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
E.where_ $ E.just (fileContentEntry E.^. FileContentEntryHash) E.==. ref
E.distinctOnOrderBy [E.asc ref] $ return ref
tellM . fmap (Map.singleton "workflows") . runConduit $ workflowFileReferences .| C.foldMap Set.singleton
let allMissingDb :: Set Minio.Object
allMissingDb = setOf (folded . folded . re minioFileReference) missingDb
filterMissingDb :: forall m. Monad m
=> Set Minio.Object
-> ConduitT Minio.ListItem (Set Minio.Object) m ()
filterMissingDb remaining = maybeT (yield remaining) $ do
nextMinio <- MaybeT await
remaining' <- case nextMinio of
Minio.ListItemObject oi -> do
let (missingMinio, remaining') = Set.split (Minio.oiObject oi) remaining
lift $ yield missingMinio
return remaining'
_other -> return remaining
lift $ filterMissingDb remaining'
allMissingMinio <- maybeT (return $ fold missingDb) . hoistMaybeM . runAppMinio . runMaybeT . runConduit $
transPipe lift (Minio.listObjects uploadBucket Nothing True)
.| filterMissingDb allMissingDb
.| C.foldMapE (setOf minioFileReference)
return $ Map.mapMaybe (fromNullable . Set.intersection allMissingMinio) missingDb
fin :: Map Text (NonNull (Set FileContentReference)) -> Handler ()
fin missingCounts = do
forM_ (Map.keysSet trackedReferences) $ \refIdent ->
observeMissingFiles refIdent . maybe 0 olength $ missingCounts Map.!? refIdent
iforM_ missingCounts $ \refIdent missingFiles
-> let missingRefs = unlines . map tshow . Set.toList $ toNullable missingFiles
in $logErrorS "MissingFiles" [st|#{refIdent}: #{olength missingFiles}\n#{missingRefs}|]
when (Map.null missingCounts) $
$logInfoS "MissingFiles" [st|No missing files|]
trackedReferences = Map.fromList $ over (traverse . _1) nameToPathPiece
[ (''CourseApplicationFile, E.from $ \appFile -> return $ appFile E.^. CourseApplicationFileContent )
, (''MaterialFile, E.from $ \matFile -> return $ matFile E.^. MaterialFileContent )
, (''CourseNewsFile, E.from $ \newsFile -> return $ newsFile E.^. CourseNewsFileContent )
, (''SheetFile, E.from $ \sheetFile -> return $ sheetFile E.^. SheetFileContent )
, (''CourseAppInstructionFile, E.from $ \appInstr -> return $ appInstr E.^. CourseAppInstructionFileContent)
, (''SubmissionFile, E.from $ \subFile -> return $ subFile E.^. SubmissionFileContent )
, (''SessionFile, E.from $ \sessFile -> return $ sessFile E.^. SessionFileContent )
, (''AllocationMatching, E.from $ \matching -> return . E.just $ matching E.^. AllocationMatchingLog )
]
{-# NOINLINE pruneUnreferencedFilesIntervalsCache #-}
pruneUnreferencedFilesIntervalsCache :: TVar (Map Natural [(Maybe FileContentChunkReference, Maybe FileContentChunkReference)])
pruneUnreferencedFilesIntervalsCache = unsafePerformIO $ newTVarIO Map.empty
dispatchJobPruneUnreferencedFiles :: Natural -> Natural -> Natural -> JobHandler UniWorX
dispatchJobPruneUnreferencedFiles numIterations epoch iteration = JobHandlerAtomicWithFinalizer act fin
where
act = hoist lift $ do
now <- liftIO getCurrentTime
interval <- getsYesod $ view _appPruneUnreferencedFilesInterval
keep <- fmap (max 0) . getsYesod $ view _appKeepUnreferencedFiles
let
chunkHashBytes :: forall h.
( Unwrapped FileContentChunkReference ~ Digest h )
=> Integer
chunkHashBytes = fromIntegral (hashDigestSize (error "hashDigestSize inspected argument" :: h))
chunkHashBits = chunkHashBytes * 8
base :: Integer
base = 2 ^ chunkHashBits
intervals :: [Integer]
-- | Exclusive upper bounds
intervals
| numIterations <= 0 = pure base
| otherwise = go protoIntervals ^.. folded . _1
where
go [] = []
go ints
| maximumOf (folded . _1) ints == Just base = ints
| otherwise = go $ lts ++ over _1 succ (over _2 (subtract $ toInteger numIterations) closest) : map (over _1 succ) gts
where
closest = maximumBy (comparing $ view _2) ints
(lts, geqs) = partition (((>) `on` view _1) closest) ints
gts = filter (((<) `on` view _1) closest) geqs
-- | Exclusive upper bounds
protoIntervals :: [(Integer, Integer)]
protoIntervals = [ over _1 (i *) $ base `divMod` toInteger numIterations
| i <- [1 .. toInteger numIterations]
]
intervalsDgsts' = zipWith (curry . over both $ toDigest <=< assertM' (> 0)) (0 : init intervals) intervals
toDigest :: Integer -> Maybe FileContentChunkReference
toDigest = fmap (review _Wrapped) . digestFromByteString . pad . ByteString.pack . reverse . unfoldr step
where step i
| i <= 0 = Nothing
| otherwise = Just (fromIntegral i, i `shiftR` 8)
pad bs
| toInteger (ByteString.length bs) >= chunkHashBytes = bs
| otherwise = pad $ ByteString.cons 0 bs
intervalsDgsts <- atomically $ do
cachedDgsts <- readTVar pruneUnreferencedFilesIntervalsCache
case Map.lookup numIterations cachedDgsts of
Just c -> return c
Nothing -> do
modifyTVar' pruneUnreferencedFilesIntervalsCache $ force . Map.insert numIterations intervalsDgsts'
return intervalsDgsts'
let
permIntervalsDgsts = shuffleM intervalsDgsts `evalRand` mkStdGen (hash epoch)
(minBoundDgst, maxBoundDgst) = permIntervalsDgsts !! fromIntegral (toInteger iteration `mod` genericLength permIntervalsDgsts)
chunkIdFilter :: E.SqlExpr (E.Value FileContentChunkReference) -> E.SqlExpr (E.Value Bool)
chunkIdFilter cRef = E.and $ catMaybes
[ minBoundDgst <&> \b -> cRef E.>=. E.val b
, maxBoundDgst <&> \b -> cRef E.<. E.val b
]
$logDebugS "PruneUnreferencedFiles" . tshow $ (minBoundDgst, maxBoundDgst)
E.insertSelectWithConflict
(UniqueFileContentChunkUnreferenced $ error "insertSelectWithConflict inspected constraint")
(E.from $ \fileContentChunk -> do
E.where_ . E.not_ . E.subSelectOr . E.from $ \fileContentEntry -> do
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunk E.^. FileContentChunkId
return $ E.any E.exists (fileReferences $ fileContentEntry E.^. FileContentEntryHash)
E.where_ . chunkIdFilter $ fileContentChunk E.^. FileContentChunkHash
return $ FileContentChunkUnreferenced E.<# (fileContentChunk E.^. FileContentChunkId) E.<&> E.val now
)
(\current excluded ->
[ FileContentChunkUnreferencedSince E.=. E.min (current E.^. FileContentChunkUnreferencedSince) (excluded E.^. FileContentChunkUnreferencedSince) ]
)
E.delete . E.from $ \fileContentChunkUnreferenced -> do
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
return $ E.any E.exists (fileReferences $ fileContentEntry E.^. FileContentEntryHash)
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
let unmarkWorkflowFiles (otoList -> fRefs) = E.delete . E.from $ \fileContentChunkUnreferenced -> do
E.where_ . E.subSelectOr . E.from $ \fileContentEntry -> do
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
return $ fileContentEntry E.^. FileContentEntryHash `E.in_` E.valList fRefs
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
chunkSize = 100
in runConduit $ workflowFileReferences .| C.map Seq.singleton .| C.chunksOfE chunkSize .| C.mapM_ unmarkWorkflowFiles
let
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> do
let unreferencedSince = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunkUnreferenced) -> do
E.on $ fileContentEntry' E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
return . E.max_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince
E.where_ $ E.maybe E.false (E.<. E.val (addUTCTime (-keep) now)) unreferencedSince
E.groupBy $ fileContentEntry E.^. FileContentEntryHash
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryHash ]
return $ fileContentEntry E.^. FileContentEntryHash
deleteEntry :: _ -> DB (Sum Natural)
deleteEntry (E.Value fRef) =
bool 0 1 . (> 0) <$> deleteWhereCount [FileContentEntryHash ==. fRef]
Sum deletedEntries <- runConduit $
getEntryCandidates
.| takeWhileTime (interval / 3)
.| C.mapM deleteEntry
.| C.fold
let
getChunkCandidates = E.selectSource . E.from $ \fileContentChunkUnreferenced -> do
E.where_ $ fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedSince E.<. E.val (addUTCTime (-keep) now)
E.where_ . E.not_ . E.exists . E.from $ \fileContentEntry ->
E.where_ $ fileContentEntry E.^. FileContentEntryChunkHash E.==. fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
E.where_ . chunkIdFilter $ E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash (E.^. FileContentChunkHash)
return ( fileContentChunkUnreferenced E.^. FileContentChunkUnreferencedHash
, E.subSelectForeign fileContentChunkUnreferenced FileContentChunkUnreferencedHash $ E.length_ . (E.^. FileContentChunkContent)
)
deleteChunk :: _ -> DB (Sum Natural, Sum Word64)
deleteChunk (E.Value cRef, E.Value size) = do
deleteWhere [ FileContentChunkUnreferencedHash ==. cRef ]
(, Sum size) . fromIntegral <$> deleteWhereCount [FileContentChunkHash ==. unFileContentChunkKey cRef]
(Sum deletedChunks, Sum deletedChunkSize) <- runConduit $
getChunkCandidates
.| takeWhileTime (interval / 3)
.| persistentTokenBucketTakeC' TokenBucketPruneFiles (view $ _2 . _Value :: _ -> Word64)
.| C.mapM deleteChunk
.| C.fold
return (deletedEntries, deletedChunks, deletedChunkSize)
fin (deletedEntries, deletedChunks, deletedChunkSize) = do
observeDeletedUnreferencedFiles deletedEntries
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedEntries} long-unreferenced files|]
observeDeletedUnreferencedChunks deletedChunks deletedChunkSize
$logInfoS "PruneUnreferencedFiles" [st|Deleted #{tshow deletedChunks} chunks (#{textBytes deletedChunkSize})|]
dispatchJobInjectFiles :: JobHandler UniWorX
dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
uploadBucket <- getsYesod $ view _appUploadCacheBucket
interval <- getsYesod $ view _appInjectFiles
let
extractReference (Minio.ListItemObject oi) = (oi, ) <$> Minio.oiObject oi ^? minioFileReference
extractReference _ = Nothing
injectOrDelete :: (Minio.ObjectInfo, FileContentReference)
-> Handler (Sum Natural, Sum Word64)
injectOrDelete (objInfo, fRef) = do
let obj = Minio.oiObject objInfo
sz = fromIntegral $ Minio.oiSize objInfo
fRef' <- runDB $ do
chunkVar <- newEmptyTMVarIO
dbAsync <- allocateLinkedAsync $ do
atomically $ isEmptyTMVar chunkVar >>= guard . not
sinkFileDB False $ C.unfoldM (\x -> fmap (, x) <$> atomically (takeTMVar chunkVar)) ()
logger <- askLoggerIO
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
let sendChunks = go 0 0 Nothing =<< liftIO (getTime Monotonic)
where
go :: forall m. MonadIO m => Natural -> Int64 -> Maybe TimeSpec -> TimeSpec -> ConduitT ByteString Void m ()
go c accsz lastReport startT = do
currT <- liftIO $ getTime Monotonic
chunk' <- await
whenIsJust chunk' $ \chunk -> do
let csz = fromIntegral $ olength chunk
!c' = succ c
!sz' = accsz + csz
!lastReport'
| toRational currT - toRational (fromMaybe startT lastReport) > 5 = Just currT
| otherwise = lastReport
when (csz > 0) $ do
let p :: Centi
p = realToFrac $ (toInteger sz' % toInteger sz) * 100
eta :: Maybe Integer
eta = do
accsz' <- assertM' (/= 0) accsz
return . ceiling $ (toRational currT - toRational startT) / fromIntegral accsz' * (fromIntegral sz - fromIntegral accsz)
when (lastReport' /= lastReport || sz' >= fromIntegral sz) $
flip runLoggingT logger . $logInfoS "InjectFiles" . mconcat $ catMaybes
[ pure [st|Sinking chunk ##{tshow c} (#{textBytes csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%)|]
, eta <&> \eta' -> [st| ETA #{textDuration eta'}|]
, pure "..."
]
atomically . putTMVar chunkVar $ Just chunk
go c' sz' lastReport' startT
lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks
return True
if
| not didSend -> Nothing <$ cancel dbAsync
| otherwise -> do
atomically $ putTMVar chunkVar Nothing
Just <$> waitAsync dbAsync
let matchesFRef = is _Just $ assertM (== fRef) fRef'
if | matchesFRef ->
maybeT (return ()) . runAppMinio . handleIf minioIsDoesNotExist (const $ return ()) $ Minio.removeObject uploadBucket obj
| otherwise ->
$logErrorS "InjectFiles" [st|Minio object #{obj}'s content does not match it's name (content hash: #{tshow fRef'} /= name hash: #{tshow fRef})|]
return . bool mempty (Sum 1, Sum sz) $ is _Just fRef'
(Sum injectedFiles, Sum injectedSize) <-
runConduit $ transPipe runAppMinio (Minio.listObjects uploadBucket Nothing True)
.| C.mapMaybe extractReference
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize)
.| C.mapM (lift . injectOrDelete)
.| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeInjectedFiles inj sz)
.| C.fold
$logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{textBytes injectedSize})|]
data RechunkFileException
= RechunkFileExceptionHashMismatch
{ oldHash, newHash :: FileContentReference }
deriving (Eq, Ord, Show, Generic, Typeable)
deriving anyclass (Exception)
dispatchJobRechunkFiles :: JobHandler UniWorX
dispatchJobRechunkFiles = JobHandlerAtomicWithFinalizer act fin
where
act = hoist lift $ do
interval <- getsYesod $ view _appRechunkFiles
let
getEntryCandidates = E.selectSource . E.from $ \fileContentEntry -> E.distinctOnOrderBy [E.asc $ fileContentEntry E.^. FileContentEntryHash] $ do
E.where_ . E.exists . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
E.where_ . E.not_ $ fileContentChunk E.^. FileContentChunkContentBased
let size = E.subSelectMaybe . E.from $ \(fileContentEntry' `E.InnerJoin` fileContentChunk) -> do
E.on $ fileContentChunk E.^. FileContentChunkId E.==. fileContentEntry' E.^. FileContentEntryChunkHash
E.where_ $ fileContentEntry' E.^. FileContentEntryHash E.==. fileContentEntry E.^. FileContentEntryHash
return $ E.sum_ (E.length_ $ fileContentChunk E.^. FileContentChunkContent:: E.SqlExpr (E.Value Word64))
return ( fileContentEntry E.^. FileContentEntryHash
, size
)
rechunkFile :: FileContentReference -> Word64 -> DB (Sum Natural, Sum Word64)
rechunkFile fRef sz = do
fRef' <- sinkFileDB True $ sourceFileDB fRef
unless (fRef == fRef') $
throwM $ RechunkFileExceptionHashMismatch fRef fRef'
return (Sum 1, Sum sz)
(Sum rechunkedFiles, Sum rechunkedSize) <- runConduit $
getEntryCandidates
.| C.mapMaybe (\(E.Value fRef, E.Value sz) -> (fRef, ) <$> sz)
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
.| persistentTokenBucketTakeC' TokenBucketRechunkFiles (view _2 :: _ -> Word64)
.| C.mapM (uncurry rechunkFile)
.| C.fold
return (rechunkedFiles, rechunkedSize)
fin (rechunkedFiles, rechunkedSize) = do
observeRechunkedFiles rechunkedFiles rechunkedSize
$logInfoS "RechunkFiles" [st|Rechunked #{tshow rechunkedFiles} files in database (#{textBytes rechunkedSize} bytes)|]