Apache Spark – 2 (Spark SQL w/ Databricks)

Veri özelinde kullanılan pek çok ürün gibi Apache Spark‘ında cloud hizmet sağlayıcıları tarafından sunulan servisleri mevcuttur. Son zamanlarda adı sıkça duyulan Databricks veri tarafında Spark’ında içinde bulunduğu bir dizi hizmeti cloud hizmeti olarak sunmaktadır.

Yazıda Databricks‘in Community Cloud hizmetinden ücretsiz olarak edinilen deneyim özelinde örnekler sunulacaktır. Databricks diğer cloud hizmet sağlayıcılar ile de entegre çalışmaktadır. Bu yazının yazıldığı tarihte AWS, Azure ve GCP içerisinde de servisleri hizmet sunmaktadır.

Databricks

Hesap açma adımından sonra açılan ana ekran aşağıdaki gibidir.

Kullanılacak notebook bir cluster üzerinde çalışacağından solda yer alan New Cluster ile açılan aşağıdaki ekrandan yeni bir cluster oluşturulur.

Ardından ana ekranda orta kısımda yer alan Import & Explore Data kısmından kullanılacak veri setleri Databricks’e eklenir. Bunun yerine notebook üzerinde localdeki yada uzak makinedeki bir veri setinin yolu ve şeması verilerekte kullanılabilir.

Veri setinin tanımlamaları seçilir.

Veri setinin nihai hali sol taraftaki bar açılıp Data kısmından görüntülenebilir.

Örnek senaryonun gerçekleştirileceği notebook için yine ana sayfadan soldaki New Notebook kısmıyla yeni bir notebook yaratılıp açılır.

Örnek Senaryo – Explain Plan

Spark ile yapılan her işlem engine tarafında gerçekleşen bir optimizasyon sürecinden geçer. Bu süreci işleten yapı Catalyst Optimizer‘dır. Bu yapı sorgunun (Query) ilgili veri üzerinde (DataFrame) nasıl işletileceğini optimize eder. Query Planning temel anlamda nasıl çalışırı anlamak için aşağıdaki akışı incelemek yeterli olacaktır.

RDBMS ile çalışanlarında aşina olduğu bu yapı explain() fonksiyonuyla görüntülenebilir. explain() fonksiyonu iki argüman alabilen bir fonksiyondur. Biri extended diğeride mode argümanlarıdır. extended default olarak False alınır. mode ise şu değerler ile çalıştırılabilir; simple, extended, codegen, cost ve formatted. Örneğe konu olan senaryo cost ve formatted parametreleri olacaktır.

 Kullanılacak modüller import edilir.
import pyspark
from pyspark.sql import SparkSession, functions, Window
 Spark Engine’e erişim için session yaratılır.
spark = SparkSession.\
    builder.\
    master('local[*]').\
    appName('spark_explain').\
    getOrCreate()
 Data altında yer alan tablolar dataframe’e aktarılır.
worldcitiesDF = spark.table('worldcities')
gdpDF = spark.table('gdp')
 worldcities ve gdp veri setlerinin içeriği aşağıdaki şekildedir.
worldcitiesDF.select("*").show(5)
gdpDF.select("*").show(5)

OUTPUT:
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|   city|city_ascii|    lat|     lng|    country|iso2|iso3| admin_name|capital|population|        id|
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|  Tokyo|     Tokyo|35.6897|139.6922|      Japan|  JP| JPN|      Tōkyō|primary|  37977000|1392685764|
|Jakarta|   Jakarta|-6.2146|106.8451|  Indonesia|  ID| IDN|    Jakarta|primary|  34540000|1360771077|
|  Delhi|     Delhi|  28.66|   77.23|      India|  IN| IND|      Delhi|  admin|  29617000|1356872604|
| Mumbai|    Mumbai|18.9667| 72.8333|      India|  IN| IND|Mahārāshtra|  admin|  23355000|1356226629|
| Manila|    Manila|   14.6|120.9833|Philippines|  PH| PHL|     Manila|primary|  23088000|1608618140|
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
only showing top 5 rows

+---------+-------+--------------+----------------------+
|ISO3_Code|Ranking|        Coutry|Economy_Million_Dollar|
+---------+-------+--------------+----------------------+
|      USA|      1| United States|              20936600|
|      CHN|      2|         China|              14722731|
|      JPN|      3|         Japan|               5064873|
|      DEU|      4|       Germany|               3806060|
|      GBR|      5|United Kingdom|               2707744|
+---------+-------+--------------+----------------------+
only showing top 5 rows

worldcities ve gdp veri setleri iso3 ülke kodları üzerinden inner join edilir. worldcities veri setinden ülke bazında şehirlerin toplam nüfus bilgisi alınır. gdp tablosundan ise Economy_Million_Dollar kolonu alınıp yeni bir dataframe oluşturulur.

 Örnek Senaryo
explainDF = worldcitiesDF\
.join(gdpDF,worldcitiesDF.iso3 == gdpDF.ISO3_Code,"inner")\
.where(worldcitiesDF.population > 15000000)\
.select(worldcitiesDF.country,worldcitiesDF.population,gdpDF.Economy_Million_Dollar,
          functions.round(functions.sum(worldcitiesDF.population/1000000)\
                          .over(Window.partitionBy(worldcitiesDF.country)),3)\
        .alias("Sum_Population_Million"))\
.orderBy(worldcitiesDF.population, ascending=False)
 explainDF dataframe’inin içeriği aşağıdaki şekildedir.
explainDF.show(10)

OUTPUT:
+------------+----------+----------------------+----------------------+
|     country|population|Economy_Million_Dollar|Sum_Population_Million|
+------------+----------+----------------------+----------------------+
|       Japan|  37977000|               5064873|                37.977|
|   Indonesia|  34540000|               1058424|                 34.54|
|       India|  29617000|               2622984|                70.532|
|       India|  23355000|               2622984|                70.532|
| Philippines|  23088000|                361489|                23.088|
|       China|  22120000|              14722731|                78.384|
|      Brazil|  22046000|               1444733|                22.046|
|Korea, South|  21794000|               1630525|                21.794|
|      Mexico|  20996000|               1076163|                20.996|
|       China|  20902000|              14722731|                78.384|
+------------+----------+----------------------+----------------------+
only showing top 10 rows

cost modunda dataframe’in Optimized Logical Plan‘ı ve seçilmiş Physical Plan‘ı aşağıdaki gibidir. Maliyet MiB, KiB … şeklinde verinin boyutuna göre sunulur. RDBMS yapılardaki cardinality gibi düşünülebilir.

 explainDF dataframe’inin optimizer tarafından cost bazında planı aşağıdaki şekildedir.
explainDF.explain(mode="cost")

OUTPUT:
== Optimized Logical Plan ==
Sort [population#3035 DESC NULLS LAST], true, Statistics(sizeInBytes=106.9 MiB)
+- Window [country#3030, population#3035, Economy_Million_Dollar#3333, round(sum(_w0#5189) windowspecdefinition(country#3030, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())), 3) AS Sum_Population_Million#5184], [country#3030], Statistics(sizeInBytes=106.9 MiB)
   +- Project [country#3030, population#3035, Economy_Million_Dollar#3333, (cast(population#3035 as double) / 1000000.0) AS _w0#5189], Statistics(sizeInBytes=106.9 MiB)
      +- Join Inner, (iso3#3032 = ISO3_Code#3330), Statistics(sizeInBytes=184.6 MiB)
         :- Project [country#3030, iso3#3032, population#3035], Statistics(sizeInBytes=1361.0 KiB)
         :  +- Filter ((isnotnull(population#3035) AND (population#3035 > 15000000)) AND isnotnull(iso3#3032)), Statistics(sizeInBytes=4.2 MiB)
         :     +- Relation[city#3026,city_ascii#3027,lat#3028,lng#3029,country#3030,iso2#3031,iso3#3032,admin_name#3033,capital#3034,population#3035,id#3036] csv, Statistics(sizeInBytes=4.2 MiB)
         +- Project [ISO3_Code#3330, Economy_Million_Dollar#3333], Statistics(sizeInBytes=3.0 KiB)
            +- Filter isnotnull(ISO3_Code#3330), Statistics(sizeInBytes=5.2 KiB)
               +- Relation[ISO3_Code#3330,Ranking#3331,Coutry#3332,Economy_Million_Dollar#3333] csv, Statistics(sizeInBytes=5.2 KiB)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [population#3035 DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(population#3035 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#5422]
      +- Window [country#3030, population#3035, Economy_Million_Dollar#3333, round(sum(_w0#5189) windowspecdefinition(country#3030, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())), 3) AS Sum_Population_Million#5184], [country#3030]
         +- Sort [country#3030 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(country#3030, 200), ENSURE_REQUIREMENTS, [id=#5418]
               +- Project [country#3030, population#3035, Economy_Million_Dollar#3333, (cast(population#3035 as double) / 1000000.0) AS _w0#5189]
                  +- BroadcastHashJoin [iso3#3032], [ISO3_Code#3330], Inner, BuildRight, false
                     :- Filter ((isnotnull(population#3035) AND (population#3035 > 15000000)) AND isnotnull(iso3#3032))
                     :  +- FileScan csv default.worldcities[country#3030,iso3#3032,population#3035] Batched: false, DataFilters: [isnotnull(population#3035), (population#3035 > 15000000), isnotnull(iso3#3032)], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/worldcities-4.csv], PartitionFilters: [], PushedFilters: [IsNotNull(population), GreaterThan(population,15000000), IsNotNull(iso3)], ReadSchema: struct<country:string,iso3:string,population:int>
                     +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, false]),false), [id=#5414]
                        +- Filter isnotnull(ISO3_Code#3330)
                           +- FileScan csv default.gdp[ISO3_Code#3330,Economy_Million_Dollar#3333] Batched: false, DataFilters: [isnotnull(ISO3_Code#3330)], Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/GDP-4.csv], PartitionFilters: [], PushedFilters: [IsNotNull(ISO3_Code)], ReadSchema: struct<ISO3_Code:string,Economy_Million_Dollar:int>

formatted modunda dataframe’in seçilmiş Physical Plan‘ı görüntülenir, aşağıdaki gibidir. Adıyla müsemma adımları (1), (2), (3), … şeklinde numaralandırp görsel olarak daha anlışılır bir görüntü sunar.

 worldcities veri setinin içeriği aşağıdaki şekildedir.
explainDF.explain(mode="formatted")

OUTPUT:
== Physical Plan ==
AdaptiveSparkPlan (13)
+- Sort (12)
   +- Exchange (11)
      +- Window (10)
         +- Sort (9)
            +- Exchange (8)
               +- Project (7)
                  +- BroadcastHashJoin Inner BuildRight (6)
                     :- Filter (2)
                     :  +- Scan csv default.worldcities (1)
                     +- BroadcastExchange (5)
                        +- Filter (4)
                           +- Scan csv default.gdp (3)


(1) Scan csv default.worldcities
Output [3]: [country#3030, iso3#3032, population#3035]
Batched: false
Location: InMemoryFileIndex [dbfs:/FileStore/tables/worldcities-4.csv]
PushedFilters: [IsNotNull(population), GreaterThan(population,15000000), IsNotNull(iso3)]
ReadSchema: struct<country:string,iso3:string,population:int>

(2) Filter
Input [3]: [country#3030, iso3#3032, population#3035]
Condition : ((isnotnull(population#3035) AND (population#3035 > 15000000)) AND isnotnull(iso3#3032))

(3) Scan csv default.gdp
Output [2]: [ISO3_Code#3330, Economy_Million_Dollar#3333]
Batched: false
Location: InMemoryFileIndex [dbfs:/FileStore/tables/GDP-4.csv]
PushedFilters: [IsNotNull(ISO3_Code)]
ReadSchema: struct<ISO3_Code:string,Economy_Million_Dollar:int>

(4) Filter
Input [2]: [ISO3_Code#3330, Economy_Million_Dollar#3333]
Condition : isnotnull(ISO3_Code#3330)

(5) BroadcastExchange
Input [2]: [ISO3_Code#3330, Economy_Million_Dollar#3333]
Arguments: HashedRelationBroadcastMode(ArrayBuffer(input[0, string, false]),false), [id=#5782]

(6) BroadcastHashJoin
Left keys [1]: [iso3#3032]
Right keys [1]: [ISO3_Code#3330]
Join condition: None

(7) Project
Output [4]: [country#3030, population#3035, Economy_Million_Dollar#3333, (cast(population#3035 as double) / 1000000.0) AS _w0#5271]
Input [5]: [country#3030, iso3#3032, population#3035, ISO3_Code#3330, Economy_Million_Dollar#3333]

(8) Exchange
Input [4]: [country#3030, population#3035, Economy_Million_Dollar#3333, _w0#5271]
Arguments: hashpartitioning(country#3030, 200), ENSURE_REQUIREMENTS, [id=#5786]

(9) Sort
Input [4]: [country#3030, population#3035, Economy_Million_Dollar#3333, _w0#5271]
Arguments: [country#3030 ASC NULLS FIRST], false, 0

(10) Window
Input [4]: [country#3030, population#3035, Economy_Million_Dollar#3333, _w0#5271]
Arguments: [country#3030, population#3035, Economy_Million_Dollar#3333, round(sum(_w0#5271) windowspecdefinition(country#3030, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())), 3) AS Sum_Population_Million#5228], [country#3030]

(11) Exchange
Input [4]: [country#3030, population#3035, Economy_Million_Dollar#3333, Sum_Population_Million#5228]
Arguments: rangepartitioning(population#3035 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#5790]

(12) Sort
Input [4]: [country#3030, population#3035, Economy_Million_Dollar#3333, Sum_Population_Million#5228]
Arguments: [population#3035 DESC NULLS LAST], true, 0

(13) AdaptiveSparkPlan
Output [4]: [country#3030, population#3035, Economy_Million_Dollar#3333, Sum_Population_Million#5228]
Arguments: isFinalPlan=false

Kodların Databricks Notebook‘u üzerindeki görüntüsü aşağıdaki şekildedir. 

Cluster UI

Compute kısmından Spark içindeki monitor ve yönetim arayüzlerine erişilebilir. Aşağıdaki görsel notebook üzerinde çalıştırılan komutun çalışma sürecini gösterir.

Kaynaklar:

https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/
https://spark.apache.org/docs/3.1.2/sql-ref-syntax-qry-explain.html#content
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html?highlight=join#pyspark.sql.DataFrame.join
https://medium.com/datalex/sparks-logical-and-physical-plans-when-why-how-and-beyond-8cd1947b605a
https://blog.clairvoyantsoft.com/apache-spark-join-strategies-e4ebc7624b06
https://sparkbyexamples.com/spark/spark-sql-dataframe-join/
https://docs.databricks.com/spark/latest/spark-sql/cbo.html
https://docs.databricks.com/data/tables.html#language-python
https://databricks.com/glossary/catalyst-optimizer
https://developer.ibm.com/blogs/how-to-understanddebug-your-spark-application-using-explain/
https://udemy.com/course/apache-spark-3-databricks-certified-associate-developer
https://udemy.com/course/apache-spark-programming-in-python-for-beginners

__________________________________________________________________________

Ali Mesut Karadeniz

Yorum bırakın