Skip to content

DevMehta13/Learning_new

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

85 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Spark SQL Walkthrough — Same Exam Questions, SQL Style

If the exam says "use Spark SQL" or "use DataFrame" instead of RDD, this file is your savior. Same questions as WALKTHROUGH.md, same datasets — different approach.

Key idea: with Spark SQL, you write regular SQL queries (SELECT ... FROM ... GROUP BY ...) on top of your data — much shorter than RDD code.


PART A — Spark SQL Concepts in 60 Seconds

A.1 — Two ways to query in Spark

There are two equivalent styles — both run through the same engine (Catalyst):

Style 1: Plain SQL strings (easier for SQL people)

df.createOrReplaceTempView("rides")
spark.sql("SELECT city, SUM(fare) FROM rides GROUP BY city").show()

Style 2: DataFrame API (Python method-chaining)

from pyspark.sql.functions import sum as _sum
df.groupBy("city").agg(_sum("fare").alias("total")).show()

Both produce the same result. Use whichever you find clearer. The exam usually accepts either.

💡 Recommendation: If the question literally says "Spark SQL", use Style 1 (SQL strings). If it says "DataFrame", use Style 2.

A.2 — The two new lines you need

# 1. Read CSV → DataFrame
df = spark.read.csv("rides.csv", header=True, inferSchema=True)

# 2. Register the DataFrame as an SQL table named "rides"
df.createOrReplaceTempView("rides")

# Now you can query it:
spark.sql("SELECT * FROM rides").show()
Line Plain English
spark.read.csv(...) Reads file as DataFrame (table). header=True → first row is column names. inferSchema=True → auto-detects types.
df.createOrReplaceTempView("rides") Tells Spark "treat this DataFrame as a SQL table called rides".
spark.sql("...") Runs the SQL string and returns a DataFrame. Use .show() to display.

That's literally it. The rest is just SQL.

A.3 — DataFrame API common functions

For Style 2, import these:

from pyspark.sql.functions import sum as _sum, avg, count, max as _max, min as _min, desc, asc

⚠️ sum, max, min exist in plain Python too — that's why we rename them with _sum, _max, _min to avoid confusion.


PART B — SET 1 (rides.csv) — Spark SQL Versions

Same dataset:

ride_id, driver_id, distance, fare, city
1, D1, 10, 200, Mumbai
2, D2, 5,  120, Delhi
3, D1, 8,  160, Mumbai
4, D3, 12, 250, Pune
5, D2, 7,  140, Delhi

B.0 — Always-first cell (same as before)

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("BDA").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

Then create the CSV (same as in WALKTHROUGH.md Part D.0).


B.1 — Question (a): Total fare per city — Spark SQL way

Style 1: Pure SQL string

# 1. Read CSV as DataFrame
df = spark.read.csv("rides.csv", header=True, inferSchema=True)
df.show()

# 2. Register as a temp view (a SQL "table")
df.createOrReplaceTempView("rides")

# 3. Run SQL
result = spark.sql("""
    SELECT city, SUM(fare) AS total_fare
    FROM rides
    GROUP BY city
""")
result.show()

# 4. Save result to a CSV folder (replaces the saveAsTextFile from RDD)
import shutil, os
if os.path.exists("output_a_sql"): shutil.rmtree("output_a_sql")
result.write.csv("output_a_sql", header=True)

Line by line:

1. Read CSV

df = spark.read.csv("rides.csv", header=True, inferSchema=True)
  • spark.read.csv(...) returns a DataFrame (table).
  • header=True → first row = column names (ride_id, driver_id, ...).
  • inferSchema=True → auto-detects types (so distance is int, not string).

2. Register the table

df.createOrReplaceTempView("rides")
  • Tells Spark: "treat this DataFrame as if it were a SQL table called rides."
  • Now spark.sql("SELECT ... FROM rides ...") works.
  • "TempView" because it disappears when SparkSession ends. Doesn't write anywhere on disk.

3. Run the query

result = spark.sql("""
    SELECT city, SUM(fare) AS total_fare
    FROM rides
    GROUP BY city
""")
result.show()

This is regular SQL — same as in MySQL/PostgreSQL.

Part of query Meaning
SELECT city, SUM(fare) Pick the city column, and the sum of fare
AS total_fare Name the new column total_fare (without it, the column would be called sum(fare))
FROM rides From the table named rides (which we registered above)
GROUP BY city Group rows that share the same city, then run SUM on each group

4. Save

result.write.csv("output_a_sql", header=True)
  • df.write.csv(folder, header=True) → saves DataFrame as CSV files inside a folder.
  • header=True → write column names as the first row.
  • Like saveAsTextFile, the folder must NOT already exist — that's why we delete it first.

Expected output:

+------+----------+
|  city|total_fare|
+------+----------+
|Mumbai|       360|
| Delhi|       260|
|  Pune|       250|
+------+----------+

Style 2: DataFrame API (alternative)

from pyspark.sql.functions import sum as _sum

df = spark.read.csv("rides.csv", header=True, inferSchema=True)
result = df.groupBy("city").agg(_sum("fare").alias("total_fare"))
result.show()
Line Plain English
df.groupBy("city") Group rows that have the same city
.agg(_sum("fare").alias("total_fare")) For each group, sum the fare column and name it total_fare
.show() Print the result table

Both styles produce the exact same output. Choose one and stick with it.


B.2 — Question (b): Linear Regression — distance → fare

💡 The Linear Regression code is the SAME whether the exam says "use Spark SQL" or not — because the ML pipeline always uses DataFrames. There is no RDD-style ML code.

If they ask you to load the data via Spark SQL first, just do:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Load via Spark SQL
df = spark.read.csv("rides.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("rides")

# Optional: select only the columns we need via SQL
ml_df = spark.sql("SELECT distance, fare FROM rides")
ml_df.show()

# Now standard ML flow
assembler = VectorAssembler(inputCols=["distance"], outputCol="features")
data = assembler.transform(ml_df).select("features", ml_df["fare"].alias("label"))

lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(data)

predictions = model.transform(data)
predictions.select("features", "label", "prediction").show()

print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)

The only NEW thing is using spark.sql(...) to filter columns. Everything after VectorAssembler(...) is identical to the regular walkthrough.


B.3 — Question (c): Graph — same as before

💡 NetworkX is the same. Spark SQL is for tabular data; graphs use NetworkX which doesn't change. Use the code from WALKTHROUGH.md Part D.3.

If they want you to extract the graph data via SQL first (rare):

import networkx as nx
import pandas as pd

# Load + use SQL to filter columns
df = spark.read.csv("rides.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("rides")
edges_df = spark.sql("SELECT driver_id, city FROM rides")

# Convert Spark DataFrame → pandas DataFrame
pdf = edges_df.toPandas()

# Now standard NetworkX
G = nx.DiGraph()
for d in pdf["driver_id"].unique(): G.add_node(d, type="driver")
for c in pdf["city"].unique():      G.add_node(c, type="city")
for _, row in pdf.iterrows():
    G.add_edge(row["driver_id"], row["city"])

print("Vertices:", list(G.nodes))
print("Edges:", list(G.edges))
print("Degree centrality:", nx.degree_centrality(G))

The new line is edges_df.toPandas() → converts a Spark DataFrame back to pandas, which NetworkX can use easily.


PART C — SET 2 (delivery.csv) — Spark SQL Versions

Same dataset:

delivery_id, agent_id, distance, delivery_time, zone
1, A1, 5,  30, Zone1
2, A2, 8,  45, Zone2
3, A1, 6,  35, Zone1
4, A3, 10, 60, Zone3
5, A2, 7,  40, Zone2

C.1 — Question (a): Total delivery_time per zone, sorted DESC — Spark SQL way

Style 1: SQL string

df = spark.read.csv("delivery.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("delivery")

result = spark.sql("""
    SELECT zone, SUM(delivery_time) AS total_time
    FROM delivery
    GROUP BY zone
    ORDER BY total_time DESC
""")
result.show()

import shutil, os
if os.path.exists("output_a_sql"): shutil.rmtree("output_a_sql")
result.write.csv("output_a_sql", header=True)

What's new vs Set 1: One extra line in the SQL

ORDER BY total_time DESC
Part Meaning
ORDER BY Sort the result
total_time Sort by this column (the new aliased name)
DESC Descending (biggest first). Use ASC for ascending (default).

Expected output:

+-----+----------+
| zone|total_time|
+-----+----------+
|Zone2|        85|
|Zone1|        65|
|Zone3|        60|
+-----+----------+

Style 2: DataFrame API

from pyspark.sql.functions import sum as _sum, desc

df = spark.read.csv("delivery.csv", header=True, inferSchema=True)
result = (
    df.groupBy("zone")
      .agg(_sum("delivery_time").alias("total_time"))
      .orderBy(desc("total_time"))
)
result.show()
Line Plain English
df.groupBy("zone") Group rows by zone
.agg(_sum("delivery_time").alias("total_time")) Sum delivery_time per group, call it total_time
.orderBy(desc("total_time")) Sort by total_time, biggest first

C.2 — Question (b): Linear Regression — distance → delivery_time

Same as Set 1 (b), only file/column names change:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

df = spark.read.csv("delivery.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("delivery")

ml_df = spark.sql("SELECT distance, delivery_time FROM delivery")

assembler = VectorAssembler(inputCols=["distance"], outputCol="features")
data = assembler.transform(ml_df).select("features", ml_df["delivery_time"].alias("label"))

lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(data)
predictions = model.transform(data)
predictions.select("features", "label", "prediction").show()

print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)

C.3 — Question (c): Graph Agent → Zone

Same as Set 1 (c), with Agent → Zone renames. Use NetworkX.


PART D — Extra Spark SQL Patterns You Might Need

If the exam throws something else at you, here are common patterns ready to copy.

D.1 — Filtering rows

SQL:

spark.sql("SELECT * FROM rides WHERE distance > 7").show()
spark.sql("SELECT * FROM rides WHERE city = 'Mumbai'").show()

DataFrame API:

df.filter(df.distance > 7).show()
df.filter(df.city == "Mumbai").show()

D.2 — Counting rows per group

SQL:

spark.sql("SELECT city, COUNT(*) AS num_rides FROM rides GROUP BY city").show()

DataFrame API:

from pyspark.sql.functions import count
df.groupBy("city").agg(count("*").alias("num_rides")).show()

D.3 — Average per group

SQL:

spark.sql("SELECT city, AVG(fare) AS avg_fare FROM rides GROUP BY city").show()

DataFrame API:

from pyspark.sql.functions import avg
df.groupBy("city").agg(avg("fare").alias("avg_fare")).show()

D.4 — Multiple aggregations at once

SQL:

spark.sql("""
    SELECT city,
           SUM(fare) AS total,
           AVG(fare) AS average,
           COUNT(*) AS num_rides,
           MAX(fare) AS max_fare,
           MIN(fare) AS min_fare
    FROM rides
    GROUP BY city
""").show()

DataFrame API:

from pyspark.sql.functions import sum as _sum, avg, count, max as _max, min as _min
df.groupBy("city").agg(
    _sum("fare").alias("total"),
    avg("fare").alias("average"),
    count("*").alias("num_rides"),
    _max("fare").alias("max_fare"),
    _min("fare").alias("min_fare")
).show()

D.5 — Sorting

SQL:

spark.sql("SELECT * FROM rides ORDER BY fare DESC").show()
spark.sql("SELECT * FROM rides ORDER BY city ASC, fare DESC").show()

DataFrame API:

from pyspark.sql.functions import desc, asc
df.orderBy(desc("fare")).show()
df.orderBy(asc("city"), desc("fare")).show()

D.6 — Top N rows

SQL:

spark.sql("SELECT * FROM rides ORDER BY fare DESC LIMIT 3").show()

DataFrame API:

df.orderBy(desc("fare")).limit(3).show()

D.7 — Distinct values

SQL:

spark.sql("SELECT DISTINCT city FROM rides").show()

DataFrame API:

df.select("city").distinct().show()

D.8 — Joining two tables

SQL:

df_drivers = spark.read.csv("drivers.csv", header=True, inferSchema=True)
df_drivers.createOrReplaceTempView("drivers")

spark.sql("""
    SELECT r.ride_id, r.fare, d.name
    FROM rides r
    JOIN drivers d ON r.driver_id = d.driver_id
""").show()

DataFrame API:

df.join(df_drivers, on="driver_id", how="inner").show()

D.9 — Adding a computed column

SQL:

spark.sql("SELECT *, fare/distance AS fare_per_km FROM rides").show()

DataFrame API:

df.withColumn("fare_per_km", df.fare / df.distance).show()

D.10 — Finding the city with the highest total fare

SQL:

spark.sql("""
    SELECT city, SUM(fare) AS total
    FROM rides
    GROUP BY city
    ORDER BY total DESC
    LIMIT 1
""").show()

DataFrame API:

from pyspark.sql.functions import sum as _sum, desc
df.groupBy("city").agg(_sum("fare").alias("total")).orderBy(desc("total")).limit(1).show()

PART E — RDD vs Spark SQL — When to Use Which?

Question says... Use this approach
"RDD", "reduceByKey", "key-value pairs", "map/filter" RDD style (WALKTHROUGH.md)
"Spark SQL", "SQL query", "SELECT" SQL string style (spark.sql("SELECT ..."))
"DataFrame", "groupBy", "aggregate" DataFrame API style (df.groupBy(...).agg(...))
"Linear Regression", "Train a model", "ML" Always DataFrame (this file or WALKTHROUGH.md — same code)
"Graph", "NetworkX", "centrality" NetworkX with pandas (always)

If the question says nothing specific (just "find total fare per city"), either RDD or Spark SQL works. SQL is usually shorter and more readable, so go with SQL when you can.


PART F — Pattern Skeleton (Spark SQL Edition)

Memorize this 4-line skeleton:

# 1. Read CSV
df = spark.read.csv(FILE, header=True, inferSchema=True)

# 2. Register as SQL table
df.createOrReplaceTempView(TABLE_NAME)

# 3. Run SQL
result = spark.sql("SELECT ... FROM TABLE_NAME ...")

# 4. Show or save
result.show()
result.write.csv(OUTPUT_FOLDER, header=True)

Then write whatever SQL the question is asking for in step 3. That's literally the whole pattern.


PART G — Common Errors & Fixes

Error What's wrong Fix
Table or view not found: rides You forgot to register the temp view Run df.createOrReplaceTempView("rides") first
cannot resolve 'fare' given input columns: ... Wrong column name (typo or case) Check exact names with df.columns
Path output_a_sql already exists Folder exists from a previous run Delete with shutil.rmtree(...) first, OR use .mode("overwrite")
AnalysisException: GROUP BY ... Selected a column that's not in GROUP BY and not aggregated Either add to GROUP BY or wrap in SUM/AVG/etc.
cannot import name 'sum' Importing sum shadows Python's built-in Rename: from pyspark.sql.functions import sum as _sum

Pro tip: use mode("overwrite") instead of manually deleting

result.write.mode("overwrite").csv("output_a_sql", header=True)

This auto-overwrites the folder if it exists, no need for shutil.rmtree.


PART H — Side-By-Side Cheat Sheet

Task RDD Spark SQL string DataFrame API
Read sc.textFile("f.csv") spark.read.csv("f.csv", header=True, inferSchema=True) (same as Spark SQL)
Register table n/a df.createOrReplaceTempView("t") not needed
Sum per group pairs.reduceByKey(lambda a,b: a+b) SELECT k, SUM(v) FROM t GROUP BY k df.groupBy("k").agg(_sum("v"))
Filter rdd.filter(lambda x: x[1] > 10) SELECT * FROM t WHERE v > 10 df.filter(df.v > 10)
Sort desc rdd.sortBy(lambda x: x[1], ascending=False) ORDER BY v DESC df.orderBy(desc("v"))
Top 5 rdd.takeOrdered(5, lambda x: -x[1]) ... LIMIT 5 (after order) df.limit(5)
Save rdd.saveAsTextFile("out") df.write.csv("out", header=True) (same)
Show print(rdd.collect()) df.show() df.show()

PART I — 30-Second Pre-Exam Recap

  1. Spark SQL = read CSV → register temp view → run SQL string → show/save.
  2. Two styles: SQL string OR DataFrame API. Either works. Pick by question wording.
  3. ML and Graph code stays the same regardless.
  4. SQL pattern is shorter than RDD — when in doubt, use SQL.
  5. Always run the SparkSession cell first.

You now have all three styles covered:

File What it teaches
WALKTHROUGH.md RDD style (reduceByKey, map, filter)
SPARK_SQL_WALKTHROUGH.md Spark SQL + DataFrame API
EXAM_GUIDE.md Quick reference + setup
VIVA_PREP.md 200 viva Q&A

Whichever way the exam asks the question, you have the template. 🚀

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors