CREATE OR REPLACE PROCEDURE CR_ARCHIVE_REGISTRY.COMPARE_TABLES(cur_tables IN VARCHAR2, l_batch_no IN NUMBER)
AS
l_prod_schema VARCHAR2(30) := 'CR_5_0_X';
l_arch_schema VARCHAR2(30) := 'CR_ARCHIVE';
l_error_message VARCHAR2(3000);
-- Variables for table names and statements
sql_stmt VARCHAR2(4000);
merge_stmt CLOB;
ip_merge_stmt_1 CLOB;
ip_merge_stmt_2 CLOB;
ip_tbl_last_update_col VARCHAR2(1000);
ip_tbl_create_date_col VARCHAR2(1000);
ip_tbl_primary_col VARCHAR2(1000);
-- Record to store affected primary keys (for logging)
TYPE rec_table IS RECORD (
rn NUMBER
);
TYPE t_rec_table IS TABLE OF rec_table;
v_inserted_rows t_rec_table;
v_updated_rows t_rec_table;
batch_size NUMBER := 500000;
total_rows NUMBER := 0;
batch_start NUMBER := 1;
batch_end NUMBER;
batch_number NUMBER;
BEGIN
-- Step 1: Create or ensure the logging table exists
EXECUTE IMMEDIATE '
CREATE TABLE IF NOT EXISTS CR_ARCHIVE_REGISTRY.merge_log (
log_id NUMBER GENERATED ALWAYS AS IDENTITY,
table_name VARCHAR2(50),
record_id VARCHAR2(50),
column_name VARCHAR2(50),
old_value VARCHAR2(4000),
new_value VARCHAR2(4000),
action VARCHAR2(20),
batch_number NUMBER,
log_timestamp TIMESTAMP DEFAULT SYSTIMESTAMP
)';
-- Step 2: Get the table's important column names
SELECT last_updated_column, create_date, primary_key, merge_stmt_1, merge_stmt_2
INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
FROM tmp_date_col_mapper
WHERE table_name = cur_tables;
-- Step 3: Get the total number of rows to process
sql_stmt := 'SELECT COUNT(*) FROM cr_5_0_x.' || cur_tables;
EXECUTE IMMEDIATE sql_stmt INTO total_rows;
-- Batch processing loop
batch_number := l_batch_no;
WHILE batch_start <= total_rows LOOP
batch_end := batch_start + batch_size - 1;
IF batch_end > total_rows THEN
batch_end := total_rows;
END IF;
-- Step 4: Construct the merge statement dynamically for each batch
merge_stmt := ip_merge_stmt_1 || ' '
|| ' TO CLOB(''WHERE rn BETWEEN ' || batch_start || ' AND ' || batch_end || ') '
|| ' TO CLOB('') '
|| ' TO CLOB('') '
|| ip_merge_stmt_2 || ' '
|| ' RETURNING ' || ip_tbl_primary_col || ' BULK COLLECT INTO v_inserted_rows, v_updated_rows';
-- Step 5: Execute the merge statement
BEGIN
EXECUTE IMMEDIATE merge_stmt;
-- Log inserted rows
FOR i IN 1..v_inserted_rows.COUNT LOOP
INSERT INTO CR_ARCHIVE_REGISTRY.merge_log (
table_name, record_id, column_name, old_value, new_value, action, batch_number
)
SELECT cur_tables, v_inserted_rows(i).rn, 'INSERT', NULL, NULL, 'INSERT', batch_number
FROM DUAL;
END LOOP;
-- Log updated rows
FOR i IN 1..v_updated_rows.COUNT LOOP
INSERT INTO CR_ARCHIVE_REGISTRY.merge_log (
table_name, record_id, column_name, old_value, new_value, action, batch_number
)
SELECT cur_tables, v_updated_rows(i).rn, 'UPDATE', NULL, NULL, 'UPDATE', batch_number
FROM DUAL;
END LOOP;
-- Output progress
dbms_output.put_line('For Table: ' || cur_tables || ' with Batch from ' || batch_start || ' - ' || batch_end);
dbms_output.put_line('Rows affected: ' || (v_inserted_rows.COUNT + v_updated_rows.COUNT));
-- Step 6: Reconciliation step for inserts and updates
sql_stmt := 'SELECT COUNT(*) FROM cr_5_0_x.' || cur_tables || ' WHERE rn BETWEEN ' || batch_start || ' AND ' || batch_end
|| ' AND (SYSDATE-COALESCE(AUDIT_LAST_UPDATE_DATE, AUDIT_CREATE_DATE, SYSDATE)) = (SYSDATE-COALESCE(AUDIT_LAST_UPDATE_DATE, AUDIT_CREATE_DATE, SYSDATE))';
EXECUTE IMMEDIATE sql_stmt INTO total_rows;
dbms_output.put_line('Reconciliation - Rows updated in source table: ' || total_rows);
sql_stmt := 'SELECT COUNT(*) FROM cr_archive.' || cur_tables || ' WHERE rn BETWEEN ' || batch_start || ' AND ' || batch_end
|| ' AND AUDIT_CREATE_DATE IS NOT NULL';
EXECUTE IMMEDIATE sql_stmt INTO total_rows;
dbms_output.put_line('Reconciliation - Rows inserted in target table: ' || total_rows);
-- Step 7: Log discrepancies if counts don't match
IF v_inserted_rows.COUNT + v_updated_rows.COUNT != total_rows THEN
INSERT INTO CR_ARCHIVE_REGISTRY.merge_log (
table_name, record_id, column_name, old_value, new_value, action, batch_number
)
VALUES (cur_tables, NULL, 'COUNT_MISMATCH', v_inserted_rows.COUNT + v_updated_rows.COUNT, total_rows, 'DISCREPANCY', batch_number);
END IF;
-- Commit after each batch
COMMIT;
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line('Error in processing tables: ' || cur_tables || ' in batch');
dbms_output.put_line('Error raised in: ' || $$plsql_unit || ' at line ' || $$plsql_line || ' - ' || sqlerrm);
dbms_output.put_line('ERROR_STACK: ' || DBMS_UTILITY.FORMAT_ERROR_STACK);
dbms_output.put_line('ERROR_BACKTRACE: ' || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE);
-- Rollback the current batch, insert the error log, and continue with the next table
ROLLBACK;
l_error_message := dbms_utility.format_error_backtrace || ' ' || dbms_utility.format_error_stack;
INSERT INTO sp_ael_arch_error_log (sp_name, cur_table, batch_number, batch_start, l_error_message)
VALUES ($$plsql_unit, cur_tables, batch_number, batch_start, l_error_message);
COMMIT;
RAISE;
END;
-- Step 8: Move to the next batch
batch_start := batch_end + 1;
batch_number := batch_number + 1;
END LOOP;
-- Final commit
COMMIT;
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line('Error in processing tables: ' || cur_tables || ' in batch');
dbms_output.put_line('Error raised in: ' || $$plsql_unit || ' at line ' || $$plsql_line || ' - ' || sqlerrm);
dbms_output.put_line('ERROR_STACK: ' || DBMS_UTILITY.FORMAT_ERROR_STACK);
dbms_output.put_line('ERROR_BACKTRACE: ' || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE);
ROLLBACK;
RAISE;
END;
/