Selam bu yazımda Sqoop ile ilgili aldığım notları paylaşacağım.
Sqoopu RDBMS’ten yani SQL’den HDFS üzerine veri aktarırken kullanırız. İsmi ise çok cafcaflı, Sql ve Hadoop birleşiminden gelmekte.
Sqoopun güzel noktası, pasif durumdayken herhangi bir kaynak gerektirmez.
Çalıştığı zaman MapReduce uygular ve Yarndan kaynak alır.
Fakat Nifi gibi alternatifler sürekli kaynak tüketir. Bu araçlar için arkada sürekli bir sunucu çalışmak zorunda.
Kullanacağınız SQL’in JDBC driverını aşağıdaki dizine eklemeniz gerekmekte. Sonrasında Sqoopa vereceğiniz driver parametresi ile Sqoop bu driver içinde class araması yapar.
/opt/manual/sqoop/lib/ | grep postgresql
Sqoop Modları
eval:
girilen bilgiler kontrol edilir.
import:
veritabanından veri getir RDBMS → Hadoop (HDFS)
export:
veritabanına veri yükle. Hadoop → İlişkisel veri tabanı
Eval
Sqoop ile veri aktarımı yapmadan önce bağlantıları kontrol etmekte fayda var. Bunun için eval parametresi kullanılır.
--connect jdbc:postgresql://localhost:5432/train_db \
--driver org.postgresql.Driver \
--username pg_user --password secretPassword \
--query "select count(1) from customers"
Sqoop ile Postgres üzerinden HDFS’e veri aktarımı
#1 HDFS üzerinde bir klasör oluştur.
hdfs dfs -mkdir -p /user/train/output_data/sqoop_import
#2 Eval test et
sqoop eval --connect jdbc:postgresql://localhost:5432/traindb \\
--driver org.postgresql.Driver \\
--username pg_user --password secretPassword \\
--query "select rownumber, customerid from churn limit 3"
-----------------------------
| rownumber | customerid |
-----------------------------
| 1 | 15634602 |
| 2 | 15647311 |
| 3 | 15619304 |
-----------------------------
💡 Sqoop bu transfer işlemini yaparken Mapping
yani paralel olarak işlem yapabiliyor. Bunun için -m parametresi kullanılır ve hangi sütuna göre parçalara ayrılacağı belirtilir.
Belirtilen sütun primary key gibi otomatik artan bir sütun olmalı. Yani min-max ile map sayısını ayarlayabileceği bir sütun…
Bu veri seti için rownumber biçilmiş kaftan.
#3 mapping sayısı ne olmalı?
sqoop eval --connect jdbc:postgresql://localhost:5432/traindb \\
--driver org.postgresql.Driver \\
--username train --password secretPassword \\
--query "select min(rownumber) minrow, max(rownumber) maxrow from churn limit 3"
-----------------------------
| minrow | maxrow |
-----------------------------
| 1 | 10000 |
-----------------------------
Yukarıdaki senaryoda 4 mapping seçersek şu şekilde bir tablo olacak:
1. Parça | 0 – 2500 |
2. Parça | 2500 – 5000 |
3. Parça | 5000 – 7500 |
4. Parça | 7500 – 1000 |
Import
sqoop import: PostgreSQL üzerinden Hadoopa veri getir.
💡 Sqoop her zaman HDFS üzerinden olaya bakıyor.
#1 Postgres üzerinden HDFS’e veriyi gönderelim
sqoop import --connect jdbc:postgresql://localhost:5432/traindb \\
--driver org.postgresql.Driver \\
--username pg_user --password secretPassword \\
--table churn \\
--m 4 \\
--split-by rownumber \\
--target-dir /user/train/output_data/sqoop_import/churn
-table churn
veri tabanındaki hangi tablo?
--m 4
belirtilen sütuna göre dört parçaya böl.
--split-by rownumber
mappingte hangi parçaya göre bölünecek? otomatik artan olmalı
--target-dir
hdfs üzerinde nereye gönderilecek?
Dikkat daha önce /user/train/output_data/sqoop_import
diye bir dizin oluşturduk, ekstra olarak bir klasör daha veriyorum, Sqoop verileri bu dizine aktaracak.
Resource Managerdan bu işlemleri takip edebilirsin.
localhost:8088
#2 Hedef dizini kontrol et.
[train@localhost play]$ hdfs dfs -ls /user/train/output_data/sqoop_import/churn
Found 5 items
-rw-r--r-- 1 train supergroup 0 2020-09-18 06:26 /user/train/output_data/sqoop_import/churn/_SUCCESS
-rw-r--r-- 1 train supergroup 169844 2020-09-18 06:26 /user/train/output_data/sqoop_import/churn/part-m-00000
-rw-r--r-- 1 train supergroup 171064 2020-09-18 06:26 /user/train/output_data/sqoop_import/churn/part-m-00001
-rw-r--r-- 1 train supergroup 170654 2020-09-18 06:26 /user/train/output_data/sqoop_import/churn/part-m-00002
-rw-r--r-- 1 train supergroup 170774 2020-09-18 06:26 /user/train/output_data/sqoop_import/churn/part-m-00003
💡 4 Mapping verdiğimiz için 4 dosya oluştu! Herbirinde 25.000 gözlem var.
Hedef Klasörü Silme
Aynı dizinde tekrar aynı işlemi yapmaya çalışırsak hata alırız. (File Already Exists)
Bunun için -target-dir
öncesinde -delete-target-dir
eklenmeli. Bu HDFS’teki dizini import işlemi öncesi siler.
Örneğin:
sqoop import --connect jdbc:postgresql://localhost:5432/traindb \\
--driver org.postgresql.Driver \\
--username pg_user --password secretPassword \\
--table churn \\
--m 4 \\
--split-by rownumber \\
--delete-target-dir \\
--target-dir /user/train/output_data/sqoop_import/churn
Burada klasör ismini tarih ile dinamik yaparak bu durumdan kurtulmak mümkün.
Örneğin:
now=$(date+”%Y_%m_%d_%H_%M_%s”)
Açıkça şifre yazma, Password File oluştur.
Bu işi yapacaksak amatörlüğe yer yok, historyde hasssas bilgiler tutmayalım.
Bunun için bir password dosyası oluşturup, yetkilerini ayarlayarak parametre olarak bu dosyayı vermek ideal bir çözüm olacaktır.
echo -n "secretPassword" > ~/sqoop.password
chmod 400 ~/sqoop.password
Kullanım Şekli:
sqoop eval --connect jdbc:postgresql://localhost:5432/traindb \\
--username pg_user \\
--password-file file:///home/train/sqoop.password \\
--query "select rownumber, customerid from churn limit 3"
Sqoop – Hive
Sqoop ile HDFS üzerine veri aktarırken aynı zamanda Hive’da şema oluşturmak da mümkün.
#1 Hive import et
sqoop import --connect jdbc:postgresql://localhost/traindb \\
--driver org.postgresql.Driver \\
--username pg_user --password secretPassword \\
--query "select * from churn WHERE \\$CONDITIONS" \\
--m 4 --split-by rownumber \\
--hive-import --create-hive-table --hive-table test1.churn \\
--target-dir /tmp/churn
WHERE \$CONDITIONS" \ | Queryde where olsa da olmasa da yazmak durumundayız |
--hive-import | hive yüklenecek |
--create-hive-table | hive’da tablo yok sen oluştur |
--hive-table test1.churn | tablo ismi |
--target-dir /tmp/churn | HDFS dizini |
#2 Dbeaverdan kontrol edelim
describe churn;
col_name |data_type|comment|
---------------|---------|-------|
rownumber |int | |
customerid |int | |
surname |string | |
creditscore |int | |
geography |string | |
gender |string | |
age |int | |
tenure |int | |
balance |double | |
numofproducts |int | |
hascrcard |int | |
isactivemember |int | |
estimatedsalary|double | |
exited |int | |
select count(1) from churn;
_c0 |
-----|
10000|
Dikkat!
#3 Aynı tabloyu tekrar yazmak istiyorsan
--create-hive-table
yerine--hive-overwrite
kullan.-delete-target-dir
ekle ya da directory değiştir.
Örnek Kullanım:
sqoop import --connect jdbc:postgresql://localhost/traindb \\
--driver org.postgresql.Driver \\
--username pg_user --password secretPassword \\
--query "select * from churn WHERE \\$CONDITIONS" \\
--m 4 --split-by rownumber \\
--hive-import --hive-overwrite --hive-table test1.churn \\
--delete-target-dir --target-dir /tmp/churn
#4 Hiçbir şey yazmasak Append
olur.
sqoop import --connect jdbc:postgresql://localhost/traindb \\
--driver org.postgresql.Driver \\
--username pg_user --password secretPassword \\
--query "select * from churn WHERE \\$CONDITIONS" \\
--m 4 --split-by rownumber \\
--hive-import --hive-table test1.churn \\
--delete-target-dir --target-dir /tmp/churn
#5 Tablo ismi ile import
sqoop import --connect jdbc:postgresql://localhost/traindb \\
--driver org.postgresql.Driver --username pg_user --password secretPassword \\
--table churn --m 4 --split-by rownumber \\
--hive-import --hive-overwrite --hive-table test1.churn --target-dir /tmp/churn3
#6 Dikkat! Eğer otomatik artan (incremental) bir sütun yoksa -m 1
yap.
#7 Where koşulu ile import
sqoop import --connect jdbc:postgresql://localhost/traindb \\
--driver org.postgresql.Driver --username pg_user --password secretPassword \\
--query "select * from public.churn where exited = 1 AND \\$CONDITIONS" \\
--m 4 --split-by rownumber \\
--hive-import --create-hive-table --hive-table test1.churn_exited_1 --target-dir /tmp/churn_exited_1
#8 Bashscript şeklinde cronjob scriptleri oluşturulabilir.
#!/bin/bash
sqoop import --connect jdbc:postgresql://localhost/traindb --driver org.postgresql.Driver --username pg_user --password secretPassword --query "select * from churn WHERE \\$CONDITIONS" --m 4 --split-by rownumber --hive-import --create-hive-table --hive-table test2.churn --target-dir /tmp/churn
./run_sqoop.sh > sqoop.log 2>1&
Export
Çok yaygın bir kullanım değil, HDFS üzerinden SQL tarafına veri aktarmak için kullanılır.
#1 truncate target table churn
truncate table public.churn;
select * from public.churn limit 5;
rownumber|customerid|surname|creditscore|geography|gender|age|tenure|balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|
---------|----------|-------|-----------|---------|------|---|------|-------|-------------|---------|--------------|---------------|------|
#2 Export
sqoop export --connect jdbc:postgresql://localhost/traindb \\
--driver org.postgresql.Driver \\
--username pg_user -password secretPassword \\
--export-dir /user/train/output_data/sqoop_import/churn \\
-m 4 --table public.churn