Skip to content

Commit

Permalink
Support execution of zio statements - add repeatZIO operator (#31)
Browse files Browse the repository at this point in the history
* Support execution of zio statements

* Update src/main/scala/zio/cassandra/session/Session.scala

Co-authored-by: Sergey Rublev <[email protected]>

* cleanup

* removed execute zio

Co-authored-by: Sergey Rublev <[email protected]>
  • Loading branch information
adwells-ds and narma authored Dec 17, 2022
1 parent 3f82de1 commit 77dcbe4
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 5 deletions.
37 changes: 37 additions & 0 deletions src/it/scala/zio/cassandra/session/SessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import zio.{ Chunk, Scope, ZIO }

import java.net.InetSocketAddress
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import com.datastax.oss.driver.api.core.cql.PreparedStatement

object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {

Expand Down Expand Up @@ -156,6 +158,41 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
inserted <- session.execute(batch)
result <- session.selectFirst(cqlConst"select count(*) from $table".as[Long])
} yield assertTrue(inserted, result.contains(3))
},
test("select continuously over multiple key partitions") {
val partitionSize = 10L
val partitionNr = new AtomicLong(0L)

// we do not have template interface for now,
// hence have to manually set the values
def selectStatement(ps: PreparedStatement) = ZIO.attempt {
val pn = partitionNr.getAndIncrement()
ps.bind()
.setLong(0, pn)
.setLong(1, pn * partitionSize)
.setLong(2, (pn + 1) * partitionSize)
}

for {
session <- ZIO.service[Session]
tbl = "table_" + UUID.randomUUID().toString.replaceAll("-", "_")
table = s"$keyspace.$tbl"
_ <- session.execute(
cqlConst"create table $table(id text, p_nr bigint, seq_nr bigint, primary key((id, p_nr), seq_nr))"
)
records = Chunk.fromIterable(0L.until(37L))
_ <- records.mapZIO { i =>
session.execute(cql"""INSERT INTO ${lift(
table
)} (id, p_nr, seq_nr) VALUES('key', ${i / partitionSize}, $i)""")
}
// read all records per key partition
st <- session.prepare(
s"""select id, p_nr, seq_nr from ${table}
|where id = 'key' and p_nr = ? and seq_nr >= ? and seq_nr <= ?""".stripMargin
)
res <- session.repeatZIO(selectStatement(st)).runCount
} yield assertTrue(records.size == res.toInt)
}
)
}
28 changes: 23 additions & 5 deletions src/main/scala/zio/cassandra/session/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ trait Session {

def select(stmt: Statement[_]): Stream[Throwable, Row]

/*
* Continuously quering the effect, until empty response returned.
* Meaning that effect should provide new statement on each materialization.
*/
def repeatZIO[R](stmt: ZIO[R, Throwable, Statement[_]]): ZStream[R, Throwable, Row]

// short-cuts
def selectFirst(stmt: Statement[_]): Task[Option[Row]]

Expand Down Expand Up @@ -68,26 +74,38 @@ object Session {
override def execute(query: String): Task[AsyncResultSet] =
ZIO.fromCompletionStage(underlying.executeAsync(query))

override def select(stmt: Statement[_]): Stream[Throwable, Row] = {
def pull(ref: Ref[ZIO[Any, Option[Throwable], AsyncResultSet]]): ZIO[Any, Option[Throwable], Chunk[Row]] =
private def repeatZIO[R](
stmt: ZIO[R, Throwable, Statement[_]],
continuous: Boolean
): ZStream[R, Throwable, Row] = {
val executeOpt = stmt.flatMap(execute).mapError(Option(_))
def pull(ref: Ref[ZIO[R, Option[Throwable], AsyncResultSet]]): ZIO[R, Option[Throwable], Chunk[Row]] =
for {
io <- ref.get
rs <- io
_ <- rs match {
case _ if rs.hasMorePages =>
ref.set(ZIO.fromCompletionStage(rs.fetchNextPage()).mapError(Option(_)))
case _ if rs.currentPage().iterator().hasNext => ref.set(Pull.end)
case _ => Pull.end
case _ if rs.currentPage().iterator().hasNext =>
ref.set(if (continuous) executeOpt else Pull.end)
case _ =>
Pull.end
}
} yield Chunk.fromIterable(rs.currentPage().asScala)

ZStream.fromPull {
for {
ref <- Ref.make(execute(stmt).mapError(Option(_)))
ref <- Ref.make(executeOpt)
} yield pull(ref)
}
}

override def repeatZIO[R](stmt: ZIO[R, Throwable, Statement[_]]): ZStream[R, Throwable, Row] =
repeatZIO(stmt, continuous = true)

override def select(stmt: Statement[_]): Stream[Throwable, Row] =
repeatZIO(ZIO.succeed(stmt), continuous = false)

override def selectFirst(stmt: Statement[_]): Task[Option[Row]] =
execute(stmt).map(rs => Option(rs.one()))

Expand Down

0 comments on commit 77dcbe4

Please sign in to comment.