Об Apache Spark
2017-10-30
А вот вам заметки полного нуба об Apache Spark.
Именно Apache Spark™. А то есть ещё какой-то веб-фреймворк Spark. Не говоря уже о Twilight Sparkle. Плохое название. Гуглите осторожнее.
Apache Spark — это такая штука для распределённых вычислений. Эту всякую бигдату обрабатывать. Причём это не платформа для распределённых вычислений, типа Hadoop, а скорее фреймворк для распределённых вычислений. На нём можно писать эти распределённые вычисления. На Scala, Java, Python или R. Примерно однотипно на всех языках. А выполнять эти распределённые вычисления уже на кластере в Hadoop (точнее YARN) или в Apache Mesos. (Господи, всюду Апач.)
Чертовски привлекательно, что можно обойтись и без кластера, а запустить Спарк в локальной JVM. При этом, если у вас достаточно мощная машинка, и HDFS вы всё равно не используете, а значит, локальность данных вас особо не волнует, даже в таком локальном запуске можно сожрать изрядно оперативы и ядер ЦПУ, и перемолоть изрядную кучку данных.
Как-то так:
$ $SPARK_HOME/bin/spark-submit \
--class "my.runners.App" \
--master local[4] \
target/scala-2.11/my-assembly-0.1.jar \
--data-mongo-uri mongodb://... \
--target-mongo-uri mongodb://...
Вот так локально мы и перемалываем несколько десятков гигабайт недельных данных за пару десятков минут, чтобы найти некоторые аномалии...
R я не знаю. Python в Spark не пробовал. А Spark изначально запилен на Scala. Пришлось немного освоить Scala.
Согласно последней политике продвижения Scala, все Scala продукты должны иметь хороший Java API. И у Спарка он есть. Так и написано в жалком подобии явадоков: вот этот метод для Скалы, а вот этот для Явы.
На Яве получается заметно многословнее, чем на Скале. Нет имплиситов, поэтому тот же Encoder везде надо явно втыкать. Encoder — это такая штука, которая описывает "схему" обрабатываемых данных. Данные же нужно гонять между узлами кластера, и для этого надо понимать, какой они "формы". Примитивные типы, case классы Scala, некоторые коллекции поддерживаются из коробки. Для Java ещё java beans тоже энкодятся. Как правило, этого достаточно.
Но я ведь попробовал пописать на Kotlin.
Осторожно,
spark-kotlin
— это для веб-фреймворка,
а не для Apache Spark.
И на Котлине мне не понравилось.
Уж лучше слегка научиться Скале.
Имплиситов в Котлине нет. Энкодеры надо прописывать явно. Data классы Котлина поддерживаются при этом не в Kotlin-way.
Вот такой data class работать не будет:
data class KotlinDataClass(
val a: String
)
Потому что это не java bean. Тут конструктор не умолчательный. И сеттеров нет.
Нужно сделать java bean:
data class JavaBean(
var a: String? = null
)
Вот теперь будет работать в Спарке. Хотя выглядит чудовищно с точки зрения Котлина.
Пользоваться приходится API для Java,
JavaSparkContext
и JavaRDD
.
Потому что скаловых коллекций Котлин не умеет
(слава богу).
Бывает,
что Котлин видит пару методов с разными сигнатурами.
Один принимает скаловую функцию.
Другой принимает какой-нибудь org.apache.spark.api.java.function.MapFunction
.
Второй метод явно добавили для Java API.
Но Котлин эти сигнатуры не различает.
Приходится явно функциональный литерал Котлина
приводить к типу Function.
Уродливо.
import org.apache.spark.api.java.function.MapFunction
//...
return df.map(
MapFunction { o: MyObject ->
//...
},
MySchema.myEncoder
)
//...
Через пару дней мучений, я всё же решил: Скала, так Скала.
Всё в Спарке крутится вокруг трёх штук,
которые являются развитием одной и той же идеи.
Это RDD (Resilient Distributed Datasets),
DataFrame и Dataset.
Их можно прочитать,
из файлов (в том числе и HDFS) или из БД.
Их можно записать,
в файлы (в том числе и HDFS) или в БД.
Над ними можно делать операции.
Примерно те же операции,
что и над Stream, что в Java 8:
map()
, flatMap()
, filter()
, reduce()
, groupBy()
, вот это всё.
Только вот весь бигдатный набор данных
будет побит на партиции,
и каждая партиция будет обработана на воркере в кластере,
а результат будет собран в кучку на том узле,
который инициировал вычисления
и называется Driver.
Разница между RDD, DataFrame и Dataset в типизации. RDD исторически был первым API Спарка. И он умеет работать только с кортежами (которые tuple). Т.е. все вот эти мапы и прочее вы будете делать с кортежами.
Dataset (и DataFrame) — это более новый API Спарка. В DataFrame вы имеете дело со схемой данных. Известны имена и типы столбцов. Соответственно, по именам можно столбцы извлекать, удалять из набора данных, и всё такое. Можно об этом думать как об очень длинной SQL таблице. Даже некоторые операции можно выражать на подмножестве SQL.
В Dataset вы имеете дело с объектами.
Типа ORM.
Очень удобно для таких объектов использовать case классы Скалы.
На самом деле,
технически DataFrame не существует, это Dataset[Row]
.
Но в работе гораздо удобнее более типизированный Dataset[MyCaseClass]
.
Вы задаёте класс при загрузке данных,
и сразу получаете датасет нужных объектов.
По ходу манипуляций у вас получатся другие объекты,
и их снова можно сохранить в какую-нибудь коллекцию
какой-нибудь БД.
Схема (т.е. набор и типы колонок) вполне успешно самостоятельно выводится из набора полей класса. Но есть возможность указать её самостоятельно. Это полезно, если у вас в какой-нибудь Mongo коллекции завалялись документы разных форм. Тогда указание схемы позволит выбрать только документы нужной формы, пригодные для обработки.
Читать/загружать наши датасеты можно из файлов, из любой реляционной БД через JDBC, из MongoDB. Если читаем из HDFS, вовсю работает локальность данных. Т.е. воркерам достанется на обработку та партиция, которая расположена на том же узле. Если читаем из реляционной БД, на локальность данных всем насрать, как я понимаю. Если читаем из MongoDB, может учитываться расположение шардов, если они у вас есть. Разбиение на партиции — это обязанность коннектора к БД. Для Монги есть свой коннектор, который понимает шарды. Для JDBC есть один общий коннектор, который может работать с любыми JDBC драйверами.
Это бигдата.
Читать в датасет
вам придётся всю таблицу или коллекцию.
Целиком.
Впрочем,
в MongoDB есть возможность
задать шаг aggregate,
который будет вставлен в самом начале.
Очень имеет смысл добавить туда $match
,
который выберет только то,
что нужно.
Коннектор к MongoDB
читает очень странно.
На мой взгляд.
Сначала делается большой aggregate()
,
в котором выполняется $sample
,
выбираются только _id
порядка 10% всех документов.
А потом уже выбираются все документы,
с разбиением по диапазонам (от и до) _id
.
Вообще-то _id
не образуют континуум,
совпадающий с запрошенными данными,
хоть они и упорядочены.
В результате,
как минимум,
некоторые документы бывают пропущены,
ибо их _id
не попали в первоначальный $sample
.
Может в Big Data так принято,
но мне такой алгоритм кажется странным.
В бигдате много странного и непривычного. Спарк не умеет обновлять данные. Он легко может создать новую таблицу или коллекцию и выгрузить туда весь датасет, который получился в результате вычислений. Но дальше уже ваша забота, куда эту таблицу засунуть. Как правило, её нужно как-то смержить с имеющимися данными. Например, как-то так:
INSERT IGNORE INTO existing_data (...)
SELECT ... FROM spark_result;
В случае с MongoDB можно попробовать сделать апдейт и в Spark.
Опять таки,
всё зависит от коннектора.
Коннектор MongoDB умеет обращать внимание на _id
документа.
И если в датасете есть _id
,
будет сделан upsert, а не insert.
Таким образом,
можно загрузить датасет оригинальной коллекции
и сджойнить с результатами обработки,
обновить объекты.
А потом обновлённый результат выгрузить в оригинальную коллекцию,
обновить документы.
Это всё работает,
но только если вы вызовете правильный метод
MongoSpark.save[D](dataset: Dataset[D], writeConfig: WriteConfig)
.
Другие методы сохранения в MongoDB
плевать хотели на _id
.
Ну и лучше так не делать.
Во-первых,
вам придётся грузить в Спарк на одну коллекцию больше.
Во-вторых,
joinWith()
— не самая лёгкая операция для Спарка.
В-третьих,
вы полностью перепишете оригинальную коллекцию,
и все изменения,
внесённые в неё третьей стороной
с момента загрузки в Спарк,
до момента выгрузки из Спарка,
будут потеряны.
Проще выгрузить из Спарка результат вычислений в отдельную коллекцию,
а потом смержить уже средствами Монги.
Это небыстро, но надёжно.
db.spark_result.find().forEach(
function(doc) { db.existing_data.update(
{ ...update key... },
{ $set: { ...update operation... } },
{ upsert: true, multi: true })
})
Apache Spark — типичный инструмент Big Data. Всё что вы можете: загрузить громадный объем данных, обработать его по кусочками, в параллель на узлах кластера, и выгрузить результат снова одним большим куском данных. Никакой модификации имеющихся данных, только создание новых. И при этом ещё могут быть погрешности в чтении :)
А иногда надо разветвиться. Подсчитать на исходных данных какую-нибудь тяжёлую статистику. А потом, по этой статистике, родить несколько разных результатов. Теоретически, для каждого результата нужен свой конвейер. Но тогда придётся для каждого конвейера заново вычитывать исходные данные и считать статистику. Скучно и неоптимально.
Поэтому в Спарке есть кэш.
Метод persist()
.
Любой шаг конвейера можно сохранить в кэш.
И начать новый конвейер с этого кэша.
Будет быстро.
Главное,
чтобы памяти хватило.
А не хватит памяти,
можно использовать диск.
На воркерах.
Будет медленее,
но это всё равно быстрее,
чем вычитывать всё заново из внешней БД.
Тем более,
что Спарк хранит в кэше объекты,
сериализованные с помощью Kryo.
Главное,
всё правильно подтюнить.
Похоже, в тех случаях, когда всё равно нужно перелопатить добрые сотни миллионов записей, чтобы получить какие-то результаты, Apache Spark является отличной альтернативой попытке перелопатить это средствами самой БД. Выйдет быстрее (потому что обработка будет делаться на других узлах, а не в БД) и значительно гибче (потому что это всё же код на Scala, где можно делать почти всё, что угодно). Вот только хочется нормального API и для Kotlin :)