Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick Add gp_stat_progress_dtx_recovery for observability #872

Merged
merged 9 commits into from
Jan 16, 2025
4 changes: 2 additions & 2 deletions gpMgmt/bin/gpload.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,8 @@ def run(self):
except Exception as e:
# close fd so that not block the worker thread because of stdout/stderr pipe not finish/closed.
self.fd.close()
sys.stderr.write("\n\nWarning: gpfdist log halt because Log Thread '%s' got an exception: %s \n" % (self.getName(), str(e)))
self.gpload.log(self.gpload.WARN, "gpfdist log halt because Log Thread '%s' got an exception: %s" % (self.getName(), str(e)))
sys.stderr.write("\n\nWarning: gpfdist log halt because Log Thread '%s' got an exception: %s \n" % (self.name, str(e)))
self.gpload.log(self.gpload.WARN, "gpfdist log halt because Log Thread '%s' got an exception: %s" % (self.name, str(e)))
raise

def cli_help():
Expand Down
34 changes: 0 additions & 34 deletions gpMgmt/bin/gppylib/gparray.py
Original file line number Diff line number Diff line change
Expand Up @@ -1444,40 +1444,6 @@ def get_datadir_prefix(self):
prefix = self.coordinator.datadir[start_last_dir:start_dir_content]
return prefix

# --------------------------------------------------------------------
# If we've got recovered segments, and we have a matched-pair, we
# can update the catalog to "rebalance" back to our original primary.
def updateRoleForRecoveredSegs(self, dbURL):
"""
Marks the segment role to match the configured preferred_role.
"""

# walk our list of segments, checking to make sure that
# both members of the peer-group are in our recovered-list,
# save their content-id.
recovered_contents = []
for segPair in self.segmentPairs:
if segPair.primaryDB:
if segPair.primaryDB.dbid in self.recoveredSegmentDbids:
if segPair.mirrorDB and segPair.mirrorDB.dbid in self.recoveredSegmentDbids:
recovered_contents.append((segPair.primaryDB.content, segPair.primaryDB.dbid, segPair.mirrorDB.dbid))

with closing(dbconn.connect(dbURL, True, allowSystemTableMods = True)) as conn:
for (content_id, primary_dbid, mirror_dbid) in recovered_contents:
sql = "UPDATE gp_segment_configuration SET role=preferred_role where content = %d" % content_id
dbconn.executeUpdateOrInsert(conn, sql, 2)

# NOTE: primary-dbid (right now) is the mirror.
sql = "INSERT INTO gp_configuration_history VALUES (now(), %d, 'Reassigned role for content %d to MIRROR')" % (primary_dbid, content_id)
dbconn.executeUpdateOrInsert(conn, sql, 1)

# NOTE: mirror-dbid (right now) is the primary.
sql = "INSERT INTO gp_configuration_history VALUES (now(), %d, 'Reassigned role for content %d to PRIMARY')" % (mirror_dbid, content_id)
dbconn.executeUpdateOrInsert(conn, sql, 1)

# We could attempt to update the segments-array.
# But the caller will re-read the configuration from the catalog.

# --------------------------------------------------------------------
def addExpansionSeg(self, content, preferred_role, dbid, role,
hostname, address, port, datadir):
Expand Down
4 changes: 3 additions & 1 deletion gpMgmt/bin/gppylib/gpcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ class GPCatalogException(Exception):
'pg_resourcetype',
'pg_resqueue',
'pg_resqueuecapability',
'pg_tablespace'
'pg_subscription',
'pg_tablespace',
'pg_transform'
]

# ============================================================================
Expand Down
1 change: 0 additions & 1 deletion gpMgmt/bin/gpsd
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import pwd
import sys
import re
from contextlib import closing
from distutils.version import LooseVersion
from optparse import OptionParser
import pgdb
from gppylib.utils import formatInsertValuesList, Escape
Expand Down
12 changes: 9 additions & 3 deletions gpMgmt/test/behave/mgmt_utils/gpcheckcat.feature
Original file line number Diff line number Diff line change
Expand Up @@ -633,17 +633,23 @@ Feature: gpcheckcat tests
Then gpcheckcat should return a return code of 3
And the user runs "dropdb mis_attr_db"

Scenario: gpcheckcat should not report dependency error from pg_default_acl
Scenario: gpcheckcat should not report dependency error from pg_default_acl, pg_subscription and pg_transform
Given database "check_dependency_error" is dropped and recreated
And the user runs "psql -d check_dependency_error -c "CREATE ROLE foo; ALTER DEFAULT PRIVILEGES FOR ROLE foo REVOKE EXECUTE ON FUNCTIONS FROM PUBLIC;""
Then psql should return a return code of 0
When the user runs "gpcheckcat check_dependency_error"
And the user runs "psql -d check_dependency_error -c "CREATE SUBSCRIPTION foo CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE);""
Then psql should return a return code of 0
And the user runs "psql -d check_dependency_error -c "CREATE TRANSFORM FOR int LANGUAGE SQL (FROM SQL WITH FUNCTION prsd_lextype(internal), TO SQL WITH FUNCTION int4recv(internal));""
Then psql should return a return code of 0
When the user runs "gpcheckcat -R dependency check_dependency_error"
Then gpcheckcat should return a return code of 0
And gpcheckcat should not print "SUMMARY REPORT: FAILED" to stdout
And gpcheckcat should not print "has a dependency issue on oid" to stdout
And gpcheckcat should print "Found no catalog issue" to stdout
And the user runs "psql -d check_dependency_error -c "DROP SUBSCRIPTION foo""
And the user runs "psql -d check_dependency_error -c "DROP TRANSFORM FOR int LANGUAGE SQL""
And the user runs "dropdb check_dependency_error"
And the user runs "psql -c "DROP ROLE foo""
And the user runs "psql -d postgres -c "DROP ROLE foo""


########################### @concourse_cluster tests ###########################
Expand Down
16 changes: 16 additions & 0 deletions src/backend/catalog/system_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,22 @@ CREATE VIEW pg_stat_progress_copy AS
FROM pg_stat_get_progress_info('COPY') AS S
LEFT JOIN pg_database D ON S.datid = D.oid;

CREATE VIEW gp_stat_progress_dtx_recovery AS
SELECT
CASE S.param1 WHEN 0 THEN 'initializing'
WHEN 1 THEN 'recovering commited distributed transactions'
WHEN 2 THEN 'gathering in-doubt transactions'
WHEN 3 THEN 'aborting in-doubt transactions'
WHEN 4 THEN 'gathering in-doubt orphaned transactions'
WHEN 5 THEN 'managing in-doubt orphaned transactions'
END AS phase,
S.param2 AS recover_commited_dtx_total, -- total commited transactions found to recover
S.param3 AS recover_commited_dtx_completed, -- recover completed, this is always 0 after startup.
S.param4 AS in_doubt_tx_total, -- total in doubt tx found, used in startup and non-startup phase
S.param5 AS in_doubt_tx_in_progress, -- in-progress in-doubt tx, this is always 0 for startup
S.param6 AS in_doubt_tx_aborted -- aborted in-doubt tx, this can be >0 for both
FROM pg_stat_get_progress_info('DTX RECOVERY') AS S;

CREATE VIEW pg_user_mappings AS
SELECT
U.oid AS umid,
Expand Down
68 changes: 67 additions & 1 deletion src/backend/cdb/cdbdtxrecovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "utils/ps_status.h"
#include "postmaster/bgworker.h"
#include "pgstat.h"
#include "commands/progress.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
Expand Down Expand Up @@ -53,6 +54,10 @@ volatile int *shmNumCommittedGxacts;

static volatile sig_atomic_t got_SIGHUP = false;

static int64 in_doubt_tx_in_progress = 0;
static int64 in_doubt_tx_aborted = 0;
static int64 in_doubt_tx_total = 0;

typedef struct InDoubtDtx
{
char gid[TMGIDSIZE];
Expand Down Expand Up @@ -190,6 +195,11 @@ recoverInDoubtTransactions(void)
"Going to retry commit notification for distributed transactions (count = %d)",
*shmNumCommittedGxacts);

pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_PHASE,
PROGRESS_DTX_RECOVERY_PHASE_STARTUP_RECOVER_COMMITTED_DTX);
pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_COMITTED_DTX_TOTAL,
*shmNumCommittedGxacts);

for (i = 0; i < *shmNumCommittedGxacts; i++)
{
DistributedTransactionId gxid = shmCommittedGxidArray[i];
Expand All @@ -204,16 +214,27 @@ recoverInDoubtTransactions(void)
doNotifyCommittedInDoubt(gid);

RecordDistributedForgetCommitted(gxid);

pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_COMITTED_DTX_COMPLETED,
i+1);
}

SIMPLE_FAULT_INJECTOR("post_progress_recovery_comitted");

*shmNumCommittedGxacts = 0;


pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_PHASE,
PROGRESS_DTX_RECOVERY_PHASE_STARTUP_GATHER_IN_DOUBT_TX);

/*
* Any in-doubt transctions found will be for aborted
* transactions. Gather in-doubt transactions and issue aborts.
*/
htab = gatherRMInDoubtTransactions(0, true);

pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_PHASE,
PROGRESS_DTX_RECOVERY_PHASE_STARTUP_ABORT_IN_DOUBT_TX);
/*
* go through and resolve any remaining in-doubt transactions that the
* RM's have AFTER recoverDTMInDoubtTransactions. ALL of these in doubt
Expand Down Expand Up @@ -351,6 +372,9 @@ gatherRMInDoubtTransactions(int prepared_seconds, bool raiseError)
elog(DEBUG3, "Found in-doubt transaction with GID: %s on remote RM", gid);

strncpy(lastDtx->gid, gid, TMGIDSIZE);

pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_IN_DOUBT_TX_TOTAL,
++in_doubt_tx_total);
}
}
}
Expand Down Expand Up @@ -388,6 +412,9 @@ abortRMInDoubtTransactions(HTAB *htab)
elog(DTM_DEBUG3, "Aborting in-doubt transaction with gid = %s", entry->gid);

doAbortInDoubt(entry->gid);

pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_IN_DOUBT_TX_ABORTED,
++in_doubt_tx_aborted);
}
}

Expand All @@ -414,10 +441,19 @@ abortOrphanedTransactions(HTAB *htab)

dtxDeformGid(entry->gid, &gxid);

if (!IsDtxInProgress(gxid))
if (IsDtxInProgress(gxid))
{
pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_IN_DOUBT_TX_IN_PROGRESS,
++in_doubt_tx_in_progress);
SIMPLE_FAULT_INJECTOR("post_in_doubt_tx_in_progress");
}
else
{
elog(LOG, "Aborting orphaned transactions with gid = %s", entry->gid);
doAbortInDoubt(entry->gid);

pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_IN_DOUBT_TX_ABORTED,
++in_doubt_tx_aborted);
}
}
}
Expand Down Expand Up @@ -536,6 +572,14 @@ DtxRecoveryStartRule(Datum main_arg)
return (Gp_role == GP_ROLE_DISPATCH);
}

static void
ResetInDoubtStatProgress()
{
in_doubt_tx_total = 0;
in_doubt_tx_aborted = 0;
in_doubt_tx_in_progress = 0;
}

static void
AbortOrphanedPreparedTransactions()
{
Expand All @@ -546,9 +590,15 @@ AbortOrphanedPreparedTransactions()
return;
#endif

pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_PHASE,
PROGRESS_DTX_RECOVERY_PHASE_GATHER_IN_DOUBT_TX);

StartTransactionCommand();
htab = gatherRMInDoubtTransactions(gp_dtx_recovery_prepared_period, false);

pgstat_progress_update_param(PROGRESS_DTX_RECOVERY_PHASE,
PROGRESS_DTX_RECOVERY_PHASE_ABORT_IN_DOUBT_TX);

/* in case an error happens somehow. */
if (htab != NULL)
{
Expand Down Expand Up @@ -629,6 +679,9 @@ DtxRecoveryMain(Datum main_arg)
/* Connect to postgres */
BackgroundWorkerInitializeConnection(DB_FOR_COMMON_ACCESS, NULL, 0);

/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_DTX_RECOVERY, InvalidOid);

/*
* Do dtx recovery process. It is possible that *shmDtmStarted is true
* here if we terminate after this code block, e.g. due to error and then
Expand All @@ -646,6 +699,8 @@ DtxRecoveryMain(Datum main_arg)
set_ps_display("");
}

pgstat_progress_end_command();

/* Fetch the gxid batch in advance. */
bumpGxid();

Expand Down Expand Up @@ -690,7 +745,18 @@ DtxRecoveryMain(Datum main_arg)
* cases so that it could respond promptly for gxid bumping given
* the abort operation might be time-consuming.
*/

/*
* The total amount of in doubt transactions are calculated per iteration
* and are destroyed after use. Keeping the old statistics may be confusing,
* so clear it up.
*/
ResetInDoubtStatProgress();
pgstat_progress_start_command(PROGRESS_COMMAND_DTX_RECOVERY, InvalidOid);

AbortOrphanedPreparedTransactions();

pgstat_progress_end_command();
}

rc = WaitLatch(&MyProc->procLatch,
Expand Down
3 changes: 2 additions & 1 deletion src/backend/gporca/scripts/convert_minirepro_5_to_6.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ def convert_insert_statement(infile, outfile):


def parseargs():
parser = argparse.ArgumentParser(description=_help, version='1.0')
parser = argparse.ArgumentParser(description=_help)

parser.add_argument('--version', action='version', version='1.0')
parser.add_argument("filepath", help="Path to minirepro file")

args = parser.parse_args()
Expand Down
3 changes: 2 additions & 1 deletion src/backend/gporca/scripts/get_debug_event_counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ def processLogFile(logFileLines, allruns):


def parseargs():
parser = argparse.ArgumentParser(description=_help, version='1.0')
parser = argparse.ArgumentParser(description=_help)

parser.add_argument('--version', action='version', version='1.0')
parser.add_argument("--logFile", default="",
help="GPDB log file saved from a run with debug event counters enabled (default is to search "
"GPDB master log directory)")
Expand Down
2 changes: 2 additions & 0 deletions src/backend/utils/adt/pgstatfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
cmdtype = PROGRESS_COMMAND_BASEBACKUP;
else if (pg_strcasecmp(cmd, "COPY") == 0)
cmdtype = PROGRESS_COMMAND_COPY;
else if (pg_strcasecmp(cmd, "DTX RECOVERY") == 0)
cmdtype = PROGRESS_COMMAND_DTX_RECOVERY;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
Expand Down
2 changes: 1 addition & 1 deletion src/backend/utils/init/postinit.c
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
* report this backend in the PgBackendStatus array, meanwhile, we do not
* want users to see auxiliary background worker like fts in pg_stat_* views.
*/
if (!bootstrap && !amAuxiliaryBgWorker())
if (!bootstrap && (!amAuxiliaryBgWorker() || IsDtxRecoveryProcess()))
pgstat_bestart();

/*
Expand Down
4 changes: 3 additions & 1 deletion src/backend/utils/misc/faultinjector.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ checkBgProcessSkipFault(const char* faultName)
{
/* dtx recovery process */
if (0 != strcmp("before_orphaned_check", faultName) &&
0 != strcmp("after_orphaned_check", faultName))
0 != strcmp("after_orphaned_check", faultName) &&
0 != strcmp("post_in_doubt_tx_in_progress", faultName) &&
0 != strcmp("post_progress_recovery_comitted", faultName))
{
elog(LOG, "skipped fault '%s' in dtx recovery process", faultName);
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/backend/utils/misc/guc_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2839,7 +2839,7 @@ struct config_bool ConfigureNamesBool_gp[] =
},

{
{"gp_resource_group_bypass", PGC_SUSET, RESOURCES,
{"gp_resource_group_bypass", PGC_USERSET, RESOURCES,
gettext_noop("If the value is true, the query in this session will not be limited by resource group."),
NULL
},
Expand Down
15 changes: 15 additions & 0 deletions src/include/commands/progress.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,19 @@
#define PROGRESS_COPY_TYPE_PIPE 3
#define PROGRESS_COPY_TYPE_CALLBACK 4

/* Progress parameters for DTX recovery */
#define PROGRESS_DTX_RECOVERY_PHASE 0
#define PROGRESS_DTX_RECOVERY_COMITTED_DTX_TOTAL 1
#define PROGRESS_DTX_RECOVERY_COMITTED_DTX_COMPLETED 2
#define PROGRESS_DTX_RECOVERY_IN_DOUBT_TX_TOTAL 3
#define PROGRESS_DTX_RECOVERY_IN_DOUBT_TX_IN_PROGRESS 4
#define PROGRESS_DTX_RECOVERY_IN_DOUBT_TX_ABORTED 5

/* Phases of DTX recovery */
#define PROGRESS_DTX_RECOVERY_PHASE_STARTUP_RECOVER_COMMITTED_DTX 1
#define PROGRESS_DTX_RECOVERY_PHASE_STARTUP_GATHER_IN_DOUBT_TX 2
#define PROGRESS_DTX_RECOVERY_PHASE_STARTUP_ABORT_IN_DOUBT_TX 3
#define PROGRESS_DTX_RECOVERY_PHASE_GATHER_IN_DOUBT_TX 4
#define PROGRESS_DTX_RECOVERY_PHASE_ABORT_IN_DOUBT_TX 5

#endif
3 changes: 2 additions & 1 deletion src/include/utils/backend_progress.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ typedef enum ProgressCommandType
PROGRESS_COMMAND_CLUSTER,
PROGRESS_COMMAND_CREATE_INDEX,
PROGRESS_COMMAND_BASEBACKUP,
PROGRESS_COMMAND_COPY
PROGRESS_COMMAND_COPY,
PROGRESS_COMMAND_DTX_RECOVERY
} ProgressCommandType;

#define PGSTAT_NUM_PROGRESS_PARAM 20
Expand Down
Loading
Loading