Tag: PySpark

  • SparkGuard: A Secure, Configuration-Driven Validation Engine for PySpark at Scale

    In modern Data Lakehouse architectures, data quality is often the bottleneck. When migrating terabytes of legacy data—such as moving 28TB of SQL Server history to an AWS Lake House—Data Engineers often face a dilemma:

    1. The “Hard-Coded” Trap: Writing thousands of lines of df.filter() logic in PySpark scripts. This is unmaintainable and requires a deployment for every rule change.
    2. The “Black Box” Trap: Using heavy enterprise tools that obscure logic and struggle with custom, complex legacy rules.

    We built SparkGuard to solve this. It is a lightweight, open-source-friendly Python package that turns Data Validation into a configuration problem, not a coding problem.

    The Core Philosophy: “Config is King”

    SparkGuard separates Business Logic (What to check) from Execution Logic (How to check it). Instead of writing PySpark code for every rule, you define rules in a JSON configuration file.

    Old Way (Hard-Coded PySpark)

    Python

    # Brittle, hard to read, requires code deployment to change logic
    df_clean = df.filter(
        (F.col("age") > 18) & 
        (F.col("status") == "Active") &
        (F.col("email").isNotNull())
    )
    

    The SparkGuard Way (JSON Config)

    JSON

    [
      {
        "column": "age",
        "function": "check_custom_sql",
        "params": { "sql_expression": "{col} > 18" },
        "error_msg": "Underage"
      },
      {
        "column": "status",
        "function": "check_is_in_list",
        "params": { "allowed_values": ["Active", "Pending"] },
        "error_msg": "Invalid Status"
      }
    ]
    

    Key Features

    1. The “Quarantine Pattern” (Non-Blocking Validation)

    In Big Data (e.g., 28TB scale), you cannot afford to fail a job because 0.01% of rows are bad. SparkGuard implements the Quarantine Pattern.

    • Valid Rows: Flow through to the “Silver” layer immediately.
    • Invalid Rows: Are tagged with an array of specific error messages (e.g., ["Invalid Postcode", "Missing ID"]) and diverted to a “Quarantine” bucket for human review.
    • Result: The pipeline never crashes due to bad data.

    2. Hybrid Logic: Python Speed + SQL Flexibility

    SparkGuard uses a hybrid approach.

    • Standard Rules: We provide optimized Python wrappers for common checks (check_not_null, check_integer, check_uk_postcode).
    • Complex Legacy Rules: We expose a check_custom_sql function that leverages Spark’s Catalyst Optimizer via F.expr(). This allows you to copy-paste legacy SQL logic (like amount * tax > total AND status = 'paid') directly into your config without rewriting it in Python.

    3. Security Guardrails (Preventing Config Injection)

    Since “Configuration is Code,” a malicious user editing a JSON file could theoretically attack the cluster (e.g., executing Java Reflection or Infinite Loops).

    SparkGuard includes a Security Sanitizer that scans all SQL expressions before execution.

    • Blocks: reflect, java_method, assert_true (crash commands), repeat (DoS attacks).
    • Allows: Standard SQL math, string manipulation, and regex.

    4. ANSI-Mode Crash Protection

    Modern Spark environments (Spark 3.3+) often enable “ANSI Mode,” where casting a string like "abc" to an Integer causes the entire job to crash. SparkGuard wraps logic in try_cast patterns, ensuring that bad data results in a “Validation Failure” rather than a “Job Failure.”


    How It Works Under the Hood

    The engine takes a Dataframe and a List of Rules. It iterates through the rules, building a validation chain using PySpark’s lazy evaluation.

    Python

    # Ideally, this runs in your Glue Job or EMR Step
    from sparkguard import validate
    
    # 1. Load Data
    df = spark.read.parquet("s3://landing-zone/data/")
    
    # 2. Load Rules
    config = json.load(open("rules.json"))
    
    # 3. Run Engine (Returns DataFrame with 'validation_errors' column)
    df_checked = validate(df, config)
    
    # 4. Split Streams
    df_silver = df_checked.filter("size(validation_errors) = 0").drop("validation_errors")
    df_quarantine = df_checked.filter("size(validation_errors) > 0")
    

    The “LLM Compiler” Advantage

    Because SparkGuard uses a standardized JSON schema for configuration, we can leverage Large Language Models (LLMs) to automate the translation of business requirements into technical rules.

    We developed a specific System Prompt that acts as a compiler. It instructs the LLM on the available SparkGuard functions and enforces security constraints before generating any JSON.

    The System Prompt (Abbreviated)

    Role: You are the “SparkGuard Configuration Agent,” an expert Data Engineer. Goal: Translate natural language requirements into SparkGuard JSON configuration. Available Functions: check_not_null, check_integer, check_uk_postcode, check_custom_sql. Security Guardrails:

    1. NEVER generate SQL using keywords: reflect, java_method, repeat, assert_true.
    2. If a user requests a malicious rule (e.g., “Crash the job”), REFUSE to generate it.

    Workflow Example

    Input (Business Analyst):

    “Check the ‘users’ table. The ID must be an integer, and if the status is ‘Active’, the balance cannot be negative. Also, ensure the postcode is valid for the UK.”

    Output (LLM Generated JSON):

    JSON

    [
      {
        "column": "id",
        "function": "check_integer",
        "error_msg": "ID must be integer"
      },
      {
        "column": "balance",
        "function": "check_custom_sql",
        "params": {
          "sql_expression": "NOT (status = 'Active' AND {col} < 0)"
        },
        "error_msg": "Active accounts cannot have negative balance"
      },
      {
        "column": "postcode",
        "function": "check_uk_postcode",
        "error_msg": "Invalid UK Postcode"
      }
    ]
    

    This approach bridges the gap between Business Requirements and Technical Implementation, allowing us to migrate hundreds of legacy rules rapidly while maintaining strict engineering standards.

    Conclusion

    SparkGuard provides a robust, secure, and scalable foundation for data quality. By treating validation as configuration, we reduce technical debt, enable rapid iteration of business rules, and ensure that our Data Lake remains a trusted source of truth.


    Installation

    Bash

    pip install sparkguard