import logging
logging.basicConfig(filename='/home/luke/ytlog.txt', format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
import re
import urllib.request
import json
import socket
yt_api_key = 'get your own lol'
playlist_filename = '/home/luke/mpv-playlist.m3u'
mpv_socket = '/home/luke/socket-mpv'
SI_large_prefixes = { # Extend if necessary
3: 'k',
6: 'M',
9: 'G',
12: 'T',
15: 'P',
18: 'E',
def shorten_number(number, max_width, engineering=False, include_letter=True, include_minus=True):
Return a nice shortened string of a number using SI prefixes. Large numbers only for now (no negative exponents).
Num is treated as a string instead of taking a numeric approach.
engineering: use the SI prefix as the decimal symbol to save a character. (e.g. 4k20 instead of 4.20k)
include_letter: include letters and decimal points in the max width
max_width = max(max_width, 3)
number = str(number)
if number[0] == '-':
num = number[1:]
neg = True
if include_minus:
max_width -= 1
num = number
neg = False
width = len(num)
if width <= max_width:
return number
if include_letter: # Make room
if engineering:
max_width -= 1
max_width -= 2
max_width = max(max_width, 1)
unit = ((width-1)//3)*3
dec_point = width - unit
if engineering:
output = num[:dec_point] + SI_large_prefixes[unit] + num[dec_point:max_width]
if dec_point < max_width:
output = num[:dec_point] + '.' + num[dec_point:max_width] + SI_large_prefixes[unit]
output = num[:dec_point] + SI_large_prefixes[unit]
if neg:
output = '-' + output
return output
def shorten_period(string, max_terms=2, collapse_weeks=True):
Take an ISO 8601 period string, return something human readable.
Lowercase the time component while leaving the date component uppercase.
if string[0] != 'P':
raise ValueError('Given string is not an ISO 8601 period string')
datestr, timestr = string[1:].split('T') # M can be Month or Minute depending on location, so split the time compone$
date_components = re.findall(r'(\d+[YMWD])', datestr)
time_components = re.findall(r'(\d+[hms])', timestr.lower())
if collapse_weeks:
new_date = []
weeks = 0
for d in date_components:
if d[-1] == 'W':
weeks = int(d[:-1])
elif d[-1] == 'D':
date_components = new_date
components = date_components + time_components
return ''.join(components[:max_terms])
re_yt_video_id = re.compile(r'(?<=[\?\&]v=)([a-zA-Z0-9_-]+)')
re_yt_playlist_id = re.compile(r'(?<=[\?\&]list=)([a-zA-Z0-9_-]+)')
def get_yt_video_id(url: str):
domain, _, params = url.partition('/')
if domain == 'youtu.be':
return params.partition('?')[0]
elif domain in ['youtube.com', 'm.youtube.com']:
if vid_id := re_yt_video_id.search(params):
return vid_id[0]
return None
def get_yt_video_and_list_id(url: str):
if url.startswith('http://'):
url = url[7:]
elif url.startswith('https://'):
url = url[8:]
if url.startswith('www.'):
url = url[4:]
vid_id = get_yt_video_id(url)
if list_id := re_yt_playlist_id.search(url):
return vid_id, list_id[0]
return vid_id, None
TLDs = ['com', 'biz', 'edu', 'gov', 'int', 'mil', 'moe', 'net', 'org', 'xxx', 'aero', 'asia', 'coop', 'info', 'jobs', 'name', 'musem', 'travel',]
url_regex = re.compile(r'((https?://|www.)\S+)|(\S+\.([a-z][a-z]|{})\S*)'.format('|'.join(TLDs)), re.IGNORECASE)
# url_prefix = re.compile(r'(https?://www\.)|(https?://|www\.)', re.IGNORECASE)
def get_api_json(api_url: str):
logging.debug("api url:%s", api_url)
req = urllib.request.urlopen(api_url)
return json.loads(req.read().decode('utf-8'))
except BaseException as e:
logging.error('url_finder error - json load fail: {}'.format(e))
return None
def process_playlist(filename: str):
vid_ids = {}
list_ids = {}
new_content = False
with open(filename, 'r') as f:
lines = f.read().splitlines()
for i in range(len(lines)-1, 1, -1):
if lines[i-1].startswith('#EXTINF:'):
line = lines[i]
if line.startswith('#EXTINF:'):
if match := url_regex.match(line.strip()):
url = match[0]
vid_id, list_id = get_yt_video_and_list_id(url)
if vid_id:
vid_ids[vid_id] = None
if list_id:
list_ids[list_id] = None
if not vid_ids:
logging.info('No new videos')
logging.info('New videos: ' + ', '.join(vid_ids.keys()))
new_content = True
if vids_response := get_api_json(f'https://www.googleapis.com/youtube/v3/videos?id={",".join(vid_ids.keys())}&key={yt_api_key}&part=snippet,contentDetails'):
if 'items' not in vids_response:
logging.info('Videos Response has no items!')
for item in vids_response['items']:
channelTitle = item['snippet']['channelTitle']
title = item['snippet']['title']
duration = shorten_period(item['contentDetails']['duration'])
vid_ids[item['id']] = f'{channelTitle} - {title} ({duration})'
if not list_ids:
logging.info('No new playlists')
logging.info('New playlists: ' + ', '.join(list_ids.keys()))
new_content = True
if lists_response := get_api_json(f'https://www.googleapis.com/youtube/v3/playlists?id={",".join(list_ids.keys())}&key={yt_api_key}&part=snippet,contentDetails'):
if 'items' not in lists_response:
logging.info('Lists Response has no items!')
for item in lists_response['items']:
# logging.info(item)
channelTitle = item['snippet']['channelTitle']
title = item['snippet']['title']
count = item['contentDetails']['itemCount']
list_ids[item['id']] = f'{channelTitle} - {title} ({count} videos)'
if not new_content:
logging.info(f'No new content in {filename}, not writing anything.')
for i in range(len(lines)-1, 1, -1):
if lines[i-1].startswith('#EXTINF:'):
line = lines[i]
if line.startswith('#EXTINF:'):
if match := url_regex.match(line.strip()):
url = match[0]
vid_id, list_id = get_yt_video_and_list_id(url)
if vid_id in vid_ids and vid_ids[vid_id]:
lines.insert(i, f'#EXTINF:123, {vid_ids[vid_id]}')
if list_id in list_ids and list_ids[list_id]:
lines.insert(i, f'#EXTINF:123, {list_ids[list_id]}')
if lines:
with open(filename, 'w') as f:
if __name__ == '__main__':
except BaseException as e:
logging.error('Error processing: {}'.format(e))
logging.info('Reloading playlist on mpv')
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.sendall(f'playlist-clear; loadlist "{playlist_filename}" append\n'.encode())
logging.debug('Checking current video to see if it should be removed from start of playlist')
def get_property(name: str):
sock.sendall(f'{{ "command": ["get_property", "{name}"] }}\n'.encode())
response = sock.recv(4096)
response_json = json.loads(response)
if response_json['error'] == 'success':
return response_json['data']
raise ValueError(f'Response for get_property {name} was {response_json['error']}')
current_filename = get_property('playlist/0/filename') # 'path' also works
next_filename = get_property('playlist/1/filename')
if current_filename == next_filename:
logging.info('Next video is same as this one, deduplicating')
sock.sendall(b'playlist-remove 1\n')
sock.sendall(b'show-text ${playlist}\n')
except BaseException as e:
logging.error('Error deduping head of playlist: {}'.format(e))
logging.info('mpv playlist reloaded!')
except BaseException as e:
logging.error('Error reloading playlist: {}'.format(e))