Поиск на сайте: Расширенный поиск


Новые программы oszone.net Читать ленту новостей RSS
EF StartUp Manager - программа для слежением за программами, которые запускаются вместе с загрузкой операционной системы...
EF Mailbox Manager - программа для периодической проверки неограниченного количество аккаунтов электронной почты. Вы мож...
Программа позволяет узнать, чем занято место на жестком диске. Сканирует папки очень быстро и выводит наглядную диаграмм...
HashMyFiles - это маленькая утилита, которая предоставляет возможность рассчета контрольных сумм MD5 и SHA1 для одного и...
Файловый менеджер с очень малыми системными требованиями, но тем не менее с большими возможностями. Программа имеет ориг...
OSzone.net Microsoft Разработка приложений Облако/Azure Сбор и анализ телеметрических данных с помощью Microsoft Azure Services RSS

Сбор и анализ телеметрических данных с помощью Microsoft Azure Services

Текущий рейтинг: 0 (проголосовало 0)
 Посетителей: 1010 | Просмотров: 1163 (сегодня 0)  Шрифт: - +

Каждое устройство на основе датчика генерирует телеметрические данные. Вся ценность — в их интерпретации. В мире потребителей водитель смотрит на приборную панель своей машины, чтобы понять, как его стиль вождения влияет на расход топлива и движение. В промышленности сравнение температуры машины со средней по другим машинам в заводском цехе может помочь оператору выявить риски сбоя и предпринять профилактическое обслуживание.

Эти сценарии требуют сбора телеметрических данных от десятков или сотен тысяч подключенных устройств. Еще важнее, что вам нужно анализировать эти данные, чтобы обеспечить их осмысленную визуализацию и получить полную аналитическую картину. При работе с такими огромными объемами данных инфраструктуры больших данных (Big Data) вроде Hadoop предоставляют прочный фундамент для обработки данных, который можно масштабировать в соответствии с базой установленных устройств.

В этой статье вы узнаете, как создать простую архитектуру сбора телеметрической информации, используя Microsoft Azure Service Bus. Затем вы проанализируете эти данные с возможностью масштабирования, применяя Hadoop-сервис Microsoft Azure, который называется HDInsight.

Архитектура решения

В предыдущих выпусках этой рубрики Бруно Теркали и Рикардо Виллалобос показали, как с помощью Service Bus установить управляющий канал для взаимодействия с подключенным объектом. В этой статье я буду использовать Service Bus как промежуточный уровень для буферизации сообщений телеметрии, посылаемых устройством.

Устройства будут «общаться» с Service Bus напрямую, отправляя сообщения телеметрии в выделенный Topic (рис. 1). Затем одна или более подписок будет извлекать эти сообщения из очереди в рабочую роль и сохранять их как плоские файлы в Blob Storage. Тогда кластер Hadoop сможет использовать эти входные файлы для выполнения анализа и вычислений.

*
Увеличить


Рис. 1. Базовая схема решения по телеметрии больших данных

Raspberry PiRaspberry Pi
AMQPAMQP
Service Bus TopicService Bus Topic
SubscriptionПодписка
Worker RoleРабочая роль
Input FilesВходные файлы
Blob StorageBlob Storage
Pig JobЗадание Pig
HDinsightHDinsight

Эта архитектура имеет то преимущество, что отделяет разные части друг от друга. Service Bus действует как посредник и может буферизовать данные, если рабочие роли не успевают их считывать достаточно быстро. Вы можете отслеживать длину очереди и на основе этого автоматически масштабировать уровень рабочих ролей.

Подписки также полезны для выполнения простой фильтрации входящих данных и их распределения по разным серверным уровням обработки. Например, у вас могли бы быть подписка Urgent, которая посылала бы сообщения в систему оповещения реального времени, и подписка Everything, которая захватывала бы все данные для последующего анализа.

Поскольку рабочие роли просто перемещают данные в хранилище — будь то Hadoop Distributed File System (HDFS) или Blob Storage, оно отделено от той части Hadoop, которая отвечает за обработку. Она может работать независимо от ритма поступления данных. Вы могли бы сделать так, чтобы кластер HDInsight работал постоянно. Это позволило бы постоянно обрабатывать небольшие пакеты и сократить задержки, связанные с вычислениями. А возможно, вы предпочли бы экономить деньги и запускать кластер HDInsight лишь раз в день, чтобы сразу выполнить все накопившиеся вычисления. Поддерживается и смешанный вариант.

Сбор телеметрических данных с помощью Service Bus

Azure Service Bus позволяет посылать сообщения в Topic по одному из двух протоколов: HTTP или AMQP. В случае подключенных устройств с ограниченной пропускной способностью AMQP дает некоторые преимущества. Это эффективный, двоичный, надежный и портируемый протокол. Он также предоставляет библиотеки для многих языков, исполняющих сред и операционных систем. Это дает вам гибкость при подключении своего устройства напрямую к Service Bus для отправки сообщений телеметрии.

Для проверки этого подхода я воспользовался платой Raspberry Pi, способной передавать данные температурных и других датчиков, и библиотекой Apache Qpid Proton AMQP. Proton — сокращенная до абсолютного минимума, портируемая библиотека, которую можно компилировать в самых разнообразных средах, где будут отправляться AMQP-сообщения. Она полностью поддерживает Azure Service Bus. Подробнее о библиотеке Proton AMQP см. по ссылке bit.ly/1icc6Ag.

Для этого примера я скомпилировал библиотеку Proton непосредственно на плате Raspberry Pi. После этого я воспользовался привязками Python для написания простого скрипта, который захватывает показания датчиков с USB-порта и посылает их в Azure Service Bus, как показано на рис. 2.

Рис. 2. Код на Python в Raspberry Pi для захвата показаний датчиков

#!/usr/bin/python
import sys
import commands
import re
import uuid
import serial
from proton import *
# Идентификатор устройства
id = uuid.getnode()
# Адрес Topic
address = "amqps://owner:key@address.servicebus.windows.net/telemetry"
# Открытие последовательного порта
ser = serial.Serial('/dev/ttyACM0', 9600)
# Создание объектов Proton
messenger = Messenger()
while True:
 # Read values from Arduino in the form K1:V1_K2:V2_...
 temp = ser.readline().rstrip('\r\n')
 print temp
 # Создание AMQP-ссобщения
 message = Message()
 # Инициализация свойств
 message.properties = dict()
 message.properties[symbol("did")] = symbol(id)
 # Сопоставление строк со списком, символизация,
 # создание словаря и объединение
 pairs=map(lambda x:x.split(':'), temp.split('_'))
 symbols = map(lambda x:(symbol(x[0]),int(x[1])), pairs)
 message.properties.update(dict(symbols))
 message.address = address
 messenger.put(message)
 messenger.send()

Скрипт на Python напрямую адресуется к Azure Service Bus Topic с именем «telemetry». Он использует строку подключения, которая включает стандартный маркер аутентификации Service Bus и указывает на применение протокола AMQP. В реальной среде вам понадобилось бы задействовать более серьезный механизм аутентификации, чтобы избежать компрометации своих параметров подключения.

Допустим, что значительное количество этих устройств Raspberry начинают собирать данные. Каждое из них посылает идентификатор устройства (Device ID, DID), который впоследствии используется для вычисления средних температур. В данном примере DID генерируется модулем UUID, который извлекает MAC-адрес системы.

Плата Arduino Esplora, подключенная к Raspberry Pi по USB, собирает показания. Esplora — плата из серии «все в одном» со встроенными датчиками. Это упрощает считывание температуры или других параметров окружающей среды и их отправку по последовательной шине. Скрипт на Python на другой стороне USB-кабеля считывает выходные значения. Пример схемы Arduino, которая выводит значения датчиков в последовательный порт, показана на рис. 3.

Рис. 3. Arduino-код для сбора показаний Raspberry Pi

void loop()
{
  int celsius = Esplora.readTemperature(DEGREES_C);
  int loudness = Esplora.readMicrophone();
  int light = Esplora.readLightSensor();
  Serial.print("T:");
  Serial.print(celsius);
  Serial.print("_");
  Serial.print("M:");
  Serial.print(loudness);
  Serial.print("_");
  Serial.print("L:");
  Serial.print(light);
  Serial.println();
 // Задержка на секунду
  delay(1000);
}

Выбор варианта развертывания больших данных

У вас есть несколько вариантов выбора того, какой тип решения Hadoop вы задействуете для анализа данных. Выбор типа развертывания будет диктовать, как и где вам потребуется агрегировать данные для анализа.

Azure предлагает превосходное решение через HDInsight. Оно предоставляет инфраструктуру Hadoop как сервис. Этот вариант Hadoop, основанный на Hortonworks Data Platform (HDP) for Windows, поставляется с коннектором, позволяющим заданиям напрямую обращаться к входным данным из Azure Blob Storage.

Это означает, что тогда вам не понадобится работающий кластер Hadoop для приема входных файлов. Вы можете загружать файлы в контейнер Blob Storage, которым HDInsight воспользуется позже. При анализе пакета файлов вы можете запустить кластер HDInsight за несколько минут, выполнить серию заданий за пару часов и выключить его. Это выльется в гораздо меньшие счета за вычислительные ресурсы.

С другой стороны, если вы предпочтете развернуть стандартный вариант Hadoop, такой как HDP, или Cloudera Distribution в виртуальных машинах Azure (VM), вы будете отвечать за поддержание кластера в актуальном состоянии. Кроме того, вам потребуется должным образом сконфигурировать его для оптимальной работы. Этот подход имеет смысл, если вы намерены использовать адаптированные компоненты Hadoop, не входящие в HDInsight, например HBase, как механизм хранения.

Сохранение телеметрических данных в Blob Storage

Извлечение данных из Azure Service Bus — процесс простой. Используйте рабочую роль как «читатель» или «слушатель» подписки. Затем аккумулируйте сообщения во входные файлы, которые может читать HDInsight.

Первым делом подготовьте одну или несколько подписок в своем Topic в Azure Service Bus. Это даст вам некоторую свободу при разбиении или распределении потока данных в зависимости от требований. Как минимум, создание подписки, захватывающей все, для сохранения всех входящих сообщений будет удачной идеей. Кроме того, в подписках Azure Service Bus можно применять фильтры. Это будет создавать дополнительные потоки специфических сообщений. Пример создания Topic и подписок с использованием C# и библиотеки Azure Service Bus SDK приведен на рис. 4.

Рис. 4. Подписка в Azure Service Bus

var namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);
//Создание Topic
if (!namespaceManager.TopicExists("telemetry"))
{
  namespaceManager.CreateTopic("telemetry");
}
// Создание подписки, захватывающей все
if (!namespaceManager.SubscriptionExists("telemetry", "all"))
{
  namespaceManager.CreateSubscription("telemetry", "all");
}
//Создание подписки для оповещений
if (!namespaceManager.SubscriptionExists("telemetry", "alert"))
{
  SqlFilter alertFilter = new SqlFilter("type = 99");
  namespaceManager.CreateSubscription("telemetry",
  "alert", alertFilter);
}

Создав подписку в Azure Service Bus, можно принимать и сохранять сообщения. В этом примере используется формат CSV, который легко читать и понимать как компьютерам, так и человеку. Чтобы как можно быстрее считать входящее сообщение, рабочая роль создает ряд объектов Task (здесь их десять). Кроме того, пакеты сообщений считываются асинхронными методами, а не по одному за раз. Подписка all и тема telemetry будут принимать эти сообщения (рис. 5).

Рис. 5. Прием сообщений от подписки и их сохранение в Blob Storage

SubscriptionClient client =
  SubscriptionClient.CreateFromConnectionString(connectionString,
  "telemetry", "all", ReceiveMode.ReceiveAndDelete);
List<Task> tasks = new List<Task>();
for (int i = 0; i < NBTASKS; i++)
{
  var id = i; // оповещение о замыкании
  Task t = Task.Run(async () =>
  {
    BlobStorageWriter writer = new BlobStorageWriter(id);
    while (true)
    {
      var messages = await client.ReceiveBatchAsync(BATCH_SIZE);
      foreach (var message in messages)
      {
        try
        {
          await writer.WriteOneLine(TelemetryMessage.Stringify(message));
        }
        catch (Exception ex)
        {
          Trace.TraceError(ex.Message);
        }
      }
    }
  });
  tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());

Метод TelemetryMessage.Stringify просто возвращает строку текста в формате CSV, который содержит телеметрические данные. Он также извлекает содержимое некоторых полезных полей из заголовков Azure Service Bus, таких как Message ID или Enqueued Time.

BlobStorageWriter.WriteOneLine предназначен для записи строки непосредственно в Blob. Поскольку доступно десять параллельных задач, влияние будет оказано на то же количество Blob одновременно. WriteOneLine также время от времени циклически сдвигает файлы для отправки в HDInsight. Я использую два параметра, чтобы решить, когда следует переключаться на новый файл: количество строк, записанных в файл, и время с момента создания Blob (например, создаем новый файл через каждый час или по достижении миллиона строка в нем). Этот метод использует асинхронные вызовы, чтобы избежать блокировки при записи сообщений в поток данных Blob (рис. 6).

Рис. 6. Запись данных из сообщений в Azure Blob

public async Task WriteOneLine(string line)
{
  var bytes = Encoding.UTF8.GetBytes(string.Format("{0}\n", line));
  await destinationStream.WriteAsync(bytes, 0, bytes.Length);
  TimeSpan ts = DateTime.Now - startBlobTime;
  if (++linesWritten > MAX_LINES || ts.TotalSeconds > MAX_SECONDS)
  {
    Trace.TraceInformation(
      "Wrote " + linesWritten + " lines to " + currentBlob.Name);
    GetNextBlob();
    linesWritten = 0;
  }
}

Данные, извлеченные из сообщений телеметрии, содержатся в конечных файлах в таком виде:

145268284e8e498282e20b01170634df,test,24,980,21,2014-03-14 13:43:32
dbb52a3cf690467d8401518fc5e266fd,test,24,980,21,2014-03-14 13:43:32
e9b5f508ef8c4d1e8d246162c02e7732,test,24,980,21,2014-03-14 13:43:32

Они включают Message ID, Device ID, три показания датчиков и дату постановки сообщения в очередь. Этот формат легко разобрать на следующем этапе.

Анализ данных с применением HDInsight

Самое впечатляющее преимущество HDInsight в том, что вы можете запустить полный кластер Hadoop, выполнить задание и удалить кластер непосредственно из командной строки. Вам не потребуется входить в VM или выполнять какую-то специфическую настройку. Вы можете подготавливать HDInsight и управлять им с помощью Windows PowerShell в Windows или кросс-платформенных утилит командной строки для Mac или Linux.

Интегрированные командлеты Azure PowerShell можно скачать по ссылке bit.ly/1tGirZk. Эти командлеты включают все, что нужно для управления вашей инфраструктурой Azure, в том числе кластерами HDInsight. Импортировав свои параметры публикации и выбрав подписку по умолчанию, вы должны указать лишь одну командную строку для создания нового кластера HDInsight:

New-AzureHDInsightCluster -Name "hditelemetry" -Location "North Europe" -DefaultStorageAccountName "telemetry.blob.core.windows.net" -DefaultStorageAccountKey "storage-account-key" -DefaultStorageContainerName "data" -ClusterSizeInNodes 4

Эта команда инструктирует кластер HDInsight использовать существующие Storage Account и Container как корень файловой системы. Именно так он будет обращаться ко всем телеметрическим данным, генерируемым процессом сбора. Вы также можете указать, сколько рабочих узлов (worker nodes) должен использовать кластер в зависимости от объема данных и какая степень параллелизма вам нужна.

После того как кластер подготовлен и запущен, вы можете разрешать доступ у удаленного рабочего стола. Тогда в головной узел смогут входить другие пользователи и запускать интерактивный сеанс со стандартными командами и утилитами Hadoop. Однако намного быстрее использовать удаленные команды, задействовав преимущества Windows PowerShell для запуска заданий Map Reduce, Hive или Pig.

Я использовал задание Pig для вычисления среднего значения температуры. Pig изначально разрабатывали в Yahoo. Он позволяет тем, кто работает с Hadoop, сосредоточиться в большей мере на анализе крупных наборов данных и меньше тратить времени на написание программ для сопоставления и преобразования данных. Скрипт на Pig, как правило, включает три этапа.

  1. Загрузка данных, которыми вы хотите манипулировать.
  2. Запуск серии преобразований данных (транслируемых в набор задач сопоставления и преобразования).
  3. Вывод результатов на экран или сохранение результатов в каком-либо файле.

Следующий пример показывает, как это обычно делается, выполняя скрип интерактивно на этапе Exploratory Data Analysis (EDA) с помощью интерпретатора Pig:

data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);
data1 = group data by did;
data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);
dump data2;

Если вы наберете этот скрипт непосредственно в интерпретаторе Pig, он покажет таблицу, содержащую ряд точек данных (температур) и среднее измеренное значение для каждого DID. Как видите, синтаксис Pig весьма явный. Разные этапы манипуляций над данными четко разделяются:

  • первое выражение load используется для загрузки данных из CSV-файлов, описывающих имена и типы входных полей;
  • затем данные группируются по DID или для каждого устройства;
  • конечный набор данных генерируется с помощью агрегирующих функций наподобие COUNT и AVG.

Как только скрипт отлажен, эту задачу можно автоматизировать с помощью Windows PowerShell. Используйте командлет New-AzureHDInsightPigJobDefinition для инициализации задания Pig созданным скриптом. Затем, используя Start-AzureHDInsightJob и Wait-AzureHDInsightJob, запустите задание и ждите его завершения (рис. 7). После этого вы можете получить результаты через Get-AzureHDInsightJobOutput.

Рис. 7. Вставка, анализ и запуск заданий в HDInsight

$PigScript = "data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);" +
"data1 = group data by did;" +
"data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);" +
"dump data2;"
# Определение задания Pig
$pigJobDefinition = New-AzureHDInsightPigJobDefinition -Query $PigScript
# Запуск задания
$pigJob = Start-AzureHDInsightJob -Cluster "hditelemetry" -JobDefinition $pigJobDefinition
# Ожидание завершения задания
Wait-AzureHDInsightJob -Job $pigJob -WaitTimeoutInSeconds 3600
# Получение результатов задания
Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId –StandardOutput

Результаты отображаются в консоли командной строки примерно так:

C:\> Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId
(test,29091,24.0)
(49417795060,3942,30.08371385083714)

В данном случае было довольно много тестовых измерений и получено примерно 4000 показаний от Raspberry Pi. Среднее значение этих показаний равно округленно 30 градусам.

Заключение

Azure Service Bus — надежный и быстрый способ сбора данных от любых устройств. Чтобы хранить и анализировать эти данные, вам нужен отказоустойчивый механизм хранения и анализа. Azure HDInsight абстрагирует процесс создания и поддержания кластера Hadoop для работы с таким хранилищем. Это высокомасштабируемое решение, которое можно конфигурировать и автоматизировать, используя такие средства, как Windows PowerShell или интерфейс командной строки Mac/Linux в Azure.

Автор: Томас Конте  •  Иcточник: msdn.microsoft.com  •  Опубликована: 16.12.2014
Нашли ошибку в тексте? Сообщите о ней автору: выделите мышкой и нажмите CTRL + ENTER
Теги:   Microsoft Azure Services.


Оценить статью:
Вверх
Комментарии посетителей RSS

Чтобы оставить комментарий, зарегистрируйтесь или войдите с учетной записью социальной сети.