Skip to content

Commit

Permalink
fix: update all users when getting their data [AR-5057] (#2201)
Browse files Browse the repository at this point in the history
* Commit with unresolved merge conflicts outside of

* chore: solve conflicts

* chore: avoid calling upsert when lists are empty

* chore: avoid calling upsert when lists are empty

---------

Co-authored-by: Tommaso Piazza <[email protected]>
  • Loading branch information
vitorhugods and tmspzz authored Nov 8, 2023
1 parent 33129c1 commit 323edb6
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.IdMapper
import com.wire.kalium.logic.data.id.NetworkQualifiedId
import com.wire.kalium.logic.data.id.QualifiedID
import com.wire.kalium.logic.data.id.SelfTeamIdProvider
import com.wire.kalium.logic.data.id.toApi
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.id.toModel
Expand All @@ -36,10 +37,8 @@ import com.wire.kalium.logic.data.session.SessionRepository
import com.wire.kalium.logic.data.team.Team
import com.wire.kalium.logic.data.team.TeamMapper
import com.wire.kalium.logic.data.user.type.UserEntityTypeMapper
import com.wire.kalium.logic.data.user.type.isFederated
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.failure.SelfUserDeleted
import com.wire.kalium.logic.data.id.SelfTeamIdProvider
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
Expand Down Expand Up @@ -159,12 +158,12 @@ internal class UserDataSource internal constructor(
) : UserRepository {

/**
* In case of federated users, we need to refresh their info every time.
* Since the current backend implementation at wire does not emit user events across backends.
* Stores the last time a user's details were fetched from remote.
*
* This is an in-memory cache, to help avoid unnecessary requests in a time window.
* @see Event.User.Update
* @see USER_DETAILS_MAX_AGE
*/
private val federatedUsersExpirationCache = ConcurrentMutableMap<UserId, Instant>()
private val userDetailsRefreshInstantCache = ConcurrentMutableMap<UserId, Instant>()

override suspend fun fetchSelfUser(): Either<CoreFailure, Unit> = wrapApiRequest { selfApi.getSelfInfo() }
.flatMap { userDTO ->
Expand All @@ -190,7 +189,9 @@ internal class UserDataSource internal constructor(
.map { userEntity ->
userEntity?.let { userMapper.fromUserDetailsEntityToOtherUser(userEntity) }
}.onEach { otherUser ->
processFederatedUserRefresh(userId, otherUser)
if (otherUser != null) {
refreshUserDetailsIfNeeded(userId)
}
}

override suspend fun getUsersWithOneOnOneConversation(): List<OtherUser> {
Expand All @@ -199,16 +200,19 @@ internal class UserDataSource internal constructor(
}

/**
* Only in case of federated users and if it's expired or not cached, we fetch and refresh the user info.
* Only refresh user profiles if it wasn't fetched recently.
*
* @see userDetailsRefreshInstantCache
* @see USER_DETAILS_MAX_AGE
*/
private suspend fun processFederatedUserRefresh(userId: UserId, otherUser: OtherUser?) {
if (otherUser != null && otherUser.userType.isFederated()
&& federatedUsersExpirationCache[userId]?.let { DateTimeUtil.currentInstant() > it } != false
) {
private suspend fun refreshUserDetailsIfNeeded(userId: UserId) {
val now = DateTimeUtil.currentInstant()
val wasFetchedRecently = userDetailsRefreshInstantCache[userId]?.let { now < it + USER_DETAILS_MAX_AGE } ?: false
if (!wasFetchedRecently) {
fetchUserInfo(userId).also {
kaliumLogger.d("Federated user, refreshing user info from API after $FEDERATED_USER_TTL")
kaliumLogger.d("Federated user, refreshing user info from API after $USER_DETAILS_MAX_AGE")
}
federatedUsersExpirationCache[userId] = DateTimeUtil.currentInstant().plus(FEDERATED_USER_TTL)
userDetailsRefreshInstantCache[userId] = now
}
}

Expand Down Expand Up @@ -261,22 +265,18 @@ internal class UserDataSource internal constructor(
val selfUserTeamId = selfTeamIdProvider().getOrNull()?.value
val teamMembers = listUserProfileDTO
.filter { userProfileDTO -> userProfileDTO.isTeamMember(selfUserTeamId, selfUserDomain) }
val otherUsers = listUserProfileDTO
.filter { userProfileDTO -> !userProfileDTO.isTeamMember(selfUserTeamId, selfUserDomain) }

userDAO.upsertUsers(
teamMembers.map { userProfileDTO ->
.map { userProfileDTO ->
userMapper.fromUserProfileDtoToUserEntity(
userProfile = userProfileDTO,
connectionState = ConnectionEntity.State.ACCEPTED,
userTypeEntity = userDAO.observeUserDetailsByQualifiedID(userProfileDTO.id.toDao())
.firstOrNull()?.userType ?: UserTypeEntity.STANDARD
)
}
)

userDAO.upsertUsers(
otherUsers.map { userProfileDTO ->
val otherUsers = listUserProfileDTO
.filter { userProfileDTO -> !userProfileDTO.isTeamMember(selfUserTeamId, selfUserDomain) }
.map { userProfileDTO ->
userMapper.fromUserProfileDtoToUserEntity(
userProfile = userProfileDTO,
connectionState = ConnectionEntity.State.NOT_CONNECTED,
Expand All @@ -289,7 +289,13 @@ internal class UserDataSource internal constructor(
)
)
}
)
if (teamMembers.isNotEmpty()) {
userDAO.upsertUsers(teamMembers)
}

if (otherUsers.isNotEmpty()) {
userDAO.upsertUsers(otherUsers)
}
}

override suspend fun fetchUsersIfUnknownByIds(ids: Set<UserId>): Either<CoreFailure, Unit> = wrapStorageRequest {
Expand Down Expand Up @@ -495,7 +501,16 @@ internal class UserDataSource internal constructor(

companion object {
internal const val SELF_USER_ID_KEY = "selfUserID"
internal val FEDERATED_USER_TTL = 5.minutes

/**
* Maximum age for user details.
*
* The USER_DETAILS_MAX_AGE constant represents the maximum age in minutes that user details can be considered valid. After
* this duration, the user details should be refreshed.
*
* This is needed because some users don't get `user.update` events, so we need to refresh their details every so often.
*/
internal val USER_DETAILS_MAX_AGE = 5.minutes
internal const val BATCH_SIZE = 500
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,9 @@ class UserRepositoryTest {
}

@Test
fun givenAKnownNOTFederatedUser_whenGettingFromDb_thenShouldNotRefreshItsDataFromAPI() = runTest {
fun givenAKnownUser_whenGettingFromDb_thenShouldRefreshItsDataFromAPI() = runTest {
val (arrangement, userRepository) = Arrangement()
.withUserDaoReturning(TestUser.DETAILS_ENTITY.copy(userType = UserTypeEntity.STANDARD))
.withUserDaoReturning(TestUser.DETAILS_ENTITY)
.withSuccessfulGetUsersInfo()
.arrange()

Expand All @@ -344,18 +344,18 @@ class UserRepositoryTest {
verify(arrangement.userDetailsApi)
.suspendFunction(arrangement.userDetailsApi::getUserInfo)
.with(any())
.wasNotInvoked()
.wasInvoked(exactly = once)
verify(arrangement.userDAO)
.suspendFunction(arrangement.userDAO::upsertUsers)
.with(any())
.wasNotInvoked()
.wasInvoked(exactly = once)
}
}

@Test
fun givenAKnownFederatedUser_whenGettingFromDbAndCacheValid_thenShouldNOTRefreshItsDataFromAPI() = runTest {
fun givenAKnownUser_whenGettingFromDbAndCacheValid_thenShouldNOTRefreshItsDataFromAPI() = runTest {
val (arrangement, userRepository) = Arrangement()
.withUserDaoReturning(TestUser.DETAILS_ENTITY.copy(userType = UserTypeEntity.FEDERATED))
.withUserDaoReturning(TestUser.DETAILS_ENTITY)
.withSuccessfulGetUsersInfo()
.arrange()

Expand All @@ -369,7 +369,7 @@ class UserRepositoryTest {
verify(arrangement.userDAO)
.suspendFunction(arrangement.userDAO::upsertUsers)
.with(any())
.wasInvoked(exactly = twice)
.wasInvoked(exactly = once)
}

val resultSecondTime = userRepository.getKnownUser(TestUser.USER_ID)
Expand Down

0 comments on commit 323edb6

Please sign in to comment.