среда, 15 мая 2013 г.

Scoobi: Scala в гостях у Hadoop


Пара вводных слов: Big data, Map-Reduce, Hadoop

Без сомнения, "big data" сегодня одна из "горячих" тем в мире IT.

Эти обстоятельства заставляют и меня посматривать в сторону соответствующих технологий и интересоваться новостями в области "больших данных".

В контексте "больших данных" часто всплывает концепция MapReduce. Это модель вычислений, которая подходит для определённых классов задач, в рамках которой вычисления могут производиться параллельно на большом количестве компьютеров, объединённых в кластер. Среди задач, эффективно распараллелеливаемых с помощью MapReduce, стоит упомянуть построение инвертированного индекса, вычисление PageRank, поиск, сортировку, некоторые графовые алгоритмы и алгоритмы из области машинного обучения и.т.д. Плодотворность этого подхода базируется на том, что "если данных очень много, проще перемещать вычисления, а не данные", поэтому важной частью MapReduce-фреймворка должно быть распределённое хранилище данных. Сущность MapReduce можно выразить в следующих шагах:

  • Шаг 1. Входные данные разбиваются на "куски", которые могут быть обработаны параллельно. Например:
        -входные данные - это множество отдельных файлов. Естественно попытаться рассматривать каждый файл как отдельный "кусок данных" и обрабатывать параллельно
        -входные данные - многострочный файл, каждая строка которого может быть обработана параллельно. Можно разбивать его на отдельные строки или на диапазоны строк.
        -входные данные - диапазоны чисел, для которых нужно провести вычисления
  • Шаг 2. К каждому "куску" применяется процедура "Map", результат которой -- множество пар ключ-значение (K, V).     Вычисления Map для разных данных выполняются параллельно (часто в кластере на разных машинах).
  • Шаг 3. Результаты группируются по ключу.
  • Шаг 4. Сгруппированные по ключу результаты направляются на вход процедуры Reduce, которая "сворачивает" данные для каждого ключа в единый результат. Если операция Reduce ассоциативна, то она может быть эффективно распараллелена.

Одно из широко применяемых средств для организации MapReduce-вычислений - Apache Hadoop. Это свободный фреймворк помогающий создавать и выполнять распределённые программы. Помимо реализации модели MapReduce, Hadoop поредоставляет распределённую файловую систему. Чтобы реализовать "своё" распределённое вычисление вам как правило нужно определить собственные классы Mapper и Reducer, и некоторый "инициализирующий процесс" код.

"Родным" языком Hadoop является Java, поэтому самый прямой путь - реализовать процедуры Map и Reduce на Java. Однако есть возможность использовать и другие языки программирования с помощью Hadoop Streaming.


Подобно тому, как изучение нового языка программирования обычно начинают с написания программы, выводящей на экран "Hello world", знакомство с MapReduce начинают с задачи подсчёта слов в тексте. Пусть в распределённой файловой системе лежит множество файлов - текстовых документов. Наша задача состоит в том, чтобы подсчитать, сколько раз встречается каждое слово во всех документах.

Наша процедура Map принимает на вход документ, разбивает его на отдельные слова и порождает для каждого слова w пару (w,1). Слово w здесь является ключом, 1 - значением (См. шаг 2 в описании Map-Reduce).

На вход нашаей процедуре Reduce подаётся слово и список . Этот список формируется из объединённых результатов всех выполнений процедуры Map. Если вы используете Hadoop, то вам не нужно явно заботиться об этой группировке. Это происходит автоматически, "за кулисами".

Для реализации некоторых типичных задач можно обойтись и без программирования на Java, воспользовавшись Apache Pig, который по сути тоже представляет собой язык программирования, но более высокоуровневый и изначально ориентированный на идеологию MapReduce. Можно скомбинировать PIG и Java, написав "высокоуровневую часть" на Pig, а некоторые детали реализовать в виде UDF (User-Defined Function), написанных на Java.


Scoobi. Делаем вещи проще и быстрее с помощью Scala.


Здесь я хочу описать другой путь, основанный на языке Scala и библиотеке Scoobi. Поскольку Scala совместима с Java, и из Scala можно просто напрямую использовать объекты Java, мы можем практически построчно портировать MapReduce-код c java на scala, однако такой подход на мой взгляд не раскрывает в полной мере преймуществ Scala.

Ориентированный на Scala открытай фреймворк Scoobi разработан в NICTA. Scoobi позволяет очень кратко и выразительно описывать Map-Reduce вычисления. Так, например, подсчёт разных слов в тексте запишется так (взято из примеров Scoobi):


import com.nicta.scoobi.Scoobi._

val lines = fromTextFile("hdfs://in/...")

val counts = lines.flatMap(_.split(" "))
                .map(word => (word, 1))
                .groupByKey
                .combine(_+_)

persist(counts.toTextFile("hdfs://out/...", overwrite=true))

Разберём этот код: важной частью API Scoobi является trait (аналог интерфейса Java) DList[T]. Этот тип представляет "распределённый список", который может жить на нескольких машинах Hadoop-кластера. Функция fromTextFile возвращает распределённый список строк файла типа DList[String]. К этому списку применяется метод flatMap(_.split(" ")) . В результате на выходе получаем распределённый список отдельных слов файла (того же типа:  DList[String]). К получившемуся списку применяется метод  map(word => (word, 1)) . Здесь мы генерируем распределённый список тех самых пар ключ-значение, о которых упоминалось выше, на шаге 2 описания вычислений MapReduce. Ключом у нас являетсяы слово текста, а "значением" -- 1. Метод groupByKey соответствует шагу 3.

Здесь я хочу привести свой собственный пример, использующий Scoobi: вычисление числа π (пи) методом Монте-Карло. Помимо вычисления оценки самого числа π, мы будем вычислять и среднеквадратичное отклонение этой оценки, что позволит нам судить о точности результата.

Для вычисления числа π мы будем генерировать пары (x,y) псевдослучайных чисел, каждое из которых принадлежит отрезку [0,1]. Площадь белой четверти круга на рисунке равна π/4, а площадь ограничивающего квадрата равна 1.



Следовательно, если бросать случайные точки в квадрат, то отношение числа всех точек к числу точек, попавших в четверть-круг будет стремиться к π/4. Это, конечно, весьма тривиальный и непрактичный пример применения метода Монте-Карло, но это в данном случае и хорошо, так как позволит сосредоточится в большей степени на технических деталях, а не на математике.

При сложении очень большого количества чисел с плавающей точкой может присходить потеря точности, связанная с тем, что накопленная сумма может очень сильно отличаться по порядку величины от очередного добавляемого числа. Для борьбы с потерей точности существуют ухищрения, например алгоритм Кэхэна, в котором погрешность суммирования на пердыдущем шаге сохраняется в отдельной переменной и используется как дополнительное слагаемое при следующем суммировании. Я позаимствую эту идею для представления численной оценки чисел. Дополнительно к прочим полям хранится накопленная ошибка суммирования (см. код ниже). Операция combineEstimations комбинирует две оценки и возвращает новую, более точную оценку числовой величины (при этом комбинировании учитываются накопленные ошибки).

Для реализации этого подхода я создал scala-класс ValueEstimator.

//Класс "оценки числового значения".
//Содержит следующие поля:
//numTests - количество измерений
//mean - среднее значение
//variance - дисперсия
//meanError - накопленная ошибка
case class ValueEstimator(numTests:Long, 
                          mean:Double, 
                          variance:Double, 
                          meanError:Double) {
   //Комбинирует "этот" экземпляр ValueEstimator с другим,
   //Возвращает полученное значение оценки.
   def combineEstimations( other:ValueEstimator ):ValueEstimator = ...
}

//Это нужно написать чтобы Scoobi "понимал" наш класс
implicit val valueEstimatorFmt = mkCaseWireFormat(ValueEstimator, 
                                        ValueEstimator.unapply _)


Это позволит мне продемонстрировать ещё одну мощную возможность Scoobi: распределённый список DList может содержать элементы не только каких-то предопределённых типов, но и наших case-классов.

И так, вот наша реализация вычисления методом Mонте-Карло с помощью Scoobi:


package scoobimc

import scala.util.Random
import com.nicta.scoobi.Scoobi._
import com.nicta.scoobi.io.func.FunctionInput

import MCUtils._

object PiMonteCarlo extends ScoobiApp {

  val numSequencies = 10
  val generateSequenceLength = 10000

  def run() = {
    val input = FunctionInput.fromFunction(numSequencies)( x=>x )

    val pi = input.flatMap( generateRands _ )
             .map( checkIfInside )
             .groupByKey
             .combine( 
                (x:ValueEstimator, y:ValueEstimator) => 
                    x combineEstimations y 
              )

    //Сохраняем результат
    persist( toTextFile(pi, "output", overwrite=true))
  }

  //Проверяют, что точка x лежит внутри единичного четверть-круга
  def checkIfInside( x:(Double, Double) ) =
    if ( (x._1 * x._1) + (x._2 * x._2) < 1.0 )
      (1,ValueEstimator(1, 1.0, 0.0,0.0))
    else
      (1,ValueEstimator(1, 0.0, 0.0,0.0))

  //Генерирует случайные точки в единичном квадрате
  def generateRands( seed:Int ) = {
    val rand = new Random(seed)
    (1 to generateSequenceLength) map ( _ => (rand.nextDouble(),rand.nextDouble()) )
  }
}
как и в случае с подсчётом слов, разберём, что же здесь происходит. Вызов FunctionInput.fromFunction(numSequencies)( x=>x ) генерирует DList c последовательностью целых чисел от 0 до numSequencies. Затем к этому списку применяется метод flatMap( generateRands _ ) , который сопоставляет каждому числу списка последовательность псевдослучайных двумерных точек, длинной generateSequenceLength. Функция flatMap выдаёт эти последовательности в сконкатенированном виде, то есть единым DList-ом, который имеет длинну numSequencies*generateSequenceLength. К этому результату применяется метод map( checkIfInside ). На выходе, как обычно, получается DList, хранящий пары ключ-значение, где ключ - еденица, а значения -- обекты типа ValueEstimator. Для точек, попавших в четверть-круг, оценка 1, для непопавших - 0.

Нам остаётся найти среднее значение всех числовых оценок, которое должно стремиться к π/4.

Операция groupByKey в данном случае тривиальна, так как все элементы имеют один и тот же ключ, равный единице. Метод combine скомбинирует все наши ValueEstimator-ы, используя определённый нами метод combineEstimations. На выходе получим оценку среднего и дисперсии искомой величины, π/4. Переменная pi будет представлять собой маленький DList с единственной парой ключ-значение. Ключ - единица, значение - окончательный ValueEstimator числа π/4.

Надеюсь, этим постом мне удалось показать, что Scoobi является очень выразительным средством для организации распределённых MapReduce вычислений.

В одном из следующих постов я планирую написать, как запустить описанное выше вычисление числа π на кластере в Amazon EC2 из нескольких машин.

Ссылки

https://github.com/AlexanderSavochkin/MCwithScoobi - здесь выложены исходные коды к этому посту
Introducing Scoobi and Scalding: Scala DSLs for Hadoop MapReduce
Comparison of Hadoop Frameworks - обзор и сравнение фреймворков для Hadoop. Упоминается Scoobi и ряд альтернатив, ориентированных на Java/Scala/Clojure...