SparkSQL: Select & Filter


from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('SparkSQL2').getOrCreate()
spark
import pandas as pd
from pyspark.sql import Row

# Creation of the list from where the RDD is going to be created
acTransList = ["SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10"]

# acTransList = [x.split(',') for x in acTransList]

# x = []
# y = []
# for accNo,trnAmount in acTransList:
#     x.append(accNo)
#     y.append(float(trnAmount))

# df = pd.DataFrame({'accNo':x,'trnAmount':y})
# acTransDF = spark.createDataFrame(df)

# Create the DataFrame
acTransDF = spark.sparkContext.parallelize(acTransList)\
   .map(lambda trans: trans.split(","))\
   .map(lambda p: Row(accNo=p[0], tranAmount=float(p[1]))).toDF()

# Register temporary table in the DataFrame for using it in SQL
acTransDF.createOrReplaceTempView("trans")
# Print the structure of the DataFrame
acTransDF.printSchema()
root
 |-- accNo: string (nullable = true)
 |-- tranAmount: double (nullable = true)
# Show the first few records of the DataFrame
acTransDF.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10001|    1000.0|
|SB10002|    1200.0|
|SB10003|    8000.0|
|SB10004|     400.0|
|SB10005|     300.0|
|SB10006|   10000.0|
|SB10007|     500.0|
|SB10008|      56.0|
|SB10009|      30.0|
|SB10010|    7000.0|
|CR10001|    7000.0|
|SB10002|     -10.0|
+-------+----------+

RDD

# Calculate the sum using mixing of DataFrame and RDD like operations
sumAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a+b)
sumAmountByMixing
28486.0
# Calculate the maximum using mixing of DataFrame and RDD like operations
maxAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a if a > b else b)
maxAmountByMixing
10000.0
# Calculate the minimum using mixing of DataFrame and RDD like operations
minAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a if a < b else b)
minAmountByMixing
30.0

SELECT

# create another DataFrame containing the good transaction records
goodTransRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")

# Register temporary table in the DataFrame for using it in SQL
goodTransRecords.createOrReplaceTempView("goodtrans")

# Show the first few records of the DataFrame
goodTransRecords.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10001|    1000.0|
|SB10002|    1200.0|
|SB10003|    8000.0|
|SB10004|     400.0|
|SB10005|     300.0|
|SB10006|   10000.0|
|SB10007|     500.0|
|SB10008|      56.0|
|SB10009|      30.0|
|SB10010|    7000.0|
+-------+----------+
# Use SQL to create another DataFrame containing the bad account records (NOT 'SB%')
badAccountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo NOT like 'SB%'")
badAccountRecords.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|CR10001|    7000.0|
+-------+----------+
# Negative tranAmount
# Use SQL to create another DataFrame containing the bad amount records
badAmountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE tranAmount < 0")
# Show the first few records of the DataFrame
badAmountRecords.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10002|     -10.0|
+-------+----------+
# Do the union of two DataFrames and create another DataFrame
badTransRecords = badAccountRecords.union(badAmountRecords)
# Show the first few records of the DataFrame
badTransRecords.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|CR10001|    7000.0|
|SB10002|     -10.0|
+-------+----------+
# Calculate the sum
sumAmount = spark.sql("SELECT sum(tranAmount)as sum FROM goodtrans")
# Show the first few records of the DataFrame
sumAmount.show()
+-------+
|    sum|
+-------+
|28486.0|
+-------+
# Calculate the maximum
maxAmount = spark.sql("SELECT max(tranAmount) as max FROM goodtrans")
# Show the first few records of the DataFrame
maxAmount.show()
+-------+
|    max|
+-------+
|10000.0|
+-------+
# select good account numbers
# Use SQL to create another DataFrame containing the good account numbers
A = "SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo"
B = "SELECT DISTINCT accNo FROM goodtrans"
goodAccNos = spark.sql(A)

# Show the first few records of the DataFrame
goodAccNos.show()
+-------+
|  accNo|
+-------+
|SB10001|
|SB10002|
|SB10003|
|SB10004|
|SB10005|
|SB10006|
|SB10007|
|SB10008|
|SB10009|
|SB10010|
+-------+

FILTER

# Create the DataFrame using API for the good transaction records
goodTransRecords = acTransDF.filter("accNo like 'SB%'").filter("tranAmount > 0")
# Show the first few records of the DataFrame
goodTransRecords.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10001|    1000.0|
|SB10002|    1200.0|
|SB10003|    8000.0|
|SB10004|     400.0|
|SB10005|     300.0|
|SB10006|   10000.0|
|SB10007|     500.0|
|SB10008|      56.0|
|SB10009|      30.0|
|SB10010|    7000.0|
+-------+----------+
# Create the DataFrame using API for the high value transaction records
highValueTransRecords = goodTransRecords.filter("tranAmount > 1000")

# Show the first few records of the DataFrame
highValueTransRecords.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10002|    1200.0|
|SB10003|    8000.0|
|SB10006|   10000.0|
|SB10010|    7000.0|
+-------+----------+
# Create the DataFrame using API for the bad account records
badAccountRecords = acTransDF.filter("accNo NOT like 'SB%'")
# Show the first few records of the DataFrame
badAccountRecords.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|CR10001|    7000.0|
+-------+----------+
# Create the DataFrame using API for the bad amount records
badAmountRecords = acTransDF.filter("tranAmount < 0")
# Show the first few records of the DataFrame
badAmountRecords.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|SB10002|     -10.0|
+-------+----------+
# Do the union of two DataFrames and create another DataFrame
badTransRecords = badAccountRecords.union(badAmountRecords)
# Show the first few records of the DataFrame
badTransRecords.show()
+-------+----------+
|  accNo|tranAmount|
+-------+----------+
|CR10001|    7000.0|
|SB10002|     -10.0|
+-------+----------+
# Create the DataFrame using API for the good account numbers
goodAccNos = acTransDF.filter("accNo like 'SB%'").select("accNo").distinct().orderBy("accNo")
# Show the first few records of the DataFrame
goodAccNos.show()
+-------+
|  accNo|
+-------+
|SB10001|
|SB10002|
|SB10003|
|SB10004|
|SB10005|
|SB10006|
|SB10007|
|SB10008|
|SB10009|
|SB10010|
+-------+