From 16f7626c1151e2cced8ffa817f524e24140fbf32 Mon Sep 17 00:00:00 2001 From: Jonny007-MKD Date: Fri, 3 Apr 2020 22:19:39 +0200 Subject: [PATCH] Reimplement sanerenamix in python3 --- editSeriesCache.py | 28 +++ saneRenamix.py | 473 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 501 insertions(+) create mode 100644 editSeriesCache.py create mode 100644 saneRenamix.py diff --git a/editSeriesCache.py b/editSeriesCache.py new file mode 100644 index 0000000..70a2899 --- /dev/null +++ b/editSeriesCache.py @@ -0,0 +1,28 @@ +import os +import pickle +import sys + +workingDir = os.path.dirname(os.path.realpath(__file__)) +path = os.path.join(workingDir, "series.cache") + +def loadCache(): + if not os.path.isfile(path): return None + try: + with open(path, 'rb') as f: + cache = pickle.load(f) + return cache + except Exception as e: + return None + +def writeCache(cache): + with open(path, 'wb') as f: + pickle.dump(cache, f) + +cache = loadCache() +print(cache) +sys.exit() +if cache: + cache.pop("Ein Fall für TKKG (2014)", None) + cache.pop("Ein Fall für TKKG", None) + cache.pop("Ein Fall fuer TKKG", None) + writeCache(cache) diff --git a/saneRenamix.py b/saneRenamix.py new file mode 100644 index 0000000..42e255f --- /dev/null +++ b/saneRenamix.py @@ -0,0 +1,473 @@ +#!/usr/bin/env python3 + + +class Config: + ApiKey = "2C9BB45EFB08AD3B" + ProductName = "SaneRenamix for OTR v1.0" + DefaultLanguage = "de" + +importErrors = [] +import argparse +import logging +import re +import pickle +import os +import csv +import sys +import requests +from datetime import datetime +try: + import urllib +except ModuleNotFoundError: + importErrors.append("urllib") +try: + import tvdbsimple as tvdb + tvdb.keys.API_KEY = "a76ca23091901c5bf5c32de77c29a52a" +except ModuleNotFoundError: + importErrors.append("tvdbsimple") +if importErrors: + raise ModuleNotFoundError('please pip install these modules: ' + ', '.join(importErrors)) + + +workingDir = os.path.dirname(os.path.realpath(__file__)) + + +class ExitCode: + GeneralError = 1 # invalid argument option, missing parameter + WrongArguments = 2 + Aborted = 3 # Ctrl+C + SeriesNotFoundInTvDB = 10 + SeriesNotFoundInEPG = 11 + SeveralPossibleSeriesFound = 12 + NoInfoForThisEpisode = 20 + NoEpisodeTitleFoundInEPG = 21 + DownloadingEPGFailed = 40 + DownloadingListFromTvDBFailed = 41 + +def parseArgs(): + parser = argparse.ArgumentParser(description = "Create a sane name for OTR media files\nby Leroy Foerster & Jonny007-MKD") + parser.add_argument("--file", "-f", type=str, required=True, help="Name of the file that shall be renamed") + parser.add_argument("--nocache", "-c", action="store_true", help="Disables the usage of the local cache") + parser.add_argument("--silent", "-s", action="store_true", help="Output only the new filename") + parser.add_argument("--language", "-l", type=str, help="Language code for TvDB (de, en, fr, ...)", default=Config.DefaultLanguage) + return parser.parse_args() + +class EpisodeInfo: + def __init__(self): + self.season = None # int + self.episode = None # int + self.seriesName = None # str + self.episodeTitle = None # str + self.maybeEpisodeTitle = None # str + self.datetime = None # datetime + self.sender = None # str + self.description = None # str + self.fileSuffix = None # str + + def __str__(self): + result = "" + if self.seriesName: result += self.seriesName + " " + if self.season: result += f"S{self.season:02d}" + if self.episode: result += f"E{self.episode:02d}" + if result: result += " " + if self.episodeTitle: result += self.episodeTitle + if result: return result + return f"{self.datetime} {self.sender} {self.description}" + + +def analyzeFilename(filename: str): + logging.debug(f"analyzeFilename({filename})") + + result = EpisodeInfo() + + def makeDatetime(date: str, time: str): + yy, mm, dd = date.split('.') + HH, MM = time.split('-') + return datetime(int(yy)+2000, int(mm), int(dd), int(HH), int(MM)) + + # S00_E00_Series + found = re.search(r"^S(\d\d)_E(\d\d)_", filename) + if found: + result.season = int(found.group(1)) + result.episode = int(found.group(2)) + filename = filename[len("S00_E00_"):] + logging.debug(f" found info at beginning: S{result.season:02d}E{result.episode:02d}. filename = {filename}") + + found = re.search(r"^(.*?)(_S(\d\d)E(\d\d))?_(\d\d.\d\d.\d\d)_(\d\d-\d\d)_([^_]+)_(\d+)_.+?\.(.+)", filename) + if not found: + raise Exception("Regex did not match filename") + result.seriesName = found.group(1).replace("_", " ") + result.datetime = makeDatetime(found.group(5), found.group(6)) + result.sender = found.group(7) + result.fileSuffix = found.group(9) + if found.group(2): + result.season = int(found.group(3)) + result.episode = int(found.group(4)) + logging.info(f" found info: {result.seriesName}, {result.datetime}, {result.sender}, S{result.season}E{result.episode}, {result.fileSuffix}") + return result + +def convertTitle(title: str, lang: str): + title = title.replace(" s ", "'s ") + if title.endswith(" s"): title = title[0:-2] + "'s" + + if lang == "de": + title = title.replace("Ae", "Ä").replace("Oe", "Ö").replace("Ue", "Ü") + title = title.replace("ae", "ä").replace("oe", "ö").replace("ue", "ü") + return title + +def getSeriesId(info: EpisodeInfo, args: dict): + logging.debug(f"getSeriesId()") + + def loadCache(): + if args.nocache: return None + logging.debug(f" loadCache()") + path = os.path.join(workingDir, "series.cache") + if not os.path.isfile(path): return None + try: + with open(path, 'rb') as f: + cache = pickle.load(f) + logging.debug(f" {len(cache)} entries loaded") + return cache + except Exception as e: + logging.debug(f" pickle load failed: {e}") + return None + + def fromCache(cache): + logging.debug(f" fromCache()") + if not cache: return (None, None) + + words = info.seriesName.split(' ') + for i in range(len(words), 0, -1): + title2 = " ".join(words[0:i]) + titles = set([title2, convertTitle(title2, args.language)]) + logging.debug(" trying '" + "' and '".join(titles) + "'") + + for title3 in titles: + if title3 in cache: + (id, niceName) = cache[title3] + logging.debug(f" found {id} as {niceName}") + return (id, niceName) + logging.debug(f" found nothing") + return (None, None) + + def fromTvdb(): + logging.debug(f" fromTvdb()") + + words = info.seriesName.split(' ') + regex = re.compile("[^a-zA-Z0-9 ]") + allResults = [] + for i in range(len(words), 0, -1): + title2 = " ".join(words[0:i]) + titles = set([title2, convertTitle(title2, args.language)]) + logging.debug(" trying '" + "' and '".join(titles) + "'") + + for title3 in titles: + try: + responses = None + responses = tvdb.Search().series(title3, language=args.language) + except requests.exceptions.HTTPError as e: + logging.debug(f" Exception {type(e)}: {e}") + if not responses: continue + allResults.extend(responses) + for r in responses: + if r["seriesName"] == title3: + logging.debug(f' found {r["id"]} as {r["seriesName"]}') + return r["id"], r["seriesName"] + title3 = regex.sub("", title3).lower() + for r in responses: + if regex.sub("", r["seriesName"]).lower() == title3: + logging.debug(f' found {r["id"]} as {r["seriesName"]}') + return r["id"], r["seriesName"] + + allIds = set([ r["id"] for r in allResults ]) + if len(allIds) > 1: + uniqueResults = {} + for r in allResults: + if r["id"] in uniqueResults: continue + uniqueResults[r["id"]] = r + logging.debug(f' found several series: {[ r["seriesName"] for r in uniqueResults.values()]}') + sys.exit(ExitCode.SeveralPossibleSeriesFound) + if len(allIds) == 1: + r = allResults[0] + logging.debug(f' found {r["id"]} as {r["seriesName"]}') + return r["id"], r["seriesName"] + + logging.debug(f" nothing found") + sys.exit(ExitCode.SeriesNotFoundInTvDB) + + def writeCache(id: int, names: list, cache: dict): + if args.nocache: return + logging.debug(f" writeCache({id}, {names})") + if not cache: + cache = {} + for name in names: + cache[name] = (id, niceName) + + path = os.path.join(workingDir, "series.cache") + with open(path, 'wb') as f: + pickle.dump(cache, f) + + def checkWhetherSeriesNameContainsEpisodeName(niceName: str): + regex = re.compile("[^a-zA-Z0-9 ]") + niceNames = set([niceName.lower(), regex.sub("", niceName.lower())]) + seriesNames = [] + for n in [info.seriesName, convertTitle(info.seriesName, args.language)]: + seriesNames.append(n.lower()) + seriesNames.append(regex.sub("", n.lower())) + seriesNames = set(seriesNames) + + for niceName2 in niceNames: + for seriesName in seriesNames: + if seriesName.startswith(niceName2): + numWordsNiceName = len(niceName2.split(" ")) + seriesNameWords = info.seriesName.split(" ") + info.maybeEpisodeTitle = " ".join(seriesNameWords[numWordsNiceName:]) + return " ".join(seriesNameWords[:numWordsNiceName]) + + + + # Load good series name. First from cache, then from TvDB + cache = loadCache() + (id, niceName) = fromCache(cache) + if id: + checkWhetherSeriesNameContainsEpisodeName(niceName) + else: + (id, niceName) = fromTvdb() + if id: + names = [ niceName ] + thirdName = checkWhetherSeriesNameContainsEpisodeName(niceName) + if thirdName: names.append(thirdName) + else: names.append(info.seriesName) + writeCache(id, names, cache) + + if not id: return None + + if id: + info.seriesName = niceName + + + return id + +def getEpgData(info: EpisodeInfo): + logging.debug("getEpgData()") + filename = f"epg-{info.datetime.strftime('%y.%m.%d')}.csv" + filepath = os.path.join(workingDir, filename) + + def downloadEpg(): + url = f"https://www.onlinetvrecorder.com/epg/csv/epg_{info.datetime.strftime('%Y_%m_%d')}.csv" + logging.debug(f" downloadEpg(): {url}") + request = urllib.request.urlopen(url) + if request.getcode() == 200: + data = request.read().decode('latin-1') + with open(filepath, 'w') as f: + f.write(data) + return data.split('\n') + else: + raise Exception(f"Downloading EPG data failed: {request.getcode()}") + + def loadEpgFromFile(): + if os.path.isfile(filepath): + logging.debug(f" loadEpgFromFile()") + with open(filepath) as f: + return f.readlines() + return None + + def findEpgEntry(data: list): + logging.debug(f" findEpgEntry()") + regex = re.compile("[^a-zA-Z0-9]") + beginn = info.datetime.strftime('%d.%m.%Y %H:%M:%S') + sender = regex.sub("", info.sender).lower() + + reader = csv.DictReader(data, delimiter=';') + for entry in reader: + #logging.debug(f' {entry["beginn"]} == {beginn} && {entry["sender"]} == {sender}?') + if not entry["beginn"] == beginn: continue + if not regex.sub("", entry["sender"]).lower() == sender: continue + logging.debug(f' found entry: {entry["text"]}') + return entry + return None + + data = loadEpgFromFile() + if not data: data = downloadEpg() + entry = findEpgEntry(data) + if entry: + def removePrefix(complete: str, prefix: str): + return complete[len(prefix):].strip(" \t-_,.") if complete.startswith(prefix) else complete + entry["text"] = entry["text"].strip(" \t-_,.") + entry["text"] = removePrefix(entry["text"], entry["titel"]) + entry["text"] = removePrefix(entry["text"], info.seriesName) + + info.title = entry["titel"] + info.description = entry["text"] + logging.debug(" set: {info.description}") + +class Episodes: + def __init__(self, seriesID: int, args: dict): + self.seriesID = seriesID + self.args = args + self.path = os.path.join(workingDir, f"episode-{seriesID}.cache") + self.fromCache = None + + def _loadCache(self): + if args.nocache: return None + logging.debug(f" loadCache()") + if not os.path.isfile(self.path): return None + try: + with open(self.path, 'rb') as f: + cache = pickle.load(f) + return cache + except Exception as e: + logging.debug(f" pickle load failed: {e}") + return None + + def _fromTvdb(self): + logging.debug(f" fromTvdb()") + + try: + episodes = tvdb.series.Series_Episodes(self.seriesID, language=self.args.language).all() + return episodes + except Exception as e: + logging.error(f" Exception: {e}") + return None + + def _writeCache(self, episodes): + if args.nocache: return + with open(self.path, 'wb') as f: + pickle.dump(episodes, f) + + def get(self): + episodes = None + # First call: try to load from cache + if not self.fromCache: + self.fromCache = True + episodes = self._loadCache() + if self.fromCache == True: + # Second call or if cache does not exist: try to load from TvDB + if not episodes: + episodes = self._fromTvdb() + if episodes: + self.fromCache = False + self._writeCache(episodes) + return episodes + + +def getEpisodeTitleFromEpgData(info: EpisodeInfo, seriesID: int, args: dict): + logging.debug(f"getEpisodeTitleFromEpgData()") + if not info.description: + logging.debug(f" no description") + return # Nothing we can do about it :( + E = Episodes(seriesID, args) + regex = re.compile("[^a-zA-Z0-9 ]") + + def get(dct: dict, keys: list): + for k in keys: + if k in dct: return dct[k] + return None + + def saveInfo(foundEpisode: dict): + info.season = get(foundEpisode, ["airedSeason", "dvdSeason"]) + info.episode = get(foundEpisode, ["airedEpisodeNumber", "dvdEpisodeNumber"]) + info.episodeTitle = foundEpisode["episodeName"] + logging.debug(f' found: S{info.season:02d}E{info.episode:02d}') + + d = info.description + def doSearch(searchFunc): + for delims in [ (".",""), (".",","), (",", ""), (",",".") ]: + if not delims[0] in d or not delims[1] in d: continue + split = d.split(delims[0]) + if delims[1]: split = split[1].split(delims[1]) + split = split[0].strip() + + words = split.split(" ") + for i in range(len(words)): # cut off one word at a time from the end + for j in range(len(words), i, -1): # cut off one word at a time from the beginning + title = " ".join(words[i:j]) + logging.debug(f' trying "{title}"') + found = searchFunc(title) + if found: + saveInfo(found) + return True + return False + + + for i in range(2): # Try once from cache and once from TvDB + episodes = E.get() + if not episodes: continue# Nothing we can do about it :( + episodesByName = { e["episodeName"]: e for e in episodes } + + if info.maybeEpisodeTitle and info.maybeEpisodeTitle in episodesByName: + saveInfo(episodesByName[info.maybeEpisodeTitle]) + return + + + logging.debug(" searching for a matching episode name (exactly)") + def searchByName(title: str): + return episodesByName.get(title, None) + found = doSearch(searchByName) + if found: return + + logging.debug(" searching for a matching episode name more liberally") + episodesByName2 = { regex.sub("", e["episodeName"]): e for e in episodes } + def searchByName2(title: str): + return episodesByName2.get(regex.sub("", title), None) + found = doSearch(searchByName2) + if found: return + + logging.debug(" searching for a matching description (startswith)") + def searchByOverview(overview: str): + results = [ e for e in episodes if e["overview"] and e["overview"].startswith(overview) ] + if len(results) == 1: return results[0] + return None + found = doSearch(searchByOverview) + if found: return + + logging.debug(" searching for a matching description more liberally (startswith)") + def searchByOverview2(overview: str): + results = [ e for e in episodes if e["overview"] and regex.sub("", e["overview"]).startswith(regex.sub(overview)) ] + if len(results) == 1: return results[0] + return None + found = doSearch(searchByOverview2) + if found: return + + + +def getEpisodeTitleFromTvdb(info: EpisodeInfo, seriesID: int, args: dict): + logging.debug("getEpisodeTitleFromTvdb()") + episodes = Episodes(seriesID, args).get() + if not episodes: return # Nothing we can do :( + + def get(dct: dict, keys: list): + for k in keys: + if k in dct: return dct[k] + return None + + for e in episodes: + season = get(e, ["airedSeason", "dvdSeason"]) + episode = get(e, ["airedEpisodeNumber", "dvdEpisodeNumber"]) + if season == info.season and episode == info.episode: + info.episodeTitle = e["episodeName"] + return + + +def printResult(info: EpisodeInfo): + if info.seriesName and info.season and info.episode: + episodeTitle = info.episodeTitle.replace(' ', '.') if info.episodeTitle else "" + print(f"{info.seriesName.replace(' ', '.')}..S{info.season:02d}E{info.episode:02d}..{episodeTitle}.{info.fileSuffix}") + sys.exit(0) + else: + sys.exit(ExitCode.NoEpisodeTitleFoundInEPG) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s") + args = parseArgs() + info = analyzeFilename(args.file) + id = getSeriesId(info, args) + if not id: sys.exit(ExitCode.SeriesNotFoundInTvDB) + if not info.season or not info.episode: + getEpgData(info) + getEpisodeTitleFromEpgData(info, id, args) + if not info.episodeTitle: + getEpisodeTitleFromTvdb(info, id, args) + printResult(info) +