Skip to content

Commit

Permalink
Add upsertStreaming (fix #115)
Browse files Browse the repository at this point in the history
  • Loading branch information
oyvindberg committed Jul 10, 2024
1 parent 2b83b5c commit 7c765b8
Show file tree
Hide file tree
Showing 726 changed files with 6,677 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ trait PersonRepo {
def update(row: PersonRow)(implicit c: Connection): Boolean
def updateFieldValues(compositeId: PersonId, fieldValues: List[PersonFieldValue[?]])(implicit c: Connection): Boolean
def upsert(unsaved: PersonRow)(implicit c: Connection): PersonRow
def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import anorm.SimpleSql
import anorm.SqlStringInterpolation
import anorm.ToStatement
import java.sql.Connection
import scala.annotation.nowarn
import testdb.hardcoded.customtypes.Defaulted
import typo.dsl.DeleteBuilder
import typo.dsl.SelectBuilder
Expand Down Expand Up @@ -148,4 +149,15 @@ class PersonRepoImpl extends PersonRepo {
.executeInsert(PersonRow.rowParser(1).single)

}
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table person_TEMP (like compositepk.person) on commit drop".execute(): @nowarn
streamingInsert(s"""copy person_TEMP("one", "two", "name") from stdin""", batchSize, unsaved)(PersonRow.text, c): @nowarn
SQL"""insert into compositepk.person("one", "two", "name")
select * from person_TEMP
on conflict ("one", "two")
do update set
"name" = EXCLUDED."name"
;
drop table person_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,10 @@ class PersonRepoMock(toRow: Function1[PersonRowUnsaved, PersonRow],
map.put(unsaved.compositeId, unsaved): @nowarn
unsaved
}
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.compositeId -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ trait FootballClubRepo {
def update(row: FootballClubRow)(implicit c: Connection): Boolean
def updateFieldValues(id: FootballClubId, fieldValues: List[FootballClubFieldValue[?]])(implicit c: Connection): Boolean
def upsert(unsaved: FootballClubRow)(implicit c: Connection): FootballClubRow
def upsertStreaming(unsaved: Iterator[FootballClubRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import anorm.SimpleSql
import anorm.SqlStringInterpolation
import anorm.ToStatement
import java.sql.Connection
import scala.annotation.nowarn
import typo.dsl.DeleteBuilder
import typo.dsl.SelectBuilder
import typo.dsl.SelectBuilderSql
Expand Down Expand Up @@ -130,4 +131,15 @@ class FootballClubRepoImpl extends FootballClubRepo {
.executeInsert(FootballClubRow.rowParser(1).single)

}
override def upsertStreaming(unsaved: Iterator[FootballClubRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table football_club_TEMP (like myschema.football_club) on commit drop".execute(): @nowarn
streamingInsert(s"""copy football_club_TEMP("id", "name") from stdin""", batchSize, unsaved)(FootballClubRow.text, c): @nowarn
SQL"""insert into myschema.football_club("id", "name")
select * from football_club_TEMP
on conflict ("id")
do update set
"name" = EXCLUDED."name"
;
drop table football_club_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,10 @@ class FootballClubRepoMock(map: scala.collection.mutable.Map[FootballClubId, Foo
map.put(unsaved.id, unsaved): @nowarn
unsaved
}
override def upsertStreaming(unsaved: Iterator[FootballClubRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.id -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ trait MaritalStatusRepo {
def selectByIdsTracked(ids: Array[MaritalStatusId])(implicit c: Connection): Map[MaritalStatusId, MaritalStatusRow]
def update: UpdateBuilder[MaritalStatusFields, MaritalStatusRow]
def upsert(unsaved: MaritalStatusRow)(implicit c: Connection): MaritalStatusRow
def upsertStreaming(unsaved: Iterator[MaritalStatusRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import anorm.SQL
import anorm.SimpleSql
import anorm.SqlStringInterpolation
import java.sql.Connection
import scala.annotation.nowarn
import typo.dsl.DeleteBuilder
import typo.dsl.SelectBuilder
import typo.dsl.SelectBuilderSql
Expand Down Expand Up @@ -103,4 +104,15 @@ class MaritalStatusRepoImpl extends MaritalStatusRepo {
.executeInsert(MaritalStatusRow.rowParser(1).single)

}
override def upsertStreaming(unsaved: Iterator[MaritalStatusRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table marital_status_TEMP (like myschema.marital_status) on commit drop".execute(): @nowarn
streamingInsert(s"""copy marital_status_TEMP("id") from stdin""", batchSize, unsaved)(MaritalStatusRow.text, c): @nowarn
SQL"""insert into myschema.marital_status("id")
select * from marital_status_TEMP
on conflict ("id")
do update set
;
drop table marital_status_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,10 @@ class MaritalStatusRepoMock(map: scala.collection.mutable.Map[MaritalStatusId, M
map.put(unsaved.id, unsaved): @nowarn
unsaved
}
override def upsertStreaming(unsaved: Iterator[MaritalStatusRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.id -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ trait PersonRepo {
def update(row: PersonRow)(implicit c: Connection): Boolean
def updateFieldValues(id: PersonId, fieldValues: List[PersonFieldValue[?]])(implicit c: Connection): Boolean
def upsert(unsaved: PersonRow)(implicit c: Connection): PersonRow
def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import anorm.SimpleSql
import anorm.SqlStringInterpolation
import anorm.ToStatement
import java.sql.Connection
import scala.annotation.nowarn
import testdb.hardcoded.customtypes.Defaulted
import testdb.hardcoded.myschema.football_club.FootballClubId
import testdb.hardcoded.myschema.marital_status.MaritalStatusId
Expand Down Expand Up @@ -231,4 +232,25 @@ class PersonRepoImpl extends PersonRepo {
.executeInsert(PersonRow.rowParser(1).single)

}
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table person_TEMP (like myschema.person) on commit drop".execute(): @nowarn
streamingInsert(s"""copy person_TEMP("id", "favourite_football_club_id", "name", "nick_name", "blog_url", "email", "phone", "likes_pizza", "marital_status_id", "work_email", "sector", "favorite_number") from stdin""", batchSize, unsaved)(PersonRow.text, c): @nowarn
SQL"""insert into myschema.person("id", "favourite_football_club_id", "name", "nick_name", "blog_url", "email", "phone", "likes_pizza", "marital_status_id", "work_email", "sector", "favorite_number")
select * from person_TEMP
on conflict ("id")
do update set
"favourite_football_club_id" = EXCLUDED."favourite_football_club_id",
"name" = EXCLUDED."name",
"nick_name" = EXCLUDED."nick_name",
"blog_url" = EXCLUDED."blog_url",
"email" = EXCLUDED."email",
"phone" = EXCLUDED."phone",
"likes_pizza" = EXCLUDED."likes_pizza",
"marital_status_id" = EXCLUDED."marital_status_id",
"work_email" = EXCLUDED."work_email",
"sector" = EXCLUDED."sector",
"favorite_number" = EXCLUDED."favorite_number"
;
drop table person_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,10 @@ class PersonRepoMock(toRow: Function1[PersonRowUnsaved, PersonRow],
map.put(unsaved.id, unsaved): @nowarn
unsaved
}
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.id -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ trait PersonRepo {
def update(row: PersonRow)(implicit c: Connection): Boolean
def updateFieldValues(compositeId: PersonId, fieldValues: List[PersonFieldValue[?]])(implicit c: Connection): Boolean
def upsert(unsaved: PersonRow)(implicit c: Connection): PersonRow
def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import anorm.SimpleSql
import anorm.SqlStringInterpolation
import anorm.ToStatement
import java.sql.Connection
import scala.annotation.nowarn
import testdb.hardcoded.customtypes.Defaulted
import typo.dsl.DeleteBuilder
import typo.dsl.SelectBuilder
Expand Down Expand Up @@ -148,4 +149,15 @@ class PersonRepoImpl extends PersonRepo {
.executeInsert(PersonRow.rowParser(1).single)

}
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table person_TEMP (like compositepk.person) on commit drop".execute(): @nowarn
streamingInsert(s"""copy person_TEMP("one", "two", "name") from stdin""", batchSize, unsaved)(PersonRow.text, c): @nowarn
SQL"""insert into compositepk.person("one", "two", "name")
select * from person_TEMP
on conflict ("one", "two")
do update set
"name" = EXCLUDED."name"
;
drop table person_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,10 @@ class PersonRepoMock(toRow: Function1[PersonRowUnsaved, PersonRow],
map.put(unsaved.compositeId, unsaved): @nowarn
unsaved
}
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.compositeId -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ trait FootballClubRepo {
def update(row: FootballClubRow)(implicit c: Connection): Boolean
def updateFieldValues(id: FootballClubId, fieldValues: List[FootballClubFieldValue[?]])(implicit c: Connection): Boolean
def upsert(unsaved: FootballClubRow)(implicit c: Connection): FootballClubRow
def upsertStreaming(unsaved: Iterator[FootballClubRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import anorm.SimpleSql
import anorm.SqlStringInterpolation
import anorm.ToStatement
import java.sql.Connection
import scala.annotation.nowarn
import typo.dsl.DeleteBuilder
import typo.dsl.SelectBuilder
import typo.dsl.SelectBuilderSql
Expand Down Expand Up @@ -130,4 +131,15 @@ class FootballClubRepoImpl extends FootballClubRepo {
.executeInsert(FootballClubRow.rowParser(1).single)

}
override def upsertStreaming(unsaved: Iterator[FootballClubRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table football_club_TEMP (like myschema.football_club) on commit drop".execute(): @nowarn
streamingInsert(s"""copy football_club_TEMP("id", "name") from stdin""", batchSize, unsaved)(FootballClubRow.text, c): @nowarn
SQL"""insert into myschema.football_club("id", "name")
select * from football_club_TEMP
on conflict ("id")
do update set
"name" = EXCLUDED."name"
;
drop table football_club_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,10 @@ class FootballClubRepoMock(map: scala.collection.mutable.Map[FootballClubId, Foo
map.put(unsaved.id, unsaved): @nowarn
unsaved
}
override def upsertStreaming(unsaved: Iterator[FootballClubRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.id -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ trait MaritalStatusRepo {
def selectByIdsTracked(ids: Array[MaritalStatusId])(implicit c: Connection): Map[MaritalStatusId, MaritalStatusRow]
def update: UpdateBuilder[MaritalStatusFields, MaritalStatusRow]
def upsert(unsaved: MaritalStatusRow)(implicit c: Connection): MaritalStatusRow
def upsertStreaming(unsaved: Iterator[MaritalStatusRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import anorm.SQL
import anorm.SimpleSql
import anorm.SqlStringInterpolation
import java.sql.Connection
import scala.annotation.nowarn
import typo.dsl.DeleteBuilder
import typo.dsl.SelectBuilder
import typo.dsl.SelectBuilderSql
Expand Down Expand Up @@ -103,4 +104,15 @@ class MaritalStatusRepoImpl extends MaritalStatusRepo {
.executeInsert(MaritalStatusRow.rowParser(1).single)

}
override def upsertStreaming(unsaved: Iterator[MaritalStatusRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table marital_status_TEMP (like myschema.marital_status) on commit drop".execute(): @nowarn
streamingInsert(s"""copy marital_status_TEMP("id") from stdin""", batchSize, unsaved)(MaritalStatusRow.text, c): @nowarn
SQL"""insert into myschema.marital_status("id")
select * from marital_status_TEMP
on conflict ("id")
do update set
;
drop table marital_status_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,10 @@ class MaritalStatusRepoMock(map: scala.collection.mutable.Map[MaritalStatusId, M
map.put(unsaved.id, unsaved): @nowarn
unsaved
}
override def upsertStreaming(unsaved: Iterator[MaritalStatusRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.id -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ trait PersonRepo {
def update(row: PersonRow)(implicit c: Connection): Boolean
def updateFieldValues(id: PersonId, fieldValues: List[PersonFieldValue[?]])(implicit c: Connection): Boolean
def upsert(unsaved: PersonRow)(implicit c: Connection): PersonRow
def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import anorm.SimpleSql
import anorm.SqlStringInterpolation
import anorm.ToStatement
import java.sql.Connection
import scala.annotation.nowarn
import testdb.hardcoded.customtypes.Defaulted
import testdb.hardcoded.myschema.football_club.FootballClubId
import testdb.hardcoded.myschema.marital_status.MaritalStatusId
Expand Down Expand Up @@ -231,4 +232,25 @@ class PersonRepoImpl extends PersonRepo {
.executeInsert(PersonRow.rowParser(1).single)

}
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table person_TEMP (like myschema.person) on commit drop".execute(): @nowarn
streamingInsert(s"""copy person_TEMP("id", "favourite_football_club_id", "name", "nick_name", "blog_url", "email", "phone", "likes_pizza", "marital_status_id", "work_email", "sector", "favorite_number") from stdin""", batchSize, unsaved)(PersonRow.text, c): @nowarn
SQL"""insert into myschema.person("id", "favourite_football_club_id", "name", "nick_name", "blog_url", "email", "phone", "likes_pizza", "marital_status_id", "work_email", "sector", "favorite_number")
select * from person_TEMP
on conflict ("id")
do update set
"favourite_football_club_id" = EXCLUDED."favourite_football_club_id",
"name" = EXCLUDED."name",
"nick_name" = EXCLUDED."nick_name",
"blog_url" = EXCLUDED."blog_url",
"email" = EXCLUDED."email",
"phone" = EXCLUDED."phone",
"likes_pizza" = EXCLUDED."likes_pizza",
"marital_status_id" = EXCLUDED."marital_status_id",
"work_email" = EXCLUDED."work_email",
"sector" = EXCLUDED."sector",
"favorite_number" = EXCLUDED."favorite_number"
;
drop table person_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,10 @@ class PersonRepoMock(toRow: Function1[PersonRowUnsaved, PersonRow],
map.put(unsaved.id, unsaved): @nowarn
unsaved
}
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.id -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ trait PersonRepo {
def update(row: PersonRow): ConnectionIO[Boolean]
def updateFieldValues(compositeId: PersonId, fieldValues: List[PersonFieldValue[?]]): ConnectionIO[Boolean]
def upsert(unsaved: PersonRow): ConnectionIO[PersonRow]
def upsertStreaming(unsaved: Stream[ConnectionIO, PersonRow], batchSize: Int = 10000): ConnectionIO[Int]
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,17 @@ class PersonRepoImpl extends PersonRepo {
returning "one", "two", "name"
""".query(using PersonRow.read).unique
}
override def upsertStreaming(unsaved: Stream[ConnectionIO, PersonRow], batchSize: Int = 10000): ConnectionIO[Int] = {
for {
_ <- sql"create temporary table person_TEMP (like compositepk.person) on commit drop".update.run
_ <- new FragmentOps(sql"""copy person_TEMP("one", "two", "name") from stdin""").copyIn(unsaved, batchSize)(using PersonRow.text)
res <- sql"""insert into compositepk.person("one", "two", "name")
select * from person_TEMP
on conflict ("one", "two")
do update set
"name" = EXCLUDED."name"
;
drop table person_TEMP;""".update.run
} yield res
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,14 @@ class PersonRepoMock(toRow: Function1[PersonRowUnsaved, PersonRow],
unsaved
}
}
override def upsertStreaming(unsaved: Stream[ConnectionIO, PersonRow], batchSize: Int = 10000): ConnectionIO[Int] = {
unsaved.compile.toList.map { rows =>
var num = 0
rows.foreach { row =>
map += (row.compositeId -> row)
num += 1
}
num
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ trait FootballClubRepo {
def update(row: FootballClubRow): ConnectionIO[Boolean]
def updateFieldValues(id: FootballClubId, fieldValues: List[FootballClubFieldValue[?]]): ConnectionIO[Boolean]
def upsert(unsaved: FootballClubRow): ConnectionIO[FootballClubRow]
def upsertStreaming(unsaved: Stream[ConnectionIO, FootballClubRow], batchSize: Int = 10000): ConnectionIO[Int]
}
Loading

0 comments on commit 7c765b8

Please sign in to comment.