Представим, что у нас есть некоторая функция, для выполнения которой требуется некоторое время.
def slowCalculation(i: Int)(implicit ec: ExecutionContext): Future[Int] = Future {
log(s"slowCalculation $i")
Thread.sleep(1000)
i
}С помощью Future.traverse мы можем применить данную функцию для некоторого списка данных. При этом выполнив конкурентный (это не значит параллельный) запуск фьюч.
val someData = List(1, 2, 3, 4)
Future.traverse(someData)(slowCalculation)В функции slowCalculation вместо Thread.sleep(1000) может быть все что угодно. Например, обращение в базу данных или поход в некоторый сервис. Так же может произойти такая ситуация, что размер списка someData может быть равен не 4, а 400000. И поскольку Future жадная, то у нас запуститься 400000 прожорливых фьюч. Это может привести к непредвиденным последствиям (кстати, это отличный способ заддосить свой внутренний или внешний сервис).
Чтобы обезопасить себя от такого поведения напишем функцию batchTraverse для порционного применения функции к списку данных.
def batchTraverse[A, B](input: Seq[A], batchSize: Int)(f: A => Future[B]): Future[Seq[B]] = {
input.grouped(batchSize)
.map(batch => () => Future.traverse(batch)(f))
.foldLeft(Future.successful(Seq[B]())) {
(accF, batchF) => for {
acc <- accF
batch <- batchF()
} yield acc ++ batch
}
}После чего можем запустить функцию порционно по 10 штук пока не будет обработан весь список.
Await.result(batchTraverse(0 to 400000, 10)(slowCalculation), Duration.Inf)- Примечание: в ходе экспериментов не забывайте о контексте, в котором будет производиться запуск фьюч. А лучше создать свой
ExecutionContextс фиксированным размером пула потоков.
val threadPool = Executors.newFixedThreadPool(20)
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(threadPool)