from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('SparkSQL2').getOrCreate()
spark
import pandas as pd
from pyspark.sql import Row
# Creation of the list from where the RDD is going to be created
acTransList = ["SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10"]
# acTransList = [x.split(',') for x in acTransList]
# x = []
# y = []
# for accNo,trnAmount in acTransList:
# x.append(accNo)
# y.append(float(trnAmount))
# df = pd.DataFrame({'accNo':x,'trnAmount':y})
# acTransDF = spark.createDataFrame(df)
# Create the DataFrame
acTransDF = spark.sparkContext.parallelize(acTransList)\
.map(lambda trans: trans.split(","))\
.map(lambda p: Row(accNo=p[0], tranAmount=float(p[1]))).toDF()
# Register temporary table in the DataFrame for using it in SQL
acTransDF.createOrReplaceTempView("trans")
# Print the structure of the DataFrame
acTransDF.printSchema()
root
|-- accNo: string (nullable = true)
|-- tranAmount: double (nullable = true)
# Show the first few records of the DataFrame
acTransDF.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
RDD
# Calculate the sum using mixing of DataFrame and RDD like operations
sumAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a+b)
sumAmountByMixing
# Calculate the maximum using mixing of DataFrame and RDD like operations
maxAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a if a > b else b)
maxAmountByMixing
# Calculate the minimum using mixing of DataFrame and RDD like operations
minAmountByMixing = goodTransRecords.rdd.map(lambda trans: trans.tranAmount).reduce(lambda a,b : a if a < b else b)
minAmountByMixing
SELECT
# create another DataFrame containing the good transaction records
goodTransRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")
# Register temporary table in the DataFrame for using it in SQL
goodTransRecords.createOrReplaceTempView("goodtrans")
# Show the first few records of the DataFrame
goodTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
+-------+----------+
# Use SQL to create another DataFrame containing the bad account records (NOT 'SB%')
badAccountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo NOT like 'SB%'")
badAccountRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
+-------+----------+
# Negative tranAmount
# Use SQL to create another DataFrame containing the bad amount records
badAmountRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE tranAmount < 0")
# Show the first few records of the DataFrame
badAmountRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| -10.0|
+-------+----------+
# Do the union of two DataFrames and create another DataFrame
badTransRecords = badAccountRecords.union(badAmountRecords)
# Show the first few records of the DataFrame
badTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
# Calculate the sum
sumAmount = spark.sql("SELECT sum(tranAmount)as sum FROM goodtrans")
# Show the first few records of the DataFrame
sumAmount.show()
+-------+
| sum|
+-------+
|28486.0|
+-------+
# Calculate the maximum
maxAmount = spark.sql("SELECT max(tranAmount) as max FROM goodtrans")
# Show the first few records of the DataFrame
maxAmount.show()
+-------+
| max|
+-------+
|10000.0|
+-------+
# select good account numbers
# Use SQL to create another DataFrame containing the good account numbers
A = "SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo"
B = "SELECT DISTINCT accNo FROM goodtrans"
goodAccNos = spark.sql(A)
# Show the first few records of the DataFrame
goodAccNos.show()
+-------+
| accNo|
+-------+
|SB10001|
|SB10002|
|SB10003|
|SB10004|
|SB10005|
|SB10006|
|SB10007|
|SB10008|
|SB10009|
|SB10010|
+-------+
FILTER
# Create the DataFrame using API for the good transaction records
goodTransRecords = acTransDF.filter("accNo like 'SB%'").filter("tranAmount > 0")
# Show the first few records of the DataFrame
goodTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
+-------+----------+
# Create the DataFrame using API for the high value transaction records
highValueTransRecords = goodTransRecords.filter("tranAmount > 1000")
# Show the first few records of the DataFrame
highValueTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10006| 10000.0|
|SB10010| 7000.0|
+-------+----------+
# Create the DataFrame using API for the bad account records
badAccountRecords = acTransDF.filter("accNo NOT like 'SB%'")
# Show the first few records of the DataFrame
badAccountRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
+-------+----------+
# Create the DataFrame using API for the bad amount records
badAmountRecords = acTransDF.filter("tranAmount < 0")
# Show the first few records of the DataFrame
badAmountRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| -10.0|
+-------+----------+
# Do the union of two DataFrames and create another DataFrame
badTransRecords = badAccountRecords.union(badAmountRecords)
# Show the first few records of the DataFrame
badTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
# Create the DataFrame using API for the good account numbers
goodAccNos = acTransDF.filter("accNo like 'SB%'").select("accNo").distinct().orderBy("accNo")
# Show the first few records of the DataFrame
goodAccNos.show()
+-------+
| accNo|
+-------+
|SB10001|
|SB10002|
|SB10003|
|SB10004|
|SB10005|
|SB10006|
|SB10007|
|SB10008|
|SB10009|
|SB10010|
+-------+