Параллельные запросы в Kotlin — пишем свой Parallel Map

параллельные запросы в Котлин
parallel-map-kotlin

Привет блог. Сегодня я решил рассмотреть конкретный пример — parallel map Kotlin c использованием корутин. А именно мы посмотрим как можно реализовать параллельные запросы с помощью сопрограмм.

Для чего нужны параллельные запросы?

Сначала нужно узнать — для чего может потребоваться параллельно выполнять запросы к серверу? Примеров масса: скачивание двух картинок и их совмещение, получение зависимых данных из Сети и запись в БД. Наконец — многопоточный парсинг, когда за 1 раз нужно получить много данных.

С этим разобрались, теперь давайте посмотрим как вообще можно реализовать параллельное выполнение операций? Первое что приходит на ум — Thread. Но писать класс для этого мы конечно не будем, а возьмем первого «кандидата» в наш список — parallelStream() из Java. Аналога которого в Котлин кстати до сих пор нет.

Для тестирования возьмем всем знакомый jsonplaceholder.typicode.com и будем от него по API получать список комментариев. Не забудем измерить время выполнения с помощью measureTimeMillis { }.

Допустим метод getHttpAsync получает асинхронно данные из Сети. Сперва проверим время по дефолту

val time = measureTimeMillis {
    val result = (1..10).toList()
        .map { getHttpAsync("https://jsonplaceholder.typicode.com/comments/$it") }
    
    result.forEach { println(it) }
}
println("Async: $time ms") // Async: 1493 ms

10 запусков подряд. Среднее время 1493 ms. Не густо. Давайте применим магию parallelStream()

val timeStrParallel = measureTimeMillis {
    val result = (1..10).toList()
        .parallelStream()
        .map { getHttpAsync("https://jsonplaceholder.typicode.com/comments/$it") }
    result.forEach { println(it) }
}
println("ParallelStream: $timeStrParallel ms") // ParallelStream: 579 ms

579 ms в среднем — вот это уже достойный результат! И можно было бы остановиться на этом, но мы конечно не будем. Продолжим изыскания дальше.

Parallel Map — пример из сети

Покопавшись в Интернете, я нашел пример, который тиражируется на нескольких сайтах. Автор пытаясь добиться параллельного выполнения пишет свою реализацию мапы для коллекций — pmap

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
    map { async { f(it) } }.awaitAll()
}

Здесь в принципе все просто. Автор расширяет объект Iterable с помощью функции pmap, которая с помощью async.awaitAll() дожидается выполнения результата каждой f(it), которая объявлена как suspend. Возвращается у нас List<B>. Давайте затестим!

val pmapTime = measureTimeMillis {
    runBlocking {
        val mapResult = (1..10).toList()
            .pmap { getHttpAsync("https://jsonplaceholder.typicode.com/comments/$it") }
        println(mapResult)
    }
}
println("PmapTime: $pmapTime ms") // PmapTime: 1558 ms

И наш результат… Барабанная дробь! 1558 ms. Как так-то? Ведь запросы выполняются и правда параллельно, но дюже медленно. Но здесь не хватает одной важной детали — контекста (context). Перепишем немного метод

Обновленная версия…

suspend fun <T, V> Iterable<T>.parallelMap(func: suspend (T) -> V): Iterable<V> = coroutineScope {
    map { element ->
        async(Dispatchers.IO) { func(element) }
    }.awaitAll()
}

val myPmapTime = measureTimeMillis {
    runBlocking {
        val mapResult = (1..10).toList()
            .parallelMap { getHttpAsync("https://jsonplaceholder.typicode.com/comments/$it") }
        println(mapResult)
    }
}
println("MyPmapTime: $myPmapTime ms") // MyPmapTime: 607 ms

А вот это уже получше — 607 ms и все благодаря вот этому — async(Dispatchers.IO). Контекст Dispatchers.IO предоставляет максимум возможных потоков, из-за чего операции ввода/вывода становятся быстрее. Использовать его можно для операций получения данных из Интернета, чтения файла и т.д.

Для высоконагруженных «процессорных» операций, связанных с расчетом данных следует применять Dispatchers.Default

Если не хотите расширять Iterable, можно по тому же принципу набросать простой метод, который принимая Range делает тоже самое

suspend fun <T>parallelHttp(range: Iterable<T>) = coroutineScope {
    range.map { elem ->
        async(Dispatchers.IO) {
            getHttpAsync("https://jsonplaceholder.typicode.com/comments/$elem")
        }
    }.awaitAll()
}

val parallelHttpTime = measureTimeMillis {
    runBlocking {
        println(parallelHttp(1..10))
    }
}
println("ParallelHttp: $parallelHttpTime ms") // ParallelHttp: 589 ms

В примере выше нет обработки ошибок, так как он показательный. Но если она потребуется, то можно использовать стандартный try catch { } и все будет работать.

Результаты и выводы

Давайте все сведем в таблицу и сделаем краткие выводы. Думаю все наглядно представлено.

Метод выполненияВремя исполнения (ms)
Async1493
parallelStream()579
pMap1558
parallelMap607
ParallelHttp589

По итогу, если Вам действительно нужно параллельно выполнять какие-либо операции, то лучшим выбором будет async.awaitAll(). Он же пригодится для реализации parallel map kotlin. Закономерный вопрос: «А почему не parallelStream?». Все дело в том, что обработка ошибок там тот еще «геморрой». Мне кажется, что более читаемо и наглядно использовать async в связке с try catch { }.

На это все. В следующих статьях я продолжу разбираться с корутинами. Возможно, накидаю какой-нибудь парсер данных.

Спасибо за внимание!

P.S.

Кстати подробную документацию по корутинам можно найти по этой ссылке.

2 комментария

  1. Hello there! I simply would like to give you a huge thumbs up for the great information you have right here on this post. Leslie Dugas

  2. Thanks for this method. I am will usin it in my next project. Winne Edmund Fabozzi

Оставить ответ