Tag: PySpark

  • Hive array operations

    In this article we describe how to add and remove items from array in Hive using PySpark. We are going to use two array functions to accomplish the given task:

    • array_union
    • array_except

    UPDATE – output might contains duplicate rows. Use groupBy with agg to deals with multiple columns. You may need to use the following function:

    • array_distinct
    • flatten
    • collect_set

    Ref: you may find this article useful Generating Synthetic Data Using HIVE SQL

    Problem description

    We have a table which contains a list of business units which are link by a common entity – say company number. For example a business listed in PAYE table is also link with company number. Similarly a business unit might be linked with VAT reference and which is also may be linked by the same company number.

    Example of business link table given below:

    +----+-----+----------+--------------+
    |luid| crns|   vatrefs|      payerefs|
    +----+-----+----------+--------------+
    |   1| [c1]|[v67, v66]|    [p71, p61]|
    |   2| [c7]|      null|          null|
    |   3| null|      [v2]|          [p1]|
    |   4| null|     [v24]|         [p10]|
    |   5| null|      null|[p2, p23, p93]|
    |   6| null|[v46, v51]|    [p12, p53]|
    |   7| null|     [v89]|         [p23]|
    |   9| [c2]|      null|          null|
    |  10| [c4]|      null|          null|
    |  11|[c14]|      null|          null|
    |  12| null|     [v34]|    [p46, p57]|
    |  19| null|     [v78]|          null|
    |  22| null|      [v4]|         [p11]|
    +----+-----+----------+--------------+
    

    Link and unlink business unit

    We have business units sitting on its own which are needed to be linked with given company number. After linking we need to remove a business unit from its original unit.

    Examples of PAYE and VAT to be linked and unlinked

    PAYE
    +------+---+-------+------+
    |c_luid|crn|payeref|p_luid|
    +------+---+-------+------+
    |     1| c1|     p1|     3|
    |     9| c2|     p2|     5|
    |    10| c4|    p10|     4|
    |     2| c7|    p46|    12|
    |  null| c3|    p11|    22|
    |  null|c12|    p23|     7|
    +------+---+-------+------+
    
    VAT
    +------+---+------+------+
    |c_luid|crn|vatref|v_luid|
    +------+---+------+------+
    |     1| c1|   v34|    12|
    |    11|c14|    v2|     3|
    |    10| c4|   v24|     4|
    |  null|c20|   v46|     6|
    |  null|c33|   v78|    19|
    +------+---+------+------+

    Approach

    • Unlink unit from its current position – also take care of units without business unit id
    • Link units to to new business unit id
    • Need to deal with size of array equal to -1

    Adding an item to an array

    # link VAT
    vat_link = paye_unlink.join(vat,[paye_unlink.luid==vat.c_luid],'left')
    vat_link = (vat_link.withColumn('vatrefs',
                         F.when((F.col('c_luid').isNotNull()) & (F.col('vatref').isNotNull()),
                                 F.array_union(F.col('vatrefs'),F.array(F.col('vatref'))))
                         .otherwise(F.col('vatrefs')))
                )

    Removing an item from an array

    # unlink VAT
    vat_unlink  = (vat_join.withColumn('vatrefs',
                                              F.when((F.col('luid')== F.col('v_luid')) &
                                                     (F.col('c_luid').isNotNull()) &
                                                     (F.col('vatref').isNotNull()
                                                     F.array_except('vatrefs',F.array(F.col('vatref'))))
                                               .otherwise(F.col('vatrefs')))
                            .withColumn('crns',
                                F.when((F.col('luid') == F.col('v_luid')) & 
                                  (F.col('c_luid').isNull()) & 
                                  (F.col('crn').isNotNull()),
                                  F.array_union(F.col('crns'),F.array(F.col('crn'))))
                                .otherwise(F.col('crns')))
    

    Removing duplicate rows

    # dataframe with all units in the database
    fact_updated = spark.sql("""
    with fact(luid,crns,vatrefs,payerefs) as (
    select 1,array('c1'),array('v67','v66'),array('p71','p61') union all
    select 1,array('c1'),array('v68','v66'),array('p71','p61') union all
    select 9,array('c2'),null,null union all
    select 11,array('c14'),null,null)
    select * from fact
    """)
    
    # removing duplicates
    fact_after_remove = (fact_updated.groupBy('luid','crns').agg(
        F.array_distinct(F.flatten(F.collect_set("vatrefs"))).alias('vatrefs'),
        F.array_distinct(F.flatten(F.collect_set("payerefs"))).alias('payerefs'))
         )

    Hive random sampling

    Use distribute by rand() with where clause and sort by. Hive will distribute the data randomly to reducers, and sort it randomly on the reducers, hopefully that will produce randomized data when the limit kicks into play.

    select * from my_table
    where rand() <= 0.0001
    distribute by rand()
    sort by rand()
    limit 10000;

    Expected output

    +----+-----+---------------+--------------+
    |luid| crns|        vatrefs|      payerefs|
    +----+-----+---------------+--------------+
    |   1| [c1]|[v67, v66, v34]|[p71, p61, p1]|
    |   2| [c7]|             []|         [p46]|
    |   3|   []|             []|            []|
    |   4|   []|             []|            []|
    |   5|   []|             []|    [p23, p93]|
    |   6|[c20]|     [v46, v51]|    [p12, p53]|
    |   7|[c12]|          [v89]|         [p23]|
    |   9| [c2]|             []|          [p2]|
    |  10| [c4]|          [v24]|         [p10]|
    |  11|[c14]|           [v2]|            []|
    |  12|   []|             []|         [p57]|
    |  19|[c33]|          [v78]|            []|
    |  22| [c3]|           [v4]|         [p11]|
    +----+-----+---------------+--------------+

    Code listing

    # unlink vat however unit has crn without c_luid then add crn too the unit 
    vat_join = fact_updated.join(vat,[fact_updated.luid == vat.v_luid],'left')
    vat_unlink  = (vat_join.withColumn('vatrefs',
                                              F.when((F.col('luid')== F.col('v_luid')) &
                                                     (F.col('c_luid').isNotNull()) &
                                                     (F.col('vatref').isNotNull()),
                                               F.array_except('vatrefs',F.array(F.col('vatref'))))
                                               .otherwise(F.col('vatrefs')))
                            .withColumn('crns',
                                F.when((F.col('luid') == F.col('v_luid')) & 
                                  (F.col('c_luid').isNull()) & 
                                  (F.col('crn').isNotNull()),
                                  F.array_union(F.col('crns'),F.array(F.col('crn'))))
                                .otherwise(F.col('crns')))
    
                  )
    vat_unlink = vat_unlink.drop('c_luid','crn','vatref','v_luid')
    
    # unlink PAYE
    paye_join = vat_unlink.join(paye,[vat_unlink.luid == paye.p_luid],'left')
    paye_unlink  = (paye_join.withColumn('payerefs',
                                              F.when((F.col('luid')== F.col('p_luid')) &
                                                     (F.col('c_luid').isNotNull()) &
                                                     (F.col('payeref').isNotNull()),
                                               F.array_except('payerefs',F.array(F.col('payeref'))))
                                               .otherwise(F.col('payerefs')))
                            .withColumn('crns',
                                F.when((F.col('luid') == F.col('p_luid')) & 
                                  (F.col('c_luid').isNull()) & 
                                  (F.col('crn').isNotNull()),
                                  F.array_union(F.col('crns'),F.array(F.col('crn'))))
                                .otherwise(F.col('crns')))
    
                  )
    paye_unlink = paye_unlink.drop('c_luid','crn','payeref','p_luid')
    
    # link PAYE
    paye_link = vat_link.join(paye,[vat_link.luid==paye.c_luid],'left')
    paye_link = (paye_link.withColumn('payerefs',
                         F.when((F.col('c_luid').isNotNull()) & (F.col('payeref').isNotNull()),
                                 F.array_union(F.col('payerefs'),F.array(F.col('payeref'))))
             .otherwise(F.col('payerefs')))
                )
    paye_link = paye_link.drop('c_luid','crn','payeref','p_luid')
    remove duplicate  rows