Source code for dbdicom.dbd

import os
import shutil
import json
from typing import Union
import zipfile
import re
from copy import deepcopy

from tqdm import tqdm
import numpy as np
import vreg
from pydicom.dataset import Dataset
import pydicom

import dbdicom.utils.arrays
import dbdicom.dataset as dbdataset
import dbdicom.database as dbdatabase
import dbdicom.register as register
import dbdicom.const as const
from dbdicom.utils.pydicom_dataset import (
    get_values, 
    set_values,
    set_value,
    )



[docs] class DataBaseDicom(): """Class to read and write a DICOM folder. Args: path (str): path to the DICOM folder. """ def __init__(self, path): if not os.path.exists(path): os.makedirs(path) self.path = path file = self._register_file() if os.path.exists(file): try: with open(file, 'r') as f: self.register = json.load(f) # remove the json file after reading it. If the database # is not properly closed this will prevent that changes # have been made which are not reflected in the json # file on disk # os.remove(file) except Exception as e: # raise ValueError( # f'Cannot open {file}. Please close any programs that are ' # f'using it and try again. Alternatively you can delete the file ' # f'manually and try again.' # ) # If the file can't be read, delete it and load again os.remove(file) self.read() else: self.read()
[docs] def read(self): """Read the DICOM folder again """ self.register = dbdatabase.read(self.path) # For now ensure all series have just a single CIOD # Leaving this out for now until the issue occurs again. # self._split_series() return self
[docs] def delete(self, entity): """Delete a DICOM entity from the database Args: entity (list): entity to delete """ # delete datasets on disk removed = register.index(self.register, entity) for index in removed: file = os.path.join(self.path, index) if os.path.exists(file): os.remove(file) # drop the entity from the register register.remove(self.register, entity) # cleanup empty folders remove_empty_folders(entity[0]) return self
[docs] def close(self): """Close the DICOM folder This also saves changes in the header file to disk. """ file = self._register_file() with open(file, 'w') as f: json.dump(self.register, f, indent=4) return self
def _register_file(self): return os.path.join(self.path, 'dbtree.json')
[docs] def summary(self): """Return a summary of the contents of the database. Returns: dict: Nested dictionary with summary information on the database. """ return register.summary(self.register)
[docs] def print(self): """Print the contents of the DICOM folder """ register.print_tree(self.register) return self
[docs] def patients(self, name=None, contains=None, isin=None): """Return a list of patients in the DICOM folder. Args: name (str, optional): value of PatientName, to search for individuals with a given name. Defaults to None. contains (str, optional): substring of PatientName, to search for individuals based on part of their name. Defaults to None. isin (list, optional): List of PatientName values, to search for patients whose name is in the list. Defaults to None. Returns: list: list of patients fulfilling the criteria. """ return register.patients(self.register, self.path, name, contains, isin)
[docs] def studies(self, entity=None, desc=None, contains=None, isin=None): """Return a list of studies in the DICOM folder. Args: entity (str or list): path to a DICOM folder (to search in the whole folder), or a two-element list identifying a patient (to search studies of a given patient). desc (str, optional): value of StudyDescription, to search for studies with a given description. Defaults to None. contains (str, optional): substring of StudyDescription, to search for studies based on part of their description. Defaults to None. isin (list, optional): List of StudyDescription values, to search for studies whose description is in a list. Defaults to None. Returns: list: list of studies fulfilling the criteria. """ if entity == None: entity = self.path if isinstance(entity, str): studies = [] for patient in self.patients(): studies += self.studies(patient, desc, contains, isin) return studies elif len(entity)==1: studies = [] for patient in self.patients(): studies += self.studies(patient, desc, contains, isin) return studies else: return register.studies(self.register, entity, desc, contains, isin)
[docs] def series(self, entity=None, desc=None, contains=None, isin=None): """Return a list of series in the DICOM folder. Args: entity (str or list): path to a DICOM folder (to search in the whole folder), or a list identifying a patient or a study (to search series of a given patient or study). desc (str, optional): value of SeriesDescription, to search for series with a given description. Defaults to None. contains (str, optional): substring of SeriesDescription, to search for series based on part of their description. Defaults to None. isin (list, optional): List of SeriesDescription values, to search for series whose description is in a list. Defaults to None. Returns: list: list of series fulfilling the criteria. """ if entity == None: entity = self.path if isinstance(entity, str): series = [] for study in self.studies(entity): series += self.series(study, desc, contains, isin) return series elif len(entity)==1: series = [] for study in self.studies(entity): series += self.series(study, desc, contains, isin) return series elif len(entity)==2: series = [] for study in self.studies(entity): series += self.series(study, desc, contains, isin) return series else: # path = None (all series) or path = patient (all series in patient) return register.series(self.register, entity, desc, contains, isin)
[docs] def volume(self, entity:Union[list, str], dims:list=None, verbose=1) -> vreg.Volume3D: """Read volume. Args: entity (list, str): DICOM series to read dims (list, optional): Non-spatial dimensions of the volume. Defaults to None. verbose (bool, optional): If set to 1, shows progress bar. Defaults to 1. Returns: vreg.Volume3D: """ # if isinstance(entity, str): # path to folder # return [self.volume(s, dims) for s in self.series(entity)] # if len(entity) < 4: # folder, patient or study # return [self.volume(s, dims) for s in self.series(entity)] if dims is None: dims = [] elif isinstance(dims, str): dims = [dims] else: dims = list(dims) dims = ['SliceLocation'] + dims # Read dicom files values = [[] for _ in dims] volumes = [] files = register.files(self.register, entity) for f in tqdm(files, desc='Reading volume..', disable=(verbose==0)): ds = pydicom.dcmread(f) values_f = get_values(ds, dims) for d in range(len(dims)): values[d].append(values_f[d]) volumes.append(dbdataset.volume(ds)) # Format coordinates as mesh coords = [np.array(v) for v in values] coords, inds = dbdicom.utils.arrays.meshvals(coords) # Check that all slices have the same coordinates if len(dims) > 1: # Loop over all coordinates after slice location for c in coords[1:]: # Loop over all slice locations for k in range(1, c.shape[0]): # Coordinate c of slice k if not np.array_equal(c[k,...], c[0,...]): raise ValueError( "Cannot build a single volume. Not all slices " "have the same coordinates." ) # Build volumes vols = np.array(volumes) vols = vols[inds].reshape(coords[0].shape) # Infer spacing between slices from slice locations # Technically only necessary if SpacingBetweenSlices not set or incorrect vols = infer_slice_spacing(vols) # Join 2D volumes into 3D volumes try: vol = vreg.join(vols) except ValueError: # some vendors define the slice vector as -cross product # of row and column vector. Check if that solves the issue. for v in vols.reshape(-1): v.affine[:3,2] = -v.affine[:3,2] # Then try again vol = vreg.join(vols) if vol.ndim > 3: # Coordinates of slice 0 c0 = [c[0,...] for c in coords[1:]] vol.set_coords(c0) vol.set_dims(dims[1:]) return vol
[docs] def values(self, series:list, *attr, dims:list=None, verbose=1) -> Union[dict, tuple]: """Read the values of some attributes from a DICOM series Args: series (list): DICOM series to read. attr (tuple, optional): DICOM attributes to read. dims (list, optional): Non-spatial dimensions of the volume. Defaults to None. verbose (bool, optional): If set to 1, shows progress bar. Defaults to 1. Returns: tuple: arrays with values for the attributes. """ # if isinstance(series, str): # path to folder # return [self.values(s, attr, dims) for s in self.series(series)] # if len(series) < 4: # folder, patient or study # return [self.values(s, attr, dims) for s in self.series(series)] if dims is None: dims = ['InstanceNumber'] elif np.isscalar(dims): dims = [dims] else: dims = list(dims) # Read dicom files coord_values = [[] for _ in dims] attr_values = [[] for _ in attr] files = register.files(self.register, series) for f in tqdm(files, desc='Reading values..', disable=(verbose==0)): ds = pydicom.dcmread(f) coord_values_f = get_values(ds, dims) for d in range(len(dims)): coord_values[d].append(coord_values_f[d]) attr_values_f = get_values(ds, attr) for a in range(len(attr)): attr_values[a].append(attr_values_f[a]) # Format coordinates as mesh coords = [np.array(v) for v in coord_values] coords, inds = dbdicom.utils.arrays.meshvals(coords) # Sort values accordingly values = [np.array(v) for v in attr_values] values = [v[inds].reshape(coords[0].shape) for v in values] # Return values if len(values) == 1: return values[0] else: return tuple(values)
[docs] def write_volume( self, vol:Union[vreg.Volume3D, tuple], series:list, ref:list=None, ): """Write a vreg.Volume3D to a DICOM series Args: vol (vreg.Volume3D): Volume to write to the series. series (list): DICOM series to read ref (list): Reference series """ series_full_name = full_name(series) if series_full_name in self.series(): raise ValueError(f"Series {series_full_name[-1]} already exists in study {series_full_name[-2]}.") if isinstance(vol, tuple): vol = vreg.volume(vol[0], vol[1]) if ref is None: ds = dbdataset.new_dataset('MRImage') #ds = dbdataset.new_dataset('ParametricMap') else: if ref[0] == series[0]: ref_mgr = self else: ref_mgr = DataBaseDicom(ref[0]) files = register.files(ref_mgr.register, ref) ref_mgr.close() ds = pydicom.dcmread(files[0]) # Get the attributes of the destination series attr = self._series_attributes(series) n = self._max_instance_number(attr['SeriesInstanceUID']) if vol.ndim==3: slices = vol.split() for i, sl in tqdm(enumerate(slices), desc='Writing volume..'): dbdataset.set_volume(ds, sl) self._write_dataset(ds, attr, n + 1 + i) else: i=0 vols = vol.separate().reshape(-1) for vt in tqdm(vols, desc='Writing volume..'): slices = vt.split() for sl in slices: dbdataset.set_volume(ds, sl) sl_coords = [c.ravel()[0] for c in sl.coords] set_value(ds, sl.dims, sl_coords) self._write_dataset(ds, attr, n + 1 + i) i+=1 return self
[docs] def edit( self, series:list, new_values:dict, dims:list=None, verbose=1, ): """Edit attribute values in a new DICOM series Args: series (list): DICOM series to edit new_values (dict): dictionary with attribute: value pairs to write to the series dims (list, optional): Non-spatial dimensions of the volume. Defaults to None. verbose (bool, optional): If set to 1, shows progress bar. Defaults to 1. """ if dims is None: dims = ['InstanceNumber'] elif np.isscalar(dims): dims = [dims] else: dims = list(dims) # Check that all values have the correct nr of elements files = register.files(self.register, series) for a in new_values.values(): if np.isscalar(a): pass elif np.array(a).size != len(files): raise ValueError( f"Incorrect value lengths. All values need to have {len(files)} elements" ) # Read dicom files to sort them coord_values = [[] for _ in dims] for f in tqdm(files, desc='Sorting series..', disable=(verbose==0)): ds = pydicom.dcmread(f) coord_values_f = get_values(ds, dims) for d in range(len(dims)): coord_values[d].append(coord_values_f[d]) # Format coordinates as mesh coords = [np.array(v) for v in coord_values] coords, inds = dbdicom.utils.arrays.meshvals(coords) # Sort files accordingly files = np.array(files)[inds] # Now edit and write the files attr = self._series_attributes(series) n = self._max_instance_number(attr['SeriesInstanceUID']) # Drop existing attributes if they are edited attr = {a:attr[a] for a in attr if a not in new_values} # List instances to be edited to_drop = register.index(self.register, series) # Write the instances tags = list(new_values.keys()) for i, f in tqdm(enumerate(files), desc='Writing values..', disable=(verbose==0)): ds = pydicom.dcmread(f) values = [] for a in new_values.values(): if np.isscalar(a): values.append(a) else: values.append(np.array(a).reshape(-1)[i]) set_values(ds, tags, values) self._write_dataset(ds, attr, n + 1 + i) # Delete the originals files register.drop(self.register, to_drop) [os.remove(os.path.join(self.path, idx)) for idx in to_drop] return self
[docs] def to_nifti(self, series:list, file:str, dims=None, verbose=1): """Save a DICOM series in nifti format. Args: series (list): DICOM series to read file (str): file path of the nifti file. dims (list, optional): Non-spatial dimensions of the volume. Defaults to None. verbose (bool, optional): If set to 1, shows progress bar. Defaults to 1. """ vol = self.volume(series, dims, verbose) vreg.write_nifti(vol, file) return self
[docs] def from_nifti(self, file:str, series:list, ref:list=None): """Create a DICOM series from a nifti file. Args: file (str): file path of the nifti file. series (list): DICOM series to create ref (list): DICOM series to use as template. """ vol = vreg.read_nifti(file) self.write_volume(vol, series, ref) return self
[docs] def pixel_data(self, series:list, dims:list=None, coords=False, attr=None) -> np.ndarray: """Read the pixel data from a DICOM series Args: series (list or str): DICOM series to read. This can also be a path to a folder containing DICOM files, or a patient or study to read all series in that patient or study. In those cases a list is returned. dims (list, optional): Dimensions of the array. coords (bool): If set to True, the coordinates of the arrays are returned alongside the pixel data attr (list, optional): list of DICOM attributes that are read on the fly to avoid reading the data twice. Returns: numpy.ndarray or tuple: numpy array with pixel values, with at least 3 dimensions (x,y,z). If coords is set these are returned too as an array with coordinates of the slices according to dims. If include is provided the values are returned as a dictionary in the last return value. """ if isinstance(series, str): # path to folder return [self.pixel_data(s, dims, coords, attr) for s in self.series(series)] if len(series) < 4: # folder, patient or study return [self.pixel_data(s, dims, coords, attr) for s in self.series(series)] if dims is None: dims = ['InstanceNumber'] elif np.isscalar(dims): dims = [dims] else: dims = list(dims) # Ensure return_vals is a list if attr is None: params = [] elif np.isscalar(attr): params = [attr] else: params = list(attr) files = register.files(self.register, series) # Read dicom files coords_array = [] arrays = np.empty(len(files), dtype=dict) if attr is not None: values = np.empty(len(files), dtype=dict) for i, f in tqdm(enumerate(files), desc='Reading pixel data..'): ds = pydicom.dcmread(f) coords_array.append(get_values(ds, dims)) # save as dict so numpy does not stack as arrays arrays[i] = {'pixel_data': dbdataset.pixel_data(ds)} if attr is not None: values[i] = {'values': get_values(ds, params)} # Format as mesh coords_array = np.stack([v for v in coords_array], axis=-1) coords_array, inds = dbdicom.utils.arrays.meshvals(coords_array) arrays = arrays[inds].reshape(coords_array.shape[1:]) arrays = np.stack([a['pixel_data'] for a in arrays.reshape(-1)], axis=-1) arrays = arrays.reshape(arrays.shape[:2] + coords_array.shape[1:]) if attr is None: if coords: return arrays, coords_array else: return arrays # Return values as a dictionary values = values[inds].reshape(-1) values_dict = {} for p in range(len(params)): # Get the type from the first value vp0 = values[0]['values'][p] # Build an array of the right type vp = np.zeros(values.size, dtype=type(vp0)) # Populate the array with values for parameter p for i, v in enumerate(values): vp[i] = v['values'][p] # Reshape values for parameter p vp = vp.reshape(coords_array.shape[1:]) # Eneter in the dictionary values_dict[params[p]] = vp # If only one, return as value if len(params) == 1: values_return = values_dict[attr[0]] else: values_return = values_dict # problem if the values are a list. Needs an array with a prespeficied dtype # values = values[inds].reshape(coords_array.shape[1:]) # values = np.stack([a['values'] for a in values.reshape(-1)], axis=-1) # values = values.reshape((len(params), ) + coords_array.shape[1:]) if coords: return arrays, coords_array, values_return else: return arrays, values_return
[docs] def files(self, entity:list) -> list: """Read the files in a DICOM entity Args: entity (list or str): DICOM entity to read. This can be a path to a folder containing DICOM files, or a patient or study to read all series in that patient or study. Returns: list: list of valid dicom files. """ if isinstance(entity, str): # path to folder files = [] for s in self.series(entity): files += self.files(s) return files if len(entity) < 4: # folder, patient or study files = [] for s in self.series(entity): files += self.files(s) return files return register.files(self.register, entity)
[docs] def unique(self, pars:list, entity:list) -> dict: """Return a list of unique values for a DICOM entity Args: pars (list, str/tuple): attribute or attributes to return. entity (list): DICOM entity to search (Patient, Study or Series) Returns: dict: if a pars is a list, this returns a dictionary with unique values for each attribute. If pars is a scalar this returnes a list of values. """ if not isinstance(pars, list): single=True pars = [pars] else: single=False v = self._values(pars, entity) # Return a list with unique values for each attribute values = [] for a in range(v.shape[1]): va = v[:,a] # Remove None values va = va[[x is not None for x in va]] va = list(va) # Get unique values and sort va = [x for i, x in enumerate(va) if i==va.index(x)] try: va.sort() except: pass values.append(va) if single: return values[0] else: return {p: values[i] for i, p in enumerate(pars)}
[docs] def copy(self, from_entity, to_entity=None): """Copy a DICOM entity (patient, study or series) Args: from_entity (list): entity to copy to_entity (list, optional): entity after copying. If this is not provided, a copy will be made in the same study and returned Returns: entity: the copied entity. If th to_entity is provided, this is returned. """ if len(from_entity) == 4: if to_entity is None: to_entity = deepcopy(from_entity) if isinstance(to_entity[-1], tuple): to_entity[-1] = (to_entity[-1][0] + '_copy', 0) else: to_entity[-1] = (to_entity[-1] + '_copy', 0) while to_entity in self.series(): to_entity[-1][1] += 1 if len(to_entity) != 4: raise ValueError( f"Cannot copy series {from_entity} to series {to_entity}. " f"{to_entity} is not a series (needs 4 elements)." ) self._copy_series(from_entity, to_entity) return to_entity if len(from_entity) == 3: if to_entity is None: to_entity = deepcopy(from_entity) if isinstance(to_entity[-1], tuple): to_entity[-1] = (to_entity[-1][0] + '_copy', 0) else: to_entity[-1] = (to_entity[-1] + '_copy', 0) while to_entity in self.studies(): to_entity[-1][1] += 1 if len(to_entity) != 3: raise ValueError( f"Cannot copy study {from_entity} to study {to_entity}. " f"{to_entity} is not a study (needs 3 elements)." ) self._copy_study(from_entity, to_entity) return to_entity if len(from_entity) == 2: if to_entity is None: to_entity = deepcopy(from_entity) to_entity[-1] += '_copy' while to_entity in self.patients(): to_entity[-1] += '_copy' if len(to_entity) != 2: raise ValueError( f"Cannot copy patient {from_entity} to patient {to_entity}. " f"{to_entity} is not a patient (needs 2 elements)." ) self._copy_patient(from_entity, to_entity) return to_entity raise ValueError( f"Cannot copy {from_entity} to {to_entity}. " )
[docs] def move(self, from_entity, to_entity): """Move a DICOM entity Args: entity (list): entity to move """ self.copy(from_entity, to_entity) self.delete(from_entity) return self
[docs] def split_series(self, series:list, attr:Union[str, tuple], key=None) -> list: """ Split a series into multiple series Args: series (list): series to split. attr (str or tuple): dicom attribute to split the series by. key (function): split by by key(attr) Returns: list: list of two-element tuples, where the first element is is the value and the second element is the series corresponding to that value. """ # Find all values of the attr and list files per value all_files = register.files(self.register, series) files = [] values = [] for f in tqdm(all_files, desc=f'Reading {attr}'): ds = pydicom.dcmread(f) v = get_values(ds, attr) if key is not None: v = key(v) if v in values: index = values.index(v) files[index].append(f) else: values.append(v) files.append([f]) # Copy the files for each value (sorted) to new series split_series = [] for index, v in tqdm(enumerate(values), desc='Writing new series'): series_desc = series[-1] if isinstance(series, str) else series[-1][0] series_desc = clean_folder_name(f'{series_desc}_{attr}_{v}') series_v = series[:3] + [(series_desc, 0)] self._files_to_series(files[index], series_v) split_series.append((v, series_v)) return split_series
def _values(self, attributes:list, entity:list): # Create a np array v with values for each instance and attribute # if set(attributes) <= set(dbdatabase.COLUMNS): # index = register.index(self.register, entity) # v = self.register.loc[index, attributes].values # else: files = register.files(self.register, entity) v = np.empty((len(files), len(attributes)), dtype=object) for i, f in enumerate(files): ds = pydicom.dcmread(f) v[i,:] = get_values(ds, attributes) return v def _copy_patient(self, from_patient, to_patient): from_patient_studies = register.studies(self.register, from_patient) for from_study in tqdm(from_patient_studies, desc=f'Copying patient {from_patient[1:]}'): # Count the studies with the same description in the target patient study_desc = from_study[-1][0] if to_patient[0]==from_patient[0]: cnt = len(self.studies(to_patient, desc=study_desc)) else: mgr = DataBaseDicom(to_patient[0]) cnt = len(mgr.studies(to_patient, desc=study_desc)) mgr.close() # Ensure the copied studies end up in a separate study with the same description to_study = to_patient + [(study_desc, cnt)] self._copy_study(from_study, to_study) def _copy_study(self, from_study, to_study): from_study_series = register.series(self.register, from_study) for from_series in tqdm(from_study_series, desc=f'Copying study {from_study[1:]}'): # Count the series with the same description in the target study series_desc = from_series[-1][0] if to_study[0]==from_study[0]: cnt = len(self.series(to_study, desc=series_desc)) else: mgr = DataBaseDicom(to_study[0]) cnt = len(mgr.series(to_study, desc=series_desc)) mgr.close() # Ensure the copied series end up in a separate series with the same description to_series = to_study + [(series_desc, cnt)] self._copy_series(from_series, to_series) def _copy_series(self, from_series, to_series): # Get the files to be exported from_series_files = register.files(self.register, from_series) if to_series[0] == from_series[0]: # Copy in the same database self._files_to_series(from_series_files, to_series) else: # Copy to another database mgr = DataBaseDicom(to_series[0]) mgr._files_to_series(from_series_files, to_series) mgr.close() def _files_to_series(self, files, to_series): # Get the attributes of the destination series attr = self._series_attributes(to_series) n = self._max_instance_number(attr['SeriesInstanceUID']) # Copy the files to the new series for i, f in tqdm(enumerate(files), total=len(files), desc=f'Copying series {to_series[1:]}'): # Read dataset and assign new properties ds = pydicom.dcmread(f) self._write_dataset(ds, attr, n + 1 + i) def _max_study_id(self, patient_id): for pt in self.register: if pt['PatientID'] == patient_id: # Find the largest integer StudyID n = [] for st in pt['studies']: try: n.append(int(st['StudyID'])) except: pass if n == []: return 0 else: return int(np.amax(n)) return 0 def _max_series_number(self, study_uid): for pt in self.register: for st in pt['studies']: if st['StudyInstanceUID'] == study_uid: n = [sr['SeriesNumber'] for sr in st['series']] return int(np.amax(n)) return 0 def _max_instance_number(self, series_uid): for pt in self.register: for st in pt['studies']: for sr in st['series']: if sr['SeriesInstanceUID'] == series_uid: n = list(sr['instances'].keys()) return int(np.amax([int(i) for i in n])) return 0 # def _attributes(self, entity): # if len(entity)==4: # return self._series_attributes(entity) # if len(entity)==3: # return self._study_attributes(entity) # if len(entity)==2: # return self._patient_attributes(entity) def _patient_attributes(self, patient): try: # If the patient exists and has files, read from file files = register.files(self.register, patient) attr = const.PATIENT_MODULE ds = pydicom.dcmread(files[0]) vals = get_values(ds, attr) except: # If the patient does not exist, generate values if patient in self.patients(): raise ValueError( f"Cannot create patient with id {patient[1]}." f"The ID is already taken. Please provide a unique ID." ) attr = ['PatientID', 'PatientName'] vals = [patient[1], 'Anonymous'] return {attr[i]:vals[i] for i in range(len(attr)) if vals[i] is not None} def _study_attributes(self, study): patient_attr = self._patient_attributes(study[:2]) try: # If the study exists and has files, read from file files = register.files(self.register, study) attr = const.STUDY_MODULE ds = pydicom.dcmread(files[0]) vals = get_values(ds, attr) except register.AmbiguousError as e: raise register.AmbiguousError(e) except: # If the study does not exist or is empty, generate values if study[:-1] not in self.patients(): study_id = 1 else: study_id = 1 + self._max_study_id(study[1]) attr = ['StudyInstanceUID', 'StudyDescription', 'StudyID'] study_uid = pydicom.uid.generate_uid() study_desc = study[-1] if isinstance(study[-1], str) else study[-1][0] #study_date = datetime.today().strftime('%Y%m%d') vals = [study_uid, study_desc, str(study_id)] return patient_attr | {attr[i]:vals[i] for i in range(len(attr)) if vals[i] is not None} def _series_attributes(self, series): study_attr = self._study_attributes(series[:3]) try: # If the series exists and has files, read from file files = register.files(self.register, series) attr = const.SERIES_MODULE ds = pydicom.dcmread(files[0]) vals = get_values(ds, attr) except register.AmbiguousError as e: raise register.AmbiguousError(e) except: # If the series does not exist or is empty, generate values try: study_uid = register.study_uid(self.register, series[:-1]) except: series_number = 1 else: series_number = 1 + self._max_series_number(study_uid) attr = ['SeriesInstanceUID', 'SeriesDescription', 'SeriesNumber'] series_uid = pydicom.uid.generate_uid() series_desc = series[-1] if isinstance(series[-1], str) else series[-1][0] vals = [series_uid, series_desc, int(series_number)] return study_attr | {attr[i]:vals[i] for i in range(len(attr)) if vals[i] is not None} def _write_dataset(self, ds:Dataset, attr:dict, instance_nr:int): # Set new attributes attr['SOPInstanceUID'] = pydicom.uid.generate_uid() attr['InstanceNumber'] = str(instance_nr) set_values(ds, list(attr.keys()), list(attr.values())) # Save results in a new file rel_dir = os.path.join( f"Patient__{attr['PatientID']}", f"Study__{attr['StudyID']}__{attr['StudyDescription']}", f"Series__{attr['SeriesNumber']}__{attr['SeriesDescription']}", ) os.makedirs(os.path.join(self.path, rel_dir), exist_ok=True) rel_path = os.path.join(rel_dir, pydicom.uid.generate_uid() + '.dcm') dbdataset.write(ds, os.path.join(self.path, rel_path)) # Add an entry in the register register.add_instance(self.register, attr, rel_path) def archive(self, archive_path): # TODO add flat=True option for zipping at patient level for pt in tqdm(self.register, desc='Archiving '): for st in pt['studies']: zip_dir = os.path.join( archive_path, f"Patient__{pt['PatientID']}", f"Study__{st['StudyID']}__{st['StudyDescription']}", ) os.makedirs(zip_dir, exist_ok=True) for sr in st['series']: zip_file = os.path.join( zip_dir, f"Series__{sr['SeriesNumber']}__{sr['SeriesDescription']}.zip", ) if os.path.exists(zip_file): continue try: with zipfile.ZipFile(zip_file, 'w') as zipf: for rel_path in sr['instances'].values(): file = os.path.join(self.path, rel_path) zipf.write(file, arcname=os.path.basename(file)) except Exception as e: raise RuntimeError( f"Error extracting series {sr['SeriesDescription']} " f"in study {st['StudyDescription']} of patient {pt['PatientID']}." )
def full_name(entity): if len(entity)==3: # study if isinstance(entity[-1], tuple): return entity else: full_name_study = deepcopy(entity) full_name_study[-1] = (full_name_study[-1], 0) return full_name_study elif len(entity)==4: # series full_name_study = full_name(entity[:3]) series = full_name_study + [entity[-1]] if isinstance(series[-1], tuple): return series else: full_name_series = deepcopy(series) full_name_series[-1] = (full_name_series[-1], 0) return full_name_series else: return entity def clean_folder_name(name, replacement="", max_length=255): # Strip leading/trailing whitespace name = name.strip() # Replace invalid characters (Windows, macOS, Linux-safe) illegal_chars = r'[<>:"/\\|?*\[\]\x00-\x1F\x7F]' name = re.sub(illegal_chars, replacement, name) # Replace reserved Windows names reserved = { "CON", "PRN", "AUX", "NUL", *(f"COM{i}" for i in range(1, 10)), *(f"LPT{i}" for i in range(1, 10)) } name_upper = name.upper().split(".")[0] # Just base name if name_upper in reserved: name = f"{name}_folder" # Truncate to max length (common max: 255 bytes) return name[:max_length] or "folder" def remove_empty_folders(path): """ Removes all empty subfolders from a given directory. This function walks through the directory tree from the bottom up. This is crucial because it allows child directories to be removed before their parents, potentially making the parent directory empty and eligible for removal in the same pass. Args: path (str): The absolute or relative path to the directory to scan. """ # Walk the directory tree in a bottom-up manner (topdown=False) for dirpath, dirnames, filenames in os.walk(path, topdown=False): # A directory is considered empty if it has no subdirectories and no files if not dirnames and not filenames: try: shutil.rmtree(dirpath) except OSError as e: # This might happen due to permissions issues print(f"Error removing {dirpath}: {e}") def infer_slice_spacing(vols): # In case spacing between slices is not (correctly) encoded in # DICOM it can be inferred from the slice locations. shape = vols.shape vols = vols.reshape((shape[0], -1)) slice_spacing = np.zeros(vols.shape[-1]) for d in range(vols.shape[-1]): # For single slice volumes there is nothing to do if vols[:,d].shape[0]==1: continue # Get a normal slice vector from the first volume. mat = vols[0,d].affine[:3,:3] normal = mat[:,2]/np.linalg.norm(mat[:,2]) # Get slice locations by projection on the normal. pos = [v.affine[:3,3] for v in vols[:,d]] slice_loc = [np.dot(p, normal) for p in pos] # Sort slice locations and take consecutive differences. slice_loc = np.sort(slice_loc) distances = slice_loc[1:] - slice_loc[:-1] # Round to 10 micrometer and check if unique distances = np.around(distances, 2) slice_spacing_d = np.unique(distances) # Check if unique - otherwise this is not a volume if len(slice_spacing_d) > 1: raise ValueError( 'Cannot build a volume - spacings between slices are not unique.' ) else: slice_spacing_d= slice_spacing_d[0] # Set correct slice spacing in all volumes for v in vols[:,d]: v.affine[:3,2] = normal * abs(slice_spacing_d) slice_spacing[d] = slice_spacing_d # Check slice_spacing is the same across dimensions slice_spacing = np.unique(slice_spacing) if len(slice_spacing) > 1: raise ValueError( 'Cannot build a volume - spacings between slices are not unique.' ) return vols.reshape(shape)