Apache Spark – 1 (Spark SQL w/ Jupyter)

Büyük veri setleri üzerinde performanslı analiz ve uygulamalar geliştirme imkanı sunan in-memory hesaplama yeteneğine sahip merkezi bir analitik işleme motorudur. Kendi sayfasındaki açıklaması da şöyledir; “Apache Spark™ is a unified analytics engine for large-scale data processing.”

Spark temelde bir ETL(Extract Transform Load) süreci ihtiyacına yanıt vermektedir. Bu süreç verinin bir veya birden fazla kaynaktan toplanıp, istenen dönüşümler gerçekleştirilip yeni bir kaynağa aktarılmasıdır. Yapılabilecek işlemler; Aggregating, Cleaning, Deduplicating, Filtering, Joining, Sorting, Validating vb.

Görselde de görüldüğü gibi 5 temel parçadan meydana gelir. Bu yazıda örneklemesi yapılacak olan parça Spark SQL parçası olacaktır.

Kurulum

Kurulum senaryosu; docker üzerinde 1 master node 2 worker node olacak şekilde Bitnami‘nin dockerhub üzerindeki imajı üzerinden gerçekleştirilecektir. Senaryo için kullanılacak compose aşağıdaki gibidir.

 spark_compose.yaml
services:
  spark:
    image: docker.io/bitnami/spark:3
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    ports:
      - '8080:8080'
  spark-worker-1:
    image: docker.io/bitnami/spark:3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
  spark-worker-2:
    image: docker.io/bitnami/spark:3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no

Aşağıdaki şekilde çalıştırılıp containerlar yaratılır.

alimesut@alimesut-XPS:~/DEV/ProjectSpark$ docker-compose -f spark_compose.yaml up -d

Kurulum tamamlandıktan sonra localhost:8080 adresinden master node‘a erişim sağlanabilir.

Jupyter IDE kurulumuna ayrıca değinilmeyecek olup linkteki kurulum dökümanından kurulum sağlanabilir. Ardından aşağıdaki komut ile de Python üzerinden Spark kullanımına olanak sağlayan pyspark modülü indirilip örneklere geçilebilir.

alimesut@alimesut-XPS:~/DEV/ProjectSpark$ pip install pyspark

Import ve Temel Bazı Fonksiyonlar

 Kullanılacak modüller import edilir.
from pyspark.sql import SparkSession, functions, Window
import pyspark
 Spark ortamına erişim için session yaratılır.
spark = SparkSession.\
    builder.\
    master('local[*]').\
    appName('firstapp').\
    getOrCreate()
 worldcities.csv dosyası proje klasöründen dataframe’e aktarılır.
df = spark.read.csv('worldcities.csv', header=True)

DataFrame‘de yer alan veri setine spark içinde yer alan Hive üzerinden sorgulanırmış gibi direkt SQL sorgusu yapılmasına olanak sağlayan geçici bir view yaratılır. Bu view session bazlıdır.

 CreateTempView
df.createOrReplaceTempView("worldcities")
 worldcities veri setinin içeriği aşağıdaki şekildedir.
df.select("*").show(10)

OUTPUT:
+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
|       city| city_ascii|     lat|     lng|     country|iso2|iso3|      admin_name|capital|population|        id|
+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
|      Tokyo|      Tokyo| 35.6897|139.6922|       Japan|  JP| JPN|           Tōkyō|primary|  37977000|1392685764|
|    Jakarta|    Jakarta| -6.2146|106.8451|   Indonesia|  ID| IDN|         Jakarta|primary|  34540000|1360771077|
|      Delhi|      Delhi| 28.6600| 77.2300|       India|  IN| IND|           Delhi|  admin|  29617000|1356872604|
|     Mumbai|     Mumbai| 18.9667| 72.8333|       India|  IN| IND|     Mahārāshtra|  admin|  23355000|1356226629|
|     Manila|     Manila| 14.6000|120.9833| Philippines|  PH| PHL|          Manila|primary|  23088000|1608618140|
|   Shanghai|   Shanghai| 31.1667|121.4667|       China|  CN| CHN|        Shanghai|  admin|  22120000|1156073548|
|  São Paulo|  Sao Paulo|-23.5504|-46.6339|      Brazil|  BR| BRA|       São Paulo|  admin|  22046000|1076532519|
|      Seoul|      Seoul| 37.5600|126.9900|Korea, South|  KR| KOR|           Seoul|primary|  21794000|1410836482|
|Mexico City|Mexico City| 19.4333|-99.1333|      Mexico|  MX| MEX|Ciudad de México|primary|  20996000|1484247881|
|  Guangzhou|  Guangzhou| 23.1288|113.2590|       China|  CN| CHN|       Guangdong|  admin|  20902000|1156237133|
+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
only showing top 10 rows

Örnek Senaryo 1

Temp View üzerinde SQL kullanılarak belirli bir paralel ile meridyen aralığındaki şehirlerin bilgisi aşağıdaki şekilde çekilir.

 Örnek Senaryo 1 – SQL
spark.sql("SELECT city, lat, lng, country \
           FROM worldcities \
           WHERE (lat BETWEEN 37.0000 AND 38.0000) \
               AND (lng BETWEEN 40.0000 AND 41.0000) \
           ORDER BY country,city \
           LIMIT 10").show()

OUTPUT:
+----------+-------+-------+-------+
|      city|    lat|    lng|country|
+----------+-------+-------+-------+
|    ‘Āmūdā|37.1042|40.9300|  Syria|
|     Derik|37.3644|40.2689| Turkey|
|Diyarbakır|37.9108|40.2367| Turkey|
| Kızıltepe|37.1939|40.5861| Turkey|
|    Mardin|37.3167|40.7378| Turkey|
|     Çınar|37.7256|40.4147| Turkey|
+----------+-------+-------+-------+

SQL ile yukarıda yapılan sorgunun Spark fonksiyonlarıyla kullanımı aşağıdaki şekildedir.

  Örnek Senaryo 1 – Spark Fonksiyon
df\
.select("city","lat","lng","country")\
.where(df.lat.between("37","38") & df.lng.between("40","41"))\
.orderBy("country","city")\
.limit(10)\
.show()

OUTPUT:
+----------+-------+-------+-------+
|      city|    lat|    lng|country|
+----------+-------+-------+-------+
|    ‘Āmūdā|37.1042|40.9300|  Syria|
|     Derik|37.3644|40.2689| Turkey|
|Diyarbakır|37.9108|40.2367| Turkey|
| Kızıltepe|37.1939|40.5861| Turkey|
|    Mardin|37.3167|40.7378| Turkey|
|     Çınar|37.7256|40.4147| Turkey|
+----------+-------+-------+-------+
  Her ülkeye ait kaç şehir olduğu aşağıdaki şekilde sorgulanabilir.
(df.groupBy("country").count()).orderBy("count",ascending=False).show(10)

OUTPUT:
+--------------+-----+
|       country|count|
+--------------+-----+
| United States| 7824|
|        Brazil| 3371|
|       Germany| 2624|
|         Italy| 2124|
|        France| 2017|
|United Kingdom| 1814|
|   Philippines| 1533|
|         China| 1498|
|        Russia| 1487|
|         Spain| 1035|
+--------------+-----+
only showing top 10 rows

Örnek Senaryo 2 – Aggregation & Windowing

Aggregation ve Windowing fonksiyon yeteneklerini sınamak için SQL ile yapılan aşağıdaki sorguda; ülkelerin 15M’den fazla nüfusa sahip şehirlerinin bazı türetilmiş verileri mevcut.

  • RN kolonu ile ülkelerin kendi içinde nüfusu az olandan çok olana sıralanması sağlanır. (ROW NUMBER)
  • SM kolonu ile ülkelerin toplam nüfusu hesaplanır. Her bir birim 1 Milyon ölçeğindedir. (SUM)
  • CNT kolonu ile ülkelerin kayıtlı şehir sayısı hesaplanır. (COUNT)
  • DR kolonu ile toplam nüfusun şehir sayısına oranı hesaplanır ve şehirlerin ülke bazındaki yoğunlukları bulunur. (Density Ratio)
  Örnek Senaryo 2 – SQL
spark.sql("SELECT country,city,population, \
    ROW_NUMBER() OVER (PARTITION BY country ORDER BY city ASC) as RN, \
    ROUND(SUM(population/1000000) OVER(PARTITION BY country),3) as SM, \
    COUNT(city) OVER(PARTITION BY country) as CNT, \
    ROUND((SUM(population/1000000) OVER(PARTITION BY country))/(COUNT(city) OVER(PARTITION BY country)),3) AS DR \
    FROM worldcities WHERE population > 15000000").show(30)

OUTPUT:
+-------------+------------+----------+---+------+---+------+
|      country|        city|population| RN|    SM|CNT|    DR|
+-------------+------------+----------+---+------+---+------+
|       Russia|      Moscow|  17125000|  1|17.125|  1|17.125|
|  Philippines|      Manila|  23088000|  1|23.088|  1|23.088|
|       Turkey|    Istanbul|  15154000|  1|15.154|  1|15.154|
|    Argentina|Buenos Aires|  16157000|  1|16.157|  1|16.157|
|        China|     Beijing|  19433000|  1|78.384|  4|19.596|
|        China|   Guangzhou|  20902000|  2|78.384|  4|19.596|
|        China|    Shanghai|  22120000|  3|78.384|  4|19.596|
|        China|    Shenzhen|  15929000|  4|78.384|  4|19.596|
|        India|       Delhi|  29617000|  1|70.532|  3|23.511|
|        India|     Kolkāta|  17560000|  2|70.532|  3|23.511|
|        India|      Mumbai|  23355000|  3|70.532|  3|23.511|
|United States|    New York|  18713220|  1|18.713|  1|18.713|
|      Nigeria|       Lagos|  15279000|  1|15.279|  1|15.279|
|   Bangladesh|       Dhaka|  15443000|  1|15.443|  1|15.443|
|     Thailand|     Bangkok|  17066000|  1|17.066|  1|17.066|
|       Mexico| Mexico City|  20996000|  1|20.996|  1|20.996|
|    Indonesia|     Jakarta|  34540000|  1| 34.54|  1| 34.54|
| Korea, South|       Seoul|  21794000|  1|21.794|  1|21.794|
|       Brazil|   São Paulo|  22046000|  1|22.046|  1|22.046|
|        Japan|       Tokyo|  37977000|  1|37.977|  1|37.977|
|        Egypt|       Cairo|  19372000|  1|19.372|  1|19.372|
+-------------+------------+----------+---+------+---+------+

Yukarıda yapılan örnekteki RN kısmı Spark’taki windowing fonksiyonalitesi kullanılarak aşağıdaki şekilde gerçekleştirilir. Dikkat edilirse önce where ifadesi kullanılarak veri önce küçültülüp ardından oluşan set üzerinde de işlem yapmak mümkündür ve ihtiyaç dahilinde daha performanslı bir kullanım olacaktır.

  row_number() örneği
df\
.where(df.population > 15000000)\
.select("country","city","population", 
          functions.row_number().over(Window.partitionBy("country").orderBy("city")).alias("RN"))\
.show(50)

OUTPUT:
+-------------+------------+----------+---+
|      country|        city|population| RN|
+-------------+------------+----------+---+
|       Russia|      Moscow|  17125000|  1|
|  Philippines|      Manila|  23088000|  1|
|       Turkey|    Istanbul|  15154000|  1|
|    Argentina|Buenos Aires|  16157000|  1|
|        China|     Beijing|  19433000|  1|
|        China|   Guangzhou|  20902000|  2|
|        China|    Shanghai|  22120000|  3|
|        China|    Shenzhen|  15929000|  4|
|        India|       Delhi|  29617000|  1|
|        India|     Kolkāta|  17560000|  2|
|        India|      Mumbai|  23355000|  3|
|United States|    New York|  18713220|  1|
|      Nigeria|       Lagos|  15279000|  1|
|   Bangladesh|       Dhaka|  15443000|  1|
|     Thailand|     Bangkok|  17066000|  1|
|       Mexico| Mexico City|  20996000|  1|
|    Indonesia|     Jakarta|  34540000|  1|
| Korea, South|       Seoul|  21794000|  1|
|       Brazil|   São Paulo|  22046000|  1|
|        Japan|       Tokyo|  37977000|  1|
|        Egypt|       Cairo|  19372000|  1|
+-------------+------------+----------+---+

SQL ile gerçekleştirdiğimiz Örnek Senaryo 2’nin Spark fonksiyonalitesi kullanılarak gerçekleştirilmiş hali aşağıdaki şekildedir.

  Örnek Senaryo 2 – Spark Fonksiyonu
df\
.where(df.population > 15000000)\
.select("country","city","population",
          functions.row_number().over(Window.partitionBy("country").orderBy("city")).alias("RN"),
          functions.round(functions.sum(df.population/1000000).over(Window.partitionBy("country")),3).alias("SM"),
          functions.count("city").over(Window.partitionBy("country")).alias("CNT"),
          functions.round(((functions.sum(df.population/1000000).over(Window.partitionBy("country")))/
              (functions.count("city").over(Window.partitionBy("country")))),3).alias("DR"))\
.show(50)
#.orderBy("population", ascending=False)\

OUTPUT:
+-------------+------------+----------+---+------+---+------+
|      country|        city|population| RN|    SM|CNT|    DR|
+-------------+------------+----------+---+------+---+------+
|       Russia|      Moscow|  17125000|  1|17.125|  1|17.125|
|  Philippines|      Manila|  23088000|  1|23.088|  1|23.088|
|       Turkey|    Istanbul|  15154000|  1|15.154|  1|15.154|
|    Argentina|Buenos Aires|  16157000|  1|16.157|  1|16.157|
|        China|     Beijing|  19433000|  1|78.384|  4|19.596|
|        China|   Guangzhou|  20902000|  2|78.384|  4|19.596|
|        China|    Shanghai|  22120000|  3|78.384|  4|19.596|
|        China|    Shenzhen|  15929000|  4|78.384|  4|19.596|
|        India|       Delhi|  29617000|  1|70.532|  3|23.511|
|        India|     Kolkāta|  17560000|  2|70.532|  3|23.511|
|        India|      Mumbai|  23355000|  3|70.532|  3|23.511|
|United States|    New York|  18713220|  1|18.713|  1|18.713|
|      Nigeria|       Lagos|  15279000|  1|15.279|  1|15.279|
|   Bangladesh|       Dhaka|  15443000|  1|15.443|  1|15.443|
|     Thailand|     Bangkok|  17066000|  1|17.066|  1|17.066|
|       Mexico| Mexico City|  20996000|  1|20.996|  1|20.996|
|    Indonesia|     Jakarta|  34540000|  1| 34.54|  1| 34.54|
| Korea, South|       Seoul|  21794000|  1|21.794|  1|21.794|
|       Brazil|   São Paulo|  22046000|  1|22.046|  1|22.046|
|        Japan|       Tokyo|  37977000|  1|37.977|  1|37.977|
|        Egypt|       Cairo|  19372000|  1|19.372|  1|19.372|
+-------------+------------+----------+---+------+---+------+

Compose dosyasına ve kodların yer aldığı Notebook dosyasına linkteki Github projesinden erişilebilir; https://github.com/alimesutk/ProjectSpark

Serinin devamında görüşmek üzere 🖐

Kaynaklar:
https://github.com/bitnami/bitnami-docker-spark
https://towardsdatascience.com/installing-pyspark-with-java-8-on-ubuntu-18-04-6a9dea915b5b
https://sparkbyexamples.com/spark/using-groupby-on-dataframe/
https://sparkbyexamples.com/spark/spark-sql-window-functions/
https://jupyter.org/install

__________________________________________________________________________

Ali Mesut Karadeniz

Yorum bırakın