
Привет блог. Сегодня я решил рассмотреть конкретный пример — parallel map Kotlin c использованием корутин. А именно мы посмотрим как можно реализовать параллельные запросы с помощью сопрограмм.
Для чего нужны параллельные запросы?
Сначала нужно узнать — для чего может потребоваться параллельно выполнять запросы к серверу? Примеров масса: скачивание двух картинок и их совмещение, получение зависимых данных из Сети и запись в БД. Наконец — многопоточный парсинг, когда за 1 раз нужно получить много данных.
С этим разобрались, теперь давайте посмотрим как вообще можно реализовать параллельное выполнение операций? Первое что приходит на ум — Thread. Но писать класс для этого мы конечно не будем, а возьмем первого «кандидата» в наш список — parallelStream() из Java. Аналога которого в Котлин кстати до сих пор нет.
Для тестирования возьмем всем знакомый jsonplaceholder.typicode.com и будем от него по API получать список комментариев. Не забудем измерить время выполнения с помощью measureTimeMillis { }.
Допустим метод getHttpAsync получает асинхронно данные из Сети. Сперва проверим время по дефолту
val time = measureTimeMillis {
val result = (1..10).toList()
.map { getHttpAsync("https://jsonplaceholder.typicode.com/comments/$it") }
result.forEach { println(it) }
}
println("Async: $time ms") // Async: 1493 ms
10 запусков подряд. Среднее время 1493 ms. Не густо. Давайте применим магию parallelStream()
val timeStrParallel = measureTimeMillis {
val result = (1..10).toList()
.parallelStream()
.map { getHttpAsync("https://jsonplaceholder.typicode.com/comments/$it") }
result.forEach { println(it) }
}
println("ParallelStream: $timeStrParallel ms") // ParallelStream: 579 ms
579 ms в среднем — вот это уже достойный результат! И можно было бы остановиться на этом, но мы конечно не будем. Продолжим изыскания дальше.
Parallel Map — пример из сети
Покопавшись в Интернете, я нашел пример, который тиражируется на нескольких сайтах. Автор пытаясь добиться параллельного выполнения пишет свою реализацию мапы для коллекций — pmap
suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async { f(it) } }.awaitAll()
}
Здесь в принципе все просто. Автор расширяет объект Iterable с помощью функции pmap, которая с помощью async.awaitAll() дожидается выполнения результата каждой f(it), которая объявлена как suspend. Возвращается у нас List<B>. Давайте затестим!
val pmapTime = measureTimeMillis {
runBlocking {
val mapResult = (1..10).toList()
.pmap { getHttpAsync("https://jsonplaceholder.typicode.com/comments/$it") }
println(mapResult)
}
}
println("PmapTime: $pmapTime ms") // PmapTime: 1558 ms
И наш результат… Барабанная дробь! 1558 ms. Как так-то? Ведь запросы выполняются и правда параллельно, но дюже медленно. Но здесь не хватает одной важной детали — контекста (context). Перепишем немного метод
Обновленная версия…
suspend fun <T, V> Iterable<T>.parallelMap(func: suspend (T) -> V): Iterable<V> = coroutineScope {
map { element ->
async(Dispatchers.IO) { func(element) }
}.awaitAll()
}
val myPmapTime = measureTimeMillis {
runBlocking {
val mapResult = (1..10).toList()
.parallelMap { getHttpAsync("https://jsonplaceholder.typicode.com/comments/$it") }
println(mapResult)
}
}
println("MyPmapTime: $myPmapTime ms") // MyPmapTime: 607 ms
А вот это уже получше — 607 ms и все благодаря вот этому — async(Dispatchers.IO). Контекст Dispatchers.IO предоставляет максимум возможных потоков, из-за чего операции ввода/вывода становятся быстрее. Использовать его можно для операций получения данных из Интернета, чтения файла и т.д.
Для высоконагруженных «процессорных» операций, связанных с расчетом данных следует применять Dispatchers.Default
Если не хотите расширять Iterable, можно по тому же принципу набросать простой метод, который принимая Range делает тоже самое
suspend fun <T>parallelHttp(range: Iterable<T>) = coroutineScope {
range.map { elem ->
async(Dispatchers.IO) {
getHttpAsync("https://jsonplaceholder.typicode.com/comments/$elem")
}
}.awaitAll()
}
val parallelHttpTime = measureTimeMillis {
runBlocking {
println(parallelHttp(1..10))
}
}
println("ParallelHttp: $parallelHttpTime ms") // ParallelHttp: 589 ms
В примере выше нет обработки ошибок, так как он показательный. Но если она потребуется, то можно использовать стандартный try catch { } и все будет работать.
Результаты и выводы
Давайте все сведем в таблицу и сделаем краткие выводы. Думаю все наглядно представлено.
Метод выполнения | Время исполнения (ms) |
---|---|
Async | 1493 |
parallelStream() | 579 |
pMap | 1558 |
parallelMap | 607 |
ParallelHttp | 589 |
По итогу, если Вам действительно нужно параллельно выполнять какие-либо операции, то лучшим выбором будет async.awaitAll(). Он же пригодится для реализации parallel map kotlin. Закономерный вопрос: «А почему не parallelStream?». Все дело в том, что обработка ошибок там тот еще «геморрой». Мне кажется, что более читаемо и наглядно использовать async в связке с try catch { }.
На это все. В следующих статьях я продолжу разбираться с корутинами. Возможно, накидаю какой-нибудь парсер данных.
Спасибо за внимание!
P.S.
Кстати подробную документацию по корутинам можно найти по этой ссылке.