Skip to content

Commit

Permalink
refactor: is ready Admin logic to AppState
Browse files Browse the repository at this point in the history
  • Loading branch information
steve-chavez committed May 8, 2024
1 parent 1374178 commit 1b584f7
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 63 deletions.
20 changes: 8 additions & 12 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ module PostgREST.Admin
) where

import qualified Data.Aeson as JSON
import qualified Hasql.Session as SQL
import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
Expand All @@ -28,28 +27,25 @@ import qualified PostgREST.Config as Config

import Protolude

runAdmin :: AppConfig -> AppState -> Warp.Settings -> IO ()
runAdmin conf@AppConfig{configAdminServerPort} appState settings =
runAdmin :: AppState -> Warp.Settings -> IO ()
runAdmin appState settings = do
AppConfig{configAdminServerPort} <- AppState.getConfig appState
whenJust (AppState.getSocketAdmin appState) $ \adminSocket -> do
observer $ AdminStartObs configAdminServerPort
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
where
adminApp = admin appState conf
adminApp = admin appState
observer = AppState.getObserver appState

-- | PostgREST admin application
admin :: AppState.AppState -> AppConfig -> Wai.Application
admin appState appConfig req respond = do
admin :: AppState.AppState -> Wai.Application
admin appState req respond = do
isMainAppReachable <- isRight <$> reachMainApp (AppState.getSocketREST appState)
isSchemaCacheLoaded <- AppState.getSchemaCacheLoaded appState
isConnectionUp <-
if configDbChannelEnabled appConfig
then AppState.getIsListenerOn appState
else isRight <$> AppState.usePool appState (SQL.sql "SELECT 1")
isLoaded <- AppState.isLoaded appState

case Wai.pathInfo req of
["ready"] ->
respond $ Wai.responseLBS (if isMainAppReachable && isConnectionUp && isSchemaCacheLoaded then HTTP.status200 else HTTP.status503) [] mempty
respond $ Wai.responseLBS (if isMainAppReachable && isLoaded then HTTP.status200 else HTTP.status503) [] mempty
["live"] ->
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status503) [] mempty
["config"] -> do
Expand Down
2 changes: 1 addition & 1 deletion src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ run appState = do
-- reload schema cache + config on NOTIFY
AppState.runListener conf appState

Admin.runAdmin conf appState (serverSettings conf)
Admin.runAdmin appState (serverSettings conf)

let app = postgrest configLogLevel appState (AppState.connectionWorker appState)

Expand Down
110 changes: 60 additions & 50 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ module PostgREST.AppState
, destroy
, getConfig
, getSchemaCache
, getIsListenerOn
, getMainThreadId
, getPgVersion
, getRetryNextIn
, getTime
, getJwtCache
, getSocketREST
, getSocketAdmin
, getSchemaCacheLoaded
, init
, initSockets
, initWithPool
Expand All @@ -28,6 +26,7 @@ module PostgREST.AppState
, connectionWorker
, runListener
, getObserver
, isLoaded
) where

import qualified Data.Aeson as JSON
Expand Down Expand Up @@ -90,14 +89,14 @@ data AppState = AppState
, statePgVersion :: IORef PgVersion
-- | No schema cache at the start. Will be filled in by the connectionWorker
, stateSchemaCache :: IORef (Maybe SchemaCache)
-- | If schema cache is loaded
, stateSchemaCacheLoaded :: IORef Bool
-- | The schema cache status
, stateSCacheStatus :: IORef SchemaCacheStatus
-- | The connection status
, stateConnStatus :: IORef ConnectionStatus
-- | starts the connection worker with a debounce
, debouncedConnectionWorker :: IO ()
-- | Binary semaphore used to sync the listener(NOTIFY reload) with the connectionWorker.
, stateListener :: MVar ()
-- | State of the LISTEN channel, used for the admin server checks
, stateIsListenerOn :: IORef Bool
-- | Config that can change at runtime
, stateConf :: IORef AppConfig
-- | Time used for verifying JWT expiration
Expand All @@ -118,6 +117,20 @@ data AppState = AppState
, stateMetrics :: Metrics.MetricsState
}

-- | Schema cache status
data SchemaCacheStatus
= SCLoaded
| SCPending
| SCFatalFail
deriving Eq

-- | Current database connection status
data ConnectionStatus
= ConnEstablished
| ConnPending
| ConnFatalFail Text
deriving Eq

type AppSockets = (NS.Socket, Maybe NS.Socket)


Expand All @@ -138,10 +151,10 @@ initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
<*> newIORef Nothing
<*> newIORef False
<*> newIORef SCPending
<*> newIORef ConnPending
<*> pure (pure ())
<*> newEmptyMVar
<*> newIORef False
<*> newIORef conf
<*> mkAutoUpdate defaultUpdateSettings { updateAction = getCurrentTime }
<*> myThreadId
Expand Down Expand Up @@ -286,29 +299,33 @@ waitListener = takeMVar . stateListener
signalListener :: AppState -> IO ()
signalListener appState = void $ tryPutMVar (stateListener appState) ()

getIsListenerOn :: AppState -> IO Bool
getIsListenerOn = readIORef . stateIsListenerOn
isConnEstablished :: AppState -> IO Bool
isConnEstablished x = do
conf <- getConfig x
if configDbChannelEnabled conf
then do -- if the listener is enabled, we can be sure the connection status is always up to date
st <- readIORef $ stateConnStatus x
return $ st == ConnEstablished
else -- otherwise the only way to check the connection is to make a query
isRight <$> usePool x (SQL.sql "SELECT 1")

putIsListenerOn :: AppState -> Bool -> IO ()
putIsListenerOn = atomicWriteIORef . stateIsListenerOn
isLoaded :: AppState -> IO Bool
isLoaded x = do
scacheStatus <- readIORef $ stateSCacheStatus x
connEstablished <- isConnEstablished x
return $ scacheStatus == SCLoaded && connEstablished

getSchemaCacheLoaded :: AppState -> IO Bool
getSchemaCacheLoaded = readIORef . stateSchemaCacheLoaded
putSCacheStatus :: AppState -> SchemaCacheStatus -> IO ()
putSCacheStatus = atomicWriteIORef . stateSCacheStatus

putSchemaCacheLoaded :: AppState -> Bool -> IO ()
putSchemaCacheLoaded = atomicWriteIORef . stateSchemaCacheLoaded
putConnStatus :: AppState -> ConnectionStatus -> IO ()
putConnStatus = atomicWriteIORef . stateConnStatus

getObserver :: AppState -> ObservationHandler
getObserver = stateObserver

-- | Schema cache status
data SCacheStatus
= SCLoaded
| SCOnRetry
| SCFatalFail

-- | Load the SchemaCache by using a connection from the pool.
loadSchemaCache :: AppState -> IO SCacheStatus
loadSchemaCache :: AppState -> IO SchemaCacheStatus
loadSchemaCache appState@AppState{stateObserver=observer} = do
conf@AppConfig{..} <- getConfig appState
(resultTime, result) <-
Expand All @@ -323,24 +340,17 @@ loadSchemaCache appState@AppState{stateObserver=observer} = do
Nothing -> do
putSchemaCache appState Nothing
observer $ SchemaCacheNormalErrorObs e
putSchemaCacheLoaded appState False
return SCOnRetry
putSCacheStatus appState SCPending
return SCPending

Right sCache -> do
putSchemaCache appState $ Just sCache
observer $ SchemaCacheQueriedObs resultTime
(t, _) <- timeItT $ observer $ SchemaCacheSummaryObs $ showSummary sCache
observer $ SchemaCacheLoadedObs t
putSchemaCacheLoaded appState True
putSCacheStatus appState SCLoaded
return SCLoaded

-- | Current database connection status data ConnectionStatus
data ConnectionStatus
= NotConnected
| Connected PgVersion
| FatalConnectionError Text
deriving (Eq)

-- | The purpose of this worker is to obtain a healthy connection to pg and an
-- up-to-date schema cache(SchemaCache). This method is meant to be called
-- multiple times by the same thread, but does nothing if the previous
Expand All @@ -358,20 +368,19 @@ internalConnectionWorker appState@AppState{stateObserver=observer} = work
work = do
AppConfig{..} <- getConfig appState
observer DBConnectAttemptObs
connected <- establishConnection appState
case connected of
FatalConnectionError reason ->
connStatus <- establishConnection appState
case connStatus of
ConnFatalFail reason ->
-- Fatal error when connecting
observer (ExitFatalObs reason) >> killThread (getMainThreadId appState)
NotConnected ->
-- Unreachable because establishConnection will keep trying to connect, unless disable-recovery is turned on
ConnPending ->
unless configDbPoolAutomaticRecovery
$ observer ExitDBNoRecoveryObs >> killThread (getMainThreadId appState)
Connected actualPgVersion -> do
ConnEstablished -> do
-- Procede with initialization
putPgVersion appState actualPgVersion
when configDbChannelEnabled $
signalListener appState
actualPgVersion <- getPgVersion appState
observer (DBConnectedObs $ pgvFullName actualPgVersion)
-- this could be fail because the connection drops, but the loadSchemaCache will pick the error and retry again
-- We cannot retry after it fails immediately, because db-pre-config could have user errors. We just log the error and continue.
Expand All @@ -381,7 +390,7 @@ internalConnectionWorker appState@AppState{stateObserver=observer} = work
SCLoaded ->
-- do nothing and proceed if the load was successful
return ()
SCOnRetry ->
SCPending ->
-- retry reloading the schema cache
work
SCFatalFail ->
Expand Down Expand Up @@ -415,23 +424,26 @@ establishConnection appState@AppState{stateObserver=observer} =
observer $ ConnectionPgVersionErrorObs e
case checkIsFatal e of
Just reason ->
return $ FatalConnectionError reason
Nothing ->
return NotConnected
return $ ConnFatalFail reason
Nothing -> do
putConnStatus appState ConnPending
return ConnPending
Right version ->
if version < minimumPgVersion then
return . FatalConnectionError $
return . ConnFatalFail $
"Cannot run in this PostgreSQL version, PostgREST needs at least "
<> pgvName minimumPgVersion
else
return . Connected $ version
else do
putConnStatus appState ConnEstablished
putPgVersion appState version
return ConnEstablished

shouldRetry :: RetryStatus -> ConnectionStatus -> IO Bool
shouldRetry rs isConnSucc = do
AppConfig{..} <- getConfig appState
let
delay = fromMaybe 0 (rsPreviousDelay rs) `div` backoffMicroseconds
itShould = NotConnected == isConnSucc && configDbPoolAutomaticRecovery
itShould = ConnPending == isConnSucc && configDbPoolAutomaticRecovery
when itShould $ observer $ ConnectionRetryObs delay
when itShould $ putRetryNextIn appState delay
return itShould
Expand Down Expand Up @@ -503,7 +515,6 @@ listener appState@AppState{stateObserver=observer} conf@AppConfig{..} = do
case dbOrError of
Right db -> do
observer $ DBListenerStart dbChannel
putIsListenerOn appState True
SQL.listen db $ SQL.toPgIdentifier dbChannel
SQL.waitForNotifications handleNotification db

Expand All @@ -517,7 +528,6 @@ listener appState@AppState{stateObserver=observer} conf@AppConfig{..} = do
handleFinally dbChannel True err = do
-- if the thread dies, we try to recover
observer $ DBListenerFailRecoverObs True dbChannel err
putIsListenerOn appState False
-- assume the pool connection was also lost, call the connection worker
connectionWorker appState
-- retry the listener
Expand Down

0 comments on commit 1b584f7

Please sign in to comment.