2024-06-20 20:46:34 +09:30
#/usr/bin/python
import logging
logging . basicConfig ( filename = ' /home/luke/ytlog.txt ' , format = ' %(asctime)s - %(levelname)s - %(message)s ' , level = logging . INFO )
import re
import urllib . request
import json
import socket
yt_api_key = ' get your own lol '
2024-06-20 20:56:18 +09:30
playlist_filename = ' /home/luke/mpv-playlist.m3u '
2024-06-20 20:46:34 +09:30
mpv_socket = ' /home/luke/socket-mpv '
SI_large_prefixes = { # Extend if necessary
3 : ' k ' ,
6 : ' M ' ,
9 : ' G ' ,
12 : ' T ' ,
15 : ' P ' ,
18 : ' E ' ,
}
def shorten_number ( number , max_width , engineering = False , include_letter = True , include_minus = True ) :
'''
Return a nice shortened string of a number using SI prefixes . Large numbers only for now ( no negative exponents ) .
Num is treated as a string instead of taking a numeric approach .
engineering : use the SI prefix as the decimal symbol to save a character . ( e . g . 4 k20 instead of 4.20 k )
include_letter : include letters and decimal points in the max width
'''
max_width = max ( max_width , 3 )
number = str ( number )
if number [ 0 ] == ' - ' :
num = number [ 1 : ]
neg = True
if include_minus :
max_width - = 1
else :
num = number
neg = False
width = len ( num )
if width < = max_width :
return number
if include_letter : # Make room
if engineering :
max_width - = 1
else :
max_width - = 2
max_width = max ( max_width , 1 )
unit = ( ( width - 1 ) / / 3 ) * 3
dec_point = width - unit
if engineering :
output = num [ : dec_point ] + SI_large_prefixes [ unit ] + num [ dec_point : max_width ]
else :
if dec_point < max_width :
output = num [ : dec_point ] + ' . ' + num [ dec_point : max_width ] + SI_large_prefixes [ unit ]
else :
output = num [ : dec_point ] + SI_large_prefixes [ unit ]
if neg :
output = ' - ' + output
return output
def shorten_period ( string , max_terms = 2 , collapse_weeks = True ) :
'''
Take an ISO 8601 period string , return something human readable .
Lowercase the time component while leaving the date component uppercase .
'''
if string [ 0 ] != ' P ' :
raise ValueError ( ' Given string is not an ISO 8601 period string ' )
datestr , timestr = string [ 1 : ] . split ( ' T ' ) # M can be Month or Minute depending on location, so split the time compone$
date_components = re . findall ( r ' ( \ d+[YMWD]) ' , datestr )
time_components = re . findall ( r ' ( \ d+[hms]) ' , timestr . lower ( ) )
if collapse_weeks :
new_date = [ ]
weeks = 0
for d in date_components :
if d [ - 1 ] == ' W ' :
weeks = int ( d [ : - 1 ] )
elif d [ - 1 ] == ' D ' :
new_date . append ( ' {} D ' . format ( int ( d [ : - 1 ] ) + ( 7 * weeks ) ) )
else :
new_date . append ( d )
date_components = new_date
components = date_components + time_components
return ' ' . join ( components [ : max_terms ] )
re_yt_video_id = re . compile ( r ' (?<=[ \ ? \ &]v=)([a-zA-Z0-9_-]+) ' )
re_yt_playlist_id = re . compile ( r ' (?<=[ \ ? \ &]list=)([a-zA-Z0-9_-]+) ' )
def get_yt_video_id ( url : str ) :
domain , _ , params = url . partition ( ' / ' )
if domain == ' youtu.be ' :
return params . partition ( ' ? ' ) [ 0 ]
elif domain in [ ' youtube.com ' , ' m.youtube.com ' ] :
if vid_id := re_yt_video_id . search ( params ) :
return vid_id [ 0 ]
return None
def get_yt_video_and_list_id ( url : str ) :
if url . startswith ( ' http:// ' ) :
url = url [ 7 : ]
elif url . startswith ( ' https:// ' ) :
url = url [ 8 : ]
if url . startswith ( ' www. ' ) :
url = url [ 4 : ]
vid_id = get_yt_video_id ( url )
if list_id := re_yt_playlist_id . search ( url ) :
return vid_id , list_id [ 0 ]
return vid_id , None
TLDs = [ ' com ' , ' biz ' , ' edu ' , ' gov ' , ' int ' , ' mil ' , ' moe ' , ' net ' , ' org ' , ' xxx ' , ' aero ' , ' asia ' , ' coop ' , ' info ' , ' jobs ' , ' name ' , ' musem ' , ' travel ' , ]
url_regex = re . compile ( r ' ((https?://|www.) \ S+)|( \ S+ \ .([a-z][a-z]| {} ) \ S*) ' . format ( ' | ' . join ( TLDs ) ) , re . IGNORECASE )
# url_prefix = re.compile(r'(https?://www\.)|(https?://|www\.)', re.IGNORECASE)
def get_api_json ( api_url : str ) :
logging . debug ( " api url: %s " , api_url )
try :
req = urllib . request . urlopen ( api_url )
return json . loads ( req . read ( ) . decode ( ' utf-8 ' ) )
except BaseException as e :
logging . error ( ' url_finder error - json load fail: {} ' . format ( e ) )
return None
def process_playlist ( filename : str ) :
vid_ids = { }
list_ids = { }
new_content = False
with open ( filename , ' r ' ) as f :
lines = f . read ( ) . splitlines ( )
for i in range ( len ( lines ) - 1 , 1 , - 1 ) :
if lines [ i - 1 ] . startswith ( ' #EXTINF: ' ) :
continue
line = lines [ i ]
if line . startswith ( ' #EXTINF: ' ) :
continue
if match := url_regex . match ( line . strip ( ) ) :
url = match [ 0 ]
vid_id , list_id = get_yt_video_and_list_id ( url )
if vid_id :
vid_ids [ vid_id ] = None
if list_id :
list_ids [ list_id ] = None
if not vid_ids :
logging . info ( ' No new videos ' )
else :
logging . info ( ' New videos: ' + ' , ' . join ( vid_ids . keys ( ) ) )
new_content = True
if vids_response := get_api_json ( f ' https://www.googleapis.com/youtube/v3/videos?id= { " , " . join ( vid_ids . keys ( ) ) } &key= { yt_api_key } &part=snippet,contentDetails ' ) :
if ' items ' not in vids_response :
logging . info ( ' Videos Response has no items! ' )
else :
for item in vids_response [ ' items ' ] :
channelTitle = item [ ' snippet ' ] [ ' channelTitle ' ]
title = item [ ' snippet ' ] [ ' title ' ]
duration = shorten_period ( item [ ' contentDetails ' ] [ ' duration ' ] )
vid_ids [ item [ ' id ' ] ] = f ' { channelTitle } - { title } ( { duration } ) '
logging . info ( vid_ids )
if not list_ids :
logging . info ( ' No new playlists ' )
else :
logging . info ( ' New playlists: ' + ' , ' . join ( list_ids . keys ( ) ) )
new_content = True
if lists_response := get_api_json ( f ' https://www.googleapis.com/youtube/v3/playlists?id= { " , " . join ( list_ids . keys ( ) ) } &key= { yt_api_key } &part=snippet,contentDetails ' ) :
if ' items ' not in lists_response :
logging . info ( ' Lists Response has no items! ' )
else :
for item in lists_response [ ' items ' ] :
# logging.info(item)
channelTitle = item [ ' snippet ' ] [ ' channelTitle ' ]
title = item [ ' snippet ' ] [ ' title ' ]
count = item [ ' contentDetails ' ] [ ' itemCount ' ]
list_ids [ item [ ' id ' ] ] = f ' { channelTitle } - { title } ( { count } videos) '
logging . info ( list_ids )
if not new_content :
logging . info ( f ' No new content in { filename } , not writing anything. ' )
return
for i in range ( len ( lines ) - 1 , 1 , - 1 ) :
if lines [ i - 1 ] . startswith ( ' #EXTINF: ' ) :
continue
line = lines [ i ]
if line . startswith ( ' #EXTINF: ' ) :
continue
if match := url_regex . match ( line . strip ( ) ) :
url = match [ 0 ]
vid_id , list_id = get_yt_video_and_list_id ( url )
if vid_id in vid_ids and vid_ids [ vid_id ] :
lines . insert ( i , f ' #EXTINF:123, { vid_ids [ vid_id ] } ' )
if list_id in list_ids and list_ids [ list_id ] :
lines . insert ( i , f ' #EXTINF:123, { list_ids [ list_id ] } ' )
if lines :
with open ( filename , ' w ' ) as f :
f . write ( ' \n ' . join ( lines ) )
if __name__ == ' __main__ ' :
try :
2024-06-20 20:56:18 +09:30
process_playlist ( playlist_filename )
2024-06-20 20:46:34 +09:30
except BaseException as e :
logging . error ( ' Error processing: {} ' . format ( e ) )
try :
logging . info ( ' Reloading playlist on mpv ' )
with socket . socket ( socket . AF_UNIX , socket . SOCK_STREAM ) as sock :
sock . connect ( mpv_socket )
2024-06-20 20:56:18 +09:30
sock . sendall ( f ' playlist-clear; loadlist " { playlist_filename } " append \n ' . encode ( ) )
2024-06-20 21:25:37 +09:30
try :
2024-06-21 00:02:38 +09:30
logging . debug ( ' Searching playlist for current video to see if it should replace a later entry ' )
2024-06-20 21:25:37 +09:30
def get_property ( name : str ) :
sock . sendall ( f ' {{ " command " : [ " get_property " , " { name } " ] }} \n ' . encode ( ) )
2024-06-21 00:02:38 +09:30
response = sock . recv ( 2 * * 30 )
2024-06-20 21:25:37 +09:30
response_json = json . loads ( response )
logging . debug ( response_json )
if response_json [ ' error ' ] == ' success ' :
return response_json [ ' data ' ]
else :
raise ValueError ( f ' Response for get_property { name } was { response_json [ ' error ' ] } ' )
2024-06-21 00:02:38 +09:30
full_playlist = get_property ( ' playlist ' )
current_filename = full_playlist [ 0 ] [ ' filename ' ] # Should check each one for "current":true in other circumstances, but we cleared playlist ahead of time so we know we are on index 0
for i , entry in enumerate ( full_playlist [ 1 : ] , 1 ) :
if entry [ ' filename ' ] == current_filename :
logging . debug ( f ' Found matching filename at playlist index { i } ' )
sock . sendall ( f ' playlist-remove { i } ; playlist-move 0 { i } \n ' . encode ( ) )
break
2024-06-20 21:25:37 +09:30
except BaseException as e :
2024-06-21 00:02:38 +09:30
logging . error ( ' Error searching playlist: {} ' . format ( e ) )
2024-06-20 21:28:57 +09:30
sock . sendall ( b ' show-text $ {playlist} \n ' ) # Shows playlist OSD for visual feedback
2024-06-20 20:46:34 +09:30
logging . info ( ' mpv playlist reloaded! ' )
except BaseException as e :
logging . error ( ' Error reloading playlist: {} ' . format ( e ) )