Mulesoft Toplu Veri İşleme (Batch Processing)

Mulesoft Toplu Veri İşleme (Batch Processing)

MuleSoft’ta Batch Processing, büyük veri kümelerini toplu olarak ve yüksek verimlilikle işlemek için kullanılan bir yöntemdir. Bu yöntem, iş yüklerini paralel olarak çalıştırarak maksimum verim sağlar.

Batch Processing aşağıdakileri yapmak için idealdir;

  • İki sistemi gerçek zamanlı olarak senkronize etmek. Örneğin ERP’den Salesforce’a hesap güncelleme ayrıntılarının aktarımı,
  • ETL aracı — dosyaların veya DB kayıtlarının diğer sistemlere aktarılması

Batch Processing, gelen yükü (payload) ayrı kayıtlara bölerek ve her kaydı eşzamansız (asynchronously) olarak işlemciler üzerinde çalıştırır. Tek tek kayıtları kendi veri deposuna (persistence store) kaydeder. Bu, uygulamanın çökmesi durumunda durduğu noktadan devam edeceği için Toplu işleme için gerekli güvenilirliği sağlayacaktır.

Batch İş Akışı

Batch iş akışı, üç ana fazdan oluşur:

Input Fazı

  • Gelen veriler toplu olarak alınır.
  • Veriler, bireysel işlere bölünür.

Load and Dispatch Fazı

  • İşler, çeşitli iş parçacıklarına dağıtılır.
  • İşler paralel olarak işlenir.

On Complete Fazı

  • Tüm işler tamamlandığında sonuçlar toplanır.
  • Nihai çıktı oluşturulur.

Önemli Kavramlar ve Bileşenler

  • Batch Steps: Her batch iş akışı, bir dizi adımdan oluşur. Her adım, belirli bir işlemi temsil eder.
  • Batch Records: İşlem yapılacak bireysel veri birimleridir.
  • Batch Job Instance: Her batch iş akışı, bir job örneği olarak çalıştırılır.
  • Batch Job Execution: Batch iş akışının yürütülme sürecidir.

Avantajları

  • Paralel İşleme: Çoklu iş parçacığı kullanarak veriyi daha hızlı işleme.
  • Hata Yönetimi: Her iş bağımsız olduğundan, bir işte hata oluştuğunda diğerleri etkilenmez.
  • Ölçeklenebilirlik: Büyük veri kümeleri rahatça işlenebilir.

Uygulama Senaryoları

  • Veritabanı Senkronizasyonu: İki veritabanı arasındaki verileri senkronize etmek.
  • Dosya İşleme ve Dönüşümü: Büyük dosyaların işlenmesi ve formatlarının dönüştürülmesi.
  • Gerçek Zamanlı Veri Analitiği: Büyük veri kümelerinin analiz edilmesi.

Kullanım Örnekleri

  1. Veritabanı Senkronizasyonu
  • Bir veritabanından verileri alarak başka bir veritabanına aktarır ve senkronize eder.
  • Bu işlem sırasında her kayıt bağımsız olarak işlenir.

2. Dosya İşleme

  • Büyük dosya kümelerini okuyup işleyerek veritabanına kaydeder.
  • Her dosya kaydı ayrı bir iş olarak ele alınır ve işlenir.

3. Veri Analitiği

  • Büyük veri kümelerini analiz ederek anlamlı sonuçlar çıkarır.
  • Analiz işlemleri paralel olarak gerçekleştirilir.

Teknik Detaylar ve Konfigürasyon

  • Batch Job Configurations: Batch işlerinin nasıl konfigüre edileceğini belirler.
  • Error Handling: Hata yönetimi ve hataların nasıl ele alınacağını belirler.
  • Chunk Size: Her bir iş parçacığında işlenecek veri miktarını belirler.

Batch Processing bileşenleri içine yerleştirdiğiniz işlemciler kayıtlar üzerinde hareket eder. Her kayıt bir Mule event’e benzer, İşlemciler payload anahtar sözcüğünü kullanarak her bir kayıt yüküne erişebilir, bunları değiştirebilir ve yönlendirebilir ve vars sözcüğünü kullanarak Mule değişkenleri üzerinde çalışabilir. Ancak, Toplu İş bileşeninin girdisindeki Mule özniteliklerine (attributes) erişemez veya bunları değiştiremezler.

Aşağıdaki örnekte, Batch Processing gerçekleştiren ve bu işlemin sonuçlarını içeren bir rapor döndüren bir Mule Flow XML yapısı özetlenmektedir:

Not: Örnekteki <processor placeholder />, Mule mesajlarını, kayıtlarını ve toplu işleme raporunu işleyen işlemleri, Mule çekirdek bileşenleri vb. gibi aşamaları örneklemeyi amaçlar.

<flow name=”mule-flow” >

<! — processor that triggers the flow →

<event source placeholder />
<! — message processors →

<processor placeholder />
<processor placeholder />

<! — Batch Job component →

<batch:job name=”Batch_Job”/>
<! — record processing occurs within process-records →

<batch:process-records >
<! — Batch Step component →

<batch:step name=”Batch_Step”/>
<! — processors that act on records →

<processor placeholder />

<processor placeholder />

<! — Batch Aggregator component →

<batch:aggregator />
<! — processor that acts arrays of records →

<processor placeholder />

</batch:aggregator>

</batch:step>

<! — another Batch Step component →

<batch:step name=”Batch_Step1"/>
<! — processor that acts on records →

<processor placeholder />

</batch:step>

</batch:process-records>

<! — processing of a batch job report takes place in on-complete →

<batch:on-complete>
<! — processor for result of a batch job →

<processor placeholder />

</batch:on-complete>

</batch:job>

</flow>

</mule>
  1. Mule event kaynağı Mule Flow’unu tetikler. Yaygın event kaynakları, Anypoint Connector for HTTP’den (HTTP Connector) bir HTTP Listener, bir Zamanlayıcı bileşeni veya yeni dosyaları yoklayan bir bağlayıcı işlemi gibi dinleyicilerdir,
  2. Batch Job component akışı üstünde bulunan işlemciler genellikle Batch Job component tüketmesi için bir mesajı alır ve gerekirse hazırlar. Örneğin, bir HTTP request işlemi işlenecek verileri alabilir ve bir Transform Message bileşenindeki bir DataWeave komut dosyası verileri Batch Job component’nin alması için geçerli bir biçime dönüştürebilir.
  3. Batch Job component akıştaki bir üst akış işlemcisinden bir mesaj aldığında, Load ve Dispatch aşaması başlar. Bu aşamada, bileşen girdiyi kayıt olarak işlenmek üzere hazırlar; bu işlemenin gerçekleşeceği bir toplu iş örneği oluşturmayı da içerir.
  4. Batch Job örneği <process-records/> öğesine ulaştığında batch job yürütülür. Bu noktada, İşlem aşaması başlar. Tüm kayıtları işleme bu aşamada gerçekleşir.
  5. Her Batch Step bileşeni, kayıtlardaki verileri dönüştürmek, yönlendirmek, zenginleştirmek veya değiştirmek için bir kayıt üzerinde işlem yapan bir veya daha fazla işlemci içerir. Örneğin, işlenen kayıtları tek tek (one-by-one) harici bir sunucuya aktarmak için bir bağlayıcı işlemi yapılandırabilirsiniz.
  6. Batch Aggregator bileşeni isteğe bağlıdır. Bir Batch Step bileşenine yalnızca bir tane ekleyebilirsiniz. Bir Batch Aggregator içindeki ilk işlemci (processor), bir dizi kaydı girdi olarak kabul edebilmelidir. Batch aggregation, işlenmiş kayıtlardan oluşan bir diziyi harici bir sunucuya yüklemek için kullanışlıdır. Diğer işlemcilerin kayıtları tek tek işleyebilmesi için dizi üzerinde yineleme yapan For Each gibi bileşenler kullanmak da mümkündür. Batch Aggregator bileşeni, kayıtların nasıl işleneceğini belirtmek için bir akış (streaming) veya boyut (size) ayarı gerektirir.
  7. Ek Batch Step bileşenleri isteğe bağlıdır. Bu örnek bir Batch Aggregator bileşeni içermez.
  8. Kayıtlar tüm Batch Step Adımı bileşenlerinden geçtikten sonra, Mule Batch Step örneğini tamamlar ve sonuçları, işleme sırasında hangi kayıtların başarılı ve hangilerinin başarısız olduğunu gösteren bir nesnede raporlar. Sonucu <batch:on-complete /> içinde bir Logger veya başka bir işlemci ile alabilir veya günlüğe kaydedebilirsiniz.

Batch Job bileşeni, bir toplu iş örneğindeki tüm kayıt işlemleri bittikten sonra gerçekleşen “On Complete” aşamasında kayıtları tüketir ve işlenen kayıtları Batch Job bileşeni dışındaki harici bileşenlere yaymaz. Ancak, birçok Mule bileşeni gibi, Batch Job bileşeni de Batch Job dışındaki bir alt bileşenin Batch Job bileşeninin aldığı önceden işlenmiş yüke erişmek için kullanabileceği bir hedef değişkeni (target) özelliği ayarlamak için bir yol sağlar.

Batch Job Bileşenine Geçerli Input

Akış bir üst akış olayı tarafından tetiklendiğinde, Batch Job bileşeni Mule mesaj girişi üzerinde örtük bir bölme işlemi gerçekleştirir. İşlem, JSON ve XML yüklerinin yanı sıra tüm Java yinelenebilirlerini, yineleyicilerini veya dizilerini kabul eder.

Bölme işlemiyle uyumlu olmayan bir veri biçimiyle çalışıyorsanız, Batch Job bileşenine girmeden önce payload (yükü) desteklenen bir biçime dönüştürün.

Mule Variable (Değişken) Yayılımı

Bir batch job örneğindeki her kayıt, Batch Job bileşeninin girdisinden Mule Event Variables’ı devralır. İşlem aşamasında çalışan Batch Step ve Batch Aggregator bileşenlerinde bu değişkenlere erişebilir ve değerlerini değiştirebilir ve bu bileşenlerde yeni değişkenler oluşturabilirsiniz. Bu aşamadaki her bir kayıt için değişkenler (ve bunlarda yapılan değişiklikler) bir Batch Job bileşeni içindeki her bir Batch Step ve Aggregator bileşenine yayılır. Örneğin, R1 kaydı Mule variable varName’i “hello” olarak ayarlarsa, R2 kaydı varName’i “world” olarak ayarlarsa ve R3 kaydı bu variable ayarlamazsa, bir sonraki Batch Step bileşeninde R1 “hello” değerine, R2 “world” değerine sahip olur ve R3 bu değişken için null döndürür.

Benzer bir senaryoda, Batch Job bileşeninden önceki akışta varName ayarlanırsa ve kayıt 3 (R3) varName değerini değiştirmezse, R3 bu değeri devralır. Bu noktayı açıklamak için, Batch Job bileşenine [1,2,3,4] dizisinden oluşan bir yük ve “my variable before batch job” değerine sahip bir Mule variable varName gönderen bir Mule flow varsayalım. Aşağıdaki örnek, birinci ve ikinci kayıtlarda varName için yeni bir değer ayarlamak için Choice yönlendiricisini (choice) kullanır, ancak üçüncü kayıtta ayarlamaz. Sonraki kayıtlar da yeni bir varName değeri ayarlar.

Loglar bir batch job örneği için aşağıdaki mesajları yazdırır (okunabilirlik için düzenlenmiştir):



INFO …PRINT VARIABLE BEFORE BATCH: “my variable before batch job”



INFO …PRINT VARIABLE VALUES AFTER BATCH JOB: “my variable before batch job”

INFO …PRINT RECORD NUMBER: 1

INFO …R1: PRINT VARIABLE for PAYLOAD is 1: “hello”

INFO …PRINT RECORD NUMBER: 2

INFO …R2: PRINT VARIABLE for PAYLOAD is 2: “world”

INFO …PRINT RECORD NUMBER: 3

INFO …R3: PRINT VARIABLE for PAYLOAD is 3: “my variable before batch job”

INFO …PRINT RECORD NUMBER: 4

INFO …Rn: PRINT DEFAULT VARIABLE: “some other value”



INFO …PRINT VARIABLES IN SECOND STEP: {payload 1=”hello”}

INFO …PRINT VARIABLES IN SECOND STEP: {payload 2=”world”}

INFO …PRINT VARIABLES IN SECOND STEP: {payload 3=”my variable before batch job”}

INFO …PRINT VARIABLES IN SECOND STEP: {payload 4=”some other value”}

XML Detayları

<?xml version=”1.0" encoding=”UTF-8"?>

<mule xmlns:batch=”http://www.mulesoft.org/schema/mule/batch"

xmlns=”http://www.mulesoft.org/schema/mule/core"

xmlns:doc=”http://www.mulesoft.org/schema/mule/documentation"

xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation=”http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd

http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd">

<flow name=”batch-variables-ex”>

<scheduler doc:name=”Scheduler”>

<scheduling-strategy>

<fixed-frequency frequency=”45" timeUnit=”SECONDS” />

</scheduling-strategy>

</scheduler>

<! — Set Payload →

<set-payload value=”#[[1,2,3,4]]” doc:name=”Set Payload” />

<! — Set Variable →

<set-variable value=’”my variable before batch job”’

doc:name=”Set Variable” variableName=”varName” />

<! — Log Variable →

<logger level=”INFO” doc:name=”Logger”

message=”#[vars.varName]” category=”PRINT VARIABLE BEFORE BATCH” />

<! — Batch Job →

<batch:job jobName=”batch-variables-exBatch_Job”>

<batch:process-records>

<! — First Batch Step →

<batch:step name=”Batch_Step”>

<logger level=”INFO” doc:name=”Logger” message=”#[payload]”

category=”PRINT RECORD NUMBER” />

<choice doc:name=”Choice”>

<! — First record, R1 →

<when expression=”#[payload == 1]”>

<set-variable value=’”hello”’

doc:name=”Set Variable” variableName=”varName” />

<logger level=”INFO” doc:name=”Logger”

message=”#[vars.varName]”

category=”R1: PRINT VARIABLE for PAYLOAD is 1" />

</when>

<! — Second record, R2 →

<when expression=”#[payload == 2]”>

<set-variable value=’”world”’

doc:name=”Set Variable” variableName=”varName” />

<logger level=”INFO” doc:name=”Logger”

category=”R2: PRINT VARIABLE for PAYLOAD is 2"

message=”#[vars.varName]” />

</when>

<! — Third record, R3 →

<when expression=”#[payload == 3]”>

<logger level=”INFO” doc:name=”Logger”

message=”#[vars.varName]”

category=”R3: PRINT VARIABLE for PAYLOAD is 3" />

</when>

<! — Other records →

<otherwise>

<set-variable value=’”some other value”’

doc:name=”Set Variable” variableName=”varName” />

<logger level=”INFO” doc:name=”Logger”

category=”Rn: PRINT DEFAULT VARIABLE” message=”#[vars.varName]” />

</otherwise>

</choice>

</batch:step>

<! — Second Batch Step →

<batch:step name=”Batch_Step1">

<logger level=”INFO” doc:name=”Logger”

message=’#[(“payload “ ++ payload as String) : vars.varName]’

category=”PRINT VARIABLES IN SECOND STEP” />

</batch:step>

</batch:process-records>

</batch:job>

<! — Log Variables After Batch Job →

<logger level=”INFO” doc:name=”Logger”

message=”#[vars.varName]”

category=”PRINT VARIABLE VALUES AFTER BATCH JOB” />

</flow>

</mule>

Üçüncü kaydın (R3) Batch Job bileşeninden önce ayarlanan varName değerini devraldığına dikkat edin. R3'ten sonraki tüm kayıtlar variable’ını “başka bir değere” ayarlar. İkinci Batch Step’te kaydedilen kayıtlar, ilk Batch Step’teki değişkenlerin değerlerini devralır. Batch Job bileşeninden sonra, Batch Job örneği işlemeyi bitirmeden önce asenkron olarak işlenen Logger, Batch Job yürütülmeden önce varName değerini de alır, “my variable before batch job”.

İşlem aşaması sırasında Mule variables’ta yapılan değişiklikler ve eklemeler On Complete aşamasına yayılmaz. Yalnızca tetikleyici olaydan Batch Job bileşenine devralınan değişkenler On Complete aşamasına yayılır ve batchJobInstanceId gibi toplu iş raporunun bir parçası olan standart değişkenler de bu aşamada bulunur. On Complete fazında faz boyunca devam eden ancak On Complete sona erdikten sonra devam etmeyen bir değişken oluşturmak mümkündür.

Örneğin, Tamamlandı aşamasının yeni bir değişken (myOnCompleteVar) ayarladığını ve bu aşamada bulunan tüm değişkenleri (vars) günlüğe kaydettiğini varsayalım.

<batch:on-complete>

<set-variable value=”Hello On Complete Variable”

doc:name=”Set Variable” variableName=”myOnCompleteVar” />

<logger level=”INFO” doc:name=”Logger” message=”#[vars]”

category=”PRINT ON COMPLETE VARIABLES” />

</batch:on-complete>

On Complete raporunda, batch job örneği için son set değişkenlerini bulabilirsiniz:

INFO … PRINT ON COMPLETE VARIABLES: {

varName=TypedValue[value: ‘“my variable before batch job”’, dataType: ‘SimpleDataType{type=java.lang.String, mimeType=’*/*; charset=UTF-8'}’],

batchJobInstanceId=TypedValue[value: ‘869ae510–5c84–11ed-bc38–147ddaaf4f97’, dataType: ‘SimpleDataType{type=java.lang.String, mimeType=’*/*’}’],

myOnCompleteVar=TypedValue[value: ‘Hello On Complete Variable’, dataType: ‘SimpleDataType{type=java.lang.String, mimeType=’*/*; charset=UTF-8'}’]}

Rapor nesnesi örneğindeki değişkenler aşağıdaki gibidir:

  • varName, Batch Job bileşenine ulaşmadan önce oluşturulan bir variable örneğidir. Bu variable , bileşene girdiğinde sahip olduğu değeri korur. İşlem aşaması sırasında değerde yapılan herhangi bir değişiklik On Complete aşamasında korunmaz.
  • batchJobInstanceId bir batch job örneğini tanımlayan standart bir değişkendir.
  • myOnCompleteVar, On Complete aşamasında oluşturulan bir variable örneğidir. Raporda görünür ancak On Complete aşaması sona erdikten sonra kalıcı olmaz.

Batch Job bileşeninden aşağı doğru yalnızca varName kalıcı olur. Kayıtlar ve nitelikleri gibi, kayıtların değişkenleri de Batch Job bileşeni içinde tamamen tüketilir. Batch Job örneği, akışın geri kalanından asenkron olarak yürütülür, bu nedenle Process veya On Complete aşamalarında oluşturulan hiçbir variable Batch Job bileşeninin dışında kalıcı olmaz.

Error Handling

Tüm bir batch job’ın başarısız olmasını önlemek için, Batch Job bileşeni meydana gelen herhangi bir kayıt düzeyindeki başarısızlığı ele alabilir. Ayrıca, tek tek kayıtlar üzerinde değişkenler ayarlayabilir veya kaldırabilirsiniz, böylece batch job işleme sırasında Mule bir batch job örneğindeki kayıtları bir variable’a göre yönlendirebilir veya başka şekilde hareket edebilir.

Logicalbond (Mulesoft Partner and Reseller)

Logicalbond, bir MuleSoft Türkiye Yetkili Satıcısı ve İş Ortağıdır. Küçük, orta ölçekli, kurumsal ve stratejik müşteriler için güvenilir entegrasyon çözümleri üretir.

Sertifikalı danışmanlarımız, müşterilerimizin gelişen ihtiyaçlarına uyum sağlamalarına yardımcı olurken aynı zamanda inovasyonu ve dijital dönüşümü destekleyen görev açısından kritik çözümler sağlama konusunda yeteneklidir.

Logicalbond’un güvenilir iş ortağınız olmasına izin verin, biz de kuruluşunuzun ve ekibinizin sürdürülebilir dijital değerler oluşturmasına yardımcı olalım. Daha fazla bilgi için lütfen www.logicalbond.com adresini ziyaret edin.