Büyük veri setleri üzerinde performanslı analiz ve uygulamalar geliştirme imkanı sunan in-memory hesaplama yeteneğine sahip merkezi bir analitik işleme motorudur. Kendi sayfasındaki açıklaması da şöyledir; “Apache Spark™ is a unified analytics engine for large-scale data processing.”
Spark temelde bir ETL(Extract Transform Load) süreci ihtiyacına yanıt vermektedir. Bu süreç verinin bir veya birden fazla kaynaktan toplanıp, istenen dönüşümler gerçekleştirilip yeni bir kaynağa aktarılmasıdır. Yapılabilecek işlemler; Aggregating, Cleaning, Deduplicating, Filtering, Joining, Sorting, Validating vb.
Görselde de görüldüğü gibi 5 temel parçadan meydana gelir. Bu yazıda örneklemesi yapılacak olan parça Spark SQL parçası olacaktır.
Kurulum
Kurulum senaryosu; docker üzerinde 1 master node 2 worker node olacak şekilde Bitnami‘nin dockerhub üzerindeki imajı üzerinden gerçekleştirilecektir. Senaryo için kullanılacak compose aşağıdaki gibidir.
services:
spark:
image: docker.io/bitnami/spark:3
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- '8080:8080'
spark-worker-1:
image: docker.io/bitnami/spark:3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
spark-worker-2:
image: docker.io/bitnami/spark:3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
Aşağıdaki şekilde çalıştırılıp containerlar yaratılır.
alimesut@alimesut-XPS:~/DEV/ProjectSpark$ docker-compose -f spark_compose.yaml up -d
Kurulum tamamlandıktan sonra localhost:8080 adresinden master node‘a erişim sağlanabilir.
Jupyter IDE kurulumuna ayrıca değinilmeyecek olup linkteki kurulum dökümanından kurulum sağlanabilir. Ardından aşağıdaki komut ile de Python üzerinden Spark kullanımına olanak sağlayan pyspark modülü indirilip örneklere geçilebilir.
alimesut@alimesut-XPS:~/DEV/ProjectSpark$ pip install pyspark
Import ve Temel Bazı Fonksiyonlar
from pyspark.sql import SparkSession, functions, Window
import pyspark
spark = SparkSession.\
builder.\
master('local[*]').\
appName('firstapp').\
getOrCreate()
df = spark.read.csv('worldcities.csv', header=True)
DataFrame‘de yer alan veri setine spark içinde yer alan Hive üzerinden sorgulanırmış gibi direkt SQL sorgusu yapılmasına olanak sağlayan geçici bir view yaratılır. Bu view session bazlıdır.
df.createOrReplaceTempView("worldcities")
df.select("*").show(10)
OUTPUT:
+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
| city| city_ascii| lat| lng| country|iso2|iso3| admin_name|capital|population| id|
+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
| Tokyo| Tokyo| 35.6897|139.6922| Japan| JP| JPN| Tōkyō|primary| 37977000|1392685764|
| Jakarta| Jakarta| -6.2146|106.8451| Indonesia| ID| IDN| Jakarta|primary| 34540000|1360771077|
| Delhi| Delhi| 28.6600| 77.2300| India| IN| IND| Delhi| admin| 29617000|1356872604|
| Mumbai| Mumbai| 18.9667| 72.8333| India| IN| IND| Mahārāshtra| admin| 23355000|1356226629|
| Manila| Manila| 14.6000|120.9833| Philippines| PH| PHL| Manila|primary| 23088000|1608618140|
| Shanghai| Shanghai| 31.1667|121.4667| China| CN| CHN| Shanghai| admin| 22120000|1156073548|
| São Paulo| Sao Paulo|-23.5504|-46.6339| Brazil| BR| BRA| São Paulo| admin| 22046000|1076532519|
| Seoul| Seoul| 37.5600|126.9900|Korea, South| KR| KOR| Seoul|primary| 21794000|1410836482|
|Mexico City|Mexico City| 19.4333|-99.1333| Mexico| MX| MEX|Ciudad de México|primary| 20996000|1484247881|
| Guangzhou| Guangzhou| 23.1288|113.2590| China| CN| CHN| Guangdong| admin| 20902000|1156237133|
+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
only showing top 10 rows
Örnek Senaryo 1
Temp View üzerinde SQL kullanılarak belirli bir paralel ile meridyen aralığındaki şehirlerin bilgisi aşağıdaki şekilde çekilir.
spark.sql("SELECT city, lat, lng, country \
FROM worldcities \
WHERE (lat BETWEEN 37.0000 AND 38.0000) \
AND (lng BETWEEN 40.0000 AND 41.0000) \
ORDER BY country,city \
LIMIT 10").show()
OUTPUT:
+----------+-------+-------+-------+
| city| lat| lng|country|
+----------+-------+-------+-------+
| ‘Āmūdā|37.1042|40.9300| Syria|
| Derik|37.3644|40.2689| Turkey|
|Diyarbakır|37.9108|40.2367| Turkey|
| Kızıltepe|37.1939|40.5861| Turkey|
| Mardin|37.3167|40.7378| Turkey|
| Çınar|37.7256|40.4147| Turkey|
+----------+-------+-------+-------+
SQL ile yukarıda yapılan sorgunun Spark fonksiyonlarıyla kullanımı aşağıdaki şekildedir.
df\
.select("city","lat","lng","country")\
.where(df.lat.between("37","38") & df.lng.between("40","41"))\
.orderBy("country","city")\
.limit(10)\
.show()
OUTPUT:
+----------+-------+-------+-------+
| city| lat| lng|country|
+----------+-------+-------+-------+
| ‘Āmūdā|37.1042|40.9300| Syria|
| Derik|37.3644|40.2689| Turkey|
|Diyarbakır|37.9108|40.2367| Turkey|
| Kızıltepe|37.1939|40.5861| Turkey|
| Mardin|37.3167|40.7378| Turkey|
| Çınar|37.7256|40.4147| Turkey|
+----------+-------+-------+-------+
(df.groupBy("country").count()).orderBy("count",ascending=False).show(10)
OUTPUT:
+--------------+-----+
| country|count|
+--------------+-----+
| United States| 7824|
| Brazil| 3371|
| Germany| 2624|
| Italy| 2124|
| France| 2017|
|United Kingdom| 1814|
| Philippines| 1533|
| China| 1498|
| Russia| 1487|
| Spain| 1035|
+--------------+-----+
only showing top 10 rows
Örnek Senaryo 2 – Aggregation & Windowing
Aggregation ve Windowing fonksiyon yeteneklerini sınamak için SQL ile yapılan aşağıdaki sorguda; ülkelerin 15M’den fazla nüfusa sahip şehirlerinin bazı türetilmiş verileri mevcut.
- RN kolonu ile ülkelerin kendi içinde nüfusu az olandan çok olana sıralanması sağlanır. (ROW NUMBER)
- SM kolonu ile ülkelerin toplam nüfusu hesaplanır. Her bir birim 1 Milyon ölçeğindedir. (SUM)
- CNT kolonu ile ülkelerin kayıtlı şehir sayısı hesaplanır. (COUNT)
- DR kolonu ile toplam nüfusun şehir sayısına oranı hesaplanır ve şehirlerin ülke bazındaki yoğunlukları bulunur. (Density Ratio)
spark.sql("SELECT country,city,population, \
ROW_NUMBER() OVER (PARTITION BY country ORDER BY city ASC) as RN, \
ROUND(SUM(population/1000000) OVER(PARTITION BY country),3) as SM, \
COUNT(city) OVER(PARTITION BY country) as CNT, \
ROUND((SUM(population/1000000) OVER(PARTITION BY country))/(COUNT(city) OVER(PARTITION BY country)),3) AS DR \
FROM worldcities WHERE population > 15000000").show(30)
OUTPUT:
+-------------+------------+----------+---+------+---+------+
| country| city|population| RN| SM|CNT| DR|
+-------------+------------+----------+---+------+---+------+
| Russia| Moscow| 17125000| 1|17.125| 1|17.125|
| Philippines| Manila| 23088000| 1|23.088| 1|23.088|
| Turkey| Istanbul| 15154000| 1|15.154| 1|15.154|
| Argentina|Buenos Aires| 16157000| 1|16.157| 1|16.157|
| China| Beijing| 19433000| 1|78.384| 4|19.596|
| China| Guangzhou| 20902000| 2|78.384| 4|19.596|
| China| Shanghai| 22120000| 3|78.384| 4|19.596|
| China| Shenzhen| 15929000| 4|78.384| 4|19.596|
| India| Delhi| 29617000| 1|70.532| 3|23.511|
| India| Kolkāta| 17560000| 2|70.532| 3|23.511|
| India| Mumbai| 23355000| 3|70.532| 3|23.511|
|United States| New York| 18713220| 1|18.713| 1|18.713|
| Nigeria| Lagos| 15279000| 1|15.279| 1|15.279|
| Bangladesh| Dhaka| 15443000| 1|15.443| 1|15.443|
| Thailand| Bangkok| 17066000| 1|17.066| 1|17.066|
| Mexico| Mexico City| 20996000| 1|20.996| 1|20.996|
| Indonesia| Jakarta| 34540000| 1| 34.54| 1| 34.54|
| Korea, South| Seoul| 21794000| 1|21.794| 1|21.794|
| Brazil| São Paulo| 22046000| 1|22.046| 1|22.046|
| Japan| Tokyo| 37977000| 1|37.977| 1|37.977|
| Egypt| Cairo| 19372000| 1|19.372| 1|19.372|
+-------------+------------+----------+---+------+---+------+
Yukarıda yapılan örnekteki RN kısmı Spark’taki windowing fonksiyonalitesi kullanılarak aşağıdaki şekilde gerçekleştirilir. Dikkat edilirse önce where ifadesi kullanılarak veri önce küçültülüp ardından oluşan set üzerinde de işlem yapmak mümkündür ve ihtiyaç dahilinde daha performanslı bir kullanım olacaktır.
df\
.where(df.population > 15000000)\
.select("country","city","population",
functions.row_number().over(Window.partitionBy("country").orderBy("city")).alias("RN"))\
.show(50)
OUTPUT:
+-------------+------------+----------+---+
| country| city|population| RN|
+-------------+------------+----------+---+
| Russia| Moscow| 17125000| 1|
| Philippines| Manila| 23088000| 1|
| Turkey| Istanbul| 15154000| 1|
| Argentina|Buenos Aires| 16157000| 1|
| China| Beijing| 19433000| 1|
| China| Guangzhou| 20902000| 2|
| China| Shanghai| 22120000| 3|
| China| Shenzhen| 15929000| 4|
| India| Delhi| 29617000| 1|
| India| Kolkāta| 17560000| 2|
| India| Mumbai| 23355000| 3|
|United States| New York| 18713220| 1|
| Nigeria| Lagos| 15279000| 1|
| Bangladesh| Dhaka| 15443000| 1|
| Thailand| Bangkok| 17066000| 1|
| Mexico| Mexico City| 20996000| 1|
| Indonesia| Jakarta| 34540000| 1|
| Korea, South| Seoul| 21794000| 1|
| Brazil| São Paulo| 22046000| 1|
| Japan| Tokyo| 37977000| 1|
| Egypt| Cairo| 19372000| 1|
+-------------+------------+----------+---+
SQL ile gerçekleştirdiğimiz Örnek Senaryo 2’nin Spark fonksiyonalitesi kullanılarak gerçekleştirilmiş hali aşağıdaki şekildedir.
df\
.where(df.population > 15000000)\
.select("country","city","population",
functions.row_number().over(Window.partitionBy("country").orderBy("city")).alias("RN"),
functions.round(functions.sum(df.population/1000000).over(Window.partitionBy("country")),3).alias("SM"),
functions.count("city").over(Window.partitionBy("country")).alias("CNT"),
functions.round(((functions.sum(df.population/1000000).over(Window.partitionBy("country")))/
(functions.count("city").over(Window.partitionBy("country")))),3).alias("DR"))\
.show(50)
#.orderBy("population", ascending=False)\
OUTPUT:
+-------------+------------+----------+---+------+---+------+
| country| city|population| RN| SM|CNT| DR|
+-------------+------------+----------+---+------+---+------+
| Russia| Moscow| 17125000| 1|17.125| 1|17.125|
| Philippines| Manila| 23088000| 1|23.088| 1|23.088|
| Turkey| Istanbul| 15154000| 1|15.154| 1|15.154|
| Argentina|Buenos Aires| 16157000| 1|16.157| 1|16.157|
| China| Beijing| 19433000| 1|78.384| 4|19.596|
| China| Guangzhou| 20902000| 2|78.384| 4|19.596|
| China| Shanghai| 22120000| 3|78.384| 4|19.596|
| China| Shenzhen| 15929000| 4|78.384| 4|19.596|
| India| Delhi| 29617000| 1|70.532| 3|23.511|
| India| Kolkāta| 17560000| 2|70.532| 3|23.511|
| India| Mumbai| 23355000| 3|70.532| 3|23.511|
|United States| New York| 18713220| 1|18.713| 1|18.713|
| Nigeria| Lagos| 15279000| 1|15.279| 1|15.279|
| Bangladesh| Dhaka| 15443000| 1|15.443| 1|15.443|
| Thailand| Bangkok| 17066000| 1|17.066| 1|17.066|
| Mexico| Mexico City| 20996000| 1|20.996| 1|20.996|
| Indonesia| Jakarta| 34540000| 1| 34.54| 1| 34.54|
| Korea, South| Seoul| 21794000| 1|21.794| 1|21.794|
| Brazil| São Paulo| 22046000| 1|22.046| 1|22.046|
| Japan| Tokyo| 37977000| 1|37.977| 1|37.977|
| Egypt| Cairo| 19372000| 1|19.372| 1|19.372|
+-------------+------------+----------+---+------+---+------+
Compose dosyasına ve kodların yer aldığı Notebook dosyasına linkteki Github projesinden erişilebilir; https://github.com/alimesutk/ProjectSpark
Serinin devamında görüşmek üzere 🖐
Kaynaklar:
https://github.com/bitnami/bitnami-docker-spark
https://towardsdatascience.com/installing-pyspark-with-java-8-on-ubuntu-18-04-6a9dea915b5b
https://sparkbyexamples.com/spark/using-groupby-on-dataframe/
https://sparkbyexamples.com/spark/spark-sql-window-functions/
https://jupyter.org/install
__________________________________________________________________________
Ali Mesut Karadeniz