Rabbit MQ -Work Queues

3.cü bölümde çalışma kuyruklarını inceleyeceğiz; İlk bölümde bir kuyruğa ait mesajı gönderdik ve karşı taraftan karşıladık.

https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

Çalışma kuyruğu oluşturulmasının nedeni, kaynağın uzun sürecek olan bir görevin tamamlanmasını beklemeden sıraya almasıdır. Bu yüzden uzun sürecek olan görevleri birden çok çalışan arasında dağıtmak için kullanılacak bir Çalışma Kuyruğu ile deneme yapacağız.

System.Threading namespace i altındaki Thread.Sleep() kullanarak gerçekten meşgulmüşüz gibi bu örneği yapacağız.

Bir dize göndereceğiz ve dizedeki nokta sayısını bekleme süresi olarak alacağız; her nokta “işin” bir saniyesini hesaba katacaktır. Örneğin, “Hello…” datası bu görev için üç saniye kadar sürecektir.

RabbitMQ_NewTask ve RabbitMQ_NewTaskWorker isimli 2 yeni console uygulaması daha açıyorum ve rabbitmq.client ları da yine paket olarak ekliyoruz manage nugets ten.

Round-Robin Dispatching

Round-Robin için güzel bir tanımlamayı aşağıya bırakıyorum 🙂

https://eksisozluk.com/entry/32776342

“kısıtlı kaynaklara, kaynak sayısından fazla unsurun/işin erişmeye ve kullanmaya çalışması durumunda, her işin, sistemde bu yöntem için tanımlanmış en küçük zaman dilimi boyunca işlendiği; en küçük zaman diliminde bitmeyen işlerin sırada bekleyen bir sonraki iş ile değiştirildiği; dolayısıyla da bellek/kaydedici/kuyruk birimlerini de bitmeyen işlerin durumlarının saklanması gereği nedeniyle içermek zorunda olan zamanlama/kaynak yönetimi yöntemi. yöntemin olumlu yanlarından en önde geleni, bu yöntem üzerine çalışacak sistemin tasarımının oldukça kolay olmasıdır. yöntemin olumsuz yönlerinden en önde gelenleri ise sistem tasarımının bellek/kaydedici türü unsurları barındırmak zorunda oluşu ve kısmen ya da tamamen rastsal (örneğin poisson) süreçler ile beslendiğinde oldukça düşük başarım göstermesi sayılabilir.”

Bir işi paralel hale getirerek, birikmiş iş listesini daha fazla işçi ekleyerek çözme yöntemi tanımlıyor.

İlk olarak 2 worker örneğini çalıştıralım (vs 2019 için 2 tane worker uygulamasını aynı anda açmak gibi), yani bu durumda toplamda 3 konsol uygulaması açık olmalı.

  • newtask yani sender/publisher pozisyonundaki uygulama kodlarımız;

Burada yaptığımız değişiklikle uygulamayı defalarca başlatmak yerine, bir thread yapısı oluşturuyorum;

thread içinde çağırdığım methodumun içeriği de;

Burada şunu yapıyoruz; bir array var,burada dizi içinde sırasıyla tek nokta çift nokta 3 nokta olarak string verilerimiz var.

Bunların içinde dönüp her defasında mesajın içeriğini dinamik olarak değiştiriyorum. Sonrada gelen dizi sayısına göre bir sleep işlemi gerçekleştiriyorum. Sonraki işlem ise önceki yazı dizilerimizden de anlaşılacağı üzere mesajın byte a çevrilmesi ve task_queue kuyruğunda gönderimi.

  • Alıcı pozisyonundaki newtaskworker uygulama kodlarımız da aşağıdaki gibidir. Burada da consumer ile okuduğumuz datanın içindeki nokta sayısına göre yine bir sleep işlemi ile meşgul ediyoruz uygulamayı

İlk olarak worker ları çalıştırıyoruz;

Newtask yani sender uygulamamızı çalıştırıyoruz, işlemleri devam ederken worker ları da izliyoruz;

Aşağıdaki görseldeki gibi sender sırasıyla 4 mesajı da gönderiyor, alıcı tarafındaki uygulamalar ise işi bölüşmüş oldular.

0-2 ve 4.cü mesajları biri, 1 ve 3.cü mesajları da diğeri almış oldu.Bu kod mimarisiyle de Round-Robin Dispatching e uygun olmuş oldu.

Github kodları için;

https://github.com/erolakgul/RabbitMQ_NewTask

https://github.com/erolakgul/RabbitMQ_NewTaskWorker

Message acknowledgment

Verilen bir görevi işlemek biraz zaman alabilir. Consumer tarafındaki uygulama, uzun sürecek bir işleme başlayabilir ve de kısmi olarak işi bitirebilir. Rabbit mq consumer’a bir mesaj ilettiğinde bunu silinmek üzere işaretler. Dolayıısyla kısmi olarak biten iş sonrası o anki worker işlemi sonlandırılırsa mesajı kaybedebiliriz. Ayrıca worker’a gönderilen diğer mesajlarda kaybolabilir.

Hiçbir görevi kaybetmek istemiyoruz. Bir worker sonlanırsa, görevin başka bir worker’a teslim edilmesini isteriz. Bir mesajın asla kaybolmadığından emin olmak için RabbitMQ, mesaj bildirimleri özelliğini kullanacağız. Consumer tarafından RabbitMQ’ya belirli bir mesajın alındığını, işlendiğini ve RabbitMQ’nun onu silmekte özgür olduğunu bildirmek için bir onay(bilgi) gönderilebilir.

Consumer , bir onay göndermeden sonlanırsa (kanalın kapanması, bağlantının kopması veya TCP bağlantısının kesilmesi gibi), RabbitMQ bir mesajın tam olarak işlenmediğini anlar ve yeniden kuyruğa alır. Aynı anda çevrimiçi olan başka Consumer varsa, bunu hızlı bir şekilde başka bir tüketiciye yeniden iletir. Bu şekilde, worker lar ara sıra sonlansa bile mesajın kaybolmadığından emin olmaya çalışacağız..

Fair Dispatch

Adil Gönderim olarak çevirebileceğimiz yöntem şunu söylüyor bize, eğer aynı anda bir mesaj gönderimi varsa ve 2 worker’ımız varsa bu yöntem ile her 2 işçi arasında bu mesaj gönderimi dağıtabiliriz. Böylece mesajın alındığından emin olduğumuzda diğer mesajın kaybolmadığını ve worker ın her mesajı aldığını görebiliyoruz.

RabbitMQ’nun mesaj kuyruğa girdiğinde sadece bir mesaj gönderiyor. Bir tüketici için onaylanmamış mesajların sayısına bakmaz. Her n’inci mesajı körü körüne n’inci tüketiciye gönderir.

Bu davranışı değiştirmek için prefetchCount = 1 ayarıyla BasicQos yöntemini kullanabiliriz. Bu, RabbitMQ’ya bir işçiye aynı anda birden fazla mesaj vermemesini söyler. Veya başka bir deyişle, bir işçiye bir öncekini işleyip onaylayana kadar yeni bir mesaj göndermeyin. Bunun yerine, onu hala meşgul olmayan bir sonraki çalışana gönderir.

Şimdi kod tarafında yapacağımız değişikliklere bakalım;

sender da durable parametresini true gönderiyoruz, kuyruk ismini ise değiştiriyoruz çünkü parametreler değişiyor, farklı bir kuyruk ile gitmesi gerekiyor.

Buradaki kodu ise, for içine alıyorum,aynı süre sonra gelen bir mesajı aynı anda 2 kere göndereceğim. Yani aynı anda “1.ci Mesaj” isminde bir mesajı 2 kere göndereceğim.

Consumer tarafında ise 2 worker ım olacak, ve o aynı gelen mesajın her biri 2 işçiye de pay edilecek.

Alıcı tarafta ise şu değişiklikleri yaptık, durable yine true değeri atandı, her bir worker ın o an için tek mesaj alabileceğini BasicQos ile belirttik ve son olarak da BasicAck ile mesaj gönderildiğini onaylayan yapıyı kurduk.

tüm uygulamaları çalıştırdığımızda;

publisher ın aynı mesajları 2 kere gönderdiğini, çalışan 2 workerın da her mesajı paylaştıklarını görmüş oluyoruz.

Github kodları için yukarıda verilen linkler geçerlidir.

Bir Cevap Yazın

Aşağıya bilgilerinizi girin veya oturum açmak için bir simgeye tıklayın:

WordPress.com Logosu

WordPress.com hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Twitter resmi

Twitter hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Facebook fotoğrafı

Facebook hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Connecting to %s