Hive array operations

In this article we describe how to add and remove items from array in Hive using PySpark. We are going to use two array functions to accomplish the given task:

  • array_union
  • array_except

UPDATE – output might contains duplicate rows. Use groupBy with agg to deals with multiple columns. You may need to use the following function:

  • array_distinct
  • flatten
  • collect_set

Ref: you may find this article useful Generating Synthetic Data Using HIVE SQL

Problem description

We have a table which contains a list of business units which are link by a common entity – say company number. For example a business listed in PAYE table is also link with company number. Similarly a business unit might be linked with VAT reference and which is also may be linked by the same company number.

Example of business link table given below:

+----+-----+----------+--------------+
|luid| crns|   vatrefs|      payerefs|
+----+-----+----------+--------------+
|   1| [c1]|[v67, v66]|    [p71, p61]|
|   2| [c7]|      null|          null|
|   3| null|      [v2]|          [p1]|
|   4| null|     [v24]|         [p10]|
|   5| null|      null|[p2, p23, p93]|
|   6| null|[v46, v51]|    [p12, p53]|
|   7| null|     [v89]|         [p23]|
|   9| [c2]|      null|          null|
|  10| [c4]|      null|          null|
|  11|[c14]|      null|          null|
|  12| null|     [v34]|    [p46, p57]|
|  19| null|     [v78]|          null|
|  22| null|      [v4]|         [p11]|
+----+-----+----------+--------------+

Link and unlink business unit

We have business units sitting on its own which are needed to be linked with given company number. After linking we need to remove a business unit from its original unit.

Examples of PAYE and VAT to be linked and unlinked

PAYE
+------+---+-------+------+
|c_luid|crn|payeref|p_luid|
+------+---+-------+------+
|     1| c1|     p1|     3|
|     9| c2|     p2|     5|
|    10| c4|    p10|     4|
|     2| c7|    p46|    12|
|  null| c3|    p11|    22|
|  null|c12|    p23|     7|
+------+---+-------+------+

VAT
+------+---+------+------+
|c_luid|crn|vatref|v_luid|
+------+---+------+------+
|     1| c1|   v34|    12|
|    11|c14|    v2|     3|
|    10| c4|   v24|     4|
|  null|c20|   v46|     6|
|  null|c33|   v78|    19|
+------+---+------+------+

Approach

  • Unlink unit from its current position – also take care of units without business unit id
  • Link units to to new business unit id
  • Need to deal with size of array equal to -1

Adding an item to an array

# link VAT
vat_link = paye_unlink.join(vat,[paye_unlink.luid==vat.c_luid],'left')
vat_link = (vat_link.withColumn('vatrefs',
                     F.when((F.col('c_luid').isNotNull()) & (F.col('vatref').isNotNull()),
                             F.array_union(F.col('vatrefs'),F.array(F.col('vatref'))))
                     .otherwise(F.col('vatrefs')))
            )

Removing an item from an array

# unlink VAT
vat_unlink  = (vat_join.withColumn('vatrefs',
                                          F.when((F.col('luid')== F.col('v_luid')) &
                                                 (F.col('c_luid').isNotNull()) &
                                                 (F.col('vatref').isNotNull()
                                                 F.array_except('vatrefs',F.array(F.col('vatref'))))
                                           .otherwise(F.col('vatrefs')))
                        .withColumn('crns',
                            F.when((F.col('luid') == F.col('v_luid')) & 
                              (F.col('c_luid').isNull()) & 
                              (F.col('crn').isNotNull()),
                              F.array_union(F.col('crns'),F.array(F.col('crn'))))
                            .otherwise(F.col('crns')))

Removing duplicate rows

# dataframe with all units in the database
fact_updated = spark.sql("""
with fact(luid,crns,vatrefs,payerefs) as (
select 1,array('c1'),array('v67','v66'),array('p71','p61') union all
select 1,array('c1'),array('v68','v66'),array('p71','p61') union all
select 9,array('c2'),null,null union all
select 11,array('c14'),null,null)
select * from fact
""")

# removing duplicates
fact_after_remove = (fact_updated.groupBy('luid','crns').agg(
    F.array_distinct(F.flatten(F.collect_set("vatrefs"))).alias('vatrefs'),
    F.array_distinct(F.flatten(F.collect_set("payerefs"))).alias('payerefs'))
     )

Hive random sampling

Use distribute by rand() with where clause and sort by. Hive will distribute the data randomly to reducers, and sort it randomly on the reducers, hopefully that will produce randomized data when the limit kicks into play.

select * from my_table
where rand() <= 0.0001
distribute by rand()
sort by rand()
limit 10000;

Expected output

+----+-----+---------------+--------------+
|luid| crns|        vatrefs|      payerefs|
+----+-----+---------------+--------------+
|   1| [c1]|[v67, v66, v34]|[p71, p61, p1]|
|   2| [c7]|             []|         [p46]|
|   3|   []|             []|            []|
|   4|   []|             []|            []|
|   5|   []|             []|    [p23, p93]|
|   6|[c20]|     [v46, v51]|    [p12, p53]|
|   7|[c12]|          [v89]|         [p23]|
|   9| [c2]|             []|          [p2]|
|  10| [c4]|          [v24]|         [p10]|
|  11|[c14]|           [v2]|            []|
|  12|   []|             []|         [p57]|
|  19|[c33]|          [v78]|            []|
|  22| [c3]|           [v4]|         [p11]|
+----+-----+---------------+--------------+

Code listing

# unlink vat however unit has crn without c_luid then add crn too the unit 
vat_join = fact_updated.join(vat,[fact_updated.luid == vat.v_luid],'left')
vat_unlink  = (vat_join.withColumn('vatrefs',
                                          F.when((F.col('luid')== F.col('v_luid')) &
                                                 (F.col('c_luid').isNotNull()) &
                                                 (F.col('vatref').isNotNull()),
                                           F.array_except('vatrefs',F.array(F.col('vatref'))))
                                           .otherwise(F.col('vatrefs')))
                        .withColumn('crns',
                            F.when((F.col('luid') == F.col('v_luid')) & 
                              (F.col('c_luid').isNull()) & 
                              (F.col('crn').isNotNull()),
                              F.array_union(F.col('crns'),F.array(F.col('crn'))))
                            .otherwise(F.col('crns')))

              )
vat_unlink = vat_unlink.drop('c_luid','crn','vatref','v_luid')

# unlink PAYE
paye_join = vat_unlink.join(paye,[vat_unlink.luid == paye.p_luid],'left')
paye_unlink  = (paye_join.withColumn('payerefs',
                                          F.when((F.col('luid')== F.col('p_luid')) &
                                                 (F.col('c_luid').isNotNull()) &
                                                 (F.col('payeref').isNotNull()),
                                           F.array_except('payerefs',F.array(F.col('payeref'))))
                                           .otherwise(F.col('payerefs')))
                        .withColumn('crns',
                            F.when((F.col('luid') == F.col('p_luid')) & 
                              (F.col('c_luid').isNull()) & 
                              (F.col('crn').isNotNull()),
                              F.array_union(F.col('crns'),F.array(F.col('crn'))))
                            .otherwise(F.col('crns')))

              )
paye_unlink = paye_unlink.drop('c_luid','crn','payeref','p_luid')

# link PAYE
paye_link = vat_link.join(paye,[vat_link.luid==paye.c_luid],'left')
paye_link = (paye_link.withColumn('payerefs',
                     F.when((F.col('c_luid').isNotNull()) & (F.col('payeref').isNotNull()),
                             F.array_union(F.col('payerefs'),F.array(F.col('payeref'))))
         .otherwise(F.col('payerefs')))
            )
paye_link = paye_link.drop('c_luid','crn','payeref','p_luid')
remove duplicate  rows