0 2K ru

Паттерн конкурирующих потребителей (Competing Consumers Pattern)

Цель шаблона разрешить нескольким параллельным консюмерам обрабатывать сообщения, полученные по тому же каналу обмена сообщениями. Этот шаблон позволяет системе обрабатывать несколько сообщений одновременно, чтобы оптимизировать пропускную способность, улучшить масштабируемость и доступность, а также сбалансировать нагрузку.

Проблема и контекст

Ожидаемо, что приложение, работающее в облаке, будет обрабатывать большое количество запросов. Вместо того, чтобы обрабатывать каждый запрос синхронно, обычным методом является передача приложением через систему обмена сообщениями другого сервиса (consumer сервис), который обрабатывает их асинхронно. Эта стратегия помогает гарантировать, что бизнес-логика в приложении не блокируется во время обработки запросов.

Количество запросов может значительно меняться со временем по многим причинам. Внезапный всплеск активности пользователей или агрегированные запросы могут вызвать непредсказуемую нагрузку. В часы пик системе может потребоваться обрабатывать многие сотни запросов в секунду, тогда как в другое время это число может быть очень небольшим. Кроме того, характер работы, выполняемой для обработки этих запросов, может сильно варьироваться. Использование одного экземпляра сервиса консюмера может привести к тому, что этот экземпляр будет зафлужен запросами, или система обмена сообщениями может быть перегружена потоком сообщений, приходящих из приложения. Чтобы справиться с этой изменяющейся рабочей нагрузкой, система может запускать несколько экземпляров обслуживания консюмеров. Однако эти потребители (консюмеры) должны координироваться, чтобы гарантировать, что каждое сообщение доставляется только одному потребителю.

Решение

Используйте очередь сообщений для реализации канала связи между приложением и экземплярами сервиса консюмера. Приложение отправляет запросы в виде сообщений в очередь, а экземпляры сервисов консюмеров получают сообщения из очереди и обрабатывают их. Этот подход позволяет одному и тому же пулу экземпляров сервиса консюмера обрабатывать сообщения из любого экземпляра приложения.

Схематически этот паттерн отображен ниже:

Паттерн конкурирующих потребителей (Competing Consumers Pattern)

Преемущества использования этого паттерна:

  • Это позволяет использовать систему с выравниванием нагрузки (load-leveled system), которая может обрабатывать большие различия в объеме запросов, отправляемых экземплярами приложения. Очередь действует как буфер между экземплярами приложения и экземплярами сервиса консюмера, что может помочь минимизировать влияние на доступность и скорость отклика как для приложения, так и для экземпляров сервиса (как описано шаблоном выравнивания нагрузки на основе очереди). Обработка сообщения, которое требует некоторой длительной обработки, не препятствует одновременной обработке других сообщений другими экземплярами сервисов консюмера.
  • Это повышает надежность. Если производитель (producer) взаимодействует напрямую с потребителем (consumer) вместо использования этого шаблона, но не отслеживает потребителя, существует высокая вероятность того, что сообщения могут быть потеряны или не обработаться в случае сбоя потребителя. В этом шаблоне сообщения не отправляются конкретному экземпляру сервиса, отказавший экземпляр сервиса не будет блокировать производителя, и сообщения могут обрабатываться любым работающим экземпляром сервиса.
  • Это не требует сложной координации между потребителями или между производителем и потребителями. Очередь сообщений гарантирует, что каждое сообщение доставляется как минимум один раз.
  • Это масштабируемо. Система может динамически увеличивать или уменьшать количество экземпляров обслуживания потребителей по мере изменения объема сообщений.
  • Это может улучшить отказоустойчивость, если очередь сообщений обеспечивает транзакционные операции чтения. Если экземпляр сервиса потребителя считывает и обрабатывает сообщение как часть транзакционной операции, и если впоследствии этот экземпляр сервиса потребителя завершается сбоем, этот шаблон может гарантировать, что сообщение будет возвращено в очередь, которая будет взята и обработана другим экземпляром сервиса потребителя.

Проблемы и вопросы при реализации

Порядок сообщений 

Порядок, в котором экземпляры сервисов консюмеров получают сообщения, не гарантируется и не обязательно отражает порядок, в котором были созданы сообщения. Разработайте систему, чтобы гарантировать, что обработка сообщений идемпотентна, потому что это поможет устранить любую зависимость от порядка, в котором обрабатываются сообщения.

Стоит отметить, что современные меседж-брокеры такие как например Azure Service Bus уже с коробки умеют гарантировать порядок сообщений и ничего самому не нужно реализовывать. 

Проектирование отказоустойчевых сервисов

Если система разработана так, чтобы обнаруживать и перезапускать неисправные экземпляры сервисов, может потребоваться реализовать обработку, выполняемую экземплярами сервисов, в качестве идемпотентных операций, чтобы минимизировать последствия получения и обработки одного сообщения более одного раза.

Обнаружение "ядовитых" сообщений

Ядовитые или Poison messages, для которых требуется доступ к недоступным ресурсам, может привести к сбою экземпляра сервиса. Система должна предотвращать возврат таких сообщений в очередь, а вместо этого собирать и хранить сведения об этих сообщениях в другом месте, чтобы их можно было проанализировать при необходимости.

Обработка результатов

Экземпляр сервиса, обрабатывающий сообщение, полностью отделен от логики приложения, которая генерирует сообщение, и они могут не иметь возможности общаться напрямую. Если экземпляр сервиса генерирует результаты, которые должны быть переданы обратно в логику приложения, эта информация должна храниться в месте, доступном для обоих, и система должна предоставить некоторое указание о завершении обработки, чтобы не допустить получение неполных данных.

Scaling системы обмена сообщениями

В крупномасштабном решении одна очередь сообщений может быть перегружена количеством сообщений и стать узким местом в системе. В этой ситуации рассмотрите возможность разделения системы обмена сообщениями для направления сообщений от определенных производителей в определенную очередь или использования балансировки нагрузки для распределения сообщений по нескольким очередям сообщений.

Обеспечение надежности системы обмена сообщениями

Надежная система обмена сообщениями необходима, чтобы гарантировать, что как только приложение помещает сообщение в очередь, оно не будет потеряно. Это важно для обеспечения того, чтобы все сообщения доставлялись хотя бы один раз.

Когда следует использовать шаблон конкурирующих потребителей?

Используйте этот шаблон, когда:

  • Рабочая нагрузка для приложения разделена на задачи, которые могут выполняться асинхронно.
  • Таски независимы и могут выполняться параллельно.
  • Объем работы сильно варьируется, что требует масштабируемого решения.
  • Решение должно обеспечивать высокую доступность и быть устойчивым в случае сбоя обработки задачи.

Не используйте этот шаблон, когда:

  • Нелегко разделить рабочую нагрузку приложения на отдельные таски, или существует высокая степень зависимости между тасками.
  • Таски должны выполняться синхронно, а логика приложения должна ждать завершения таски, прежде чем продолжить.
  • Таски должны выполняться в определенной последовательности.

Пример реализации на C#

Давайте рассмотрим пример на базе Azure.

Примером выступит следующий класс:

public class QueueManager
    {
        private readonly string queueName;
        private readonly string connectionString;
        private QueueClient client;
        private readonly ManualResetEvent pauseProcessingEvent;

        public QueueManager(string queueName, string connectionString)
        {
            this.queueName = queueName;
            this.connectionString = connectionString;
            this.pauseProcessingEvent = new ManualResetEvent(true);
        }

        public async Task SendMessagesAsync()
        {
            // Simulate sending a batch of messages to the queue.
            var messages = new List<BrokeredMessage>();

            for (int i = 0; i < 10; i++)
            {
                var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
                messages.Add(message);
            }

            await this.client.SendBatchAsync(messages);
        }

        public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
        {
            // Setup the options for the message pump.
            var options = new OnMessageOptions();

            // When AutoComplete is disabled, you have to manually complete/abandon the messages and handle errors, if any.
            options.AutoComplete = false;
            options.MaxConcurrentCalls = 10;
            options.ExceptionReceived += this.OptionsOnExceptionReceived;

            // Use of Service Bus OnMessage message pump. The OnMessage method must be called once, otherwise an exception will occur.
            this.client.OnMessageAsync(
                async msg =>
                {
                    // Will block the current thread if Stop is called.
                    this.pauseProcessingEvent.WaitOne();

                    // Execute processing task here
                    await processMessageTask(msg);
                },
                options);
        }

        public async Task Start()
        {
            // Check queue existence.
            var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
            if (!manager.QueueExists(this.queueName))
            {
                try
                {
                    var queueDescription = new QueueDescription(this.queueName);

                    // Set the maximum delivery count for messages. A message is automatically deadlettered after this number of deliveries.  Default value is 10.
                    queueDescription.MaxDeliveryCount = 3;

                    await manager.CreateQueueAsync(queueDescription);
                }
                catch (MessagingEntityAlreadyExistsException)
                {
                    Trace.TraceWarning(
                        "MessagingEntityAlreadyExistsException Creating Queue - Queue likely already exists for path: {0}", this.queueName);
                }
                catch (MessagingException ex)
                {
                    var webException = ex.InnerException as WebException;
                    if (webException != null)
                    {
                        var response = webException.Response as HttpWebResponse;

                        // It's likely the conflicting operation being performed by the service bus is another queue create operation
                        // If we don't have a web response with status code 'Conflict' it's another exception
                        if (response == null || response.StatusCode != HttpStatusCode.Conflict)
                        {
                            throw;
                        }

                        Trace.TraceWarning("MessagingException HttpStatusCode.Conflict - Queue likely already exists or is being created or deleted for path: {0}", this.queueName);
                    }
                }
            }

            // Create the queue client. By default, the PeekLock method is used.
            this.client = QueueClient.CreateFromConnectionString(this.connectionString, this.queueName);
        }

        public async Task Stop(TimeSpan waitTime)
        {
            // Pause the processing threads
            this.pauseProcessingEvent.Reset();

            // There is no clean approach to wait for the threads to complete processing.
            // We simply stop any new processing, wait for existing thread to complete, then close the message pump and then return
            Thread.Sleep(waitTime);

            await this.client.CloseAsync();

            var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);

            if (await manager.QueueExistsAsync(this.queueName))
            {
                try
                {
                    await manager.DeleteQueueAsync(this.queueName);
                }
                catch (MessagingEntityNotFoundException)
                {
                    Trace.TraceWarning(
                        "MessagingEntityNotFoundException Deleting Queue - Queue does not exist at path: {0}", this.queueName);
                }
            }
        }

        private void OptionsOnExceptionReceived(object sender, ExceptionReceivedEventArgs exceptionReceivedEventArgs)
        {
            if (exceptionReceivedEventArgs?.Exception == null)
                return;

            var exceptionMessage = exceptionReceivedEventArgs.Exception.Message;
            Trace.TraceError("Exception in QueueClient.ExceptionReceived: {0}", exceptionMessage);
        }
    }

Класс показывает, как можно создать очередь с помощью экземпляра QueueClient в обработчике события Start в веб-роли или воркер роли.

Метод SendMessagesAsync показывает, как приложение может создавать и отправлять пакет сообщений в очередь.

Метод ReceiveMessages показывает, как экземпляр службы потребителя может получать сообщения из очереди, следуя подходу, основанному на событиях. Параметр processMessageTask для метода ReceiveMessages представляет собой делегат, который ссылается на код, который запускается при получении сообщения. Этот код выполняется асинхронно.

Источник: Microsoft.

Comments:

Please log in to be able add comments.