Поиск на сайте: Расширенный поиск


Новые программы oszone.net Читать ленту новостей RSS
CheckBootSpeed - это диагностический пакет на основе скриптов PowerShell, создающий отчет о скорости загрузки Windows 7 ...
Вы когда-нибудь хотели создать установочный диск Windows, который бы автоматически установил систему, не задавая вопросо...
Если после установки Windows XP у вас перестала загружаться Windows Vista или Windows 7, вам необходимо восстановить заг...
Программа подготовки документов и ведения учетных и отчетных данных по командировкам. Используются формы, утвержденные п...
Red Button – это мощная утилита для оптимизации и очистки всех актуальных клиентских версий операционной системы Windows...
OSzone.net Microsoft Разработка приложений Облако/Azure Windows Azure Service Bus: шаблоны обмена сообщениями с использованием сеансов RSS

Windows Azure Service Bus: шаблоны обмена сообщениями с использованием сеансов

Текущий рейтинг: 0 (проголосовало 0)
 Посетителей: 532 | Просмотров: 681 (сегодня 0)  Шрифт: - +

В одной из наших предыдущих статей мы обсуждали, насколько важно применять шаблоны обмена сообщениями (messaging patterns) в облаке, чтобы разрывать жесткие связи между решениями и способствовать поддержке легко масштабируемых программных архитектур (см. «Comparing Windows Azure Queues and Service Bus Queues» по ссылке msdn.microsoft.com/magazine/jj159884). Очереди — один из таких шаблонов обмена сообщениями, и платформа Windows Azure предлагает два основных варианта реализации этого подхода: сервисы хранилища очередей и Service Bus Queues; в обоих сценариях множество потребителей принимает и обрабатывает сообщения в очереди. Это каноническая модель поддержки варьируемой рабочей нагрузки в облаке, где можно динамически добавлять или удалять получатели в зависимости от размера очереди, тем самым предлагая механизм распределения нагрузки и преодоления сбоев для серверной части (рис. 1).

*

Рис. 1. Шаблон обмена сообщениями через очереди: каждое сообщение потребляется одним получателем

Sender 1Отправитель 1
Sender 2Отправитель 2
Sender 3Отправитель 3
Single QueueЕдиная очередь
Receiver 1Получатель 1
Receiver 2Получатель 2
Receiver 3Получатель 3


Хотя шаблон обмена сообщениями через очереди — отличное решение для простого разъединения решений, нередки ситуации, где каждому получателю требуется своя копия сообщения с возможностью отбрасывания некоторых сообщений на основе неких правил. Хороший пример сценария этого типа показан на рис. 2, иллюстрирующем проблему, с которой часто сталкиваются розничные компании при отправке информации множеству филиалов, например новых каталогов товаров или обновленного прайс-листа.

*
Рис. 2. Шаблон обмена сообщениями «публикатор/подписчик»: каждое сообщение может потребляться более чем один раз

HeadquartersГоловной офис
TopicTopic
Subscription 1Подписка 1
Subscription 2Подписка 2
Subscription 3Подписка 3
Store 1Магазин 1
Store 2Магазин 2
Store 3Магазин 3


В этих ситуациях лучше подходит шаблон «публикатор/подписчик» (publisher/subscriber), где получатели просто выражают заинтересованность в одной или более категориях сообщений, подключаясь к независимой подписке, которая содержит копию потока сообщений. Windows Azure Service Bus реализует этот шаблон на основе тем (topics) и подписок, значительно расширяя возможности управления тем, как распространяются сообщения, на основе независимых правил и фильтров. В этой статье мы поясним, как использовать эти возможности Windows Azure Service Bus на простом примере из реальной жизни, и будем исходить из следующих требований:

  1. информация о товарах должна приниматься по порядку на основе каталожной страницы;
  2. некоторые из хранилищ не содержат какие-то категории каталога, и товары в этих категориях следует отфильтровывать для каждого магазина;
  3. новая информация каталога не должна применяться к системе хранения, пока не прибудут все сообщения.

Все примеры кода в этой статье были созданы в Visual Studio 2012 с использованием C# в качестве языка программирования. Кроме того, вам понадобятся Windows Azure SDK version 1.8 для .NET-разработчиков и доступ к подписке Windows Azure.

Подготовка плана обмена сообщениями для проекта

Прежде чем писать код, нужно определить различные сущности (темы и подписки), которые станут частью рабочего процесса обмена сообщениями. Это делается через Windows Azure Portal на manage.windowsazure.com. Войдите на портал со своими удостоверениями и действуйте по следующей схеме.

  1. Щелкните значок Create New в левой нижней части Management Portal.
  2. Щелкните значок APP SERVICES, затем SERVICE BUS TOPIC и, наконец, CUSTOM CREATE (рис. 3).
  3. В первом диалоге введите название темы и выберите подходящий регион и идентификатор подписки Windows Azure. Если это ваше первое пространство имен в выбранном регионе, мастер предложит очередь пространства имен (namespace queue): [имя вашей сущности]-ns. Это значение можно изменить.
  4. Щелкните метку NEXT (стрелку, указывающую вправо), чтобы вставить оставшиеся свойства. Вы можете согласиться со значениями по умолчанию. Щелкните галочку для создания темы (topic).
  5. Щелкните значок Service Bus на навигационной панели слева, чтобы получить список всех пространств имен. Заметьте, что созданное пространство имен может появиться в списке с некоторым запозданием. На его создание и обновление интерфейса портала уходит несколько секунд.
  6. Выберите только что созданный Topic из списка и щелкните ACCESS KEY, который вы найдете в нижней части экрана. Запишите себе полную строку подключения для последующего использования.
  7. В верхней части экрана Windows Azure Portal щелкните SUBSCRIPTIONS, а затем CREATE A NEW SUBSCRIPTION. В появившемся диалоге введите название (в нашем примере мы использовали «Store1Sub») и щелкните стрелку для продолжения.
  8. На следующем экране оставьте значения по умолчанию, но обязательно пометьте параметр Enable sessions. Щелкните галочку, чтобы создать подписку. Сеансы будут использоваться подписчиками для приема сообщений по порядку.
  9. Повторите операции из пп. 7 и 8 для каждого из трех магазинов.

*
Рис. 3. Создание нового Service Bus Topic через портал Windows Azure

Создав темы и подписки, вы можете напрямую обращаться к ним и из Visual Studio. Для этого откройте Server Explorer (View | Server Explorer) и раскройте узел Windows Azure Service Bus (рис. 4). Щелкните правой кнопкой мыши этот узел Windows Azure Service Bus и выберите Add New Connection. Укажите Namespace Name, Issuer Name (обычно «владелец») и Issuer Access Key, который вы записали себе при создании пространства имен Windows Azure на портале.

*
Рис. 4. Создание темы и подписок Service Bus средствами Visual Studio

Помните, что вы можете программным способом создавать эти сущности и управлять ими через классы в пространстве имен Microsoft.ServiceBus.Messaging, включая TopicClient и SubscriptionClient, которые используются далее в этой статье.

Создав базовую структуру рабочего процесса обмена сообщениями, мы будем имитировать трафик с помощью двух консольных приложений, созданных в Visual Studio, как показано на рис. 5. Первой консольное приложение, MSDNSender, будет отправлять каталог товаров, а второе — MSDNReceiver — принимать информацию в каждом из магазинов. Анализ кода будет дан в следующих разделах. В шаблоне «публикатор/подписчик», или сокращенно Pub/Sub, программа MSDNSender является публикатором, а MSDNReceiver — подписчиком.

*
Рис. 5. Решение Visual Studio, имитирующее сценарий с каталогом товаров

Отправка каталога товаров из головного офиса

Как видно из рис. 2, головной офис (публикатор) посылает сообщения в Topic. Эта логика представлена кодом в основном файле, Program.cs, который является частью проекта MSDNSender. Program.cs инкапсулирует логику и код для отправки списка товаров как индивидуальных сообщений в Topic. Давайте рассмотрим различные части, начиная с метода Main. Заметьте, что сначала мы создаем клиент для Topic:

// Создаем topicClient, используя удостоверения для Service Bus
TopicClient topicClient =
  TopicClient.CreateFromConnectionString(
  serviceBusConnectionString, topicName);

Как только topicClient создан, публикатор может с его помощью посылать сообщения. Список товаров хранится в XML-файле ProductsCatalog.xml; в нем содержится перечень из десяти сущностей Product, которые будут преобразованы в массив объектов. Затем объекты Product отображаются на классы Catalog и Product, хранящиеся в файле Product.cs:

// Десериализуем XML-файл с объектами Product
// и сохраняем их в массиве объектов
Catalog catalog = null;
string path = "ProductsCatalog.xml";
XmlSerializer serializer = new XmlSerializer(typeof(Catalog));
StreamReader reader = new StreamReader(path);
catalog = (Catalog) serializer.Deserialize(reader);
reader.Close();

Каждый Product в массиве catalog представляет структуру, показанную на рис. 6.

Рис. 6. Представление класса для объектов Product в каталоге

public class Product
  {
    [System.Xml.Serialization.XmlElement("ProductId")]
    public string ProductId { get; set; }
    [System.Xml.Serialization.XmlElement("ProductName")]
    public string ProductName { get; set; }
    [System.Xml.Serialization.XmlElement("Category")]
    public string Category { get; set; }
    [System.Xml.Serialization.XmlElement("CatalogPage")]
    public int CatalogPage { get; set; }
    [System.Xml.Serialization.XmlElement("MSRP")]
    public double MSRP { get; set; }
    [System.Xml.Serialization.XmlElement("Store")]
    public string Store { get; set; }
  }

Внутри цикла по массиву вызов метода CreateMessage извлекает различные свойства объектов Product и присваивает их сообщению, которое потом будет отправлено. Два свойства требуют дополнительного внимания:

if (isLastProductInArray)
  message.Properties.Add("IsLastMessageInSession", "true");
message.SessionId = catalogName;

Сеансы чрезвычайно важны, потому что позволяют получателю определять, все ли сообщения, принадлежащие конкретной логической группе, приняты. В этом случае, устанавливая свойство SessionId сообщения, мы указываем, что получатель не должен использовать информацию каталога, пока не будут получены все сообщения с одинаковым значением catalogName. Кроме того, для последнего товара в массиве мы добавляем новое свойство, IsLastMessageInSession, которое позволит получателям определять, поступило ли последнее сообщение в сеансе, и каталог может быть полностью обработан. На рис. 7 показана выполняемая программа MSDNSender.

*
Рис. 7. Выполнение проекта MSDNSender

Получение каталога товаров с помощью подписок в магазинах

Теперь, когда каталог и объекты-товары отправлены в Topic и скопированы в различные подписки, переключимся на проект MSDNReceiver, где сообщения принимаются и обрабатываются. Заметьте, что в методе Main в Program.cs код создает клиент для Subscription на основе информации, предоставляемой пользователем через команду Console.ReadLine. Ожидается, что пользователи будут вводить номера своих магазинов, отражая тем самым сообщения, которые им нужно принимать. Короче говоря, каждый филиал интересуют только сообщения, применимые к соответствующему магазину:

Console.WriteLine("Enter Store Number");
string storeNumber = Console.ReadLine();
Console.WriteLine("Selecting Subscription for Store...");
// Создаем клиент подписки на Topic
SubscriptionClient subscriptionClient =
  SubscriptionClient.CreateFromConnectionString(
  serviceBusConnectionString, topicName,
  "Store" + storeNumber.Trim() + "Sub",
  ReceiveMode.PeekLock);

Поскольку мы принимаем сообщения от подписок на основе сеансов (как пояснялось в предыдущем разделе), нам нужно запросить следующее сообщение, используя строку кода:

MessageSession sessionReceiver = subscriptionClient.
  AcceptMessageSession(TimeSpan.FromSeconds(5));

В основном это означает, что клиент будет проверять наличие в подписке любых подлежащих обработке сообщений (у которых свойство SessionId не равно null), и, если в течение пяти секунд таких сообщений не попадется, время ожидания запроса истечет, после чего приложение-получатель будет завершено. С другой стороны, если сеанс найден, будет вызван метод ReceivingSessionMessages. Прежде чем перейти к этой части кода, давайте обсудим концепцию состояния сеанса. Это состояние позволяет разработчику хранить информацию, которая может использоваться, пока принимаются сообщения, относящиеся к той же транзакции. В данном случае мы используем состояние сеанса для «запоминания» последней полученной страницы каталога, а также сообщений (товаров), прибывших вне должного порядка.

С учетом сказанного рабочий процесс в коде выглядит так.

  1. Текущее сообщение принимается в методе ReceiveSessionMessages (рис. 8), который полагается в обработке на метод ProcessMessage (рис. 9).
  2. Внутри метода ProcessMessage, если сообщение принято вне последовательности, оно автоматически откладывается, а его идентификатор сохраняется в состоянии сеанса. Иначе оно помечается как «завершенное» (complete) и удаляется из подписки. Кроме того, в сеансе сохраняется следующая ожидаемая последовательность — страница каталога.
  3. После обработки текущего принятого сообщения последующий код в ReceiveSessionMessages проверяет наличие идентификаторов отложенных сообщений в сеансе и пытается обработать их снова с учетом самой последней страницы каталога.
  4. Как только в сеансе приняты все сообщения, получатель закрывается.

Рис. 8. Метод ReceivedSessionMessages

static void ReceiveSessionMessages(MessageSession receiver)
  {
    // Считываем сообщения из подписки, пока она не опустеет
    Console.WriteLine("Reading messages from subscription {0}",
      receiver.Path);
    Console.WriteLine("Receiver Type:" + receiver.GetType().Name);
    Console.WriteLine("Receiver.SessionId = " + receiver.SessionId);
    SequenceState sessionState = GetState(receiver);
    BrokeredMessage receivedMessage;
    while ((receivedMessage = receiver.Receive()) != null)
    {
      string sessionId = receiver.SessionId;
      ProcessMessage(receivedMessage, ref sessionState, receiver);
      while (sessionState.GetNextOutOfSequenceMessage() != -1)
      {
        // Вызываем обратно отложенные сообщения
        Console.WriteLine("Calling back for deferred message: Category {0},
          Message sequence {1}", receiver.SessionId,
            sessionState.GetNextSequenceId());
        receivedMessage = receiver.Receive(
          sessionState.GetNextOutOfSequenceMessage());
        ProcessMessage(receivedMessage, ref sessionState, receiver);
      }
      if (receivedMessage.Properties.ContainsKey(
        "IsLastMessageInSession"))
        break;
    }
    SetState(receiver, null);
    receiver.Close();
  }

Рис. 9. Метод ProcessMessage

static void ProcessMessage(BrokeredMessage message, ref SequenceState sessionState,
  MessageSession session = null)
  {
    if (session != null)
    {
      int messageId = Convert.ToInt32(message.Properties["CatalogPage"]);
      if (sessionState.GetNextSequenceId() == messageId)
      {
        OutputMessageInfo("RECV: ", message, "State: " + "RECEIVED");
        sessionState.SetNextSequenceId(messageId + 1);
        message.Complete();
        SetState(session, sessionState);
      }
      else
      {
        Console.WriteLine("Deferring message: Category {0}, Catalog Page {1}",
          session.SessionId, messageId);
        sessionState.AddOutOfSequenceMessage(messageId,
          message.SequenceNumber);
        message.Defer();
        SetState(session, sessionState);
      }
    }
    Thread.Sleep(receiverDelay);
  }

Учитывайте, что для этого проекта идентификаторы отложенных сообщений хранятся в состоянии сеанса и потенциально могут быть потеряны. В производственной среде мы рекомендуем для этой цели использовать какое-либо постоянное хранилище (один из вариантов — Windows Azure Tables). Заметьте: если сообщение содержит свойство IsLastMessageSessionInSession (установленное в процессе отправки), цикл сеанса завершается. Консольный вывод для проекта MSDNReceiver показан на рис. 10.

*
Рис. 10. Выполнение проекта MSDNReceiver

Подписки Windows Azure Service Bus позволяют создавать специфические правила, чтобы отфильтровывать ненужные сообщения. В данном случае было бы сравнительно легко создать правило, которое разделяет товары по категории или по номеру магазина (что мы проигнорировали в этом проекте). Правила можно создавать программным способом, напрямую через портал Windows Azure или средствами Visual Studio.

Заключение

Windows Azure Service Bus предлагает потрясающе надежную и гибкую реализацию шаблона публикации и подписки. На основе тем и подписок можно решать множество других задач. Поддержка множества отправителей, широковещательно посылающих сообщения множеству получателей, в сочетании с поддержкой логического группирования и сортировки сообщений открывает перед современными разработчиками массу возможностей. Более того, способность использовать постоянный (persistent) сеанс для отслеживания состояния упрощает логическое группирование сообщений и управление их последовательностью. В мире, где распределенные среды стали нормой, понимание того, как использовать шаблоны обмена сообщениями и соответствующие инструменты, крайне важно для нынешних архитекторов ПО, имеющих дело с облаком.

Автор: Бруно Теркали, Рикардо Виллалобос  •  Иcточник: MSDN Magazine  •  Опубликована: 05.04.2013
Нашли ошибку в тексте? Сообщите о ней автору: выделите мышкой и нажмите CTRL + ENTER
Теги:   Windows Azure Service Bus.


Оценить статью:
Вверх
Комментарии посетителей
Комментарии отключены. С вопросами по статьям обращайтесь в форум.