PromptBase
Upgrade
Close icon
General
Home
Marketplace
Create
Hire
Login
Chat
Sell
Explore

Prompt Details

Model
(claude-4-6-sonnet)
Token size
2,815
Example input
[METADATA_INPUT]: Pipeline_ID | Source_System | Source_Table | Target_Table | Load_Mode | Schedule | Active P001 | CRM_DB | customers | dim_customer | full | 02:00 daily | Y P002 | ERP_DB | orders | fact_orders | append | 01:00 daily | Y P003 | API_Salesforce| opportunities | stg_opportunities | upsert | NULL | Y P004 | CRM_DB | contacts | dim_contact | full | 02:00 daily | N [BUSINESS_CONTEXT]: "Finance and sales reporting, dashboards live by 7am" [COMPLIANCE_REQUIREMENTS]: "GDPR — customer PII in CRM_DB" [STALENESS_THRESHOLD]: "60 days"
Example output
Running the prompt against your test data now. ```json { "metadata_ingestion_summary": { "pipelines_identified": 4, "field_categories": { "source_fields": "Partial — source system and table present, no connection alias, no schema prefix", "target_fields": "Partial — target table present, no target schema or system defined", "transformation_fields": "Missing — no transformation reference, dbt model, or SP name on any pipeline", "orchestration_fields": "Partial — schedule present on 3 of 4 rows; P003 schedule is NULL; no dependency, priority, or parallelism fields", "control_fields": "Partial — active flag present; no environment flag, no owner, no version/checksum", "quality_fields": "Missing — no row count check, null threshold, reconciliation flag, or data classification on any row", "audit_fields": "Missing — no run_id, start_time, end_time, status, error_message, or rows_processed" }, "immediate_critical_flags": [ "P003 schedule is NULL — pipeline is active but has no defined run trigger", "GDPR applies to CRM_DB (P001, P004) — no data_classification field exists to enforce PII controls", "Audit fields entirely absent — no runtime trace for 4 pipelines feeding live finance dashboards", "No environment flag — same config rows could be deployed to dev and prod without distinction", "P004 is inactive (Active=N) but still in the estate — no metadata distinguishes temporary disable from permanent decommission" ] }, "pipeline_risk_ratings": [ { "pipeline_id_or_name": "P001 — CRM_DB.customers → dim_customer", "overall_risk": "Critical", "extraction_risk": "High", "transformation_risk": "Critical", "loading_risk": "High", "orchestration_risk": "Medium", "key_issues": [ "Full-refresh load with no backup flag — truncates dim_customer with zero recovery path if source is bad", "No transformation reference — orchestrator cannot derive what logic runs between source and target", "GDPR: customers table contains PII; no data_classification field, no masking or retention control derivable from metadata", "No schema prefix on source or target — CRM_DB.customers is ambiguous in multi-schema environments", "No reconciliation flag — no post-load row count validation defined for a dimension that feeds live dashboards" ] }, { "pipeline_id_or_name": "P002 — ERP_DB.orders → fact_orders", "overall_risk": "High", "extraction_risk": "High", "transformation_risk": "Critical", "loading_risk": "High", "orchestration_risk": "High", "key_issues": [ "Append load mode with no watermark column or CDC flag — no incremental strategy defined, risk of full re-extract or duplicate rows on retry", "No transformation reference — fact table load logic is entirely invisible in metadata", "No parallelism limit — if ERP feed retries, fact_orders could receive duplicate appends from concurrent runs", "No dependency field — P002 runs at 01:00, P001 at 02:00; if fact_orders joins dim_customer, load order creates referential gap during the 01:00–02:00 window", "No upsert key columns defined — append without dedup key means no mechanism to correct duplicate rows post-load" ] }, { "pipeline_id_or_name": "P003 — API_Salesforce.opportunities → stg_opportunities", "overall_risk": "Critical", "extraction_risk": "High", "transformation_risk": "Critical", "loading_risk": "High", "orchestration_risk": "Critical", "key_issues": [ "Schedule is NULL — pipeline is marked Active=Y but has no run trigger; it will never execute or execute only on manual intervention", "Upsert load mode with no conflict resolution key defined — orchestrator cannot perform upsert without knowing which column(s) identify a record", "No transformation reference", "API source has no schema drift handling — Salesforce API field changes will hit stg_opportunities with no alert or fail action defined", "No SLA or dependency defined — if this feeds sales reporting, a NULL schedule means dashboard gap with no detection mechanism" ] }, { "pipeline_id_or_name": "P004 — CRM_DB.contacts → dim_contact", "overall_risk": "High", "extraction_risk": "Medium", "transformation_risk": "Critical", "loading_risk": "High", "orchestration_risk": "Low", "key_issues": [ "Active=N with no decommission flag or disable_reason field — cannot distinguish 'temporarily paused' from 'retired pipeline'", "Full-refresh with no backup flag — same risk as P001 if re-enabled", "GDPR: contacts table is PII; same data_classification gap as P001", "No transformation reference", "No owner field — if P004 needs to be re-enabled, there is no accountable party identified in metadata" ] } ], "metadata_schema_gaps": [ { "missing_field": "transformation_reference", "severity": "Critical", "impact": "Orchestrator cannot derive what logic to execute between extraction and load. All 4 pipelines have unknown transformation behavior. This is not a metadata gap — it makes the framework non-metadata-driven for its most complex step.", "recommended_field_definition": "transformation_ref VARCHAR(255) — dbt model name, stored procedure name, or script path. NULL = pass-through (raw copy, no transformation)." }, { "missing_field": "upsert_key_columns", "severity": "Critical", "impact": "P003 is load_mode=upsert with no key defined. Upsert without a conflict key is operationally undefined — the orchestrator must hardcode the key or the load will fail/duplicate.", "recommended_field_definition": "upsert_key_columns VARCHAR(500) — comma-separated list of columns forming the upsert natural key. Required when load_mode = upsert or merge." }, { "missing_field": "data_classification", "severity": "Critical", "impact": "GDPR applies to CRM_DB (P001, P004). Without a data_classification field, PII masking, retention enforcement, and access controls cannot be applied dynamically. Compliance controls must be hardcoded per pipeline — defeating metadata-driven governance.", "recommended_field_definition": "data_classification VARCHAR(50) — values: PUBLIC, INTERNAL, CONFIDENTIAL, PII, PII_SENSITIVE. Drives masking rules, retention policy, and access tier." }, { "missing_field": "audit_fields (run_id, start_time, end_time, status, error_message, rows_processed)", "severity": "Critical", "impact": "No runtime trace exists for any pipeline. For finance dashboards with a 7am SLA, failure detection relies on the orchestrator alone — no metadata-level audit trail for investigation or SLA breach analysis.", "recommended_field_definition": "Separate pipeline_run_log table: run_id UUID, pipeline_id FK, start_time TIMESTAMP, end_time TIMESTAMP, status VARCHAR(20), rows_extracted INT, rows_loaded INT, error_message TEXT, triggered_by VARCHAR(100)." }, { "missing_field": "environment_flag", "severity": "Critical", "impact": "Same config rows are indistinguishable between dev, test, and prod. A config change tested in dev can be promoted to prod with no metadata barrier.", "recommended_field_definition": "environment VARCHAR(10) — values: DEV, TEST, UAT, PROD. Orchestrator filters by environment at runtime. Default: PROD for existing rows." }, { "missing_field": "incremental_strategy / watermark_column", "severity": "High", "impact": "P002 uses append with no defined incremental strategy. Without a watermark or CDC flag, extraction scope is undefined — full extract on every run or silent gap depending on orchestrator defaults.", "recommended_field_definition": "incremental_strategy VARCHAR(20) — values: FULL_REFRESH, WATERMARK, CDC, API_CURSOR. watermark_column VARCHAR(100) — the column used for incremental filtering when strategy = WATERMARK." }, { "missing_field": "backup_before_load flag", "severity": "High", "impact": "P001 and P004 use full-refresh (truncate-load). No backup flag means truncation is irreversible if source data is corrupt or incomplete. Finance dimension tables are recovery-critical.", "recommended_field_definition": "backup_before_load BOOLEAN DEFAULT FALSE — when TRUE and load_mode = full, orchestrator snapshots target before truncate. Required TRUE for all dimension tables in finance context." }, { "missing_field": "dependency_pipeline_ids", "severity": "High", "impact": "P002 (fact_orders, 01:00) likely depends on P001 (dim_customer, 02:00) for referential integrity. No dependency is declared — the orchestrator cannot enforce load order, creating a 60-minute window where facts load against a stale or mid-refresh dimension.", "recommended_field_definition": "depends_on_pipeline_ids VARCHAR(500) — comma-separated pipeline_ids that must complete before this pipeline runs." }, { "missing_field": "parallelism_limit", "severity": "High", "impact": "No pipeline defines whether it can run concurrently with itself. On retry, P002 (append) could produce duplicate fact rows with no protection.", "recommended_field_definition": "max_concurrent_runs SMALLINT DEFAULT 1 — maximum simultaneous executions of this pipeline. 1 = no concurrency allowed (recommended default for append and full-refresh modes)." }, { "missing_field": "owner / team", "severity": "High", "impact": "No accountability field on any pipeline. P004 is disabled with no owner on record. Pipeline failures at 3am have no defined escalation path from metadata.", "recommended_field_definition": "owner_email VARCHAR(255), owner_team VARCHAR(100). Used for alerting and escalation routing." }, { "missing_field": "schema_drift_action", "severity": "High", "impact": "P003 extracts from Salesforce API. API schema changes are common. No drift action defined — orchestrator behavior on unexpected field is unknown (silent drop, load failure, or corrupt row).", "recommended_field_definition": "schema_drift_action VARCHAR(20) — values: FAIL, ALERT_CONTINUE, IGNORE. Required for API and external system sources." }, { "missing_field": "sla_deadline", "severity": "High", "impact": "Business context states dashboards must be live by 7am. No SLA field exists — breaches are discovered by analysts, not detected by the pipeline estate.", "recommended_field_definition": "sla_deadline TIME — latest acceptable completion time. Orchestrator triggers alert if pipeline end_time exceeds sla_deadline." }, { "missing_field": "disable_reason / decommission_flag", "severity": "Medium", "impact": "P004 Active=N with no context. No way to distinguish temporary pause, bug hold, or permanent retirement. Risk of accidental re-enable.", "recommended_field_definition": "disable_reason VARCHAR(255), decommissioned BOOLEAN DEFAULT FALSE, decommission_date DATE." }, { "missing_field": "reconciliation_required", "severity": "Medium", "impact": "No pipeline declares whether a post-load row count or checksum validation is required. For finance reporting, reconciliation should be opt-out, not opt-in.", "recommended_field_definition": "reconciliation_required BOOLEAN DEFAULT TRUE, reconciliation_type VARCHAR(20) — values: ROW_COUNT, CHECKSUM, BUSINESS_RULE." }, { "missing_field": "source_schema / target_schema", "severity": "Medium", "impact": "Source and target references lack schema prefixes. In multi-schema databases, table names alone are ambiguous and connection-dependent.", "recommended_field_definition": "source_schema VARCHAR(100), target_schema VARCHAR(100) — explicit schema prefix separate from table name." }, { "missing_field": "retry_count / retry_interval", "severity": "Medium", "impact": "Retry behavior is entirely absent from metadata. Retry logic is hardcoded in the orchestrator or absent — inconsistent behavior across pipelines.", "recommended_field_definition": "retry_count SMALLINT DEFAULT 3, retry_interval_minutes SMALLINT DEFAULT 5." }, { "missing_field": "version / last_modified / modified_by", "severity": "Medium", "impact": "No version tracking. Unauthorized or accidental config changes are undetectable. With GDPR scope, config change audit is a compliance requirement.", "recommended_field_definition": "config_version SMALLINT DEFAULT 1, last_modified TIMESTAMP, modified_by VARCHAR(100)." } ], "automation_completeness": { "decisions_hardcoded_outside_metadata": [ "Transformation logic — what runs between extract and load is unknown from metadata alone", "Upsert conflict resolution key for P003 — orchestrator must hardcode or fail", "Incremental extraction scope for P002 — watermark column unknown", "Retry behavior — count and interval not in metadata", "Backup behavior before full-refresh — not derivable from metadata", "Pipeline execution order — no dependency graph in metadata", "PII masking and GDPR controls — no data_classification to drive dynamic rules", "SLA monitoring — no deadline field for alerting logic", "Environment targeting — all rows treated as prod by default", "Parallelism enforcement — concurrency limits not defined" ], "metadata_driven_coverage": "~28% of pipeline decisions are derivable from current metadata (source system, target table, load mode, schedule, active flag). The remaining ~72% require orchestrator hardcoding or tribal knowledge." }, "prioritized_fix_plan": [ { "priority": 1, "fix": "Add upsert_key_columns to P003 immediately", "type": "Config correction", "impact": "P003 cannot execute a valid upsert without a conflict key. This is a blocker for any run of this pipeline.", "effort": "Low", "implementation_approach": "Identify the natural key for opportunities (likely opportunity_id or external_id). Add upsert_key_columns field to the metadata table, populate P003. Add NOT NULL constraint when load_mode = upsert." }, { "priority": 2, "fix": "Resolve P003 NULL schedule", "type": "Config correction", "impact": "Active pipeline with no trigger. Either define a schedule/dependency trigger or set Active=N with a disable_reason until trigger is agreed.", "effort": "Low", "implementation_approach": "Determine if P003 should be event-triggered (e.g., after Salesforce sync) or time-scheduled. Add schedule or dependency_pipeline_ids. If trigger is unknown, set Active=N with disable_reason='schedule_pending'." }, { "priority": 3, "fix": "Add data_classification field and classify P001, P004 as PII", "type": "Schema addition", "impact": "GDPR compliance requires PII identification at the pipeline metadata level. Without it, masking and retention controls cannot be applied dynamically.", "effort": "Medium", "implementation_approach": "ALTER TABLE to add data_classification VARCHAR(50). Set P001=PII, P004=PII, P002=INTERNAL, P003=CONFIDENTIAL. Define downstream masking rules keyed to this field in the orchestration layer." }, { "priority": 4, "fix": "Add backup_before_load flag; set TRUE for P001 and P004", "type": "Schema addition", "impact": "Full-refresh pipelines truncating finance dimensions with no backup is a data loss risk for dashboards with a 7am SLA.", "effort": "Medium", "implementation_approach": "Add backup_before_load BOOLEAN DEFAULT FALSE. Set TRUE for all full-refresh dimension pipelines. Implement pre-truncate snapshot in orchestrator keyed to this flag." }, { "priority": 5, "fix": "Add dependency_pipeline_ids; declare P002 dependency on P001", "type": "Schema addition", "impact": "Eliminates the referential integrity gap where fact_orders loads at 01:00 against a dim_customer that does not refresh until 02:00.", "effort": "Medium", "implementation_approach": "Add depends_on_pipeline_ids VARCHAR(500). Set P002.depends_on_pipeline_ids = 'P001'. Orchestrator waits for P001 completion before triggering P002. Reconsider P002 schedule — may need to shift to post-02:00." }, { "priority": 6, "fix": "Create pipeline_run_log audit table", "type": "Schema addition", "impact": "No runtime audit trail for any pipeline feeding live finance dashboards. SLA breach investigation requires this as a foundation.", "effort": "Medium", "implementation_approach": "Create pipeline_run_log(run_id UUID PK, pipeline_id FK, start_time TIMESTAMP, end_time TIMESTAMP, status VARCHAR(20), rows_extracted INT, rows_loaded INT, error_message TEXT, triggered_by VARCHAR(100)). Orchestrator writes one row per execution." }, { "priority": 7, "fix": "Add sla_deadline field; set 07:00 for P001, P002, P003", "type": "Schema addition", "impact": "Dashboard SLA of 7am is currently invisible to the pipeline estate. Breaches are discovered by analysts.", "effort": "Low", "implementation_approach": "Add sla_deadline TIME. Set P001, P002, P003 = '07:00:00'. Orchestrator compares end_time from run_log against sla_deadline and triggers alert channel." }, { "priority": 8, "fix": "Add transformation_reference field to all pipelines", "type": "Schema addition", "impact": "Core metadata-driven principle: orchestrator must be able to derive what transformation runs from metadata. Currently 0% coverage.", "effort": "High", "implementation_approach": "Add transformation_ref VARCHAR(255) NULL (NULL = raw pass-through, no transform). Work with pipeline owners to document the actual transformation logic reference for each pipeline. This is an audit and documentation exercise as much as a schema change." }, { "priority": 9, "fix": "Add environment_flag to all rows; default existing rows to PROD", "type": "Schema addition", "impact": "Prevents dev/test config from being applied to prod pipelines. Required before any dev or UAT instances of this metadata table exist.", "effort": "Low", "implementation_approach": "Add environment VARCHAR(10) DEFAULT 'PROD'. UPDATE existing rows SET environment='PROD'. Orchestrator filters WHERE environment = {runtime_env} on startup." }, { "priority": 10, "fix": "Add owner_email and owner_team to all pipelines", "type": "Schema addition", "impact": "3am failure on a finance pipeline currently has no defined escalation path from metadata.", "effort": "Low", "implementation_approach": "Add owner_email VARCHAR(255), owner_team VARCHAR(100). Populate from pipeline ownership records. Wire to alerting system for failure and SLA breach notifications." }, { "priority": 11, "fix": "Add incremental_strategy and watermark_column; define P002 strategy", "type": "Schema addition", "impact": "P002 append load with undefined extraction scope risks full re-extract or duplicate rows on every run.", "effort": "Medium", "implementation_approach": "Add incremental_strategy VARCHAR(20), watermark_column VARCHAR(100). Set P002.incremental_strategy='WATERMARK', P002.watermark_column='order_date' or 'updated_at' (confirm with source team). P001, P004 = 'FULL_REFRESH'. P003 = 'API_CURSOR'." }, { "priority": 12, "fix": "Add max_concurrent_runs; default to 1 for append and full-refresh pipelines", "type": "Schema addition", "impact": "Protects P002 (append) from duplicate rows on retry. Protects P001, P004 (full-refresh) from overlapping truncate-load cycles.", "effort": "Low", "implementation_approach": "Add max_concurrent_runs SMALLINT DEFAULT 1. All 4 pipelines should default to 1. Orchestrator enforces this limit before triggering a new run." } ], "what_is_working_well": [ "Active flag is present — pipeline enablement/disablement is controllable at the metadata level (though needs disable_reason to be useful)", "Load mode is defined for all 4 pipelines — the metadata does communicate the target loading strategy, even if some modes lack supporting fields (upsert key, incremental strategy)", "Source system and target table are consistently present — the basic routing of data is readable from metadata", "Schedule is defined for 3 of 4 pipelines — time-based triggering is partially covered", "The estate is small (4 pipelines, per your note) — schema additions can be applied cleanly before scale makes retrofitting expensive" ], "recommended_schema_additions": " -- Priority 1-3 additions (blockers and compliance) ALTER TABLE etl_pipeline_config ADD COLUMN upsert_key_columns VARCHAR(500) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN data_classification VARCHAR(50) NOT NULL DEFAULT 'INTERNAL'; ALTER TABLE etl_pipeline_config ADD COLUMN environment VARCHAR(10) NOT NULL DEFAULT 'PROD'; ALTER TABLE etl_pipeline_config ADD COLUMN backup_before_load BOOLEAN NOT NULL DEFAULT FALSE; ALTER TABLE etl_pipeline_config ADD COLUMN disable_reason VARCHAR(255) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN decommissioned BOOLEAN NOT NULL DEFAULT FALSE; -- Priority 4-7 additions (operational reliability) ALTER TABLE etl_pipeline_config ADD COLUMN depends_on_pipeline_ids VARCHAR(500) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN sla_deadline TIME NULL; ALTER TABLE etl_pipeline_config ADD COLUMN owner_email VARCHAR(255) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN owner_team VARCHAR(100) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN max_concurrent_runs SMALLINT NOT NULL DEFAULT 1; ALTER TABLE etl_pipeline_config ADD COLUMN retry_count SMALLINT NOT NULL DEFAULT 3; ALTER TABLE etl_pipeline_config ADD COLUMN retry_interval_minutes SMALLINT NOT NULL DEFAULT 5; -- Priority 8-11 additions (metadata completeness) ALTER TABLE etl_pipeline_config ADD COLUMN transformation_ref VARCHAR(255) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN incremental_strategy VARCHAR(20) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN watermark_column VARCHAR(100) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN schema_drift_action VARCHAR(20) NULL DEFAULT 'FAIL'; ALTER TABLE etl_pipeline_config ADD COLUMN source_schema VARCHAR(100) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN target_schema VARCHAR(100) NULL; ALTER TABLE etl_pipeline_config ADD COLUMN reconciliation_required BOOLEAN NOT NULL DEFAULT TRUE; ALTER TABLE etl_pipeline_config ADD COLUMN reconciliation_type VARCHAR(20) NULL DEFAULT 'ROW_COUNT'; ALTER TABLE etl_pipeline_config ADD COLUMN config_version SMALLINT NOT NULL DEFAULT 1; ALTER TABLE etl_pipeline_config ADD COLUMN last_modified TIMESTAMP NULL; ALTER TABLE etl_pipeline_config ADD COLUMN modified_by VARCHAR(100) NULL; -- Audit log table (separate, written at runtime) CREATE TABLE pipeline_run_log ( run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), pipeline_id VARCHAR(50) NOT NULL REFERENCES etl_pipeline_config(pipeline_id), environment VARCHAR(10) NOT NULL, start_time TIMESTAMP NOT NULL, end_time TIMESTAMP NULL, status VARCHAR(20) NOT NULL, -- RUNNING, SUCCESS, FAILED, SKIPPED rows_extracted INT NULL, rows_loaded INT NULL, error_message TEXT NULL, triggered_by VARCHAR(100) NULL, sla_breached BOOLEAN NOT NULL DEFAULT FALSE ); " } ``` --- **Quick summary for your review:** Metadata-driven coverage is at roughly 28% right now — the schema defines *what* moves data but almost nothing about *how*, *safely*, or *accountably*. Three things need attention before anything else: 1. **P003 is broken as-is** — active, no schedule, no upsert key. It will never run, and if it does it can't complete the load. 2. **GDPR exposure is real** — no data_classification field means CRM PII (P001, P004) has zero metadata-level governance hook. 3. **The 7am SLA is invisible to the estate** — no audit log, no SLA field, no dependency ordering between P001 and P002. A failure at 01:30 won't be detected until analysts arrive at 7. The DDL at the bottom is copy-paste ready. Priority 1–3 additions are the ones to ship first. الله يباركلك 🎯
🌀 Claude

Etl Architecture Auditors

Add to Cart
Instant accessInstant access
Usage rightsCommercial use
Money-back guaranteeMoney‑back
By purchasing this prompt, you agree to our terms of service
CLAUDE-4-6-SONNET
Tested icon
Guide icon
4 examples icon
Free credits icon
✅ Ingests ETL metadata tables in any format: SQL rows, JSON, CSV, or plain description ✅ Audits the metadata design itself — not just the pipelines it describes ✅ Identifies hardcoded values, missing control fields, and automation gaps ✅ Flags pipelines with conflicting or incomplete metadata before they fail in production ✅ Delivers per-pipeline risk ratings and a metadata schema improvement plan
...more
Updated 1 week ago
Report
Browse Marketplace