From 2e4e1a94c9bd01eb60a42a2e71b76704cf9cabb2 Mon Sep 17 00:00:00 2001 From: Steffen Date: Wed, 24 Apr 2024 18:01:44 +0200 Subject: [PATCH] refactor(avs): rewrite AVS synch (WIP) --- models/avs.model | 2 +- src/Database/Esqueleto/Utils.hs | 7 +++ src/Handler/Utils/Avs.hs | 75 +++++++++++++++++++----------- src/Jobs/Handler/SynchroniseAvs.hs | 41 ++++++++++++++-- src/Utils/DB.hs | 11 +++-- src/Utils/Set.hs | 6 ++- 6 files changed, 105 insertions(+), 37 deletions(-) diff --git a/models/avs.model b/models/avs.model index 47d3f48b3..067fb9d21 100644 --- a/models/avs.model +++ b/models/avs.model @@ -29,6 +29,6 @@ UserAvs AvsSync user UserId -- Note: we need to lookup UserAvs Entity anyway, so no benefit from storing AvsPersonId here creationTime UTCTime - pause Day Maybe + pause Day Maybe -- Don't synch if last synch after this day, otherwise synch UniqueAvsSyncUser user deriving Generic \ No newline at end of file diff --git a/src/Database/Esqueleto/Utils.hs b/src/Database/Esqueleto/Utils.hs index 3ca29691b..499cded08 100644 --- a/src/Database/Esqueleto/Utils.hs +++ b/src/Database/Esqueleto/Utils.hs @@ -52,6 +52,7 @@ module Database.Esqueleto.Utils , day, day', dayMaybe, interval, diffDays, diffTimes , exprLift , explicitUnsafeCoerceSqlExprValue + , truncateTable , module Database.Esqueleto.Utils.TH ) where @@ -68,6 +69,8 @@ import qualified Database.Esqueleto.PostgreSQL as E import qualified Database.Esqueleto.Internal.Internal as E import Database.Esqueleto.Utils.TH +import qualified Database.Persist.Postgresql as P + import qualified Data.Text as Text import qualified Data.Text.Lazy as Lazy (Text) import qualified Data.ByteString.Lazy as Lazy (ByteString) @@ -768,3 +771,7 @@ instance (PersistField a1, PersistField a2, PersistField b, Finite a1, Finite a2 ] (E.else_ $ E.else_ $ E.veryUnsafeCoerceSqlExprValue (E.nothing :: E.SqlExpr (E.Value (Maybe ())))) + +truncateTable :: (MonadIO m, BackendCompatible SqlBackend backend, PersistEntity record) + => record -> ReaderT backend m () +truncateTable tbl = E.rawExecute ("TRUNCATE TABLE " <> P.tableName tbl <> " RESTART IDENTITY") [] \ No newline at end of file diff --git a/src/Handler/Utils/Avs.hs b/src/Handler/Utils/Avs.hs index c3d2a8c65..f501a830e 100644 --- a/src/Handler/Utils/Avs.hs +++ b/src/Handler/Utils/Avs.hs @@ -12,7 +12,10 @@ module Handler.Utils.Avs ( guessAvsUser - , upsertAvsUserById, upsertAvsUserByCard + , upsertAvsUserByCard + , upsertAvsUserById + , updateAvsUserByIds + , linktoAvsUserByUIDs -- , getLicence, getLicenceDB, getLicenceByAvsId -- not supported by interface , AvsLicenceDifferences(..) , setLicence, setLicenceAvs, setLicencesAvs @@ -312,20 +315,49 @@ updateRecord ent new (CheckAvsUpdate up l) = lensRec = fieldLensVal up in ent & lensRec .~ newval --- | Update given AvsPersonId by querying AVS for each; update only, no insertion! Uses batch mechanism, but single query may throw -updateAvsUserByIds :: Set AvsPersonId -> DB (Set (AvsPersonId, UserId)) -updateAvsUserByIds apids0 = do - apids <- Set.fromList <$> E.filterExists UserAvsPersonId apids0 +-- | shall not throw, updates exisitng and attempts to link users with yet unknown AVSIDs + + + +linktoAvsUserByUIDs :: Set UserId -> Handler () +linktoAvsUserByUIDs = error "TODO: Not yet implemented." + +-- | Like `updateAvsUserByIds`, but exceptions are not caught here to allow rollbacks +updateAvsUserById :: AvsPersonId -> DB (Maybe UserId) +updateAvsUserById apid = do + AvsResponseContact adcs <- avsQuery $ AvsQueryContact $ Set.singleton $ AvsObjPersonId apid + let res = Set.filter ((== apid) . avsContactPersonID) adcs + snd <<$>> traverseJoin updateAvsUserByADC (Set.lookupMax res) + +-- | Update given AvsPersonIds by querying AVS for each; update only, no insertion! Uses batch mechanism abd should not throw. Each user dealt within own runDB, i.e. own DB transaction +updateAvsUserByIds :: Set AvsPersonId -> Handler (Set (AvsPersonId, UserId)) +updateAvsUserByIds apids = do + -- apids <- Set.fromList <$> E.filterExists UserAvsPersonId apids0 --not needed anymore, we expect the set to be linked AvsResponseContact adcs <- avsQuery $ AvsQueryContact $ Set.mapMonotonic AvsObjPersonId apids -- automatically batched let requestedAnswers = Set.filter (view (_avsContactPersonID . to (`Set.member` apids))) adcs -- should not occur, neither should one apid occur multiple times within the response (if so, all responses processed here in random order) - res <- foldMapM procResp requestedAnswers - let missing = Set.toList $ Set.difference apids $ Set.map fst res + (oks,bad) <- foldlM procResp mempty requestedAnswers + let missing = Set.toList $ Set.difference (Set.difference apids $ Set.map fst oks) bad unless (null missing) $ do now <- liftIO getCurrentTime - updateWhere [UserAvsPersonId <-. missing] [UserAvsLastSynch =. now, UserAvsLastSynchError =. Just "Contact unknown for AvsPersonId"] -- all others were already marked as updated - return res + runDB $ updateWhere [UserAvsPersonId <-. missing] [UserAvsLastSynch =. now, UserAvsLastSynchError =. Just "Avs contact info unknown for AvsPersonId"] -- all others were already marked as updated + return oks where - procResp (AvsDataContact apid newAvsPersonInfo newAvsFirmInfo) = fmap maybeMonoid . runMaybeT $ do + procResp :: (Set (AvsPersonId, UserId), Set AvsPersonId) -> AvsDataContact -> Handler (Set (AvsPersonId, UserId), Set AvsPersonId) + procResp (accOk, accBad) adc = do + let errHandler e = runDB $ do + let apid = avsContactPersonID adc + now <- liftIO getCurrentTime + updateBy (UniqueUserAvsId apid) [UserAvsLastSynch =. now, UserAvsLastSynchError =. Just (tshow e)] + return (accOk, Set.insert apid accBad) + updateAvsUserByADC' :: DB (Set (AvsPersonId, UserId), Set AvsPersonId) + updateAvsUserByADC' = do + res <- updateAvsUserByADC adc + return (maybeInsert res accOk, accBad) + catchAll (runDB updateAvsUserByADC') errHandler + + +updateAvsUserByADC :: AvsDataContact -> DB (Maybe (AvsPersonId, UserId)) +updateAvsUserByADC (AvsDataContact apid newAvsPersonInfo newAvsFirmInfo) = runMaybeT $ do (Entity uaId usravs) <- MaybeT $ getBy $ UniqueUserAvsId apid let usrId = userAvsUser usravs usr <- MaybeT $ get usrId @@ -420,7 +452,7 @@ updateAvsUserByIds apids0 = do repsertSuperiorSupervisor (Just newCompanyId) newAvsFirmInfo usrId -- ensure firmInfo superior is at least normal supervisor, must be executed after updating company default supervisors update usrId $ usr_up2 `mcons` usr_up1 -- update user eventually update uaId avs_ups -- update stored avsinfo for future updates - return $ Set.singleton (apid, usrId) + return (apid, usrId) -- createAvsUserById :: Set AvsPersonId -> Handler (Set (AvsPersonId, UserId)) ??? -- | Create new user from AVS-Id. Will throw an AvsException if this is not possible, e.g. due to Uniqueness Constraints @@ -461,12 +493,8 @@ createAvsUserById api = do | otherwise -> throwM $ AvsUserUnknownByAvs api (Just uid, Nothing) -> runDB $ do -- link with matching exisitng user insert_ $ usrAvs uid Nothing -- company info should cause the user to be associated with the company during the update - updRes <- updateAvsUserByIds $ Set.singleton api -- no loop, since updateAvsUserByIds does not call createAvsUserById - case Set.toList updRes of - [(api',uid')] | api == api' -> return uid' -- && uid == uid' -> return uid - | otherwise -> throwM $ AvsIdMismatch api api' - [] -> throwM $ AvsUserUnknownByAvs api - _ -> throwM $ AvsUserAmbiguous api + updRes <- updateAvsUserById api -- no loop, since updateAvsUserById does not call createAvsUserById + maybe (throwM $ AvsUserUnknownByAvs api) return updRes (Nothing, Nothing) -> do Entity{entityKey=cid, entityVal=cmp} <- runDB $ upsertAvsCompany firmInfo Nothing -- individual runDB, since no need to rollback let pinPass = avsFullCardNo2pin <$> usrCardNo @@ -617,15 +645,10 @@ upsertAvsUserByCard persNo = do -- Throws errors if the avsInterface in unavailable or new user would violate uniqueness constraints upsertAvsUserById :: AvsPersonId -> Handler UserId upsertAvsUserById api = do - upd <- runDB (updateAvsUserByIds $ Set.singleton api) - case Set.toList upd of - [] -> createAvsUserById api - [(api',uid)] - | api == api' -> return uid - | otherwise -> throwM $ AvsIdMismatch api api' - -- error $ "Handler.Utils.Avs.updateAvsUserByIds returned unasked user with AvsPersonId " <> show api' <> " for queried AvsPersonId " <> show api <> "." - (_:_:_) -> throwM $ AvsUserAmbiguous api - + upd <- runDB (updateAvsUserById api) + case upd of + Nothing -> createAvsUserById api + (Just uid) -> return uid -- Licences setLicence :: (PersistUniqueRead backend, MonadThrow m, diff --git a/src/Jobs/Handler/SynchroniseAvs.hs b/src/Jobs/Handler/SynchroniseAvs.hs index 2a2f2a31d..1a937221f 100644 --- a/src/Jobs/Handler/SynchroniseAvs.hs +++ b/src/Jobs/Handler/SynchroniseAvs.hs @@ -6,21 +6,27 @@ module Jobs.Handler.SynchroniseAvs ( dispatchJobSynchroniseAvs , dispatchJobSynchroniseAvsId , dispatchJobSynchroniseAvsUser - , dispatchJobSynchroniseAvsNext - , dispatchJobSynchroniseAvsQueue + , dispatchJobSynchroniseAvsNext -- internal only + , dispatchJobSynchroniseAvsQueue -- internal only + , dispatchJobSynchroniseAvsQueue' -- internal only TODO replace unprimed ) where import Import +import qualified Data.Set as Set +import qualified Data.Conduit.List as C + +import Database.Esqueleto.Experimental ((:&)(..)) +import qualified Database.Esqueleto.Experimental as E -- needs TypeApplications Lang-Pragma -- import qualified Database.Esqueleto.Legacy as E hiding (upsert) -- import qualified Database.Esqueleto.PostgreSQL as E --- import qualified Database.Esqueleto.Utils as E +import qualified Database.Esqueleto.Utils as E -import qualified Data.Conduit.List as C import Jobs.Queue import Handler.Utils.Avs +-- pause is a date in the past; don't synch again if the last synch was after pause dispatchJobSynchroniseAvs :: Natural -> Natural -> Natural -> Maybe Day -> JobHandler UniWorX dispatchJobSynchroniseAvs numIterations epoch iteration pause = JobHandlerException . runDB $ do @@ -105,3 +111,30 @@ dispatchJobSynchroniseAvsQueue = JobHandlerException $ do -- needed, since JobSynchroniseAvsQueue cannot requeue itself due to JobNoQueueSame (and having no parameters) dispatchJobSynchroniseAvsNext :: JobHandler UniWorX dispatchJobSynchroniseAvsNext = JobHandlerException $ void $ queueJob JobSynchroniseAvsQueue + + +dispatchJobSynchroniseAvsQueue' :: JobHandler UniWorX +dispatchJobSynchroniseAvsQueue' = JobHandlerException $ do + (unlinked,linked) <- runDB $ do + jobs <- E.select (do + (avsSync :& usrAvs) <- E.from $ E.table @AvsSync + `E.leftJoin` E.table @UserAvs + `E.on` (\(avsSync :& usrAvs) -> avsSync E.^. AvsSyncUser E.=?. usrAvs E.?. UserAvsUser) + let pause = avsSync E.^. AvsSyncPause + lastSync = usrAvs E.?. UserAvsLastSynch + E.where_ $ E.isNothing pause + E.||. E.isNothing lastSync + E.||. pause E.>. E.dayMaybe lastSync + return (avsSync E.^. AvsSyncId, avsSync E.^. AvsSyncUser, usrAvs E.?. UserAvsPersonId) + ) + let (syncIds, unlinked, linked) = foldl' discernJob mempty jobs + E.deleteWhere [AvsSyncId <-. syncIds] + return (unlinked, linked) + + void $ updateAvsUserByIds linked + void $ linktoAvsUserByUIDs unlinked + -- we do not reschedule failed synchs here in order to avoid a loop + where + discernJob (accSync, accUid, accApi) (E.Value k, _, E.Value (Just api)) = (k:accSync, accUid, Set.insert api accApi) + discernJob (accSync, accUid, accApi) (E.Value k, E.Value uid, E.Value Nothing ) = (k:accSync, Set.insert uid accUid, accApi) + \ No newline at end of file diff --git a/src/Utils/DB.hs b/src/Utils/DB.hs index cf792bf6d..fdad68adf 100644 --- a/src/Utils/DB.hs +++ b/src/Utils/DB.hs @@ -30,14 +30,15 @@ import GHC.Stack (HasCallStack, CallStack, callStack) -- import Control.Monad.Trans.Reader (withReaderT) --- | Obtain the record projection from the EntityField value -getFieldEnt :: PersistEntity record => EntityField record typ -> Entity record -> typ +-- | Obtain a record projection from an EntityField +getFieldEnt :: PersistEntity record => EntityField record typ -> Entity record -> typ getFieldEnt = view . fieldLens -getField :: PersistEntity record => EntityField record typ -> record -> typ +getField :: PersistEntity record => EntityField record typ -> record -> typ getField = view . fieldLensVal -fieldLensVal :: PersistEntity record => EntityField record field -> Lens' record field +-- | Obtain a lens from an EntityField +fieldLensVal :: PersistEntity record => EntityField record typ -> Lens' record typ fieldLensVal f = entityLens . fieldLens f where entityLens :: Lens' record (Entity record) @@ -45,7 +46,7 @@ fieldLensVal f = entityLens . fieldLens f getVal :: record -> Entity record getVal = Entity (error "fieldLensVal unexpectectly required an entity key") -- this is safe, since the lens is only used locally setVal :: record -> Entity record -> record - setVal _ = entityVal + setVal _ = entityVal emptyOrIn :: PersistField typ diff --git a/src/Utils/Set.hs b/src/Utils/Set.hs index 79e11c662..f895cd098 100644 --- a/src/Utils/Set.hs +++ b/src/Utils/Set.hs @@ -13,6 +13,7 @@ module Utils.Set , setFromFunc , mapIntersectNotOne , set2NonEmpty +, maybeInsert ) where import qualified Data.List.NonEmpty as NonEmpty @@ -81,8 +82,11 @@ setPartitionEithers = (,) <$> setMapMaybeMonotonic (preview _Left) <*> setMapMay setFromFunc :: (Finite k, Ord k) => (k -> Bool) -> Set k setFromFunc = Set.fromList . flip filter universeF - -- | convert a Set to NonEmpty, inserting a default value if necessary set2NonEmpty :: a -> Set a -> NonEmpty.NonEmpty a set2NonEmpty _ (Set.toList -> h:t) = h NonEmpty.:| t set2NonEmpty d _ = d NonEmpty.:| [] + +maybeInsert :: Ord a => Maybe a -> Set a -> Set a +maybeInsert Nothing = id +maybeInsert (Just k) = Set.insert k \ No newline at end of file