If the exam says "use Spark SQL" or "use DataFrame" instead of RDD, this file is your savior. Same questions as
WALKTHROUGH.md, same datasets — different approach.Key idea: with Spark SQL, you write regular SQL queries (
SELECT ... FROM ... GROUP BY ...) on top of your data — much shorter than RDD code.
There are two equivalent styles — both run through the same engine (Catalyst):
df.createOrReplaceTempView("rides")
spark.sql("SELECT city, SUM(fare) FROM rides GROUP BY city").show()from pyspark.sql.functions import sum as _sum
df.groupBy("city").agg(_sum("fare").alias("total")).show()Both produce the same result. Use whichever you find clearer. The exam usually accepts either.
💡 Recommendation: If the question literally says "Spark SQL", use Style 1 (SQL strings). If it says "DataFrame", use Style 2.
# 1. Read CSV → DataFrame
df = spark.read.csv("rides.csv", header=True, inferSchema=True)
# 2. Register the DataFrame as an SQL table named "rides"
df.createOrReplaceTempView("rides")
# Now you can query it:
spark.sql("SELECT * FROM rides").show()| Line | Plain English |
|---|---|
spark.read.csv(...) |
Reads file as DataFrame (table). header=True → first row is column names. inferSchema=True → auto-detects types. |
df.createOrReplaceTempView("rides") |
Tells Spark "treat this DataFrame as a SQL table called rides". |
spark.sql("...") |
Runs the SQL string and returns a DataFrame. Use .show() to display. |
That's literally it. The rest is just SQL.
For Style 2, import these:
from pyspark.sql.functions import sum as _sum, avg, count, max as _max, min as _min, desc, asc
⚠️ sum,max,minexist in plain Python too — that's why we rename them with_sum,_max,_minto avoid confusion.
Same dataset:
ride_id, driver_id, distance, fare, city
1, D1, 10, 200, Mumbai
2, D2, 5, 120, Delhi
3, D1, 8, 160, Mumbai
4, D3, 12, 250, Pune
5, D2, 7, 140, Delhi
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("BDA").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")Then create the CSV (same as in WALKTHROUGH.md Part D.0).
# 1. Read CSV as DataFrame
df = spark.read.csv("rides.csv", header=True, inferSchema=True)
df.show()
# 2. Register as a temp view (a SQL "table")
df.createOrReplaceTempView("rides")
# 3. Run SQL
result = spark.sql("""
SELECT city, SUM(fare) AS total_fare
FROM rides
GROUP BY city
""")
result.show()
# 4. Save result to a CSV folder (replaces the saveAsTextFile from RDD)
import shutil, os
if os.path.exists("output_a_sql"): shutil.rmtree("output_a_sql")
result.write.csv("output_a_sql", header=True)df = spark.read.csv("rides.csv", header=True, inferSchema=True)spark.read.csv(...)returns a DataFrame (table).header=True→ first row = column names (ride_id,driver_id, ...).inferSchema=True→ auto-detects types (sodistanceis int, not string).
df.createOrReplaceTempView("rides")- Tells Spark: "treat this DataFrame as if it were a SQL table called rides."
- Now
spark.sql("SELECT ... FROM rides ...")works. - "TempView" because it disappears when SparkSession ends. Doesn't write anywhere on disk.
result = spark.sql("""
SELECT city, SUM(fare) AS total_fare
FROM rides
GROUP BY city
""")
result.show()This is regular SQL — same as in MySQL/PostgreSQL.
| Part of query | Meaning |
|---|---|
SELECT city, SUM(fare) |
Pick the city column, and the sum of fare |
AS total_fare |
Name the new column total_fare (without it, the column would be called sum(fare)) |
FROM rides |
From the table named rides (which we registered above) |
GROUP BY city |
Group rows that share the same city, then run SUM on each group |
result.write.csv("output_a_sql", header=True)df.write.csv(folder, header=True)→ saves DataFrame as CSV files inside a folder.header=True→ write column names as the first row.- Like
saveAsTextFile, the folder must NOT already exist — that's why we delete it first.
+------+----------+
| city|total_fare|
+------+----------+
|Mumbai| 360|
| Delhi| 260|
| Pune| 250|
+------+----------+
from pyspark.sql.functions import sum as _sum
df = spark.read.csv("rides.csv", header=True, inferSchema=True)
result = df.groupBy("city").agg(_sum("fare").alias("total_fare"))
result.show()| Line | Plain English |
|---|---|
df.groupBy("city") |
Group rows that have the same city |
.agg(_sum("fare").alias("total_fare")) |
For each group, sum the fare column and name it total_fare |
.show() |
Print the result table |
Both styles produce the exact same output. Choose one and stick with it.
💡 The Linear Regression code is the SAME whether the exam says "use Spark SQL" or not — because the ML pipeline always uses DataFrames. There is no RDD-style ML code.
If they ask you to load the data via Spark SQL first, just do:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# Load via Spark SQL
df = spark.read.csv("rides.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("rides")
# Optional: select only the columns we need via SQL
ml_df = spark.sql("SELECT distance, fare FROM rides")
ml_df.show()
# Now standard ML flow
assembler = VectorAssembler(inputCols=["distance"], outputCol="features")
data = assembler.transform(ml_df).select("features", ml_df["fare"].alias("label"))
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(data)
predictions = model.transform(data)
predictions.select("features", "label", "prediction").show()
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)The only NEW thing is using spark.sql(...) to filter columns. Everything after VectorAssembler(...) is identical to the regular walkthrough.
💡 NetworkX is the same. Spark SQL is for tabular data; graphs use NetworkX which doesn't change. Use the code from
WALKTHROUGH.mdPart D.3.
If they want you to extract the graph data via SQL first (rare):
import networkx as nx
import pandas as pd
# Load + use SQL to filter columns
df = spark.read.csv("rides.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("rides")
edges_df = spark.sql("SELECT driver_id, city FROM rides")
# Convert Spark DataFrame → pandas DataFrame
pdf = edges_df.toPandas()
# Now standard NetworkX
G = nx.DiGraph()
for d in pdf["driver_id"].unique(): G.add_node(d, type="driver")
for c in pdf["city"].unique(): G.add_node(c, type="city")
for _, row in pdf.iterrows():
G.add_edge(row["driver_id"], row["city"])
print("Vertices:", list(G.nodes))
print("Edges:", list(G.edges))
print("Degree centrality:", nx.degree_centrality(G))The new line is edges_df.toPandas() → converts a Spark DataFrame back to pandas, which NetworkX can use easily.
Same dataset:
delivery_id, agent_id, distance, delivery_time, zone
1, A1, 5, 30, Zone1
2, A2, 8, 45, Zone2
3, A1, 6, 35, Zone1
4, A3, 10, 60, Zone3
5, A2, 7, 40, Zone2
df = spark.read.csv("delivery.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("delivery")
result = spark.sql("""
SELECT zone, SUM(delivery_time) AS total_time
FROM delivery
GROUP BY zone
ORDER BY total_time DESC
""")
result.show()
import shutil, os
if os.path.exists("output_a_sql"): shutil.rmtree("output_a_sql")
result.write.csv("output_a_sql", header=True)ORDER BY total_time DESC| Part | Meaning |
|---|---|
ORDER BY |
Sort the result |
total_time |
Sort by this column (the new aliased name) |
DESC |
Descending (biggest first). Use ASC for ascending (default). |
+-----+----------+
| zone|total_time|
+-----+----------+
|Zone2| 85|
|Zone1| 65|
|Zone3| 60|
+-----+----------+
from pyspark.sql.functions import sum as _sum, desc
df = spark.read.csv("delivery.csv", header=True, inferSchema=True)
result = (
df.groupBy("zone")
.agg(_sum("delivery_time").alias("total_time"))
.orderBy(desc("total_time"))
)
result.show()| Line | Plain English |
|---|---|
df.groupBy("zone") |
Group rows by zone |
.agg(_sum("delivery_time").alias("total_time")) |
Sum delivery_time per group, call it total_time |
.orderBy(desc("total_time")) |
Sort by total_time, biggest first |
Same as Set 1 (b), only file/column names change:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
df = spark.read.csv("delivery.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("delivery")
ml_df = spark.sql("SELECT distance, delivery_time FROM delivery")
assembler = VectorAssembler(inputCols=["distance"], outputCol="features")
data = assembler.transform(ml_df).select("features", ml_df["delivery_time"].alias("label"))
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(data)
predictions = model.transform(data)
predictions.select("features", "label", "prediction").show()
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)Same as Set 1 (c), with Agent → Zone renames. Use NetworkX.
If the exam throws something else at you, here are common patterns ready to copy.
spark.sql("SELECT * FROM rides WHERE distance > 7").show()
spark.sql("SELECT * FROM rides WHERE city = 'Mumbai'").show()df.filter(df.distance > 7).show()
df.filter(df.city == "Mumbai").show()spark.sql("SELECT city, COUNT(*) AS num_rides FROM rides GROUP BY city").show()from pyspark.sql.functions import count
df.groupBy("city").agg(count("*").alias("num_rides")).show()spark.sql("SELECT city, AVG(fare) AS avg_fare FROM rides GROUP BY city").show()from pyspark.sql.functions import avg
df.groupBy("city").agg(avg("fare").alias("avg_fare")).show()spark.sql("""
SELECT city,
SUM(fare) AS total,
AVG(fare) AS average,
COUNT(*) AS num_rides,
MAX(fare) AS max_fare,
MIN(fare) AS min_fare
FROM rides
GROUP BY city
""").show()from pyspark.sql.functions import sum as _sum, avg, count, max as _max, min as _min
df.groupBy("city").agg(
_sum("fare").alias("total"),
avg("fare").alias("average"),
count("*").alias("num_rides"),
_max("fare").alias("max_fare"),
_min("fare").alias("min_fare")
).show()spark.sql("SELECT * FROM rides ORDER BY fare DESC").show()
spark.sql("SELECT * FROM rides ORDER BY city ASC, fare DESC").show()from pyspark.sql.functions import desc, asc
df.orderBy(desc("fare")).show()
df.orderBy(asc("city"), desc("fare")).show()spark.sql("SELECT * FROM rides ORDER BY fare DESC LIMIT 3").show()df.orderBy(desc("fare")).limit(3).show()spark.sql("SELECT DISTINCT city FROM rides").show()df.select("city").distinct().show()df_drivers = spark.read.csv("drivers.csv", header=True, inferSchema=True)
df_drivers.createOrReplaceTempView("drivers")
spark.sql("""
SELECT r.ride_id, r.fare, d.name
FROM rides r
JOIN drivers d ON r.driver_id = d.driver_id
""").show()df.join(df_drivers, on="driver_id", how="inner").show()spark.sql("SELECT *, fare/distance AS fare_per_km FROM rides").show()df.withColumn("fare_per_km", df.fare / df.distance).show()spark.sql("""
SELECT city, SUM(fare) AS total
FROM rides
GROUP BY city
ORDER BY total DESC
LIMIT 1
""").show()from pyspark.sql.functions import sum as _sum, desc
df.groupBy("city").agg(_sum("fare").alias("total")).orderBy(desc("total")).limit(1).show()| Question says... | Use this approach |
|---|---|
| "RDD", "reduceByKey", "key-value pairs", "map/filter" | RDD style (WALKTHROUGH.md) |
| "Spark SQL", "SQL query", "SELECT" | SQL string style (spark.sql("SELECT ...")) |
| "DataFrame", "groupBy", "aggregate" | DataFrame API style (df.groupBy(...).agg(...)) |
| "Linear Regression", "Train a model", "ML" | Always DataFrame (this file or WALKTHROUGH.md — same code) |
| "Graph", "NetworkX", "centrality" | NetworkX with pandas (always) |
If the question says nothing specific (just "find total fare per city"), either RDD or Spark SQL works. SQL is usually shorter and more readable, so go with SQL when you can.
Memorize this 4-line skeleton:
# 1. Read CSV
df = spark.read.csv(FILE, header=True, inferSchema=True)
# 2. Register as SQL table
df.createOrReplaceTempView(TABLE_NAME)
# 3. Run SQL
result = spark.sql("SELECT ... FROM TABLE_NAME ...")
# 4. Show or save
result.show()
result.write.csv(OUTPUT_FOLDER, header=True)Then write whatever SQL the question is asking for in step 3. That's literally the whole pattern.
| Error | What's wrong | Fix |
|---|---|---|
Table or view not found: rides |
You forgot to register the temp view | Run df.createOrReplaceTempView("rides") first |
cannot resolve 'fare' given input columns: ... |
Wrong column name (typo or case) | Check exact names with df.columns |
Path output_a_sql already exists |
Folder exists from a previous run | Delete with shutil.rmtree(...) first, OR use .mode("overwrite") |
AnalysisException: GROUP BY ... |
Selected a column that's not in GROUP BY and not aggregated | Either add to GROUP BY or wrap in SUM/AVG/etc. |
cannot import name 'sum' |
Importing sum shadows Python's built-in |
Rename: from pyspark.sql.functions import sum as _sum |
result.write.mode("overwrite").csv("output_a_sql", header=True)This auto-overwrites the folder if it exists, no need for shutil.rmtree.
| Task | RDD | Spark SQL string | DataFrame API |
|---|---|---|---|
| Read | sc.textFile("f.csv") |
spark.read.csv("f.csv", header=True, inferSchema=True) |
(same as Spark SQL) |
| Register table | n/a | df.createOrReplaceTempView("t") |
not needed |
| Sum per group | pairs.reduceByKey(lambda a,b: a+b) |
SELECT k, SUM(v) FROM t GROUP BY k |
df.groupBy("k").agg(_sum("v")) |
| Filter | rdd.filter(lambda x: x[1] > 10) |
SELECT * FROM t WHERE v > 10 |
df.filter(df.v > 10) |
| Sort desc | rdd.sortBy(lambda x: x[1], ascending=False) |
ORDER BY v DESC |
df.orderBy(desc("v")) |
| Top 5 | rdd.takeOrdered(5, lambda x: -x[1]) |
... LIMIT 5 (after order) |
df.limit(5) |
| Save | rdd.saveAsTextFile("out") |
df.write.csv("out", header=True) |
(same) |
| Show | print(rdd.collect()) |
df.show() |
df.show() |
- Spark SQL = read CSV → register temp view → run SQL string → show/save.
- Two styles: SQL string OR DataFrame API. Either works. Pick by question wording.
- ML and Graph code stays the same regardless.
- SQL pattern is shorter than RDD — when in doubt, use SQL.
- Always run the SparkSession cell first.
You now have all three styles covered:
| File | What it teaches |
|---|---|
WALKTHROUGH.md |
RDD style (reduceByKey, map, filter) |
SPARK_SQL_WALKTHROUGH.md |
Spark SQL + DataFrame API |
EXAM_GUIDE.md |
Quick reference + setup |
VIVA_PREP.md |
200 viva Q&A |
Whichever way the exam asks the question, you have the template. 🚀