From c724a9e02284401c4c4ab3bc7c368c5c80500652 Mon Sep 17 00:00:00 2001 From: Jays2Kings Date: Sat, 10 Apr 2021 19:36:46 -0400 Subject: [PATCH] Fixing crashes in migration + rx bridge from upstream Co-Authored-By: arkon <4098258+arkon@users.noreply.github.com> --- .../data/library/LibraryUpdateService.kt | 6 +- .../eu/kanade/tachiyomi/source/LocalSource.kt | 10 ++- .../java/eu/kanade/tachiyomi/source/Source.kt | 20 ++--- .../manga/process/MigrationListController.kt | 2 +- .../util/system/RxCoroutineBridge.kt | 85 +++++++++++++++++++ 5 files changed, 104 insertions(+), 19 deletions(-) create mode 100644 app/src/main/java/eu/kanade/tachiyomi/util/system/RxCoroutineBridge.kt diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt b/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt index 54729c8a03..5b05933d91 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/library/LibraryUpdateService.kt @@ -16,6 +16,7 @@ import eu.kanade.tachiyomi.data.database.models.Category import eu.kanade.tachiyomi.data.database.models.Chapter import eu.kanade.tachiyomi.data.database.models.LibraryManga import eu.kanade.tachiyomi.data.database.models.Manga +import eu.kanade.tachiyomi.data.database.models.toMangaInfo import eu.kanade.tachiyomi.data.download.DownloadManager import eu.kanade.tachiyomi.data.download.DownloadService import eu.kanade.tachiyomi.data.image.coil.MangaFetcher @@ -28,6 +29,7 @@ import eu.kanade.tachiyomi.data.track.TrackManager import eu.kanade.tachiyomi.source.SourceManager import eu.kanade.tachiyomi.source.model.SManga import eu.kanade.tachiyomi.source.model.toMangaInfo +import eu.kanade.tachiyomi.source.model.toSChapter import eu.kanade.tachiyomi.source.model.toSManga import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.util.chapter.syncChaptersWithSource @@ -396,8 +398,8 @@ class LibraryUpdateService( notifier.showProgressNotification(manga, progress, mangaToUpdate.size) val source = sourceManager.get(manga.source) as? HttpSource ?: return false val fetchedChapters = withContext(Dispatchers.IO) { - source.fetchChapterList(manga).toBlocking().single() - } ?: emptyList() + source.getChapterList(manga.toMangaInfo()).map { it.toSChapter() } + } if (fetchedChapters.isNotEmpty()) { val newChapters = syncChaptersWithSource(db, fetchedChapters, manga, source) if (newChapters.first.isNotEmpty()) { diff --git a/app/src/main/java/eu/kanade/tachiyomi/source/LocalSource.kt b/app/src/main/java/eu/kanade/tachiyomi/source/LocalSource.kt index 5b68d14d4d..2b2bc29890 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/source/LocalSource.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/source/LocalSource.kt @@ -5,12 +5,15 @@ import com.google.gson.Gson import com.google.gson.GsonBuilder import com.google.gson.JsonObject import eu.kanade.tachiyomi.R +import eu.kanade.tachiyomi.data.database.models.toMangaInfo import eu.kanade.tachiyomi.source.model.Filter import eu.kanade.tachiyomi.source.model.FilterList import eu.kanade.tachiyomi.source.model.MangasPage import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.source.model.SChapter import eu.kanade.tachiyomi.source.model.SManga +import eu.kanade.tachiyomi.source.model.toMangaInfo +import eu.kanade.tachiyomi.source.model.toSChapter import eu.kanade.tachiyomi.util.chapter.ChapterRecognition import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder import eu.kanade.tachiyomi.util.storage.DiskUtil @@ -18,6 +21,7 @@ import eu.kanade.tachiyomi.util.storage.EpubFile import eu.kanade.tachiyomi.util.system.ImageUtil import junrar.Archive import junrar.rarfile.FileHeader +import kotlinx.coroutines.runBlocking import rx.Observable import timber.log.Timber import java.io.File @@ -124,7 +128,7 @@ class LocalSource(private val context: Context) : CatalogueSource { // Copy the cover from the first chapter found. if (thumbnail_url == null) { - val chapters = fetchChapterList(this).toBlocking().first() + val chapters = runBlocking { getChapterList(toMangaInfo()).map { it.toSChapter() } } if (chapters.isNotEmpty()) { try { val dest = updateCover(chapters.last(), this) @@ -172,7 +176,7 @@ class LocalSource(private val context: Context) : CatalogueSource { // Copy the cover from the first chapter found. if (manga.thumbnail_url == null) { - val chapters = fetchChapterList(manga).toBlocking().first() + val chapters = runBlocking { getChapterList(manga.toMangaInfo()).map { it.toSChapter() } } if (chapters.isNotEmpty()) { try { val dest = updateCover(chapters.last(), manga) @@ -186,7 +190,7 @@ class LocalSource(private val context: Context) : CatalogueSource { } fun updateMangaInfo(manga: SManga) { - val directory = getBaseDirectories(context).mapNotNull { File(it, manga.url) }.find { + val directory = getBaseDirectories(context).map { File(it, manga.url) }.find { it.exists() } ?: return val gson = GsonBuilder().setPrettyPrinting().create() diff --git a/app/src/main/java/eu/kanade/tachiyomi/source/Source.kt b/app/src/main/java/eu/kanade/tachiyomi/source/Source.kt index 6772622897..d13fbbf242 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/source/Source.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/source/Source.kt @@ -10,6 +10,7 @@ import eu.kanade.tachiyomi.source.model.toMangaInfo import eu.kanade.tachiyomi.source.model.toPageUrl import eu.kanade.tachiyomi.source.model.toSChapter import eu.kanade.tachiyomi.source.model.toSManga +import eu.kanade.tachiyomi.util.system.awaitSingle import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import rx.Observable @@ -65,12 +66,10 @@ interface Source : tachiyomi.source.Source { */ @Suppress("DEPRECATION") override suspend fun getMangaDetails(manga: MangaInfo): MangaInfo { - return withContext(Dispatchers.IO) { - val sManga = manga.toSManga() - val networkManga = fetchMangaDetails(sManga).toBlocking().single() - sManga.copyFrom(networkManga) - sManga.toMangaInfo() - } + val sManga = manga.toSManga() + val networkManga = fetchMangaDetails(sManga).awaitSingle() + sManga.copyFrom(networkManga) + return sManga.toMangaInfo() } /** @@ -78,9 +77,7 @@ interface Source : tachiyomi.source.Source { */ @Suppress("DEPRECATION") override suspend fun getChapterList(manga: MangaInfo): List { - return withContext(Dispatchers.IO) { - fetchChapterList(manga.toSManga()).toBlocking().single().map { it.toChapterInfo() } - } + return fetchChapterList(manga.toSManga()).awaitSingle().map { it.toChapterInfo() } } /** @@ -88,10 +85,7 @@ interface Source : tachiyomi.source.Source { */ @Suppress("DEPRECATION") override suspend fun getPageList(chapter: ChapterInfo): List { - return withContext(Dispatchers.IO) { - fetchPageList(chapter.toSChapter()).toBlocking().single() - .map { it.toPageUrl() } - } + return fetchPageList(chapter.toSChapter()).awaitSingle().map { it.toPageUrl() } } } diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/migration/manga/process/MigrationListController.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/migration/manga/process/MigrationListController.kt index df881eb9b6..899db83744 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/migration/manga/process/MigrationListController.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/migration/manga/process/MigrationListController.kt @@ -356,8 +356,8 @@ class MigrationListController(bundle: Bundle? = null) : launchUI { val result = CoroutineScope(migratingManga.manga.migrationJob).async { val localManga = smartSearchEngine.networkToLocalManga(manga, source.id) - val chapters = source.getChapterList(localManga.toMangaInfo()).map { it.toSChapter() } try { + val chapters = source.getChapterList(localManga.toMangaInfo()).map { it.toSChapter() } syncChaptersWithSource(db, chapters, localManga, source) } catch (e: Exception) { return@async null diff --git a/app/src/main/java/eu/kanade/tachiyomi/util/system/RxCoroutineBridge.kt b/app/src/main/java/eu/kanade/tachiyomi/util/system/RxCoroutineBridge.kt new file mode 100644 index 0000000000..eff6c82835 --- /dev/null +++ b/app/src/main/java/eu/kanade/tachiyomi/util/system/RxCoroutineBridge.kt @@ -0,0 +1,85 @@ +package eu.kanade.tachiyomi.util.system + +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import rx.Emitter +import rx.Observable +import rx.Subscriber +import rx.Subscription +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/* + * Util functions for bridging RxJava and coroutines. Taken from TachiyomiEH/SY. + */ + +suspend fun Observable.awaitSingle(): T = single().awaitOne() + +private suspend fun Observable.awaitOne(): T = suspendCancellableCoroutine { cont -> + cont.unsubscribeOnCancellation( + subscribe( + object : Subscriber() { + override fun onStart() { + request(1) + } + + override fun onNext(t: T) { + cont.resume(t) + } + + override fun onCompleted() { + if (cont.isActive) cont.resumeWithException( + IllegalStateException( + "Should have invoked onNext" + ) + ) + } + + override fun onError(e: Throwable) { + /* + * Rx1 observable throws NoSuchElementException if cancellation happened before + * element emission. To mitigate this we try to atomically resume continuation with exception: + * if resume failed, then we know that continuation successfully cancelled itself + */ + val token = cont.tryResumeWithException(e) + if (token != null) { + cont.completeResume(token) + } + } + } + ) + ) +} + +internal fun CancellableContinuation.unsubscribeOnCancellation(sub: Subscription) = + invokeOnCancellation { sub.unsubscribe() } + +fun runAsObservable( + block: suspend () -> T, + backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE +): Observable { + return Observable.create( + { emitter -> + val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) { + try { + emitter.onNext(block()) + emitter.onCompleted() + } catch (e: Throwable) { + // Ignore `CancellationException` as error, since it indicates "normal cancellation" + if (e !is CancellationException) { + emitter.onError(e) + } else { + emitter.onCompleted() + } + } + } + emitter.setCancellation { job.cancel() } + }, + backpressureMode + ) +}