refactor(avs): rewrite AVS synch (WIP)
This commit is contained in:
parent
a52c8a6ad7
commit
2e4e1a94c9
@ -29,6 +29,6 @@ UserAvs
|
||||
AvsSync
|
||||
user UserId -- Note: we need to lookup UserAvs Entity anyway, so no benefit from storing AvsPersonId here
|
||||
creationTime UTCTime
|
||||
pause Day Maybe
|
||||
pause Day Maybe -- Don't synch if last synch after this day, otherwise synch
|
||||
UniqueAvsSyncUser user
|
||||
deriving Generic
|
||||
@ -52,6 +52,7 @@ module Database.Esqueleto.Utils
|
||||
, day, day', dayMaybe, interval, diffDays, diffTimes
|
||||
, exprLift
|
||||
, explicitUnsafeCoerceSqlExprValue
|
||||
, truncateTable
|
||||
, module Database.Esqueleto.Utils.TH
|
||||
) where
|
||||
|
||||
@ -68,6 +69,8 @@ import qualified Database.Esqueleto.PostgreSQL as E
|
||||
import qualified Database.Esqueleto.Internal.Internal as E
|
||||
import Database.Esqueleto.Utils.TH
|
||||
|
||||
import qualified Database.Persist.Postgresql as P
|
||||
|
||||
import qualified Data.Text as Text
|
||||
import qualified Data.Text.Lazy as Lazy (Text)
|
||||
import qualified Data.ByteString.Lazy as Lazy (ByteString)
|
||||
@ -768,3 +771,7 @@ instance (PersistField a1, PersistField a2, PersistField b, Finite a1, Finite a2
|
||||
]
|
||||
(E.else_ $ E.else_ $ E.veryUnsafeCoerceSqlExprValue (E.nothing :: E.SqlExpr (E.Value (Maybe ()))))
|
||||
|
||||
|
||||
truncateTable :: (MonadIO m, BackendCompatible SqlBackend backend, PersistEntity record)
|
||||
=> record -> ReaderT backend m ()
|
||||
truncateTable tbl = E.rawExecute ("TRUNCATE TABLE " <> P.tableName tbl <> " RESTART IDENTITY") []
|
||||
@ -12,7 +12,10 @@
|
||||
|
||||
module Handler.Utils.Avs
|
||||
( guessAvsUser
|
||||
, upsertAvsUserById, upsertAvsUserByCard
|
||||
, upsertAvsUserByCard
|
||||
, upsertAvsUserById
|
||||
, updateAvsUserByIds
|
||||
, linktoAvsUserByUIDs
|
||||
-- , getLicence, getLicenceDB, getLicenceByAvsId -- not supported by interface
|
||||
, AvsLicenceDifferences(..)
|
||||
, setLicence, setLicenceAvs, setLicencesAvs
|
||||
@ -312,20 +315,49 @@ updateRecord ent new (CheckAvsUpdate up l) =
|
||||
lensRec = fieldLensVal up
|
||||
in ent & lensRec .~ newval
|
||||
|
||||
-- | Update given AvsPersonId by querying AVS for each; update only, no insertion! Uses batch mechanism, but single query may throw
|
||||
updateAvsUserByIds :: Set AvsPersonId -> DB (Set (AvsPersonId, UserId))
|
||||
updateAvsUserByIds apids0 = do
|
||||
apids <- Set.fromList <$> E.filterExists UserAvsPersonId apids0
|
||||
-- | shall not throw, updates exisitng and attempts to link users with yet unknown AVSIDs
|
||||
|
||||
|
||||
|
||||
linktoAvsUserByUIDs :: Set UserId -> Handler ()
|
||||
linktoAvsUserByUIDs = error "TODO: Not yet implemented."
|
||||
|
||||
-- | Like `updateAvsUserByIds`, but exceptions are not caught here to allow rollbacks
|
||||
updateAvsUserById :: AvsPersonId -> DB (Maybe UserId)
|
||||
updateAvsUserById apid = do
|
||||
AvsResponseContact adcs <- avsQuery $ AvsQueryContact $ Set.singleton $ AvsObjPersonId apid
|
||||
let res = Set.filter ((== apid) . avsContactPersonID) adcs
|
||||
snd <<$>> traverseJoin updateAvsUserByADC (Set.lookupMax res)
|
||||
|
||||
-- | Update given AvsPersonIds by querying AVS for each; update only, no insertion! Uses batch mechanism abd should not throw. Each user dealt within own runDB, i.e. own DB transaction
|
||||
updateAvsUserByIds :: Set AvsPersonId -> Handler (Set (AvsPersonId, UserId))
|
||||
updateAvsUserByIds apids = do
|
||||
-- apids <- Set.fromList <$> E.filterExists UserAvsPersonId apids0 --not needed anymore, we expect the set to be linked
|
||||
AvsResponseContact adcs <- avsQuery $ AvsQueryContact $ Set.mapMonotonic AvsObjPersonId apids -- automatically batched
|
||||
let requestedAnswers = Set.filter (view (_avsContactPersonID . to (`Set.member` apids))) adcs -- should not occur, neither should one apid occur multiple times within the response (if so, all responses processed here in random order)
|
||||
res <- foldMapM procResp requestedAnswers
|
||||
let missing = Set.toList $ Set.difference apids $ Set.map fst res
|
||||
(oks,bad) <- foldlM procResp mempty requestedAnswers
|
||||
let missing = Set.toList $ Set.difference (Set.difference apids $ Set.map fst oks) bad
|
||||
unless (null missing) $ do
|
||||
now <- liftIO getCurrentTime
|
||||
updateWhere [UserAvsPersonId <-. missing] [UserAvsLastSynch =. now, UserAvsLastSynchError =. Just "Contact unknown for AvsPersonId"] -- all others were already marked as updated
|
||||
return res
|
||||
runDB $ updateWhere [UserAvsPersonId <-. missing] [UserAvsLastSynch =. now, UserAvsLastSynchError =. Just "Avs contact info unknown for AvsPersonId"] -- all others were already marked as updated
|
||||
return oks
|
||||
where
|
||||
procResp (AvsDataContact apid newAvsPersonInfo newAvsFirmInfo) = fmap maybeMonoid . runMaybeT $ do
|
||||
procResp :: (Set (AvsPersonId, UserId), Set AvsPersonId) -> AvsDataContact -> Handler (Set (AvsPersonId, UserId), Set AvsPersonId)
|
||||
procResp (accOk, accBad) adc = do
|
||||
let errHandler e = runDB $ do
|
||||
let apid = avsContactPersonID adc
|
||||
now <- liftIO getCurrentTime
|
||||
updateBy (UniqueUserAvsId apid) [UserAvsLastSynch =. now, UserAvsLastSynchError =. Just (tshow e)]
|
||||
return (accOk, Set.insert apid accBad)
|
||||
updateAvsUserByADC' :: DB (Set (AvsPersonId, UserId), Set AvsPersonId)
|
||||
updateAvsUserByADC' = do
|
||||
res <- updateAvsUserByADC adc
|
||||
return (maybeInsert res accOk, accBad)
|
||||
catchAll (runDB updateAvsUserByADC') errHandler
|
||||
|
||||
|
||||
updateAvsUserByADC :: AvsDataContact -> DB (Maybe (AvsPersonId, UserId))
|
||||
updateAvsUserByADC (AvsDataContact apid newAvsPersonInfo newAvsFirmInfo) = runMaybeT $ do
|
||||
(Entity uaId usravs) <- MaybeT $ getBy $ UniqueUserAvsId apid
|
||||
let usrId = userAvsUser usravs
|
||||
usr <- MaybeT $ get usrId
|
||||
@ -420,7 +452,7 @@ updateAvsUserByIds apids0 = do
|
||||
repsertSuperiorSupervisor (Just newCompanyId) newAvsFirmInfo usrId -- ensure firmInfo superior is at least normal supervisor, must be executed after updating company default supervisors
|
||||
update usrId $ usr_up2 `mcons` usr_up1 -- update user eventually
|
||||
update uaId avs_ups -- update stored avsinfo for future updates
|
||||
return $ Set.singleton (apid, usrId)
|
||||
return (apid, usrId)
|
||||
|
||||
-- createAvsUserById :: Set AvsPersonId -> Handler (Set (AvsPersonId, UserId)) ???
|
||||
-- | Create new user from AVS-Id. Will throw an AvsException if this is not possible, e.g. due to Uniqueness Constraints
|
||||
@ -461,12 +493,8 @@ createAvsUserById api = do
|
||||
| otherwise -> throwM $ AvsUserUnknownByAvs api
|
||||
(Just uid, Nothing) -> runDB $ do -- link with matching exisitng user
|
||||
insert_ $ usrAvs uid Nothing -- company info should cause the user to be associated with the company during the update
|
||||
updRes <- updateAvsUserByIds $ Set.singleton api -- no loop, since updateAvsUserByIds does not call createAvsUserById
|
||||
case Set.toList updRes of
|
||||
[(api',uid')] | api == api' -> return uid' -- && uid == uid' -> return uid
|
||||
| otherwise -> throwM $ AvsIdMismatch api api'
|
||||
[] -> throwM $ AvsUserUnknownByAvs api
|
||||
_ -> throwM $ AvsUserAmbiguous api
|
||||
updRes <- updateAvsUserById api -- no loop, since updateAvsUserById does not call createAvsUserById
|
||||
maybe (throwM $ AvsUserUnknownByAvs api) return updRes
|
||||
(Nothing, Nothing) -> do
|
||||
Entity{entityKey=cid, entityVal=cmp} <- runDB $ upsertAvsCompany firmInfo Nothing -- individual runDB, since no need to rollback
|
||||
let pinPass = avsFullCardNo2pin <$> usrCardNo
|
||||
@ -617,15 +645,10 @@ upsertAvsUserByCard persNo = do
|
||||
-- Throws errors if the avsInterface in unavailable or new user would violate uniqueness constraints
|
||||
upsertAvsUserById :: AvsPersonId -> Handler UserId
|
||||
upsertAvsUserById api = do
|
||||
upd <- runDB (updateAvsUserByIds $ Set.singleton api)
|
||||
case Set.toList upd of
|
||||
[] -> createAvsUserById api
|
||||
[(api',uid)]
|
||||
| api == api' -> return uid
|
||||
| otherwise -> throwM $ AvsIdMismatch api api'
|
||||
-- error $ "Handler.Utils.Avs.updateAvsUserByIds returned unasked user with AvsPersonId " <> show api' <> " for queried AvsPersonId " <> show api <> "."
|
||||
(_:_:_) -> throwM $ AvsUserAmbiguous api
|
||||
|
||||
upd <- runDB (updateAvsUserById api)
|
||||
case upd of
|
||||
Nothing -> createAvsUserById api
|
||||
(Just uid) -> return uid
|
||||
|
||||
-- Licences
|
||||
setLicence :: (PersistUniqueRead backend, MonadThrow m,
|
||||
|
||||
@ -6,21 +6,27 @@ module Jobs.Handler.SynchroniseAvs
|
||||
( dispatchJobSynchroniseAvs
|
||||
, dispatchJobSynchroniseAvsId
|
||||
, dispatchJobSynchroniseAvsUser
|
||||
, dispatchJobSynchroniseAvsNext
|
||||
, dispatchJobSynchroniseAvsQueue
|
||||
, dispatchJobSynchroniseAvsNext -- internal only
|
||||
, dispatchJobSynchroniseAvsQueue -- internal only
|
||||
, dispatchJobSynchroniseAvsQueue' -- internal only TODO replace unprimed
|
||||
) where
|
||||
|
||||
import Import
|
||||
|
||||
import qualified Data.Set as Set
|
||||
import qualified Data.Conduit.List as C
|
||||
|
||||
import Database.Esqueleto.Experimental ((:&)(..))
|
||||
import qualified Database.Esqueleto.Experimental as E -- needs TypeApplications Lang-Pragma
|
||||
-- import qualified Database.Esqueleto.Legacy as E hiding (upsert)
|
||||
-- import qualified Database.Esqueleto.PostgreSQL as E
|
||||
-- import qualified Database.Esqueleto.Utils as E
|
||||
import qualified Database.Esqueleto.Utils as E
|
||||
|
||||
import qualified Data.Conduit.List as C
|
||||
import Jobs.Queue
|
||||
|
||||
import Handler.Utils.Avs
|
||||
|
||||
-- pause is a date in the past; don't synch again if the last synch was after pause
|
||||
dispatchJobSynchroniseAvs :: Natural -> Natural -> Natural -> Maybe Day -> JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvs numIterations epoch iteration pause
|
||||
= JobHandlerException . runDB $ do
|
||||
@ -105,3 +111,30 @@ dispatchJobSynchroniseAvsQueue = JobHandlerException $ do
|
||||
-- needed, since JobSynchroniseAvsQueue cannot requeue itself due to JobNoQueueSame (and having no parameters)
|
||||
dispatchJobSynchroniseAvsNext :: JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvsNext = JobHandlerException $ void $ queueJob JobSynchroniseAvsQueue
|
||||
|
||||
|
||||
dispatchJobSynchroniseAvsQueue' :: JobHandler UniWorX
|
||||
dispatchJobSynchroniseAvsQueue' = JobHandlerException $ do
|
||||
(unlinked,linked) <- runDB $ do
|
||||
jobs <- E.select (do
|
||||
(avsSync :& usrAvs) <- E.from $ E.table @AvsSync
|
||||
`E.leftJoin` E.table @UserAvs
|
||||
`E.on` (\(avsSync :& usrAvs) -> avsSync E.^. AvsSyncUser E.=?. usrAvs E.?. UserAvsUser)
|
||||
let pause = avsSync E.^. AvsSyncPause
|
||||
lastSync = usrAvs E.?. UserAvsLastSynch
|
||||
E.where_ $ E.isNothing pause
|
||||
E.||. E.isNothing lastSync
|
||||
E.||. pause E.>. E.dayMaybe lastSync
|
||||
return (avsSync E.^. AvsSyncId, avsSync E.^. AvsSyncUser, usrAvs E.?. UserAvsPersonId)
|
||||
)
|
||||
let (syncIds, unlinked, linked) = foldl' discernJob mempty jobs
|
||||
E.deleteWhere [AvsSyncId <-. syncIds]
|
||||
return (unlinked, linked)
|
||||
|
||||
void $ updateAvsUserByIds linked
|
||||
void $ linktoAvsUserByUIDs unlinked
|
||||
-- we do not reschedule failed synchs here in order to avoid a loop
|
||||
where
|
||||
discernJob (accSync, accUid, accApi) (E.Value k, _, E.Value (Just api)) = (k:accSync, accUid, Set.insert api accApi)
|
||||
discernJob (accSync, accUid, accApi) (E.Value k, E.Value uid, E.Value Nothing ) = (k:accSync, Set.insert uid accUid, accApi)
|
||||
|
||||
@ -30,14 +30,15 @@ import GHC.Stack (HasCallStack, CallStack, callStack)
|
||||
|
||||
-- import Control.Monad.Trans.Reader (withReaderT)
|
||||
|
||||
-- | Obtain the record projection from the EntityField value
|
||||
getFieldEnt :: PersistEntity record => EntityField record typ -> Entity record -> typ
|
||||
-- | Obtain a record projection from an EntityField
|
||||
getFieldEnt :: PersistEntity record => EntityField record typ -> Entity record -> typ
|
||||
getFieldEnt = view . fieldLens
|
||||
|
||||
getField :: PersistEntity record => EntityField record typ -> record -> typ
|
||||
getField :: PersistEntity record => EntityField record typ -> record -> typ
|
||||
getField = view . fieldLensVal
|
||||
|
||||
fieldLensVal :: PersistEntity record => EntityField record field -> Lens' record field
|
||||
-- | Obtain a lens from an EntityField
|
||||
fieldLensVal :: PersistEntity record => EntityField record typ -> Lens' record typ
|
||||
fieldLensVal f = entityLens . fieldLens f
|
||||
where
|
||||
entityLens :: Lens' record (Entity record)
|
||||
@ -45,7 +46,7 @@ fieldLensVal f = entityLens . fieldLens f
|
||||
getVal :: record -> Entity record
|
||||
getVal = Entity (error "fieldLensVal unexpectectly required an entity key") -- this is safe, since the lens is only used locally
|
||||
setVal :: record -> Entity record -> record
|
||||
setVal _ = entityVal
|
||||
setVal _ = entityVal
|
||||
|
||||
|
||||
emptyOrIn :: PersistField typ
|
||||
|
||||
@ -13,6 +13,7 @@ module Utils.Set
|
||||
, setFromFunc
|
||||
, mapIntersectNotOne
|
||||
, set2NonEmpty
|
||||
, maybeInsert
|
||||
) where
|
||||
|
||||
import qualified Data.List.NonEmpty as NonEmpty
|
||||
@ -81,8 +82,11 @@ setPartitionEithers = (,) <$> setMapMaybeMonotonic (preview _Left) <*> setMapMay
|
||||
setFromFunc :: (Finite k, Ord k) => (k -> Bool) -> Set k
|
||||
setFromFunc = Set.fromList . flip filter universeF
|
||||
|
||||
|
||||
-- | convert a Set to NonEmpty, inserting a default value if necessary
|
||||
set2NonEmpty :: a -> Set a -> NonEmpty.NonEmpty a
|
||||
set2NonEmpty _ (Set.toList -> h:t) = h NonEmpty.:| t
|
||||
set2NonEmpty d _ = d NonEmpty.:| []
|
||||
|
||||
maybeInsert :: Ord a => Maybe a -> Set a -> Set a
|
||||
maybeInsert Nothing = id
|
||||
maybeInsert (Just k) = Set.insert k
|
||||
Loading…
Reference in New Issue
Block a user