Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions Exercises/Exercise-7/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
FROM ubuntu:18.04
FROM ubuntu:22.04

RUN apt-get update && \
apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && \
apt-get install -y default-jdk scala wget vim software-properties-common python3.9 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev && \
apt-get clean

RUN wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz && \
RUN wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz && \
tar xvf spark-3.5.0-bin-hadoop3.tgz && \
mv spark-3.5.0-bin-hadoop3/ /usr/local/spark && \
ln -s /usr/local/spark spark && \
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.890/aws-java-sdk-bundle-1.11.890.jar && \
mv aws-java-sdk-bundle-1.11.890.jar /spark/jars && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar && \
mv hadoop-aws-3.2.0.jar /spark/jars
ln -s /usr/local/spark /usr/bin/spark && \
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \
mv aws-java-sdk-bundle-1.12.262.jar /usr/local/spark/jars && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \
mv hadoop-aws-3.3.4.jar /usr/local/spark/jars


WORKDIR app
WORKDIR /app
COPY . /app

RUN pip3 install markupsafe==1.1.1 cryptography==3.3.2 cython==0.29.21 numpy==1.18.5 && pip3 install -r requirements.txt
RUN pip3 install --upgrade pip && \
pip3 install markupsafe==2.0.1 cryptography==38.0.4 cython==0.29.32 numpy==1.21.6 && \
pip3 install -r requirements.txt

ENV PYSPARK_PYTHON=python3
ENV PYSPARK_SUBMIT_ARGS='--packages io.delta:delta-core_2.12:0.8.0 pyspark-shell'
ENV PYSPARK_SUBMIT_ARGS='--packages io.delta:delta-core_2.12:2.4.0 pyspark-shell'
12 changes: 7 additions & 5 deletions Exercises/Exercise-7/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ date,serial_number,model,capacity_bytes,failure,smart_1_normalized,smart_1_raw,s
Your job is to read this file with `PySpark` and answer the following questions.
Answer each question by adding a new column with the answer.

1. Add the file name as a column to the DataFrame and call it `source_file`.
2. Pull the `date` located inside the string of the `source_file` column. Final data-type must be
`date` or `timestamp`, not a `string`. Call the new column `file_date`.
3. Add a new column called `brand`. It will be based on the column `model`. If the
~~1. Add the file name as a column to the DataFrame and call it `source_file`.~~

~~2. Pull the `date` located inside the string of the `source_file` column. Final data-type must be~~
~~`date` or `timestamp`, not a `string`. Call the new column `file_date`.~~

~~3. Add a new column called `brand`. It will be based on the column `model`. If the
column `model` has a space ... aka ` ` in it, split on that `space`. The value
found before the space ` ` will be considered the `brand`. If there is no
space to split on, fill in a value called `unknown` for the `brand`.
space to split on, fill in a value called `unknown` for the `brand`.~~

4. Inspect a column called `capacity_bytes`. Create a secondary DataFrame that
relates `capacity_bytes` to the `model` column, create "buckets" / "rankings" for
Expand Down
88 changes: 87 additions & 1 deletion Exercises/Exercise-7/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,96 @@
from pyspark.sql import SparkSession
import os
from zipfile import ZipFile

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window


def main():
spark = SparkSession.builder.appName("Exercise7").enableHiveSupport().getOrCreate()
# your code here
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')

cwd = os.getcwd() # get current working directory path
file_path = os.path.join(cwd, 'data/hard-drive-2022-01-01-failures.csv.zip')

s_df = None

s_df = extract_csv(s_df, file_path, spark)

s_df = extract_date_from_file(s_df)

s_df = extract_brand_from_model(s_df)

return s_df


def extract_csv(s_df: DataFrame, file_path: str, spark: SparkSession) -> DataFrame:
'''
Extracts the compressed CSV file in memory;
During load, file name is extracted to 'source_file' column;
'''

with ZipFile(file_path) as current_zip:
with current_zip.open('hard-drive-2022-01-01-failures.csv') as failures_csv:
s_df = (
spark
.read
.format('csv')
.option('header', True)
.load(failures_csv)
.withColumn('source_file', F.input_file_name())
)

return s_df


def extract_date_from_file(s_df: DataFrame) -> DataFrame:
'''
Pulls date in date data-type from file name in column 'source_file'
'''

s_df = s_df.withColumn('file_date', F.to_date(
F.regexp_extract('source_file', F.regexp_extract(r'\d{4}-\d{2}=\d{2}', 1)), 'yyyy-MM--dd'))

return s_df


def extract_brand_from_model(s_df: DataFrame) -> DataFrame:
'''
Brand is based off hardrive model value provided;
Space delimited, where first string value is brand, else unknown;

have:
ltrim(col)
Trim the spaces from left end for the specified string value.

regexp_extract(str, pattern, idx)
Extract a specific group matched by a Java regex, from the specified string column.

split(str, pattern[, limit])
Splits str around matches of the given pattern.
split("text", "\\s+") # Split on one or more whitespace characters
'''

s_df = (s_df.withColumn('model_name_split', F.split(F.ltrim(F.col('model')), ' '))
.withColumn('brand',
F.when(F.size('model_name_split') > 1, F.col('model_name_split')[0])
.otherwise(T.lit('unknown'))
)
.drop('model_name_split')
)

return s_df


def calculating_storage_ranking(s_df: DataFrame) -> DataFrame:
'''
We have column `capacity_bytes` and column `model`. The aim is to get storage capacity rankings
for present models in the data based off the capacity provided.
'''
pass


if __name__ == "__main__":
Expand Down