From ca48a78ee6cc098696a22603c5582912cc88ebe6 Mon Sep 17 00:00:00 2001 From: Luke Hubmayer-Werner Date: Thu, 20 Jun 2024 20:46:34 +0930 Subject: [PATCH] My dodgy .m3u playlist youtube metadata fetching script --- add_playlist_yt_titles.py | 212 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100755 add_playlist_yt_titles.py diff --git a/add_playlist_yt_titles.py b/add_playlist_yt_titles.py new file mode 100755 index 0000000..68d2703 --- /dev/null +++ b/add_playlist_yt_titles.py @@ -0,0 +1,212 @@ +#/usr/bin/python +import logging +logging.basicConfig(filename='/home/luke/ytlog.txt', format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) +import re +import urllib.request +import json +import socket +yt_api_key = 'get your own lol' +filename = '/home/luke/mpv-playlist.m3u' +mpv_socket = '/home/luke/socket-mpv' + +SI_large_prefixes = { # Extend if necessary + 3: 'k', + 6: 'M', + 9: 'G', + 12: 'T', + 15: 'P', + 18: 'E', +} + +def shorten_number(number, max_width, engineering=False, include_letter=True, include_minus=True): + ''' + Return a nice shortened string of a number using SI prefixes. Large numbers only for now (no negative exponents). + Num is treated as a string instead of taking a numeric approach. + engineering: use the SI prefix as the decimal symbol to save a character. (e.g. 4k20 instead of 4.20k) + include_letter: include letters and decimal points in the max width + ''' + max_width = max(max_width, 3) + number = str(number) + if number[0] == '-': + num = number[1:] + neg = True + if include_minus: + max_width -= 1 + else: + num = number + neg = False + width = len(num) + if width <= max_width: + return number + + if include_letter: # Make room + if engineering: + max_width -= 1 + else: + max_width -= 2 + max_width = max(max_width, 1) + + unit = ((width-1)//3)*3 + dec_point = width - unit + if engineering: + output = num[:dec_point] + SI_large_prefixes[unit] + num[dec_point:max_width] + else: + if dec_point < max_width: + output = num[:dec_point] + '.' + num[dec_point:max_width] + SI_large_prefixes[unit] + else: + output = num[:dec_point] + SI_large_prefixes[unit] + if neg: + output = '-' + output + return output + +def shorten_period(string, max_terms=2, collapse_weeks=True): + ''' + Take an ISO 8601 period string, return something human readable. + Lowercase the time component while leaving the date component uppercase. + ''' + if string[0] != 'P': + raise ValueError('Given string is not an ISO 8601 period string') + datestr, timestr = string[1:].split('T') # M can be Month or Minute depending on location, so split the time compone$ + date_components = re.findall(r'(\d+[YMWD])', datestr) + time_components = re.findall(r'(\d+[hms])', timestr.lower()) + + if collapse_weeks: + new_date = [] + weeks = 0 + for d in date_components: + if d[-1] == 'W': + weeks = int(d[:-1]) + elif d[-1] == 'D': + new_date.append('{}D'.format(int(d[:-1])+(7*weeks))) + else: + new_date.append(d) + date_components = new_date + components = date_components + time_components + return ''.join(components[:max_terms]) + +re_yt_video_id = re.compile(r'(?<=[\?\&]v=)([a-zA-Z0-9_-]+)') +re_yt_playlist_id = re.compile(r'(?<=[\?\&]list=)([a-zA-Z0-9_-]+)') +def get_yt_video_id(url: str): + domain, _, params = url.partition('/') + if domain == 'youtu.be': + return params.partition('?')[0] + elif domain in ['youtube.com', 'm.youtube.com']: + if vid_id := re_yt_video_id.search(params): + return vid_id[0] + return None + +def get_yt_video_and_list_id(url: str): + if url.startswith('http://'): + url = url[7:] + elif url.startswith('https://'): + url = url[8:] + if url.startswith('www.'): + url = url[4:] + + vid_id = get_yt_video_id(url) + if list_id := re_yt_playlist_id.search(url): + return vid_id, list_id[0] + return vid_id, None + +TLDs = ['com', 'biz', 'edu', 'gov', 'int', 'mil', 'moe', 'net', 'org', 'xxx', 'aero', 'asia', 'coop', 'info', 'jobs', 'name', 'musem', 'travel',] +url_regex = re.compile(r'((https?://|www.)\S+)|(\S+\.([a-z][a-z]|{})\S*)'.format('|'.join(TLDs)), re.IGNORECASE) +# url_prefix = re.compile(r'(https?://www\.)|(https?://|www\.)', re.IGNORECASE) + +def get_api_json(api_url: str): + logging.debug("api url:%s", api_url) + try: + req = urllib.request.urlopen(api_url) + return json.loads(req.read().decode('utf-8')) + except BaseException as e: + logging.error('url_finder error - json load fail: {}'.format(e)) + return None + + +def process_playlist(filename: str): + vid_ids = {} + list_ids = {} + new_content = False + + with open(filename, 'r') as f: + lines = f.read().splitlines() + for i in range(len(lines)-1, 1, -1): + if lines[i-1].startswith('#EXTINF:'): + continue + line = lines[i] + if line.startswith('#EXTINF:'): + continue + if match := url_regex.match(line.strip()): + url = match[0] + vid_id, list_id = get_yt_video_and_list_id(url) + if vid_id: + vid_ids[vid_id] = None + if list_id: + list_ids[list_id] = None + + if not vid_ids: + logging.info('No new videos') + else: + logging.info('New videos: ' + ', '.join(vid_ids.keys())) + new_content = True + if vids_response := get_api_json(f'https://www.googleapis.com/youtube/v3/videos?id={",".join(vid_ids.keys())}&key={yt_api_key}&part=snippet,contentDetails'): + if 'items' not in vids_response: + logging.info('Videos Response has no items!') + else: + for item in vids_response['items']: + channelTitle = item['snippet']['channelTitle'] + title = item['snippet']['title'] + duration = shorten_period(item['contentDetails']['duration']) + vid_ids[item['id']] = f'{channelTitle} - {title} ({duration})' + logging.info(vid_ids) + + if not list_ids: + logging.info('No new playlists') + else: + logging.info('New playlists: ' + ', '.join(list_ids.keys())) + new_content = True + if lists_response := get_api_json(f'https://www.googleapis.com/youtube/v3/playlists?id={",".join(list_ids.keys())}&key={yt_api_key}&part=snippet,contentDetails'): + if 'items' not in lists_response: + logging.info('Lists Response has no items!') + else: + for item in lists_response['items']: + # logging.info(item) + channelTitle = item['snippet']['channelTitle'] + title = item['snippet']['title'] + count = item['contentDetails']['itemCount'] + list_ids[item['id']] = f'{channelTitle} - {title} ({count} videos)' + logging.info(list_ids) + + if not new_content: + logging.info(f'No new content in {filename}, not writing anything.') + return + + for i in range(len(lines)-1, 1, -1): + if lines[i-1].startswith('#EXTINF:'): + continue + line = lines[i] + if line.startswith('#EXTINF:'): + continue + if match := url_regex.match(line.strip()): + url = match[0] + vid_id, list_id = get_yt_video_and_list_id(url) + if vid_id in vid_ids and vid_ids[vid_id]: + lines.insert(i, f'#EXTINF:123, {vid_ids[vid_id]}') + if list_id in list_ids and list_ids[list_id]: + lines.insert(i, f'#EXTINF:123, {list_ids[list_id]}') + if lines: + with open(filename, 'w') as f: + f.write('\n'.join(lines)) + +if __name__ == '__main__': + try: + process_playlist(filename) + except BaseException as e: + logging.error('Error processing: {}'.format(e)) + try: + logging.info('Reloading playlist on mpv') + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + sock.connect(mpv_socket) + sock.sendall(b'playlist-clear; loadlist /home/luke/mpv-playlist.m3u append\n') + logging.info('mpv playlist reloaded!') + except BaseException as e: + logging.error('Error reloading playlist: {}'.format(e))