213 lines
6.8 KiB
Python
213 lines
6.8 KiB
Python
|
#/usr/bin/python
|
||
|
import logging
|
||
|
logging.basicConfig(filename='/home/luke/ytlog.txt', format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
|
||
|
import re
|
||
|
import urllib.request
|
||
|
import json
|
||
|
import socket
|
||
|
yt_api_key = 'get your own lol'
|
||
|
filename = '/home/luke/mpv-playlist.m3u'
|
||
|
mpv_socket = '/home/luke/socket-mpv'
|
||
|
|
||
|
SI_large_prefixes = { # Extend if necessary
|
||
|
3: 'k',
|
||
|
6: 'M',
|
||
|
9: 'G',
|
||
|
12: 'T',
|
||
|
15: 'P',
|
||
|
18: 'E',
|
||
|
}
|
||
|
|
||
|
def shorten_number(number, max_width, engineering=False, include_letter=True, include_minus=True):
|
||
|
'''
|
||
|
Return a nice shortened string of a number using SI prefixes. Large numbers only for now (no negative exponents).
|
||
|
Num is treated as a string instead of taking a numeric approach.
|
||
|
engineering: use the SI prefix as the decimal symbol to save a character. (e.g. 4k20 instead of 4.20k)
|
||
|
include_letter: include letters and decimal points in the max width
|
||
|
'''
|
||
|
max_width = max(max_width, 3)
|
||
|
number = str(number)
|
||
|
if number[0] == '-':
|
||
|
num = number[1:]
|
||
|
neg = True
|
||
|
if include_minus:
|
||
|
max_width -= 1
|
||
|
else:
|
||
|
num = number
|
||
|
neg = False
|
||
|
width = len(num)
|
||
|
if width <= max_width:
|
||
|
return number
|
||
|
|
||
|
if include_letter: # Make room
|
||
|
if engineering:
|
||
|
max_width -= 1
|
||
|
else:
|
||
|
max_width -= 2
|
||
|
max_width = max(max_width, 1)
|
||
|
|
||
|
unit = ((width-1)//3)*3
|
||
|
dec_point = width - unit
|
||
|
if engineering:
|
||
|
output = num[:dec_point] + SI_large_prefixes[unit] + num[dec_point:max_width]
|
||
|
else:
|
||
|
if dec_point < max_width:
|
||
|
output = num[:dec_point] + '.' + num[dec_point:max_width] + SI_large_prefixes[unit]
|
||
|
else:
|
||
|
output = num[:dec_point] + SI_large_prefixes[unit]
|
||
|
if neg:
|
||
|
output = '-' + output
|
||
|
return output
|
||
|
|
||
|
def shorten_period(string, max_terms=2, collapse_weeks=True):
|
||
|
'''
|
||
|
Take an ISO 8601 period string, return something human readable.
|
||
|
Lowercase the time component while leaving the date component uppercase.
|
||
|
'''
|
||
|
if string[0] != 'P':
|
||
|
raise ValueError('Given string is not an ISO 8601 period string')
|
||
|
datestr, timestr = string[1:].split('T') # M can be Month or Minute depending on location, so split the time compone$
|
||
|
date_components = re.findall(r'(\d+[YMWD])', datestr)
|
||
|
time_components = re.findall(r'(\d+[hms])', timestr.lower())
|
||
|
|
||
|
if collapse_weeks:
|
||
|
new_date = []
|
||
|
weeks = 0
|
||
|
for d in date_components:
|
||
|
if d[-1] == 'W':
|
||
|
weeks = int(d[:-1])
|
||
|
elif d[-1] == 'D':
|
||
|
new_date.append('{}D'.format(int(d[:-1])+(7*weeks)))
|
||
|
else:
|
||
|
new_date.append(d)
|
||
|
date_components = new_date
|
||
|
components = date_components + time_components
|
||
|
return ''.join(components[:max_terms])
|
||
|
|
||
|
re_yt_video_id = re.compile(r'(?<=[\?\&]v=)([a-zA-Z0-9_-]+)')
|
||
|
re_yt_playlist_id = re.compile(r'(?<=[\?\&]list=)([a-zA-Z0-9_-]+)')
|
||
|
def get_yt_video_id(url: str):
|
||
|
domain, _, params = url.partition('/')
|
||
|
if domain == 'youtu.be':
|
||
|
return params.partition('?')[0]
|
||
|
elif domain in ['youtube.com', 'm.youtube.com']:
|
||
|
if vid_id := re_yt_video_id.search(params):
|
||
|
return vid_id[0]
|
||
|
return None
|
||
|
|
||
|
def get_yt_video_and_list_id(url: str):
|
||
|
if url.startswith('http://'):
|
||
|
url = url[7:]
|
||
|
elif url.startswith('https://'):
|
||
|
url = url[8:]
|
||
|
if url.startswith('www.'):
|
||
|
url = url[4:]
|
||
|
|
||
|
vid_id = get_yt_video_id(url)
|
||
|
if list_id := re_yt_playlist_id.search(url):
|
||
|
return vid_id, list_id[0]
|
||
|
return vid_id, None
|
||
|
|
||
|
TLDs = ['com', 'biz', 'edu', 'gov', 'int', 'mil', 'moe', 'net', 'org', 'xxx', 'aero', 'asia', 'coop', 'info', 'jobs', 'name', 'musem', 'travel',]
|
||
|
url_regex = re.compile(r'((https?://|www.)\S+)|(\S+\.([a-z][a-z]|{})\S*)'.format('|'.join(TLDs)), re.IGNORECASE)
|
||
|
# url_prefix = re.compile(r'(https?://www\.)|(https?://|www\.)', re.IGNORECASE)
|
||
|
|
||
|
def get_api_json(api_url: str):
|
||
|
logging.debug("api url:%s", api_url)
|
||
|
try:
|
||
|
req = urllib.request.urlopen(api_url)
|
||
|
return json.loads(req.read().decode('utf-8'))
|
||
|
except BaseException as e:
|
||
|
logging.error('url_finder error - json load fail: {}'.format(e))
|
||
|
return None
|
||
|
|
||
|
|
||
|
def process_playlist(filename: str):
|
||
|
vid_ids = {}
|
||
|
list_ids = {}
|
||
|
new_content = False
|
||
|
|
||
|
with open(filename, 'r') as f:
|
||
|
lines = f.read().splitlines()
|
||
|
for i in range(len(lines)-1, 1, -1):
|
||
|
if lines[i-1].startswith('#EXTINF:'):
|
||
|
continue
|
||
|
line = lines[i]
|
||
|
if line.startswith('#EXTINF:'):
|
||
|
continue
|
||
|
if match := url_regex.match(line.strip()):
|
||
|
url = match[0]
|
||
|
vid_id, list_id = get_yt_video_and_list_id(url)
|
||
|
if vid_id:
|
||
|
vid_ids[vid_id] = None
|
||
|
if list_id:
|
||
|
list_ids[list_id] = None
|
||
|
|
||
|
if not vid_ids:
|
||
|
logging.info('No new videos')
|
||
|
else:
|
||
|
logging.info('New videos: ' + ', '.join(vid_ids.keys()))
|
||
|
new_content = True
|
||
|
if vids_response := get_api_json(f'https://www.googleapis.com/youtube/v3/videos?id={",".join(vid_ids.keys())}&key={yt_api_key}&part=snippet,contentDetails'):
|
||
|
if 'items' not in vids_response:
|
||
|
logging.info('Videos Response has no items!')
|
||
|
else:
|
||
|
for item in vids_response['items']:
|
||
|
channelTitle = item['snippet']['channelTitle']
|
||
|
title = item['snippet']['title']
|
||
|
duration = shorten_period(item['contentDetails']['duration'])
|
||
|
vid_ids[item['id']] = f'{channelTitle} - {title} ({duration})'
|
||
|
logging.info(vid_ids)
|
||
|
|
||
|
if not list_ids:
|
||
|
logging.info('No new playlists')
|
||
|
else:
|
||
|
logging.info('New playlists: ' + ', '.join(list_ids.keys()))
|
||
|
new_content = True
|
||
|
if lists_response := get_api_json(f'https://www.googleapis.com/youtube/v3/playlists?id={",".join(list_ids.keys())}&key={yt_api_key}&part=snippet,contentDetails'):
|
||
|
if 'items' not in lists_response:
|
||
|
logging.info('Lists Response has no items!')
|
||
|
else:
|
||
|
for item in lists_response['items']:
|
||
|
# logging.info(item)
|
||
|
channelTitle = item['snippet']['channelTitle']
|
||
|
title = item['snippet']['title']
|
||
|
count = item['contentDetails']['itemCount']
|
||
|
list_ids[item['id']] = f'{channelTitle} - {title} ({count} videos)'
|
||
|
logging.info(list_ids)
|
||
|
|
||
|
if not new_content:
|
||
|
logging.info(f'No new content in {filename}, not writing anything.')
|
||
|
return
|
||
|
|
||
|
for i in range(len(lines)-1, 1, -1):
|
||
|
if lines[i-1].startswith('#EXTINF:'):
|
||
|
continue
|
||
|
line = lines[i]
|
||
|
if line.startswith('#EXTINF:'):
|
||
|
continue
|
||
|
if match := url_regex.match(line.strip()):
|
||
|
url = match[0]
|
||
|
vid_id, list_id = get_yt_video_and_list_id(url)
|
||
|
if vid_id in vid_ids and vid_ids[vid_id]:
|
||
|
lines.insert(i, f'#EXTINF:123, {vid_ids[vid_id]}')
|
||
|
if list_id in list_ids and list_ids[list_id]:
|
||
|
lines.insert(i, f'#EXTINF:123, {list_ids[list_id]}')
|
||
|
if lines:
|
||
|
with open(filename, 'w') as f:
|
||
|
f.write('\n'.join(lines))
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
try:
|
||
|
process_playlist(filename)
|
||
|
except BaseException as e:
|
||
|
logging.error('Error processing: {}'.format(e))
|
||
|
try:
|
||
|
logging.info('Reloading playlist on mpv')
|
||
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
||
|
sock.connect(mpv_socket)
|
||
|
sock.sendall(b'playlist-clear; loadlist /home/luke/mpv-playlist.m3u append\n')
|
||
|
logging.info('mpv playlist reloaded!')
|
||
|
except BaseException as e:
|
||
|
logging.error('Error reloading playlist: {}'.format(e))
|