Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there anyway to stream the agg query? #3043

Open
dataoperandz opened this issue Apr 26, 2024 · 1 comment
Open

Is there anyway to stream the agg query? #3043

dataoperandz opened this issue Apr 26, 2024 · 1 comment

Comments

@dataoperandz
Copy link

Hello,

I need to get the Aggregation result. is there any possibility to stream agg query using stream publisher?

search(Indexes(s"$indexPrefix*")).size(0).aggregations(termsAgg("distinct_field_agg", field))

@igor-vovk
Copy link
Contributor

igor-vovk commented Jul 2, 2024

Not sure if I understood correctly, but here is the implementation to stream results using fs2 library I've written:

object StreamingSearch {

  def apply[F[_] : Async, A: ClassTag : Decoder](client: ElasticClient, query: SearchRequest): Stream[F, A] = {
    require(query.sorts.nonEmpty, "Search request must have at least one sort")

    val emptySearchAfter: Option[Seq[Any]] = None

    Stream.unfoldChunkEval(emptySearchAfter) { sa =>
      client
        .execute(query.searchAfter(sa.getOrElse(Seq.empty)))
        .map { result =>
          val r = result.result

          if r.nonEmpty then
            val seq         = r.to[A]
            val searchAfter = r.hits.hits.last.sort

            Some(Chunk.from(seq), searchAfter)
          else None
        }
    }
  }

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants