Skip to content

Commit

Permalink
Implement articles pagination (#322)
Browse files Browse the repository at this point in the history
* Implement articles pagination
* Implement better findRemoteKey logic
* Implement article order by paging event
* Reference article by aTag instead of eventId

---------

Co-authored-by: Aleksandar Ilic <[email protected]>
  • Loading branch information
markocic and AleksandarIlic authored Feb 21, 2025
1 parent 62f72bb commit 535ae09
Show file tree
Hide file tree
Showing 10 changed files with 2,152 additions and 44 deletions.
1,979 changes: 1,979 additions & 0 deletions app/schemas/net.primal.android.db.PrimalDatabase/57.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import net.primal.android.articles.api.model.ArticleHighlightsRequestBody
import net.primal.android.articles.api.model.ArticleHighlightsResponse
import net.primal.android.articles.api.model.ArticleResponse
import net.primal.android.core.serialization.json.NostrJson
import net.primal.android.core.serialization.json.NostrJsonImplicitNulls
import net.primal.android.core.serialization.json.decodeFromStringOrNull
import net.primal.android.networking.di.PrimalCacheApiClient
import net.primal.android.networking.primal.PrimalApiClient
Expand Down Expand Up @@ -54,7 +55,7 @@ class ArticlesApiImpl @Inject constructor(
val queryResult = primalApiClient.query(
message = PrimalCacheFilter(
primalVerb = PrimalVerb.MEGA_FEED_DIRECTIVE,
optionsJson = NostrJson.encodeToString(body),
optionsJson = NostrJsonImplicitNulls.encodeToString(body),
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@ import androidx.paging.LoadType
import androidx.paging.PagingState
import androidx.paging.RemoteMediator
import androidx.room.withTransaction
import java.time.Instant
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.withContext
import net.primal.android.articles.api.ArticlesApi
import net.primal.android.articles.api.model.ArticleFeedRequestBody
import net.primal.android.articles.api.model.ArticleResponse
import net.primal.android.articles.db.Article
import net.primal.android.articles.db.ArticleFeedCrossRef
import net.primal.android.core.coroutines.CoroutineDispatcherProvider
import net.primal.android.db.PrimalDatabase
import net.primal.android.networking.primal.retryNetworkCall
import net.primal.android.networking.sockets.errors.WssException
import net.primal.android.nostr.ext.mapNotNullAsArticleDataPO
import net.primal.android.nostr.ext.orderByPagingIfNotNull
import net.primal.android.nostr.model.primal.content.ContentPrimalPaging
import net.primal.android.notes.db.FeedPostRemoteKey
import timber.log.Timber

@OptIn(ExperimentalPagingApi::class)
class ArticleFeedMediator(
Expand All @@ -25,49 +33,156 @@ class ArticleFeedMediator(
private val dispatcherProvider: CoroutineDispatcherProvider,
) : RemoteMediator<Int, Article>() {

private val lastRequests: MutableMap<LoadType, Pair<ArticleFeedRequestBody, Long>> = mutableMapOf()

override suspend fun initialize(): InitializeAction {
val latestRemoteKey = withContext(dispatcherProvider.io()) {
database.feedPostsRemoteKeys().findLatestByDirective(directive = feedSpec)
}

return latestRemoteKey?.let {
if (it.cachedAt.isTimestampOlderThan(duration = INITIALIZE_CACHE_EXPIRY)) {
InitializeAction.LAUNCH_INITIAL_REFRESH
} else {
InitializeAction.SKIP_INITIAL_REFRESH
}
} ?: InitializeAction.LAUNCH_INITIAL_REFRESH
}

@Suppress("ReturnCount")
override suspend fun load(loadType: LoadType, state: PagingState<Int, Article>): MediatorResult {
val pageSize = state.config.pageSize
return try {
val response = withContext(dispatcherProvider.io()) {
retryNetworkCall {
articlesApi.getArticleFeed(
body = ArticleFeedRequestBody(
spec = feedSpec,
userId = userId,
limit = pageSize,
),
)
val nextUntil = when (loadType) {
LoadType.APPEND -> findLastRemoteKey(state = state)?.sinceId
?: run {
Timber.d("APPEND no remote key found exit.")
return MediatorResult.Success(endOfPaginationReached = true)
}

LoadType.PREPEND -> {
Timber.d("PREPEND end of pagination exit.")
return MediatorResult.Success(endOfPaginationReached = true)
}

LoadType.REFRESH -> null
}

return try {
val response = fetchArticles(
pageSize = state.config.pageSize,
nextUntil = nextUntil,
loadType = loadType,
)

processAndPersistToDatabase(response = response, clearFeed = loadType == LoadType.REFRESH)

MediatorResult.Success(endOfPaginationReached = false)
} catch (error: WssException) {
MediatorResult.Error(error)
} catch (_: RepeatingRequestBodyException) {
Timber.d("RepeatingRequestBody exit.")
MediatorResult.Success(endOfPaginationReached = true)
}
}

@Throws(RepeatingRequestBodyException::class)
private suspend fun fetchArticles(
pageSize: Int,
nextUntil: Long?,
loadType: LoadType,
): ArticleResponse {
val request = ArticleFeedRequestBody(
spec = feedSpec,
userId = userId,
limit = pageSize,
until = nextUntil,
)

lastRequests[loadType]?.let { (lastRequest, lastRequestAt) ->
if (request == lastRequest && !lastRequestAt.isRequestCacheExpired() && loadType != LoadType.REFRESH) {
throw RepeatingRequestBodyException()
}
}

val connections = response.articles.mapNotNullAsArticleDataPO().map {
val response = withContext(dispatcherProvider.io()) {
retryNetworkCall {
articlesApi.getArticleFeed(
body = request,
)
}
}

lastRequests[loadType] = request to Instant.now().epochSecond
return response
}

private suspend fun findLastRemoteKey(state: PagingState<Int, Article>): FeedPostRemoteKey? {
val lastItemATag = state.lastItemOrNull()?.data?.aTag
?: findLastItemOrNull()?.articleATag

return withContext(dispatcherProvider.io()) {
lastItemATag?.let { database.feedPostsRemoteKeys().findByEventId(eventId = lastItemATag) }
?: database.feedPostsRemoteKeys().findLatestByDirective(directive = feedSpec)
}
}

private suspend fun findLastItemOrNull(): ArticleFeedCrossRef? =
withContext(dispatcherProvider.io()) {
database.articleFeedsConnections().findLastBySpec(spec = feedSpec)
}

private suspend fun processAndPersistToDatabase(response: ArticleResponse, clearFeed: Boolean) {
val connections = response.articles
.orderByPagingIfNotNull(pagingEvent = response.paging)
.mapNotNullAsArticleDataPO().map {
ArticleFeedCrossRef(
spec = feedSpec,
articleId = it.articleId,
articleATag = it.aTag,
articleAuthorId = it.authorId,
)
}

withContext(dispatcherProvider.io()) {
database.withTransaction {
if (loadType == LoadType.REFRESH) {
database.articleFeedsConnections().deleteConnectionsBySpec(spec = feedSpec)
}
withContext(dispatcherProvider.io()) {
database.withTransaction {
if (clearFeed) {
database.feedPostsRemoteKeys().deleteByDirective(feedSpec)
database.articleFeedsConnections().deleteConnectionsBySpec(spec = feedSpec)
}

if (connections.isNotEmpty()) {
database.articleFeedsConnections().connect(data = connections)
}
database.articleFeedsConnections().connect(data = connections)

response.persistToDatabaseAsTransaction(
userId = userId,
database = database,
)
}
response.persistToDatabaseAsTransaction(
userId = userId,
database = database,
)
}

MediatorResult.Success(endOfPaginationReached = true)
} catch (error: WssException) {
MediatorResult.Error(error)
connections.processRemoteKeys(pagingEvent = response.paging)
}
}

private fun List<ArticleFeedCrossRef>.processRemoteKeys(pagingEvent: ContentPrimalPaging?) {
if (pagingEvent?.sinceId != null && pagingEvent.untilId != null) {
val remoteKeys = this.map {
FeedPostRemoteKey(
eventId = it.articleATag,
directive = feedSpec,
sinceId = pagingEvent.sinceId,
untilId = pagingEvent.untilId,
cachedAt = Instant.now().epochSecond,
)
}
database.feedPostsRemoteKeys().upsert(remoteKeys)
}
}

private fun Long.isTimestampOlderThan(duration: Long) = (Instant.now().epochSecond - this) > duration

private fun Long.isRequestCacheExpired() = isTimestampOlderThan(duration = LAST_REQUEST_EXPIRY)

private inner class RepeatingRequestBodyException : RuntimeException()

companion object {
private val LAST_REQUEST_EXPIRY = 10.seconds.inWholeSeconds
private val INITIALIZE_CACHE_EXPIRY = 3.minutes.inWholeSeconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ interface ArticleDao {
CASE WHEN MutedUserData.userId IS NOT NULL THEN 1 ELSE 0 END AS isMuted
FROM ArticleData
INNER JOIN ArticleFeedCrossRef
ON ArticleFeedCrossRef.articleId = ArticleData.articleId
ON ArticleFeedCrossRef.articleATag = ArticleData.aTag
AND ArticleFeedCrossRef.articleAuthorId = ArticleData.authorId
LEFT JOIN EventUserStats ON EventUserStats.eventId = ArticleData.eventId AND EventUserStats.userId = :userId
LEFT JOIN MutedUserData ON MutedUserData.userId = ArticleData.authorId
WHERE ArticleFeedCrossRef.spec = :spec AND isMuted = 0
ORDER BY ArticleData.publishedAt DESC
ORDER BY ArticleFeedCrossRef.position ASC
""",
)
fun feed(spec: String, userId: String): PagingSource<Int, Article>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@ package net.primal.android.articles.db

import androidx.room.Entity
import androidx.room.Index
import androidx.room.PrimaryKey

@Entity(
primaryKeys = [
"spec",
"articleId",
"articleAuthorId",
],
indices = [
Index(value = ["spec"]),
Index(value = ["articleId"]),
Index(value = ["articleAuthorId"]),
Index(value = ["spec", "articleATag", "articleAuthorId"], unique = true),
],
)
data class ArticleFeedCrossRef(
@PrimaryKey(autoGenerate = true)
val position: Long = 0,
val spec: String,
val articleId: String,
val articleATag: String,
val articleAuthorId: String,
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ interface ArticleFeedCrossRefDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun connect(data: List<ArticleFeedCrossRef>)

@Query("SELECT * FROM ArticleFeedCrossRef WHERE spec = :spec ORDER BY position DESC LIMIT 1")
fun findLastBySpec(spec: String): ArticleFeedCrossRef?

@Query("DELETE FROM ArticleFeedCrossRef WHERE spec = :spec")
fun deleteConnectionsBySpec(spec: String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ import net.primal.android.wallet.db.WalletTransactionData
ArticleFeedCrossRef::class,
HighlightData::class,
],
version = 56,
version = 57,
exportSchema = true,
)
@TypeConverters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ class NostrSocketClient(
chunks.forEachIndexed { index, chunk ->
val prefix = if (incoming) "<--" else "-->"
val suffix = if (index == chunksCount - 1) "[$url]" else ""
Timber.d("$prefix $chunk $suffix")
Timber.tag(if (incoming) "NostrSocketClientIncoming" else "NostrSocketClientOutgoing")
.d("$prefix $chunk $suffix")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package net.primal.android.nostr.ext

import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.decodeFromJsonElement
import net.primal.android.core.serialization.json.NostrJson
import net.primal.android.nostr.model.NostrEvent
import net.primal.android.nostr.model.primal.PrimalEvent
import net.primal.android.nostr.model.primal.content.ContentPrimalPaging
import timber.log.Timber

fun JsonObject?.asNostrEventOrNull(): NostrEvent? {
Expand All @@ -27,3 +27,10 @@ fun JsonObject?.asPrimalEventOrNull(): PrimalEvent? {
null
}
}

fun List<NostrEvent>.orderByPagingIfNotNull(pagingEvent: ContentPrimalPaging?): List<NostrEvent> {
if (pagingEvent == null) return this

val eventsMap = this.associateBy { it.id }
return pagingEvent.elements.mapNotNull { eventsMap[it] }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ interface FeedPostRemoteKeyDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun upsert(data: List<FeedPostRemoteKey>)

@Query("SELECT * FROM FeedPostRemoteKey WHERE eventId = :eventId LIMIT 1")
fun findByEventId(eventId: String): FeedPostRemoteKey?

@Query(
"""
SELECT * FROM FeedPostRemoteKey
Expand All @@ -23,6 +26,9 @@ interface FeedPostRemoteKeyDao {
directive: String,
): FeedPostRemoteKey?

@Query("SELECT * FROM FeedPostRemoteKey WHERE (directive = :directive) ORDER BY cachedAt DESC LIMIT 1")
fun findLatestByDirective(directive: String): FeedPostRemoteKey?

@Query("SELECT cachedAt FROM FeedPostRemoteKey WHERE (directive = :directive) ORDER BY cachedAt DESC LIMIT 1")
fun lastCachedAt(directive: String): Long?

Expand Down

0 comments on commit 535ae09

Please sign in to comment.