Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working on the code lab to understand coroutines and channels. #36

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/contributors/Contributors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ enum class Variant {
CHANNELS // Request7Channels
}

interface Contributors: CoroutineScope {
interface Contributors : CoroutineScope {

val job: Job

Expand Down Expand Up @@ -58,38 +58,46 @@ interface Contributors: CoroutineScope {
val users = loadContributorsBlocking(service, req)
updateResults(users, startTime)
}

BACKGROUND -> { // Blocking a background thread
loadContributorsBackground(service, req) { users ->
SwingUtilities.invokeLater {
updateResults(users, startTime)
}
}
}

CALLBACKS -> { // Using callbacks
loadContributorsCallbacks(service, req) { users ->
SwingUtilities.invokeLater {
updateResults(users, startTime)
}
}
}

SUSPEND -> { // Using coroutines
launch {
val users = loadContributorsSuspend(service, req)
updateResults(users, startTime)
}.setUpCancellation()
}

CONCURRENT -> { // Performing requests concurrently
launch {
launch(Dispatchers.Default) {
val users = loadContributorsConcurrent(service, req)
updateResults(users, startTime)
withContext(Dispatchers.Main) {
updateResults(users, startTime)
}
}.setUpCancellation()
}

NOT_CANCELLABLE -> { // Performing requests in a non-cancellable way
launch {
val users = loadContributorsNotCancellable(service, req)
updateResults(users, startTime)
}.setUpCancellation()
}

PROGRESS -> { // Showing progress
launch(Dispatchers.Default) {
loadContributorsProgress(service, req) { users, completed ->
Expand All @@ -99,6 +107,7 @@ interface Contributors: CoroutineScope {
}
}.setUpCancellation()
}

CHANNELS -> { // Performing requests concurrently and showing progress
launch(Dispatchers.Default) {
loadContributorsChannels(service, req) { users, completed ->
Expand Down Expand Up @@ -178,8 +187,7 @@ interface Contributors: CoroutineScope {
val params = getParams()
if (params.username.isEmpty() && params.password.isEmpty()) {
removeStoredParams()
}
else {
} else {
saveParams(params)
}
}
Expand Down
13 changes: 6 additions & 7 deletions src/contributors/GitHubService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,24 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.OkHttpClient
import retrofit2.Call
import retrofit2.Response
import retrofit2.Retrofit
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory
import retrofit2.http.GET
import retrofit2.http.Path
import java.util.Base64
import java.util.*

interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall(
suspend fun getOrgReposCall(
@Path("org") org: String
): Call<List<Repo>>
): Response<List<Repo>>

@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall(
suspend fun getRepoContributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<User>>
): Response<List<User>>
}

@Serializable
Expand All @@ -36,7 +35,7 @@ data class Repo(
@Serializable
data class User(
val login: String,
val contributions: Int
var contributions: Int
)

@Serializable
Expand Down
9 changes: 7 additions & 2 deletions src/tasks/Aggregation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,10 @@ TODO: Write aggregation code.
The corresponding test can be found in test/tasks/AggregationKtTest.kt.
You can use 'Navigate | Test' menu action (note the shortcut) to navigate to the test.
*/
fun List<User>.aggregate(): List<User> =
this
fun List<User>.aggregate(): List<User> {
return this
.groupBy { user -> user.login }
.map { (_, users) ->
users.first().copy(contributions = users.sumOf { user -> user.contributions })
}.sortedByDescending { user -> user.contributions }
}
35 changes: 19 additions & 16 deletions src/tasks/Request1Blocking.kt
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
package tasks

import contributors.*
import contributors.GitHubService
import contributors.RequestData
import contributors.User
import retrofit2.Response

fun loadContributorsBlocking(service: GitHubService, req: RequestData) : List<User> {
val repos = service
.getOrgReposCall(req.org)
.execute() // Executes request and blocks the current thread
.also { logRepos(req, it) }
.body() ?: emptyList()

return repos.flatMap { repo ->
service
.getRepoContributorsCall(req.org, repo.name)
.execute() // Executes request and blocks the current thread
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
fun loadContributorsBlocking(service: GitHubService, req: RequestData): List<User> {
// val repos = service
// .getOrgReposCall(req.org)
// .execute() // Executes request and blocks the current thread
// .also { logRepos(req, it) }
// .bodyList()
//
// return repos.flatMap { repo ->
// service
// .getRepoContributorsCall(req.org, repo.name)
// .execute() // Executes request and blocks the current thread
// .also { logUsers(repo, it) }
// .bodyList()
// }.aggregate()
return emptyList()
}

fun <T> Response<List<T>>.bodyList(): List<T> {
return body() ?: emptyList()
}
}
4 changes: 2 additions & 2 deletions src/tasks/Request2Background.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import kotlin.concurrent.thread

fun loadContributorsBackground(service: GitHubService, req: RequestData, updateResults: (List<User>) -> Unit) {
thread {
loadContributorsBlocking(service, req)
updateResults(loadContributorsBlocking(service, req))
}
}
}
37 changes: 20 additions & 17 deletions src/tasks/Request3Callbacks.kt
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
package tasks

import contributors.*
import contributors.GitHubService
import contributors.RequestData
import contributors.User
import contributors.log
import retrofit2.Call
import retrofit2.Callback
import retrofit2.Response
import java.util.*
import java.util.concurrent.atomic.AtomicInteger

fun loadContributorsCallbacks(service: GitHubService, req: RequestData, updateResults: (List<User>) -> Unit) {
service.getOrgReposCall(req.org).onResponse { responseRepos ->
logRepos(req, responseRepos)
val repos = responseRepos.bodyList()
val allUsers = mutableListOf<User>()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
}
}
// TODO: Why this code doesn't work? How to fix that?
updateResults(allUsers.aggregate())
}
// service.getOrgReposCall(req.org).onResponse { responseRepos ->
// logRepos(req, responseRepos)
// val repos = responseRepos.bodyList()
// val allUsers = Collections.synchronizedList(mutableListOf<User>())
// val countDownLatch = CountDownLatch(repos.size)
// for (repo in repos) {
// service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
// logUsers(repo, responseUsers)
// val users = responseUsers.bodyList()
// allUsers += users
// countDownLatch.countDown()
// }
// }
// countDownLatch.await()
// updateResults(allUsers.aggregate())
// }
}

inline fun <T> Call<T>.onResponse(crossinline callback: (Response<T>) -> Unit) {
Expand Down
13 changes: 11 additions & 2 deletions src/tasks/Request4Suspend.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,14 @@ package tasks
import contributors.*

suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> {
TODO()
}
val repos = service
.getOrgReposCall(req.org)
.also { logRepos(req, it) }
.bodyList()

return repos.flatMap { repo ->
service.getRepoContributorsCall(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
23 changes: 20 additions & 3 deletions src/tasks/Request5Concurrent.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
package tasks

import contributors.*
import kotlinx.coroutines.*
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope

suspend fun loadContributorsConcurrent(service: GitHubService, req: RequestData): List<User> = coroutineScope {
TODO()
}
val repos = service
.getOrgReposCall(req.org)
.also { logRepos(req, it) }
.bodyList()

val deferredListOfUsers = repos.map { repo ->
async {
log("starting loading for ${repo.name}")
service.getRepoContributorsCall(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
}

val list = deferredListOfUsers.awaitAll()
list.flatten().aggregate()
}