# Singular values may be decimal (no prefix), or any of the prefixes python accepts normally (0x for hex, 0b for binary, 0o for octal) # Additionally, hexadecimals may be prefixed with '#' or '$', or suffixed with 'h', e.g. 0x10 #10 $10 10h are all parsed as 16 # For nested IDs, the format is ONLY like IP addresses: # decimal with '.' separator, e.g. 16.127.1 # hexadecimal with ':' separator, e.g. 10:7f:1 # Nested IDs do not support prefixes. def try_int(v): try: if v[0] in '#$': # Maybe % too? return int(v[1:], 16) if v[-1] == 'h': return int(v[:-1], 16) return int(v, 0) except: if v == '': return None return v def get_max_number_width(container, fmt: str = 'd') -> int: return len(f'{len(container)-1:{fmt}}') def get_number_zero_pad(container, fmt: str = 'd') -> str: max_digits = len(f'{len(container)-1:{fmt}}') # Could instead call get_max_number_width return f'0{max_digits}{fmt}' def encode_nested_ids(values: list[int], max_digits: list[int] = None, fmt: str = 'd') -> str: delimiter = ':' if fmt in 'Xx' else '.' if max_digits: return delimiter.join([f'{value:0{digits}{fmt}}' for value, digits in zip(values, max_digits)]) else: return delimiter.join([f'{value:{fmt}}' for value in values]) def decode_nested_ids(string: str) -> list[int]: hex = ':' in string delimiter = ':' if hex else '.' return [int(i, 16 if hex else 10) for i in string.split(delimiter)] def flatten_keys(container: dict | list, prefix: str = '') -> dict: output = {} def flatten_item(k: str, v): if isinstance(v, dict) or isinstance(v, list): flat = flatten_keys(v, f'{prefix}{k}.') for k2, v2 in flat.items(): output[k2] = v2 else: output[f'{prefix}{k}'] = v if isinstance(container, list): fmt = get_number_zero_pad(container, 'd') # Zero pad all of the indices to the same decimal string length as the final one for k, v in enumerate(container): flatten_item(f'{k:{fmt}}', v) elif isinstance(container, dict): for k, v in container.items(): flatten_item(k, v) else: raise ValueError(f'flatten_keys is undefined for container type "{container}"') return output def flatten_table(table: list, id_fmt: str = 'x') -> list: if len(table) < 1: return table # Empty if isinstance(table[0], dict): # A simple table return [flatten_keys(d) for d in table] if isinstance(table[0], list): # Nested lists are bad when expanded as columns, so we'll expand flattened_table = [] def flatten_list(data, ids: list[int], id_max_digits: list[int]) -> None: if isinstance(data, list): max_digits = id_max_digits + [get_max_number_width(data, id_fmt)] for id, sub in enumerate(data): flatten_list(sub, ids + [id], max_digits) else: entry = {'ID': encode_nested_ids(ids, id_max_digits, id_fmt)} entry.update(flatten_keys(data)) flattened_table.append(entry) flatten_list(table, [], []) return flattened_table else: raise NotImplementedError(table[0]) def unflatten_keys(d: dict) -> dict: output = {} for k, v in d.items(): keysplit = k.split('.') target_dict = output for prefix in keysplit[:-1]: if prefix not in target_dict: target_dict[prefix] = {} target_dict = target_dict[prefix] target_dict[k] = v return output def unflatten_table(headers: list[str], entries: list): if 'ID' not in headers: return entries # This could be an array of an array of an array of an... id0 = entries[0]['ID'] if isinstance(id0, int) or ('.' not in id0 and ':' not in id0): return entries # Treat this as a nested array table = {tuple(decode_nested_ids(entry['ID'])): entry for entry in entries} output = [] def unflatten_arrays(id_split: tuple[int], cur_array: list, value): i, *remainder = id_split if len(remainder) > 0: while len(cur_array) <= i: # Make sure our array has the index we're about to jump into cur_array.append([]) unflatten_arrays(remainder, cur_array[i], value) else: while len(cur_array) <= i: # Make sure our array has the index we're about to overwrite cur_array.append(None) cur_array[i] = value for id_split in sorted(table.keys()): unflatten_arrays(id_split, output, table[id_split]) return output def dump_tsv(filename, table, id_column=True) -> None: table_flat = flatten_table(table) with open(filename, 'w') as file: headers = list(table_flat[0].keys()) if id_column and 'ID' not in headers: # Some flattened tables build their own ID column! # See how long the hex representation of the last number will be, so we can zero-pad the rest to match. fmt = get_number_zero_pad(table_flat, 'X') file.write('\t'.join(['ID'] + headers) + '\n') for i, entry in enumerate(table_flat): file.write('\t'.join([f'0x{i:{fmt}}'] + [str(entry[key]) for key in headers]) + '\n') else: file.write('\t'.join(headers) + '\n') for i, entry in enumerate(table_flat): file.write('\t'.join([str(entry[key]) for key in headers]) + '\n') def load_tsv(filename: str, unflatten: bool = True) -> list: with open(filename, 'r') as file: lines = file.read().rstrip().split('\n') if len(lines) < 2: return [] headers = lines[0].split('\t') if not unflatten: return [{key: try_int(value) for key, value in zip(headers, line.split('\t'))} for line in lines[1:]] # Simple line-by-line unflatten entries = [] for line in lines[1:]: entry = {key: try_int(value) for key, value in zip(headers, line.split('\t'))} entries.append(unflatten_keys(entry)) return unflatten_table(headers, entries) def deep_update(old_container: list|dict, new_container: list|dict, ignore_holes: bool = True): # Recurse through old_container, overwriting all values changed by new_container. # Values of None in new_container will be treated as holes. if isinstance(old_container, list): for i, new in enumerate(new_container): if ignore_holes and new is None: # Allow holes in the table for values we don't care about overwriting continue if isinstance(new, dict): id = new.get('ID') if not isinstance(id, int): id = i deep_update(old_container[id], new) elif isinstance(new, list): deep_update(old_container[i], new) else: old_container[i] = value elif isinstance(old_container, dict): for key, value in new_container.items(): if key == 'ID': continue if ignore_holes and value is None: # Allow holes in the table for values we don't care about overwriting continue if isinstance(value, dict) or isinstance(value, list): deep_update(old_container[key], value) else: old_container[key] = value else: raise ValueError