Do library updates from up to 5 sources concurrently (but in coroutines and queue system)

0 help from arkon on this one
pull/3372/head
Jay 4 years ago
parent 7a8f9373ba
commit f3d4e87542

@ -43,7 +43,12 @@ import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext
import rx.Observable
import rx.Subscription
@ -100,11 +105,28 @@ class LibraryUpdateService(
private val mangaToUpdate = mutableListOf<LibraryManga>()
private val mangaToUpdateMap = mutableMapOf<Long, List<LibraryManga>>()
private val categoryIds = mutableSetOf<Int>()
// List containing new updates
private val newUpdates = mutableMapOf<LibraryManga, Array<Chapter>>()
val count = AtomicInteger(0)
val jobCount = AtomicInteger(0)
// List containing categories that get included in downloads.
private val categoriesToDownload =
preferences.downloadNewCategories().getOrDefault().map(String::toInt)
// Boolean to determine if user wants to automatically download new chapters.
private val downloadNew: Boolean = preferences.downloadNew().getOrDefault()
// Boolean to determine if DownloadManager has downloads
private var hasDownloads = false
private val requestSemaphore = Semaphore(5)
// For updates delete removed chapters if not preference is set as well
private val deleteRemoved by lazy {
preferences.deleteRemovedChapters().get() != 1
@ -116,15 +138,10 @@ class LibraryUpdateService(
private val progressNotification by lazy {
NotificationCompat.Builder(this, Notifications.CHANNEL_LIBRARY)
.setContentTitle(getString(R.string.app_name))
.setSmallIcon(R.drawable.ic_refresh_white_24dp_img)
.setLargeIcon(notificationBitmap)
.setOngoing(true)
.setOnlyAlertOnce(true)
.setColor(ContextCompat.getColor(this, R.color.colorAccent))
.addAction(
R.drawable.ic_clear_grey_24dp_img,
getString(android.R.string.cancel),
cancelIntent
.setSmallIcon(R.drawable.ic_refresh_white_24dp_img).setLargeIcon(notificationBitmap)
.setOngoing(true).setOnlyAlertOnce(true)
.setColor(ContextCompat.getColor(this, R.color.colorAccent)).addAction(
R.drawable.ic_clear_grey_24dp_img, getString(android.R.string.cancel), cancelIntent
)
}
@ -132,6 +149,7 @@ class LibraryUpdateService(
* Defines what should be updated within a service execution.
*/
enum class Target {
CHAPTERS, // Manga chapters
DETAILS, // Manga metadata
TRACKING // Tracking metadata
@ -206,14 +224,38 @@ class LibraryUpdateService(
}
fun removeListener(listener: LibraryServiceListener) {
if (this.listener == listener)
this.listener = null
if (this.listener == listener) this.listener = null
}
}
private fun addManga(mangaToAdd: List<LibraryManga>) {
for (manga in mangaToAdd) {
if (mangaToUpdate.none { it.id == manga.id }) mangaToUpdate.add(manga)
val distinctManga = mangaToAdd.filter { it !in mangaToUpdate }
mangaToUpdate.addAll(distinctManga)
distinctManga.groupBy { it.source }.forEach {
// if added queue items is a new source not in the async list or an async list has
// finished running
if (mangaToUpdateMap[it.key].isNullOrEmpty()) {
mangaToUpdateMap[it.key] = it.value
jobCount.andIncrement
val handler = CoroutineExceptionHandler { _, exception ->
Timber.e(exception)
}
GlobalScope.launch(handler) {
val hasDLs = requestSemaphore.withPermit {
updateMangaInSource(
it.key, downloadNew, categoriesToDownload
)
}
hasDownloads = hasDownloads || hasDLs
jobCount.andDecrement
if (job?.isCancelled != false) {
finishUpdates()
}
}
} else {
val list = mangaToUpdateMap[it.key] ?: emptyList()
mangaToUpdateMap[it.key] = (list + it.value)
}
}
}
@ -355,50 +397,73 @@ class LibraryUpdateService(
}
private suspend fun updateChaptersJob(mangaToAdd: List<LibraryManga>) {
// List containing categories that get included in downloads.
val categoriesToDownload =
preferences.downloadNewCategories().getOrDefault().map(String::toInt)
// Boolean to determine if user wants to automatically download new chapters.
val downloadNew = preferences.downloadNew().getOrDefault()
// Boolean to determine if DownloadManager has downloads
var hasDownloads = false
// Initialize the variables holding the progress of the updates.
var count = 0
mangaToUpdate.addAll(mangaToAdd)
while (count < mangaToUpdate.size) {
val shouldDownload = (downloadNew && (categoriesToDownload.isEmpty() ||
mangaToUpdate[count].category in categoriesToDownload ||
db.getCategoriesForManga(mangaToUpdate[count]).executeOnIO()
.any { (it.id ?: -1) in categoriesToDownload }))
if (updateMangaChapters(mangaToUpdate[count], count, shouldDownload)) {
hasDownloads = true
mangaToUpdateMap.putAll(mangaToAdd.groupBy { it.source })
coroutineScope {
jobCount.andIncrement
val list = mangaToUpdateMap.keys.map { source ->
async {
requestSemaphore.withPermit {
updateMangaInSource(source, downloadNew, categoriesToDownload)
}
}
}
count++
val results = list.awaitAll()
hasDownloads = hasDownloads || results.any { it }
jobCount.andDecrement
finishUpdates()
}
}
private fun finishUpdates() {
if (jobCount.get() != 0) return
if (newUpdates.isNotEmpty()) {
showResultNotification(newUpdates)
if (preferences.refreshCoversToo().getOrDefault() && job?.isCancelled == false) {
updateDetails(newUpdates.map { it.key }).observeOn(Schedulers.io())
.doOnCompleted {
updateDetails(newUpdates.map { it.key }).observeOn(Schedulers.io()).doOnCompleted {
cancelProgressNotification()
if (downloadNew && hasDownloads) {
DownloadService.start(this)
}
}
.subscribeOn(Schedulers.io()).subscribe {}
}.subscribeOn(Schedulers.io()).subscribe {}
} else if (downloadNew && hasDownloads) {
DownloadService.start(this)
}
}
cancelProgressNotification()
}
private suspend fun updateMangaInSource(
source: Long,
downloadNew: Boolean,
categoriesToDownload: List<Int>
): Boolean {
if (mangaToUpdateMap[source] == null) return false
var count = 0
var hasDownloads = false
while (count < mangaToUpdateMap[source]!!.size) {
val shouldDownload =
(downloadNew && (categoriesToDownload.isEmpty() || mangaToUpdateMap[source]!![count].category in categoriesToDownload || db.getCategoriesForManga(
mangaToUpdateMap[source]!![count]
).executeOnIO().any { (it.id ?: -1) in categoriesToDownload }))
if (updateMangaChapters(
mangaToUpdateMap[source]!![count], this.count.andIncrement, shouldDownload
)
) {
hasDownloads = true
}
count++
}
mangaToUpdateMap[source] = emptyList()
return hasDownloads
}
private suspend fun updateMangaChapters(
manga: LibraryManga,
progess: Int,
progress: Int,
shouldDownload: Boolean
):
Boolean {
@ -407,7 +472,7 @@ class LibraryUpdateService(
if (job?.isCancelled == true) {
throw java.lang.Exception("Job was cancelled")
}
showProgressNotification(manga, progess, mangaToUpdate.size)
showProgressNotification(manga, progress, mangaToUpdate.size)
val source = sourceManager.get(manga.source) as? HttpSource ?: return false
val fetchedChapters = withContext(Dispatchers.IO) {
source.fetchChapterList(manga).toBlocking().single()
@ -441,7 +506,7 @@ class LibraryUpdateService(
}
}
fun downloadChapters(manga: Manga, chapters: List<Chapter>) {
private fun downloadChapters(manga: Manga, chapters: List<Chapter>) {
// we need to get the chapters from the db so we have chapter ids
val mangaChapters = db.getChapters(manga).executeAsBlocking()
val dbChapters = chapters.map {

Loading…
Cancel
Save