Pyspark: Dataframe Row & Columns


If you've used R or even the pandas library with Python you are probably already familiar with the concept of DataFrames. Spark DataFrame expand on a lot of these concepts, allowing you to transfer that knowledge easily by understanding the simple syntax of Spark DataFrames. Remember that the main advantage to using Spark DataFrames vs those other programs is that Spark can handle data across many RDDs, huge data sets that would never fit on a single computer.

Creating Dataframe

To create dataframe first we need to create spark session

from pyspark.sql import SparkSession

# May take a little while on a local computer
spark = SparkSession.builder.appName("Basics").getOrCreate()
spark

Create Dataframe from file

Create Schema manually

Next we need to create the list of Structure fields

from pyspark.sql.types import StructField, StringType, IntegerType, StructType

data_schema = [StructField('age', IntegerType(), True),
              StructField('name', StringType(), True)]

final_struc = StructType(fields=data_schema)

df = spark.read.json('people.json', schema=final_struc)
df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
# Columns
df.columns
['age', 'name']
# Column Data Type
df.dtypes
[('age', 'bigint'), ('name', 'string')]
# Descriptive Statistic
df.describe().show()
+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+
#Showing only a data
df.show(1)
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
+----+-------+
only showing top 1 row

Dataframe Selection

# df['age'] is a pyspark.sql.column.Column
type(df['age'])
pyspark.sql.column.Column
# df['age'] will not showing any thing
df['age']
Column<b'age'>
# Select column
df.select('age')
DataFrame[age: int]
# Use show() to show the value of Dataframe
df.select('age').show()
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+
# Return two Row but content will not displayed
df.head(2)
[Row(age=None, name='Michael'), Row(age=30, name='Andy')]
# Select multiple column
df.select(['age', 'name']).show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
# Select DataFrame approach
df.filter(df.age == 30).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sql_results = spark.sql("SELECT * FROM people")
sql_results.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Dataframe Column

Rename column

#Rename column
df.withColumnRenamed('age', 'Age').show()
+----+-------+
| Age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
# Convert to Dataframe
df.toDF('Age', 'Name').show()
+----+-------+
| Age|   Name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
# Operation/Function and Column Alias
df.withColumn('doubleage',df['age']*2).show()
+----+-------+---------+
| age|   name|doubleage|
+----+-------+---------+
|null|Michael|     null|
|  30|   Andy|       60|
|  19| Justin|       38|
+----+-------+---------+

Create new column

# Create new column based on pyspark.sql.column.Column
df.withColumn('newage', df['age']).show()
+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+

Drop column

# Drop column
df.drop('name').show()
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+

Dataframe Row

# Select Row based on condition
result = df.filter(df.age == 30).collect()
row = result[0]
#Dataframe row is pyspark.sql.types.Row
type(result[0])
pyspark.sql.types.Row
# Count
row.count(30)
1
# Index
row.index(30)
0

Rows can be called to turn into dictionaries

# Return Dictionary
row.asDict().values()
dict_values([30, 'Andy'])
# Return Value in Dictionary
row.asDict()['age']
30
# Print Row as Dictionary
for item in result[0]:
#     print(type(item))
    print(item)
30
Andy
# Print Row as Dictionary
for item in row.asDict():
#     print(type(item))
    print(item)
age
name
# Print Dictionary Keys
for item in row.asDict().keys():
#     print(type(item))
    print(item)
age
name
# Print Dictionary values
for item in row.asDict().values():
#     print(type(item))
    print(item)
30
Andy