#/usr/bin/python import logging logging.basicConfig(filename='/home/luke/ytlog.txt', format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) import re import urllib.request import json import socket yt_api_key = 'get your own lol' playlist_filename = '/home/luke/mpv-playlist.m3u' mpv_socket = '/home/luke/socket-mpv' SI_large_prefixes = { # Extend if necessary 3: 'k', 6: 'M', 9: 'G', 12: 'T', 15: 'P', 18: 'E', } def shorten_number(number, max_width, engineering=False, include_letter=True, include_minus=True): ''' Return a nice shortened string of a number using SI prefixes. Large numbers only for now (no negative exponents). Num is treated as a string instead of taking a numeric approach. engineering: use the SI prefix as the decimal symbol to save a character. (e.g. 4k20 instead of 4.20k) include_letter: include letters and decimal points in the max width ''' max_width = max(max_width, 3) number = str(number) if number[0] == '-': num = number[1:] neg = True if include_minus: max_width -= 1 else: num = number neg = False width = len(num) if width <= max_width: return number if include_letter: # Make room if engineering: max_width -= 1 else: max_width -= 2 max_width = max(max_width, 1) unit = ((width-1)//3)*3 dec_point = width - unit if engineering: output = num[:dec_point] + SI_large_prefixes[unit] + num[dec_point:max_width] else: if dec_point < max_width: output = num[:dec_point] + '.' + num[dec_point:max_width] + SI_large_prefixes[unit] else: output = num[:dec_point] + SI_large_prefixes[unit] if neg: output = '-' + output return output def shorten_period(string, max_terms=2, collapse_weeks=True): ''' Take an ISO 8601 period string, return something human readable. Lowercase the time component while leaving the date component uppercase. ''' if string[0] != 'P': raise ValueError('Given string is not an ISO 8601 period string') datestr, timestr = string[1:].split('T') # M can be Month or Minute depending on location, so split the time compone$ date_components = re.findall(r'(\d+[YMWD])', datestr) time_components = re.findall(r'(\d+[hms])', timestr.lower()) if collapse_weeks: new_date = [] weeks = 0 for d in date_components: if d[-1] == 'W': weeks = int(d[:-1]) elif d[-1] == 'D': new_date.append('{}D'.format(int(d[:-1])+(7*weeks))) else: new_date.append(d) date_components = new_date components = date_components + time_components return ''.join(components[:max_terms]) re_yt_video_id = re.compile(r'(?<=[\?\&]v=)([a-zA-Z0-9_-]+)') re_yt_playlist_id = re.compile(r'(?<=[\?\&]list=)([a-zA-Z0-9_-]+)') def get_yt_video_id(url: str): domain, _, params = url.partition('/') if domain == 'youtu.be': return params.partition('?')[0] elif domain in ['youtube.com', 'm.youtube.com']: if vid_id := re_yt_video_id.search(params): return vid_id[0] return None def get_yt_video_and_list_id(url: str): if url.startswith('http://'): url = url[7:] elif url.startswith('https://'): url = url[8:] if url.startswith('www.'): url = url[4:] vid_id = get_yt_video_id(url) if list_id := re_yt_playlist_id.search(url): return vid_id, list_id[0] return vid_id, None TLDs = ['com', 'biz', 'edu', 'gov', 'int', 'mil', 'moe', 'net', 'org', 'xxx', 'aero', 'asia', 'coop', 'info', 'jobs', 'name', 'musem', 'travel',] url_regex = re.compile(r'((https?://|www.)\S+)|(\S+\.([a-z][a-z]|{})\S*)'.format('|'.join(TLDs)), re.IGNORECASE) # url_prefix = re.compile(r'(https?://www\.)|(https?://|www\.)', re.IGNORECASE) def get_api_json(api_url: str): logging.debug("api url:%s", api_url) try: req = urllib.request.urlopen(api_url) return json.loads(req.read().decode('utf-8')) except BaseException as e: logging.error('url_finder error - json load fail: {}'.format(e)) return None def process_playlist(filename: str): vid_ids = {} list_ids = {} new_content = False with open(filename, 'r') as f: lines = f.read().splitlines() for i in range(len(lines)-1, 1, -1): if lines[i-1].startswith('#EXTINF:'): continue line = lines[i] if line.startswith('#EXTINF:'): continue if match := url_regex.match(line.strip()): url = match[0] vid_id, list_id = get_yt_video_and_list_id(url) if vid_id: vid_ids[vid_id] = None if list_id: list_ids[list_id] = None if not vid_ids: logging.info('No new videos') else: logging.info('New videos: ' + ', '.join(vid_ids.keys())) new_content = True if vids_response := get_api_json(f'https://www.googleapis.com/youtube/v3/videos?id={",".join(vid_ids.keys())}&key={yt_api_key}&part=snippet,contentDetails'): if 'items' not in vids_response: logging.info('Videos Response has no items!') else: for item in vids_response['items']: channelTitle = item['snippet']['channelTitle'] title = item['snippet']['title'] duration = shorten_period(item['contentDetails']['duration']) vid_ids[item['id']] = f'{channelTitle} - {title} ({duration})' logging.info(vid_ids) if not list_ids: logging.info('No new playlists') else: logging.info('New playlists: ' + ', '.join(list_ids.keys())) new_content = True if lists_response := get_api_json(f'https://www.googleapis.com/youtube/v3/playlists?id={",".join(list_ids.keys())}&key={yt_api_key}&part=snippet,contentDetails'): if 'items' not in lists_response: logging.info('Lists Response has no items!') else: for item in lists_response['items']: # logging.info(item) channelTitle = item['snippet']['channelTitle'] title = item['snippet']['title'] count = item['contentDetails']['itemCount'] list_ids[item['id']] = f'{channelTitle} - {title} ({count} videos)' logging.info(list_ids) if not new_content: logging.info(f'No new content in {filename}, not writing anything.') return for i in range(len(lines)-1, 1, -1): if lines[i-1].startswith('#EXTINF:'): continue line = lines[i] if line.startswith('#EXTINF:'): continue if match := url_regex.match(line.strip()): url = match[0] vid_id, list_id = get_yt_video_and_list_id(url) if vid_id in vid_ids and vid_ids[vid_id]: lines.insert(i, f'#EXTINF:123, {vid_ids[vid_id]}') if list_id in list_ids and list_ids[list_id]: lines.insert(i, f'#EXTINF:123, {list_ids[list_id]}') if lines: with open(filename, 'w') as f: f.write('\n'.join(lines)) if __name__ == '__main__': try: process_playlist(playlist_filename) except BaseException as e: logging.error('Error processing: {}'.format(e)) try: logging.info('Reloading playlist on mpv') with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.connect(mpv_socket) sock.sendall(f'playlist-clear; loadlist "{playlist_filename}" append\n'.encode()) logging.info('mpv playlist reloaded!') except BaseException as e: logging.error('Error reloading playlist: {}'.format(e))