Monday, May 5, 2025

Testing



CREATE OR REPLACE PROCEDURE CR_ARCHIVE_REGISTRY.COMPARE_TABLES(cur_tables IN VARCHAR2, l_batch_no IN NUMBER)
AS
    l_prod_schema VARCHAR2(30) := 'CR_5_0_X';
    l_arch_schema VARCHAR2(30) := 'CR_ARCHIVE';
    l_error_message VARCHAR2(3000);

    -- Variables for table names and statements
    sql_stmt VARCHAR2(4000);
    merge_stmt CLOB;
    ip_merge_stmt_1 CLOB;
    ip_merge_stmt_2 CLOB;

    ip_tbl_last_update_col VARCHAR2(1000);
    ip_tbl_create_date_col VARCHAR2(1000);
    ip_tbl_primary_col VARCHAR2(1000);

    -- Record to store affected primary keys (for logging)
    TYPE rec_table IS RECORD (
        rn NUMBER
    );
    TYPE t_rec_table IS TABLE OF rec_table;
    v_inserted_rows t_rec_table;
    v_updated_rows t_rec_table;

    batch_size NUMBER := 500000;
    total_rows NUMBER := 0;
    batch_start NUMBER := 1;
    batch_end NUMBER;
    batch_number NUMBER;

BEGIN
    -- Step 1: Create or ensure the logging table exists
    EXECUTE IMMEDIATE '
        CREATE TABLE IF NOT EXISTS CR_ARCHIVE_REGISTRY.merge_log (
            log_id NUMBER GENERATED ALWAYS AS IDENTITY,
            table_name VARCHAR2(50),
            record_id VARCHAR2(50),
            column_name VARCHAR2(50),
            old_value VARCHAR2(4000),
            new_value VARCHAR2(4000),
            action VARCHAR2(20),
            batch_number NUMBER,
            log_timestamp TIMESTAMP DEFAULT SYSTIMESTAMP
        )';

    -- Step 2: Get the table's important column names
    SELECT last_updated_column, create_date, primary_key, merge_stmt_1, merge_stmt_2
    INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
    FROM tmp_date_col_mapper
    WHERE table_name = cur_tables;

    -- Step 3: Get the total number of rows to process
    sql_stmt := 'SELECT COUNT(*) FROM cr_5_0_x.' || cur_tables;
    EXECUTE IMMEDIATE sql_stmt INTO total_rows;

    -- Batch processing loop
    batch_number := l_batch_no;
    WHILE batch_start <= total_rows LOOP
        batch_end := batch_start + batch_size - 1;
        IF batch_end > total_rows THEN
            batch_end := total_rows;
        END IF;

        -- Step 4: Construct the merge statement dynamically for each batch
        merge_stmt := ip_merge_stmt_1 || ' '
                   || ' TO CLOB(''WHERE rn BETWEEN ' || batch_start || ' AND ' || batch_end || ') '
                   || ' TO CLOB('') '
                   || ' TO CLOB('') '
                   || ip_merge_stmt_2 || ' '
                   || ' RETURNING ' || ip_tbl_primary_col || ' BULK COLLECT INTO v_inserted_rows, v_updated_rows';

        -- Step 5: Execute the merge statement
        BEGIN
            EXECUTE IMMEDIATE merge_stmt;

            -- Log inserted rows
            FOR i IN 1..v_inserted_rows.COUNT LOOP
                INSERT INTO CR_ARCHIVE_REGISTRY.merge_log (
                    table_name, record_id, column_name, old_value, new_value, action, batch_number
                )
                SELECT cur_tables, v_inserted_rows(i).rn, 'INSERT', NULL, NULL, 'INSERT', batch_number
                FROM DUAL;
            END LOOP;

            -- Log updated rows
            FOR i IN 1..v_updated_rows.COUNT LOOP
                INSERT INTO CR_ARCHIVE_REGISTRY.merge_log (
                    table_name, record_id, column_name, old_value, new_value, action, batch_number
                )
                SELECT cur_tables, v_updated_rows(i).rn, 'UPDATE', NULL, NULL, 'UPDATE', batch_number
                FROM DUAL;
            END LOOP;

            -- Output progress
            dbms_output.put_line('For Table: ' || cur_tables || ' with Batch from ' || batch_start || ' - ' || batch_end);
            dbms_output.put_line('Rows affected: ' || (v_inserted_rows.COUNT + v_updated_rows.COUNT));

            -- Step 6: Reconciliation step for inserts and updates
            sql_stmt := 'SELECT COUNT(*) FROM cr_5_0_x.' || cur_tables || ' WHERE rn BETWEEN ' || batch_start || ' AND ' || batch_end
                     || ' AND (SYSDATE-COALESCE(AUDIT_LAST_UPDATE_DATE, AUDIT_CREATE_DATE, SYSDATE)) = (SYSDATE-COALESCE(AUDIT_LAST_UPDATE_DATE, AUDIT_CREATE_DATE, SYSDATE))';
            EXECUTE IMMEDIATE sql_stmt INTO total_rows;
            dbms_output.put_line('Reconciliation - Rows updated in source table: ' || total_rows);

            sql_stmt := 'SELECT COUNT(*) FROM cr_archive.' || cur_tables || ' WHERE rn BETWEEN ' || batch_start || ' AND ' || batch_end
                     || ' AND AUDIT_CREATE_DATE IS NOT NULL';
            EXECUTE IMMEDIATE sql_stmt INTO total_rows;
            dbms_output.put_line('Reconciliation - Rows inserted in target table: ' || total_rows);

            -- Step 7: Log discrepancies if counts don't match
            IF v_inserted_rows.COUNT + v_updated_rows.COUNT != total_rows THEN
                INSERT INTO CR_ARCHIVE_REGISTRY.merge_log (
                    table_name, record_id, column_name, old_value, new_value, action, batch_number
                )
                VALUES (cur_tables, NULL, 'COUNT_MISMATCH', v_inserted_rows.COUNT + v_updated_rows.COUNT, total_rows, 'DISCREPANCY', batch_number);
            END IF;

            -- Commit after each batch
            COMMIT;

        EXCEPTION
            WHEN OTHERS THEN
                dbms_output.put_line('Error in processing tables: ' || cur_tables || ' in batch');
                dbms_output.put_line('Error raised in: ' || $$plsql_unit || ' at line ' || $$plsql_line || ' - ' || sqlerrm);
                dbms_output.put_line('ERROR_STACK: ' || DBMS_UTILITY.FORMAT_ERROR_STACK);
                dbms_output.put_line('ERROR_BACKTRACE: ' || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE);

                -- Rollback the current batch, insert the error log, and continue with the next table
                ROLLBACK;
                l_error_message := dbms_utility.format_error_backtrace || ' ' || dbms_utility.format_error_stack;
                INSERT INTO sp_ael_arch_error_log (sp_name, cur_table, batch_number, batch_start, l_error_message)
                VALUES ($$plsql_unit, cur_tables, batch_number, batch_start, l_error_message);
                COMMIT;
                RAISE;
        END;

        -- Step 8: Move to the next batch
        batch_start := batch_end + 1;
        batch_number := batch_number + 1;
    END LOOP;

    -- Final commit
    COMMIT;

EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error in processing tables: ' || cur_tables || ' in batch');
        dbms_output.put_line('Error raised in: ' || $$plsql_unit || ' at line ' || $$plsql_line || ' - ' || sqlerrm);
        dbms_output.put_line('ERROR_STACK: ' || DBMS_UTILITY.FORMAT_ERROR_STACK);
        dbms_output.put_line('ERROR_BACKTRACE: ' || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE);
        ROLLBACK;
        RAISE;
END;
/

Check out our other content

Check out other tags:

Most Popular Articles