суббота, 30 мая 2015 г.

Перемалываем 2.2 Тб данных в Amazon EC2 с помощью Apache Spark

В качестве личного проекта  с перерывами попиливаю проект "химического поиска". Я задался целью создать "умный" поиск химических веществ. Под словом "умный" я подразумеваю возможность отвечать на вопросы на простом человеческом языке. Поэтому в проекте есть место технологиям из области компьютерной лингвистики или NLP (natural language processing). А где NLP там и работа с большими объёмами текстов, алгоритмы машинного обучения и много других интересных вещей.

Для того, чтобы попробовать одну из моих идей мне понадобился корпус осмысленных N-грамм (из литературы) содержащих внутри себя названия химических веществ.

N-грамма это просто кусочек из N последовательноых слов, взятых из текста. Чаще всего говорят об униграммах, биграммах и триграммах.  Например, предложение Ethanol may be administered as an antidote to methanol poisoning содержит следующие триграммы: Ethanol may be; may be administered; be administered as; administered as an; as an antidote; an antidote to; antidote to methanol.

В свободном доступе есть 2.2 террабайтный корпус n-грамм (n=1..5) сгенерированный Google по текстам книг за несколько столетий. Каждая запись об n-грамме содержит так же год и количество повторений в данный год, примерно так:

Google предоставляет веб-интерфейс, с помощью которого можно, например, сравнить популярность Ленина и Сталина в разные годы. Или, скажем, сравнить популярность слов "эксплуатация" и "эксплоатация". Если же нужен доступ ко всему списку n-грамм, то весь датасет можно скачать по кусочкам здесь (вообще-то вроде как существует две версии датасета-2009 и 2012 годов). Но есть способ гораздо лучше: Amazon предоставляет удобный доступ к этому корпусу (вроде как первой версии, 2009 года), выложив его в своё хранилище S3 (наряду с несколькими другими открытыми датасетами). Это делает привлекательным использование сервиса EC2, поскольку при определённых условиях (аренда машин в той же локации, где хостится датасет) можно выиграть в скорости избежать расходов на передачу данных.

Недавно я прочитал туториал к Apache Spark. Это фреймворк для кластерных вычислений очень похожий на Scoobi, о котором я писал раньше. Относительно недавно он стал "top level" проектом в Apache Software Foundation, что позитивно сказывается на развитии проекта: популярность в сообществе растёт, новые версии выходят регулярно и часто. Разумеется, мне тоже было интресно познакомиться со Spark поближе.

Итак, нашлась подходящая задача: будем молотить 2.2 террабайтный корпус n-грамм кластером из машин, арендованных в Amazon EC2. В качестве программной платформы будем использовать Apache Spark, в котором будем запускать довольно нехитрый "скрипт" на Scala. В дальнешем я буду взаимозаменяемо использовать слова "скрипт", "Spark-скрипт", "Spark-программа" и тому подобные. Всё это одно и то же: нехитрая программка написанная на языке Scala (и выложенная на GitHub). Логика очень простая: программа просматривает список n-грамм и проверяет, входит ли в n-грамму название одного из примерно 12000 химических веществ (список взят из википедии). Если да - то сохраняем эту n-грамму в выходной файл. Попутно считаем ежегодные количества повторения n-граммы в книгах за все года и выводим в результат суммарное количество повторений. Написание программ под Spark очень напоминает написание программ под Scoobi и написание цепочек преобразований над коллекциями Scala: в Scoobi и Spark необходимо описать трансформации с помощью привычных  функций (map, filter, flatMap, и.т.д.) Только эти трансформации применяются не к коллекциям Scala API, а к распределённым "спискам" (RDD в Spark, Distributed List в Scoobi), которые могут обрабатываться параллельно в кластере. Документация Apache Spark содержит отличную вводную, так что не вижу особого смысла разбирать свой скрипт более подробно (разумеется, готов ответить на любые вопросы в коментариях). 

Сначала надо скачать Apache Spark (на момент написания заметки актуальная версия была 1.3.1). Затем его надо собрать: для Ububntu: ставим переменную JAVA_HOME (например, в моём случае так: export JAVA_HOME=/usr/lib/jvm/java-8-oracle в других системах путь может отличаться), устанавливаем maven из репозитория Ubuntu (дистрибутив Spark 1.3.1 содержит maven внутри себя, так что устанавливать предварительно maven не обязательно), запускаем скрипт make_distribution.sh. Скрипт предупредил меня, что Java 1.6 не установлена (ещё бы, у меня стоит Java 1.8). Вводим 'y' (всё равно прдолжить). Ждём окончания сборки Spark (на моём стареньком ноутбуке заняло минут 15-20).

Теперь займёмся подготовкой Scala-программы, которая делает необходимую обработку данных. Понадобится git и sbt. Клонируем исходники с github и собираем Spark-скрипт (в результате должен получиться *.jar файл).

git clone https://github.com/AlexanderSavochkin/BookNgrams-filter
 
cd BookNgrams-filter
 
sbt clean compile assembly

В директории проекта должна появиться поддиректория target в которой можно найти готовый jar-файл, примерно c таким путём и названием: ../BookNgrams-filter/target/scala-2.10/ngramsfilter-assembly-1.0.jar .

Spark-программы можно запускать и локально, что и рекомендуется сделать предварительно, чтобы сперва убедиться, что ваша программа делает то что надо на маленьком объёме данных (скормив мальенький файлик testngramsfile.txt).

%/ПУТЬ К SPARK/bin/spark-submit --class net.chwise.dataaquisition.textmining.NGramFilter --master local[2] ./ngramsfilter-assembly-1.0.jar -d -p -g testngramsfile.txt -o result -t list-compoundnames.normalized.txt

Необязательный флаг -d заставляет скрипт сохранять промежуточные вычисления, что сильно помогает в отладке. Файл с названиями химических веществ list-compoundnames.normalized.txt можно взять в поддиректории data github-проекта.

Теперь перейдём к запуску нашей Spark-программы в настоящем клстере, который мы арендуем в сервисе Amazon AWS. Вместе со Saprk'ом поставляется скрипт управления Spark-кластером в Amazon EC2. Путь к нему: %SPARK_HOME/ec2/spark-ec2 . Этот скрипт позволяет запускать/останавливать кластер, узнавать информацию о кластере, одним словом управлять им. В общем это, похоже,  заточенный под Spark аналог Apache Whirr, (о котором я писал в одном из предыдущих постов).

Итак, в самом начале генрируем в панели управления Amazon AWS пару ключей (об этом я тоже писал). Устанавливаем переменные окружения  AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY.  Значения для этих ключей можно узнать здесь: https://portal.aws.amazon.com/gp/aws/securityCredentials.

%export AWS_ACCESS_KEY_ID=... 
%export AWS_SECRET_ACCESS_KEY=...

Пришло время запускать кластер. Документация Amazon говорит, что датасет Google Books Ngrams хостится в регионе us-east-1, и, следовательно надо запускать кластер в этом же регионе чтобы избежать дополнительных трат на передачу данных. Кроме того этот датасет хранится в запакованном виде, поэтому нужно предпринять некоторые дополнительные действия (я потратил очень много сил, пытаясь заставить Spark читать сжатые данные в формате lzo, пока не нашёл этой ссылки, огромное спасибо автору).

Сначала стоит запустить "кластер" из одной машины (жирным шрифтом показано, как задать количество машин в кластере и регион) и проверить работоспособность программы на относительно маленьком подмножествет данных, например, только на униграммах. Запускаем кластер так (про то, как получить файл с ключом, который у меня называется spark-cluster.pem, я тоже писал в посте про запуск Scoobi в Amazon EC2):

%SPARK_HOME/ec2/spark-ec2 -k spark-cluster -i /home/asavochkin/Work/Projects/ChWiSe/spark-cluster.pem -s 1 --region=us-east-1 --user-data=/home/asavochkin/Work/Projects/ChWiSe/lzo-scripts/lzo.sh launch test-spark-cluster

Setting up security groups...
Creating security group test-spark-cluster-master
Creating security group test-spark-cluster-slaves
Searching for existing cluster test-spark-cluster...
Spark AMI: ami-5bb18832
Launching instances...
Launched 1 slaves in us-east-1d, regid = r-5b9c33ba
Launched master in us-east-1d, regid = r-769c3397
Waiting for all instances in cluster to enter 'ssh-ready' state..............

....пропустим большое количество вывода.
Теперь можно залогиниться на master-машину:

%SPARK_HOME/ec2/spark-ec2 -k spark-cluster -i /home/asavochkin/Work/Projects/ChWiSe/spark-cluster.pem login test-spark-cluster

Если всё прошло гладко, то после этокй команды мы окажемся в консоли мастер-ноды кластера. С этой машины можно запускать spark-программы, загружать данные в/из HDFS и.т.д. В общем, основное рабочее окружение при работе с кластером - здесь, в консоли мастер-машины. Установим здесь всё что требуется (нам нужны только git и sbt):

>sudo yum install git

>yum install -y http://dl.bintray.com/sbt/rpm/sbt-0.13.5.rpm
 

Клонируем репозиторий...

>git clone https://github.com/AlexanderSavochkin/BookNgrams-filter

Собираем скрипт...

cd BookNgrams-filter
 
sbt clean compile assembly


создаём в кластерной файловой системе HDFS поддиректорию ngrams

~/ephemeral-hdfs/bin/hadoop fs -mkdir ngrams 
 
Кладём в кластерную поддиректорию ngrams список названий веществ

~/ephemeral-hdfs/bin/hadoop fs -put list-compoundnames.normalized.txt ngrams

Наконец, запускаем то, что собрали.

#Это всё одна строчка!
~/spark/bin/spark-submit --jars local:/root/hadoop-lzo/target/hadoop-lzo-0.4.20-SNAPSHOT.jar  --class net.chwise.dataaquisition.textmining.NGramFilter ngramsfilter-assembly-1.0.jar -t ngrams/list-compoundnames.normalized.txt -g s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data -o ngrams/chemical-ngrams.tsv

Проверим, что всё стработало как задуманно. Результат должен появиться в поддиректории ngrams/chemical-ngrams.tsv.

~/ephemeral-hdfs/bin/hadoop fs -ls ngrams/chemical-ngrams.tsv
 
Скачиваем результат из кластера смотрим, что получилось

~/ephemeral-hdfs/bin/hadoop fs -get ngrams/chemical-ngrams.tsv

Когда кластер больше не нужен, можно "выключить" его такой командой:

$SPARK_HOME/ec2/spark-ec2 -k spark-cluster -i /home/asavochkin/Work/Projects/ChWiSe/spark-cluster.pem destroy test-spark-cluster

С этого момента мы ни за что не платим (но и серверов в нашем распоряжении больше нет).

Теперь очередь настоящего кластера из... пусть будет из 6 машин.

%SPARK_HOME/ec2/spark-ec2 -k spark-cluster -i /home/asavochkin/Work/Projects/ChWiSe/spark-cluster.pem -s 6 --region=us-east-1 --user-data=/home/asavochkin/Work/Projects/ChWiSe/lzo-scripts/lzo.sh launch test-spark-cluster

Снова устанавливаем весь необходимый софт как описано выше (git, sbt), забираем исходники проекта с github. собираем, копируем список названий веществ в кластер и запускаем нашу обработку данных.

#Это всё одна строчка!
~/spark/bin/spark-submit --jars local:/root/hadoop-lzo/target/hadoop-lzo-0.4.20-SNAPSHOT.jar  --class net.chwise.dataaquisition.textmining.NGramFilter ngramsfilter-assembly-1.0.jar -t ngrams/list-compoundnames.normalized.txt -g s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data,s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/2gram/data,s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/3gram/data,s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/4gram/data,s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/5gram/data,s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/1gram/data,s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/2gram/data,s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/3gram/data,s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/4gram/data,s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-gb-all/5gram/data -o ngrams/chemical-ngrams.tsv

Обработка всех англоязычнх n-gramm заняла почти сутки. Размер результата примерно 64 Mb. Результат выглядиит так:
...
copper and sulphuric acid       464
histamine . they are    89
the dynamics of oxygen  64
a higher phosphate      65
than propane .  80

...

Не знаю, поможет ли он мне улучшить поиск веществ, но опыт со Spark наверняка будет полезен.

Стоит упомянуть об удобной возможности приостанавливать spark-кластер:
%SPARK_HOME/ec2/spark-ec2 -k spark-cluster -i /home/asavochkin/Work/Projects/ChWiSe/spark-cluster.pem stop test-spark-cluster

и потом, при необходимости перезапускать его

%SPARK_HOME/ec2/spark-ec2 -k spark-cluster -i /home/asavochkin/Work/Projects/ChWiSe/spark-cluster.pem start test-spark-cluster

В этом "приостановленном" состоянии инстанс стоит нам гораздо меньше денег, приходится платить только лишь за хранение образа (что на порядок дешевле). Хоть это и небесплатно, зато после перезапуска не нужно с самого начала устанавливать git, sbt, собирать проект и.т.д. Однако данные в ephemeral hdfs не переживут подобной приостановки.