From de795f6136869e134f71aedfb3cb59afd06b747b Mon Sep 17 00:00:00 2001 From: Joshua Shields <54691495+jvshields@users.noreply.github.com> Date: Thu, 2 Nov 2023 15:00:22 -0400 Subject: [PATCH] Add vald to atom data (#380) * Add VALD data parser * changename and fix ion_charge * clean up unusued params * add vald tests * add vald to output, as well as helper functions to create atomic line list * fix test by not stripping molecules * change for loop to list comprehension * improve documetation * further improve docstring coverage * further small docstring improvements * apply sourcery * add js and Es to linelist * change vald_linelist to generic linelist * add test for stripping moleculess * add test for linelists * handle case of linelist requested when not stripping molecules * test linelist when not stripping molecules --------- Co-authored-by: Aarya Chaumal --- carsus/io/output/base.py | 976 +++++++++++++++++++++-------------- carsus/io/tests/test_vald.py | 71 ++- carsus/io/vald/vald.py | 125 ++++- 3 files changed, 771 insertions(+), 401 deletions(-) diff --git a/carsus/io/output/base.py b/carsus/io/output/base.py index 1d844d63f..5f82a918a 100644 --- a/carsus/io/output/base.py +++ b/carsus/io/output/base.py @@ -9,9 +9,12 @@ from scipy import interpolate from carsus.model import MEDIUM_AIR, MEDIUM_VACUUM -from carsus.util import (convert_atomic_number2symbol, - convert_wavelength_air2vacuum, hash_pandas_object, - serialize_pandas_object) +from carsus.util import ( + convert_atomic_number2symbol, + convert_wavelength_air2vacuum, + hash_pandas_object, + serialize_pandas_object, +) # Wavelengths above this value are given in air GFALL_AIR_THRESHOLD = 2000 * u.AA @@ -40,24 +43,33 @@ class TARDISAtomData: """ - def __init__(self, - atomic_weights, - ionization_energies, - gfall_reader, - zeta_data, - chianti_reader=None, - cmfgen_reader=None, - nndc_reader=None, - levels_lines_param={"levels_metastable_loggf_threshold": -3, - "lines_loggf_threshold": -3}, - collisions_param={"temperatures": np.arange(2000, 50000, 2000)} - ): - + def __init__( + self, + atomic_weights, + ionization_energies, + gfall_reader, + zeta_data, + chianti_reader=None, + cmfgen_reader=None, + nndc_reader=None, + vald_reader=None, + levels_lines_param={ + "levels_metastable_loggf_threshold": -3, + "lines_loggf_threshold": -3, + }, + collisions_param={"temperatures": np.arange(2000, 50000, 2000)}, + ): self.atomic_weights = atomic_weights - if (cmfgen_reader is not None) and hasattr(cmfgen_reader, 'ionization_energies'): + if (cmfgen_reader is not None) and hasattr( + cmfgen_reader, "ionization_energies" + ): combined_ionization_energies = copy.deepcopy(ionization_energies) - combined_ionization_energies.base = cmfgen_reader.ionization_energies.combine_first(ionization_energies.base) + combined_ionization_energies.base = ( + cmfgen_reader.ionization_energies.combine_first( + ionization_energies.base + ) + ) self.ionization_energies = combined_ionization_energies else: self.ionization_energies = ionization_energies @@ -67,6 +79,7 @@ def __init__(self, self.chianti_reader = chianti_reader self.cmfgen_reader = cmfgen_reader self.nndc_reader = nndc_reader + self.vald_reader = vald_reader self.levels_lines_param = levels_lines_param self.collisions_param = collisions_param @@ -87,7 +100,12 @@ def __init__(self, if hasattr(cmfgen_reader, "collisions"): collisions = cmfgen_reader.collisions.copy() collisions.index = collisions.index.rename( - ['atomic_number', 'ion_number', 'level_number_lower', 'level_number_upper'] + [ + "atomic_number", + "ion_number", + "level_number_lower", + "level_number_upper", + ] ) self.collisions = collisions @@ -95,24 +113,26 @@ def __init__(self, elif hasattr(chianti_reader, "collisions"): self.collisions = self.create_collisions(**collisions_param) - self.collisions_metadata = pd.Series({ - "temperatures": collisions_param["temperatures"], - "dataset": ["chianti"], - "info": None - }) + self.collisions_metadata = pd.Series( + { + "temperatures": collisions_param["temperatures"], + "dataset": ["chianti"], + "info": None, + } + ) else: logger.warning("No source of collisions was selected.") - if (cmfgen_reader is not None) and hasattr(cmfgen_reader, 'cross_sections'): + if (cmfgen_reader is not None) and hasattr(cmfgen_reader, "cross_sections"): self.cross_sections = self.create_cross_sections() - logger.info('Finished.') + logger.info("Finished.") @staticmethod def solve_priorities(levels): - """ - Returns a list of unique species per data source. - + """ + Returns a list of unique species per data source. + Notes ----- @@ -126,54 +146,53 @@ def solve_priorities(levels): 5 : CMFGEN """ - levels = levels.set_index(['atomic_number', 'ion_number']) + levels = levels.set_index(["atomic_number", "ion_number"]) levels = levels.sort_index() # To supress warnings lvl_list = [] for ion in levels.index.unique(): - max_priority = levels.loc[ion, 'priority'].max() - lvl = levels.loc[ion][ levels.loc[ion, 'priority'] == max_priority ] + max_priority = levels.loc[ion, "priority"].max() + lvl = levels.loc[ion][levels.loc[ion, "priority"] == max_priority] lvl_list.append(lvl) levels_uq = pd.concat(lvl_list, sort=True) - gfall_ions = levels_uq[ levels_uq['ds_id'] == 2 ].index.unique() - chianti_ions = levels_uq[ levels_uq['ds_id'] == 4 ].index.unique() - cmfgen_ions = levels_uq[ levels_uq['ds_id'] == 5 ].index.unique() + gfall_ions = levels_uq[levels_uq["ds_id"] == 2].index.unique() + chianti_ions = levels_uq[levels_uq["ds_id"] == 4].index.unique() + cmfgen_ions = levels_uq[levels_uq["ds_id"] == 5].index.unique() - assert set(gfall_ions).intersection(set(chianti_ions))\ - .intersection(set(cmfgen_ions)) == set([]) + assert set(gfall_ions).intersection(set(chianti_ions)).intersection( + set(cmfgen_ions) + ) == set([]) return gfall_ions, chianti_ions, cmfgen_ions @staticmethod def get_lvl_index2id(df, levels_all): """ - Matches level indexes with level IDs for a given DataFrame. - + Matches level indexes with level IDs for a given DataFrame. + """ # TODO: re-write this method without a for loop ion = df.index.unique() - lvl_index2id = levels_all.set_index( - ['atomic_number', 'ion_number']).loc[ion] + lvl_index2id = levels_all.set_index(["atomic_number", "ion_number"]).loc[ion] lvl_index2id = lvl_index2id.reset_index() lower_level_id = [] upper_level_id = [] - + df = df.reset_index() for row in df.itertuples(): - llid = row.level_index_lower ulid = row.level_index_upper - upper = lvl_index2id.at[ulid, 'level_id'] - lower = lvl_index2id.at[llid, 'level_id'] + upper = lvl_index2id.at[ulid, "level_id"] + lower = lvl_index2id.at[llid, "level_id"] lower_level_id.append(lower) upper_level_id.append(upper) - df['lower_level_id'] = pd.Series(lower_level_id) - df['upper_level_id'] = pd.Series(upper_level_id) + df["lower_level_id"] = pd.Series(lower_level_id) + df["upper_level_id"] = pd.Series(upper_level_id) return df @@ -190,21 +209,28 @@ def _create_artificial_fully_ionized(levels): (-1, atomic_number, atomic_number, 0, 0.0, 1, True) ) - levels_columns = ["level_id", "atomic_number", - "ion_number", "level_number", - "energy", "g", "metastable"] + levels_columns = [ + "level_id", + "atomic_number", + "ion_number", + "level_number", + "energy", + "g", + "metastable", + ] fully_ionized_levels_dtypes = [ - (key, levels.dtypes[key]) for key in levels_columns] + (key, levels.dtypes[key]) for key in levels_columns + ] fully_ionized_levels = np.array( - fully_ionized_levels, dtype=fully_ionized_levels_dtypes) + fully_ionized_levels, dtype=fully_ionized_levels_dtypes + ) return pd.DataFrame(data=fully_ionized_levels) @staticmethod - def _create_metastable_flags(levels, lines, - levels_metastable_loggf_threshold=-3): + def _create_metastable_flags(levels, lines, levels_metastable_loggf_threshold=-3): """ Returns metastable flag column for the `levels` DataFrame. @@ -221,8 +247,7 @@ def _create_metastable_flags(levels, lines, """ # Filter lines on the loggf threshold value - metastable_lines = lines.loc[lines["loggf"] - > levels_metastable_loggf_threshold] + metastable_lines = lines.loc[lines["loggf"] > levels_metastable_loggf_threshold] # Count the remaining strong transitions metastable_lines_grouped = metastable_lines.groupby("upper_level_id") @@ -249,21 +274,30 @@ def _create_einstein_coeff(lines): Transition lines dataframe. """ - einstein_coeff = (4 * np.pi ** 2 * const.e.gauss.value ** - 2) / (const.m_e.cgs.value * const.c.cgs.value) - - lines['B_lu'] = einstein_coeff * lines['f_lu'] / \ - (const.h.cgs.value * lines['nu']) - - lines['B_ul'] = einstein_coeff * lines['f_ul'] / \ - (const.h.cgs.value * lines['nu']) - - lines['A_ul'] = 2 * einstein_coeff * lines['nu'] ** 2 / \ - const.c.cgs.value ** 2 * lines['f_ul'] + einstein_coeff = (4 * np.pi**2 * const.e.gauss.value**2) / ( + const.m_e.cgs.value * const.c.cgs.value + ) + + lines["B_lu"] = ( + einstein_coeff * lines["f_lu"] / (const.h.cgs.value * lines["nu"]) + ) + + lines["B_ul"] = ( + einstein_coeff * lines["f_ul"] / (const.h.cgs.value * lines["nu"]) + ) + + lines["A_ul"] = ( + 2 + * einstein_coeff + * lines["nu"] ** 2 + / const.c.cgs.value**2 + * lines["f_ul"] + ) @staticmethod - def _calculate_collisional_strength(row, temperatures, - kb_ev, c_ul_temperature_cols): + def _calculate_collisional_strength( + row, temperatures, kb_ev, c_ul_temperature_cols + ): """ Function to calculation upsilon from Burgess & Tully 1992 (TType 1 - 4; Eq. 23 - 38). @@ -276,7 +310,7 @@ def _calculate_collisional_strength(row, temperatures, g_u = row["g_u"] ttype = row["ttype"] - if ttype > 5: + if ttype > 5: ttype -= 5 kt = kb_ev * temperatures @@ -304,14 +338,14 @@ def _calculate_collisional_strength(row, temperatures, upsilon = y_func * np.log(kt / delta_e + c) elif ttype == 5: - raise ValueError('Not sure what to do with ttype=5') + raise ValueError("Not sure what to do with ttype=5") #### 1992A&A...254..436B Equation 20 & 22 ##### - collisional_ul_factor = 8.63e-6 * upsilon / (g_u * temperatures**.5) + collisional_ul_factor = 8.63e-6 * upsilon / (g_u * temperatures**0.5) return pd.Series(data=collisional_ul_factor, index=c_ul_temperature_cols) def _get_all_levels_data(self): - """ + """ The resulting DataFrame contains stacked energy levels from GFALL, Chianti (optional), CMFGEN (optional) and NIST ground levels. Only one source of levels is kept based on priorities. @@ -326,90 +360,102 @@ def _get_all_levels_data(self): """ - logger.info('Ingesting energy levels.') + logger.info("Ingesting energy levels.") gf_levels = self.gfall_reader.levels - gf_levels['ds_id'] = 2 + gf_levels["ds_id"] = 2 if self.chianti_reader is not None: ch_levels = self.chianti_reader.levels - ch_levels['ds_id'] = 4 + ch_levels["ds_id"] = 4 else: ch_levels = pd.DataFrame(columns=gf_levels.columns) if self.cmfgen_reader is not None: cf_levels = self.cmfgen_reader.levels - cf_levels['ds_id'] = 5 + cf_levels["ds_id"] = 5 else: cf_levels = pd.DataFrame(columns=gf_levels.columns) levels = pd.concat([gf_levels, ch_levels, cf_levels], sort=True) - levels['g'] = 2*levels['j'] + 1 - levels['g'] = levels['g'].astype(np.int) - levels = levels.drop(columns=['j', 'label', 'method']) + levels["g"] = 2 * levels["j"] + 1 + levels["g"] = levels["g"].astype(np.int) + levels = levels.drop(columns=["j", "label", "method"]) levels = levels.reset_index() - levels = levels.rename(columns={'ion_charge': 'ion_number'}) - levels = levels[['atomic_number', 'ion_number', 'g', 'energy', - 'ds_id', 'priority']] - levels['energy'] = u.Quantity(levels['energy'], 'cm-1').to( - 'eV', equivalencies=u.spectral()).value - + levels = levels.rename(columns={"ion_charge": "ion_number"}) + levels = levels[ + ["atomic_number", "ion_number", "g", "energy", "ds_id", "priority"] + ] + levels["energy"] = ( + u.Quantity(levels["energy"], "cm-1") + .to("eV", equivalencies=u.spectral()) + .value + ) + # Solve priorities and set attributes for later use. - self.gfall_ions, self.chianti_ions, self.cmfgen_ions = self.solve_priorities(levels) + self.gfall_ions, self.chianti_ions, self.cmfgen_ions = self.solve_priorities( + levels + ) + + to_string = lambda x: [ + f"{convert_atomic_number2symbol(ion[0])} {ion[1]}" for ion in sorted(x) + ] - to_string = lambda x: [f"{convert_atomic_number2symbol(ion[0])} {ion[1]}" \ - for ion in sorted(x)] + gfall_str = ", ".join(to_string(self.gfall_ions)) + logger.info(f"GFALL selected species: {gfall_str}.") - gfall_str = ', '.join(to_string(self.gfall_ions)) - logger.info(f'GFALL selected species: {gfall_str}.') - if len(self.chianti_ions) > 0: - chianti_str = ', '.join(to_string(self.chianti_ions)) - logger.info(f'Chianti selected species: {chianti_str}.') - - if len(self.cmfgen_ions) > 0: - cmfgen_str = ', '.join(to_string(self.cmfgen_ions)) - logger.info(f'CMFGEN selected species: {cmfgen_str}.') + chianti_str = ", ".join(to_string(self.chianti_ions)) + logger.info(f"Chianti selected species: {chianti_str}.") + + if len(self.cmfgen_ions) > 0: + cmfgen_str = ", ".join(to_string(self.cmfgen_ions)) + logger.info(f"CMFGEN selected species: {cmfgen_str}.") # Concatenate ground levels from NIST ground_levels = self.ionization_energies.get_ground_levels() - ground_levels = ground_levels.rename(columns={'ion_charge': 'ion_number'}) - ground_levels['ds_id'] = 1 + ground_levels = ground_levels.rename(columns={"ion_charge": "ion_number"}) + ground_levels["ds_id"] = 1 levels = pd.concat([ground_levels, levels], sort=True) - levels['level_id'] = range(1, len(levels)+1) - levels = levels.set_index('level_id') + levels["level_id"] = range(1, len(levels) + 1) + levels = levels.set_index("level_id") # The following code should only remove the duplicated # ground levels. Other duplicated levels should be re- # moved at the reader stage. - mask = (levels['energy'] == 0.) & (levels[['atomic_number', - 'ion_number', 'energy', 'g']].duplicated(keep='last')) + mask = (levels["energy"] == 0.0) & ( + levels[["atomic_number", "ion_number", "energy", "g"]].duplicated( + keep="last" + ) + ) levels = levels[~mask] # Filter levels by priority for ion in self.chianti_ions: - mask = (levels['ds_id'] != 4) & ( - levels['atomic_number'] == ion[0]) & ( - levels['ion_number'] == ion[1]) + mask = ( + (levels["ds_id"] != 4) + & (levels["atomic_number"] == ion[0]) + & (levels["ion_number"] == ion[1]) + ) levels = levels.drop(levels[mask].index) for ion in self.cmfgen_ions: - mask = (levels['ds_id'] != 5) & ( - levels['atomic_number'] == ion[0]) & ( - levels['ion_number'] == ion[1]) + mask = ( + (levels["ds_id"] != 5) + & (levels["atomic_number"] == ion[0]) + & (levels["ion_number"] == ion[1]) + ) levels = levels.drop(levels[mask].index) - levels = levels[['atomic_number', 'ion_number', 'g', 'energy', - 'ds_id']] + levels = levels[["atomic_number", "ion_number", "g", "energy", "ds_id"]] levels = levels.reset_index() return levels - def _get_all_lines_data(self): - """ - The resulting DataFrame contains stacked transition lines for + """ + The resulting DataFrame contains stacked transition lines for GFALL, Chianti (optional) and CMFGEN (optional). Returns @@ -422,79 +468,93 @@ def _get_all_lines_data(self): """ - logger.info('Ingesting transition lines.') + logger.info("Ingesting transition lines.") gf_lines = self.gfall_reader.lines - gf_lines['ds_id'] = 2 + gf_lines["ds_id"] = 2 if self.chianti_reader is not None: ch_lines = self.chianti_reader.lines - ch_lines['ds_id'] = 4 + ch_lines["ds_id"] = 4 else: ch_lines = pd.DataFrame(columns=gf_lines.columns) if self.cmfgen_reader is not None: cf_lines = self.cmfgen_reader.lines - cf_lines['ds_id'] = 5 + cf_lines["ds_id"] = 5 else: cf_lines = pd.DataFrame(columns=gf_lines.columns) lines = pd.concat([gf_lines, ch_lines, cf_lines], sort=True) lines = lines.reset_index() - lines = lines.rename(columns={'ion_charge': 'ion_number'}) - lines['line_id'] = range(1, len(lines)+1) + lines = lines.rename(columns={"ion_charge": "ion_number"}) + lines["line_id"] = range(1, len(lines) + 1) # Filter lines by priority for ion in self.chianti_ions: - mask = (lines['ds_id'] != 4) & ( - lines['atomic_number'] == ion[0]) & ( - lines['ion_number'] == ion[1]) + mask = ( + (lines["ds_id"] != 4) + & (lines["atomic_number"] == ion[0]) + & (lines["ion_number"] == ion[1]) + ) lines = lines.drop(lines[mask].index) for ion in self.cmfgen_ions: - mask = (lines['ds_id'] != 5) & ( - lines['atomic_number'] == ion[0]) & ( - lines['ion_number'] == ion[1]) + mask = ( + (lines["ds_id"] != 5) + & (lines["atomic_number"] == ion[0]) + & (lines["ion_number"] == ion[1]) + ) lines = lines.drop(lines[mask].index) - lines = lines.set_index(['atomic_number', 'ion_number']) + lines = lines.set_index(["atomic_number", "ion_number"]) lines = lines.sort_index() # To supress warnings - ions = set(self.gfall_ions).union(set(self.chianti_ions))\ - .union((set(self.cmfgen_ions))) - - logger.info('Matching levels and lines.') - lns_list = [ self.get_lvl_index2id(lines.loc[ion], self.levels_all) - for ion in ions] + ions = ( + set(self.gfall_ions) + .union(set(self.chianti_ions)) + .union((set(self.cmfgen_ions))) + ) + + logger.info("Matching levels and lines.") + lns_list = [ + self.get_lvl_index2id(lines.loc[ion], self.levels_all) for ion in ions + ] lines = pd.concat(lns_list, sort=True) - lines = lines.set_index('line_id').sort_index() + lines = lines.set_index("line_id").sort_index() - lines['loggf'] = np.log10(lines['gf']) - lines = lines.drop(columns=['energy_upper', 'j_upper', 'energy_lower', - 'j_lower', 'level_index_lower', - 'level_index_upper']) + lines["loggf"] = np.log10(lines["gf"]) + lines = lines.drop( + columns=[ + "energy_upper", + "j_upper", + "energy_lower", + "j_lower", + "level_index_lower", + "level_index_upper", + ] + ) - lines['wavelength'] = u.Quantity(lines['wavelength'], 'nm').to( - 'AA').value + lines["wavelength"] = u.Quantity(lines["wavelength"], "nm").to("AA").value - lines.loc[lines['wavelength'] <= - GFALL_AIR_THRESHOLD, 'medium'] = MEDIUM_VACUUM + lines.loc[lines["wavelength"] <= GFALL_AIR_THRESHOLD, "medium"] = MEDIUM_VACUUM - lines.loc[lines['wavelength'] > - GFALL_AIR_THRESHOLD, 'medium'] = MEDIUM_AIR + lines.loc[lines["wavelength"] > GFALL_AIR_THRESHOLD, "medium"] = MEDIUM_AIR # Chianti wavelengths are already given in vacuum - gfall_mask = lines['ds_id'] == 2 - air_mask = lines['medium'] == MEDIUM_AIR - lines.loc[air_mask & gfall_mask, - 'wavelength'] = convert_wavelength_air2vacuum( - lines.loc[air_mask, 'wavelength']) + gfall_mask = lines["ds_id"] == 2 + air_mask = lines["medium"] == MEDIUM_AIR + lines.loc[air_mask & gfall_mask, "wavelength"] = convert_wavelength_air2vacuum( + lines.loc[air_mask, "wavelength"] + ) - lines = lines[['lower_level_id', 'upper_level_id', - 'wavelength', 'gf', 'loggf', 'ds_id']] + lines = lines[ + ["lower_level_id", "upper_level_id", "wavelength", "gf", "loggf", "ds_id"] + ] return lines - def create_levels_lines(self, lines_loggf_threshold=-3, - levels_metastable_loggf_threshold=-3): + def create_levels_lines( + self, lines_loggf_threshold=-3, levels_metastable_loggf_threshold=-3 + ): """ Generates the definitive `lines` and `levels` DataFrames by adding new columns and making some calculations. @@ -510,28 +570,33 @@ def create_levels_lines(self, lines_loggf_threshold=-3, """ ionization_energies = self.ionization_energies.base.reset_index() - ionization_energies = ionization_energies.rename(columns={'ion_charge': 'ion_number'}) + ionization_energies = ionization_energies.rename( + columns={"ion_charge": "ion_number"} + ) # Culling autoionization levels - levels_w_ionization_energies = pd.merge(self.levels_all, - ionization_energies, - how='left', - on=["atomic_number", - "ion_number"]) - - mask = levels_w_ionization_energies["energy"] < \ - levels_w_ionization_energies["ionization_energy"] + levels_w_ionization_energies = pd.merge( + self.levels_all, + ionization_energies, + how="left", + on=["atomic_number", "ion_number"], + ) + + mask = ( + levels_w_ionization_energies["energy"] + < levels_w_ionization_energies["ionization_energy"] + ) levels = levels_w_ionization_energies[mask].copy() - levels = levels.set_index('level_id').sort_values( - by=['atomic_number', 'ion_number']) - levels = levels.drop(columns='ionization_energy') + levels = levels.set_index("level_id").sort_values( + by=["atomic_number", "ion_number"] + ) + levels = levels.drop(columns="ionization_energy") # Clean lines - lines = self.lines_all.join(pd.DataFrame(index=levels.index), - on="lower_level_id", how="inner").\ - join(pd.DataFrame(index=levels.index), - on="upper_level_id", how="inner") + lines = self.lines_all.join( + pd.DataFrame(index=levels.index), on="lower_level_id", how="inner" + ).join(pd.DataFrame(index=levels.index), on="upper_level_id", how="inner") # Culling lines with low gf values lines = lines.loc[lines["loggf"] > lines_loggf_threshold] @@ -539,39 +604,46 @@ def create_levels_lines(self, lines_loggf_threshold=-3, # Do not clean levels that don't exist in lines # Create the metastable flags for levels - levels["metastable"] = \ - self._create_metastable_flags(levels, self.lines_all, - levels_metastable_loggf_threshold) + levels["metastable"] = self._create_metastable_flags( + levels, self.lines_all, levels_metastable_loggf_threshold + ) # Create levels numbers - levels = levels.sort_values( - ["atomic_number", "ion_number", "energy", "g"]) + levels = levels.sort_values(["atomic_number", "ion_number", "energy", "g"]) - levels["level_number"] = levels.groupby(['atomic_number', - 'ion_number'])['energy']. \ - transform(lambda x: np.arange(len(x))).values + levels["level_number"] = ( + levels.groupby(["atomic_number", "ion_number"])["energy"] + .transform(lambda x: np.arange(len(x))) + .values + ) levels["level_number"] = levels["level_number"].astype(np.int) - levels = levels[['atomic_number', 'ion_number', 'g', 'energy', - 'metastable', 'level_number', 'ds_id']] + levels = levels[ + [ + "atomic_number", + "ion_number", + "g", + "energy", + "metastable", + "level_number", + "ds_id", + ] + ] # Join atomic_number, ion_number, level_number_lower, # level_number_upper on lines lower_levels = levels.rename( - columns={ - "level_number": "level_number_lower", - "g": "g_l"} + columns={"level_number": "level_number_lower", "g": "g_l"} ).loc[:, ["atomic_number", "ion_number", "level_number_lower", "g_l"]] upper_levels = levels.rename( - columns={ - "level_number": "level_number_upper", - "g": "g_u"} + columns={"level_number": "level_number_upper", "g": "g_u"} ).loc[:, ["level_number_upper", "g_u"]] lines = lines.join(lower_levels, on="lower_level_id").join( - upper_levels, on="upper_level_id") + upper_levels, on="upper_level_id" + ) # Calculate absorption oscillator strength f_lu and emission # oscillator strength f_ul @@ -579,8 +651,7 @@ def create_levels_lines(self, lines_loggf_threshold=-3, lines["f_ul"] = lines["gf"] / lines["g_u"] # Calculate frequency - lines['nu'] = u.Quantity( - lines['wavelength'], 'AA').to('Hz', u.spectral()) + lines["nu"] = u.Quantity(lines["wavelength"], "AA").to("Hz", u.spectral()) # Create Einstein coefficients self._create_einstein_coeff(lines) @@ -591,12 +662,9 @@ def create_levels_lines(self, lines_loggf_threshold=-3, levels = levels.reset_index() # Create and append artificial levels for fully ionized ions - artificial_fully_ionized_levels = \ - self._create_artificial_fully_ionized(levels) - levels = levels.append( - artificial_fully_ionized_levels, ignore_index=True) - levels = levels.sort_values( - ["atomic_number", "ion_number", "level_number"]) + artificial_fully_ionized_levels = self._create_artificial_fully_ionized(levels) + levels = levels.append(artificial_fully_ionized_levels, ignore_index=True) + levels = levels.sort_values(["atomic_number", "ion_number", "level_number"]) return levels, lines @@ -615,73 +683,111 @@ def create_collisions(self, temperatures=np.arange(2000, 50000, 2000)): """ - logger.info('Ingesting collisional strengths.') + logger.info("Ingesting collisional strengths.") ch_collisions = self.chianti_reader.collisions - ch_collisions['ds_id'] = 4 + ch_collisions["ds_id"] = 4 # Not really needed because we have only one source of collisions collisions = pd.concat([ch_collisions], sort=True) ions = self.chianti_ions collisions = collisions.reset_index() - collisions = collisions.rename(columns={'ion_charge': 'ion_number'}) - collisions = collisions.set_index(['atomic_number', 'ion_number']) + collisions = collisions.rename(columns={"ion_charge": "ion_number"}) + collisions = collisions.set_index(["atomic_number", "ion_number"]) - logger.info('Matching collisions and levels.') - col_list = [ self.get_lvl_index2id(collisions.loc[ion], self.levels_all) - for ion in ions] + logger.info("Matching collisions and levels.") + col_list = [ + self.get_lvl_index2id(collisions.loc[ion], self.levels_all) for ion in ions + ] collisions = pd.concat(col_list, sort=True) - collisions = collisions.sort_values(by=['lower_level_id', 'upper_level_id']) + collisions = collisions.sort_values(by=["lower_level_id", "upper_level_id"]) # `e_col_id` number starts after the last line id start = self.lines_all.index[-1] + 1 - collisions['e_col_id'] = range(start, start + len(collisions)) + collisions["e_col_id"] = range(start, start + len(collisions)) # Exclude artificially created levels from levels levels = self.levels.loc[self.levels["level_id"] != -1].set_index("level_id") # Join atomic_number, ion_number, level_number_lower, level_number_upper - collisions = collisions.set_index(['atomic_number', 'ion_number']) - lower_levels = levels.rename(columns={"level_number": "level_number_lower", - "g": "g_l", "energy": "energy_lower"}). \ - loc[:, ["atomic_number", "ion_number", "level_number_lower", - "g_l", "energy_lower"]] + collisions = collisions.set_index(["atomic_number", "ion_number"]) + lower_levels = levels.rename( + columns={ + "level_number": "level_number_lower", + "g": "g_l", + "energy": "energy_lower", + } + ).loc[ + :, + [ + "atomic_number", + "ion_number", + "level_number_lower", + "g_l", + "energy_lower", + ], + ] - upper_levels = levels.rename(columns={"level_number": "level_number_upper", - "g": "g_u", "energy": "energy_upper"}). \ - loc[:, ["level_number_upper", "g_u", "energy_upper"]] + upper_levels = levels.rename( + columns={ + "level_number": "level_number_upper", + "g": "g_u", + "energy": "energy_upper", + } + ).loc[:, ["level_number_upper", "g_u", "energy_upper"]] collisions = collisions.join(lower_levels, on="lower_level_id").join( - upper_levels, on="upper_level_id") + upper_levels, on="upper_level_id" + ) # Calculate delta_e - kb_ev = const.k_B.cgs.to('eV / K').value - collisions["delta_e"] = (collisions["energy_upper"] - collisions["energy_lower"])/kb_ev + kb_ev = const.k_B.cgs.to("eV / K").value + collisions["delta_e"] = ( + collisions["energy_upper"] - collisions["energy_lower"] + ) / kb_ev # Calculate g_ratio collisions["g_ratio"] = collisions["g_l"] / collisions["g_u"] # Derive columns for collisional strengths - c_ul_temperature_cols = ['t{:06d}'.format(t) for t in temperatures] - - collisions = collisions.rename(columns={'temperatures': 'btemp', - 'collision_strengths': 'bscups'}) - - collisions = collisions[['e_col_id', 'lower_level_id', - 'upper_level_id', 'ds_id', - 'btemp', 'bscups', 'ttype', 'cups', - 'gf', 'atomic_number', 'ion_number', - 'level_number_lower', 'g_l', - 'energy_lower', 'level_number_upper', - 'g_u', 'energy_upper', 'delta_e', - 'g_ratio']] - - collisional_ul_factors = collisions.apply(self._calculate_collisional_strength, - axis=1, args=(temperatures, kb_ev, - c_ul_temperature_cols)) + c_ul_temperature_cols = ["t{:06d}".format(t) for t in temperatures] + + collisions = collisions.rename( + columns={"temperatures": "btemp", "collision_strengths": "bscups"} + ) + + collisions = collisions[ + [ + "e_col_id", + "lower_level_id", + "upper_level_id", + "ds_id", + "btemp", + "bscups", + "ttype", + "cups", + "gf", + "atomic_number", + "ion_number", + "level_number_lower", + "g_l", + "energy_lower", + "level_number_upper", + "g_u", + "energy_upper", + "delta_e", + "g_ratio", + ] + ] + + collisional_ul_factors = collisions.apply( + self._calculate_collisional_strength, + axis=1, + args=(temperatures, kb_ev, c_ul_temperature_cols), + ) collisions = pd.concat([collisions, collisional_ul_factors], axis=1) - collisions = collisions.set_index('e_col_id') + collisions = collisions.set_index("e_col_id") return collisions @@ -695,42 +801,49 @@ def create_cross_sections(self): """ - logger.info('Ingesting photoionization cross-sections.') + logger.info("Ingesting photoionization cross-sections.") cross_sections = self.cmfgen_reader.cross_sections.reset_index() - logger.info('Matching levels and cross sections.') - cross_sections = cross_sections.rename(columns={'ion_charge': 'ion_number'}) - cross_sections = cross_sections.set_index(['atomic_number', 'ion_number']) + logger.info("Matching levels and cross sections.") + cross_sections = cross_sections.rename(columns={"ion_charge": "ion_number"}) + cross_sections = cross_sections.set_index(["atomic_number", "ion_number"]) - cross_sections['level_index_lower'] = cross_sections['level_index'].values - cross_sections['level_index_upper'] = cross_sections['level_index'].values - phixs_list = [ self.get_lvl_index2id(cross_sections.loc[ion], self.levels_all) - for ion in self.cmfgen_ions ] + cross_sections["level_index_lower"] = cross_sections["level_index"].values + cross_sections["level_index_upper"] = cross_sections["level_index"].values + phixs_list = [ + self.get_lvl_index2id(cross_sections.loc[ion], self.levels_all) + for ion in self.cmfgen_ions + ] cross_sections = pd.concat(phixs_list, sort=True) - cross_sections = cross_sections.sort_values(by=['lower_level_id', 'upper_level_id']) - cross_sections['level_id'] = cross_sections['lower_level_id'] + cross_sections = cross_sections.sort_values( + by=["lower_level_id", "upper_level_id"] + ) + cross_sections["level_id"] = cross_sections["lower_level_id"] # `x_sect_id` number starts after the last `line_id`, just a convention start = self.lines_all.index[-1] + 1 - cross_sections['x_sect_id'] = range(start, start + len(cross_sections)) + cross_sections["x_sect_id"] = range(start, start + len(cross_sections)) # Exclude artificially created levels from levels - levels = self.levels.loc[self.levels['level_id'] != -1].set_index('level_id') - level_number = levels.loc[:, ['level_number']] - cross_sections = cross_sections.join(level_number, on='level_id') + levels = self.levels.loc[self.levels["level_id"] != -1].set_index("level_id") + level_number = levels.loc[:, ["level_number"]] + cross_sections = cross_sections.join(level_number, on="level_id") # Levels are already cleaned, just drop the NaN's after join cross_sections = cross_sections.dropna() - cross_sections['energy'] = u.Quantity(cross_sections['energy'], 'Ry').to('Hz', equivalencies=u.spectral()) - cross_sections['sigma'] = u.Quantity(cross_sections['sigma'], 'Mbarn').to('cm2') - cross_sections['level_number'] = cross_sections['level_number'].astype('int') - cross_sections = cross_sections.rename(columns={'energy':'nu', 'sigma':'x_sect'}) + cross_sections["energy"] = u.Quantity(cross_sections["energy"], "Ry").to( + "Hz", equivalencies=u.spectral() + ) + cross_sections["sigma"] = u.Quantity(cross_sections["sigma"], "Mbarn").to("cm2") + cross_sections["level_number"] = cross_sections["level_number"].astype("int") + cross_sections = cross_sections.rename( + columns={"energy": "nu", "sigma": "x_sect"} + ) return cross_sections - def create_macro_atom(self): """ Create a DataFrame containing macro atom data. @@ -744,81 +857,115 @@ def create_macro_atom(self): Refer to the docs: https://tardis-sn.github.io/tardis/physics/setup/plasma/macroatom.html """ - + # Exclude artificially created levels from levels - levels = self.levels.loc[self.levels["level_id"] - != -1].set_index("level_id") + levels = self.levels.loc[self.levels["level_id"] != -1].set_index("level_id") - lvl_energy_lower = levels.rename( - columns={"energy": "energy_lower"}).loc[:, ["energy_lower"]] + lvl_energy_lower = levels.rename(columns={"energy": "energy_lower"}).loc[ + :, ["energy_lower"] + ] - lvl_energy_upper = levels.rename( - columns={"energy": "energy_upper"}).loc[:, ["energy_upper"]] + lvl_energy_upper = levels.rename(columns={"energy": "energy_upper"}).loc[ + :, ["energy_upper"] + ] lines = self.lines.set_index("line_id") lines = lines.join(lvl_energy_lower, on="lower_level_id").join( - lvl_energy_upper, on="upper_level_id") + lvl_energy_upper, on="upper_level_id" + ) macro_atom = list() - macro_atom_dtype = [("atomic_number", np.int), ("ion_number", np.int), - ("source_level_number", - np.int), ("target_level_number", np.int), - ("transition_line_id", - np.int), ("transition_type", np.int), - ("transition_probability", np.float)] + macro_atom_dtype = [ + ("atomic_number", np.int), + ("ion_number", np.int), + ("source_level_number", np.int), + ("target_level_number", np.int), + ("transition_line_id", np.int), + ("transition_type", np.int), + ("transition_probability", np.float), + ] for line_id, row in lines.iterrows(): atomic_number, ion_number = row["atomic_number"], row["ion_number"] - level_number_lower, level_number_upper = \ - row["level_number_lower"], row["level_number_upper"] + level_number_lower, level_number_upper = ( + row["level_number_lower"], + row["level_number_upper"], + ) nu = row["nu"] f_ul, f_lu = row["f_ul"], row["f_lu"] e_lower, e_upper = row["energy_lower"], row["energy_upper"] transition_probabilities_dict = dict() - - transition_probabilities_dict[P_EMISSION_DOWN] = 2 * \ - nu**2 * f_ul / const.c.cgs.value**2 * (e_upper - e_lower) - transition_probabilities_dict[P_INTERNAL_DOWN] = 2 * \ - nu**2 * f_ul / const.c.cgs.value**2 * e_lower + transition_probabilities_dict[P_EMISSION_DOWN] = ( + 2 * nu**2 * f_ul / const.c.cgs.value**2 * (e_upper - e_lower) + ) + + transition_probabilities_dict[P_INTERNAL_DOWN] = ( + 2 * nu**2 * f_ul / const.c.cgs.value**2 * e_lower + ) - transition_probabilities_dict[P_INTERNAL_UP] = f_lu * \ - e_lower / (const.h.cgs.value * nu) + transition_probabilities_dict[P_INTERNAL_UP] = ( + f_lu * e_lower / (const.h.cgs.value * nu) + ) - macro_atom.append((atomic_number, ion_number, level_number_upper, - level_number_lower, line_id, P_EMISSION_DOWN, - transition_probabilities_dict[P_EMISSION_DOWN])) + macro_atom.append( + ( + atomic_number, + ion_number, + level_number_upper, + level_number_lower, + line_id, + P_EMISSION_DOWN, + transition_probabilities_dict[P_EMISSION_DOWN], + ) + ) - macro_atom.append((atomic_number, ion_number, level_number_upper, - level_number_lower, line_id, P_INTERNAL_DOWN, - transition_probabilities_dict[P_INTERNAL_DOWN])) + macro_atom.append( + ( + atomic_number, + ion_number, + level_number_upper, + level_number_lower, + line_id, + P_INTERNAL_DOWN, + transition_probabilities_dict[P_INTERNAL_DOWN], + ) + ) - macro_atom.append((atomic_number, ion_number, level_number_lower, - level_number_upper, line_id, P_INTERNAL_UP, - transition_probabilities_dict[P_INTERNAL_UP])) + macro_atom.append( + ( + atomic_number, + ion_number, + level_number_lower, + level_number_upper, + line_id, + P_INTERNAL_UP, + transition_probabilities_dict[P_INTERNAL_UP], + ) + ) macro_atom = np.array(macro_atom, dtype=macro_atom_dtype) macro_atom = pd.DataFrame(macro_atom) macro_atom = macro_atom.sort_values( - ["atomic_number", "ion_number", "source_level_number"]) + ["atomic_number", "ion_number", "source_level_number"] + ) self.macro_atom = macro_atom def create_macro_atom_references(self): """ Create a DataFrame containing macro atom reference data. - + Returns ------- pandas.DataFrame """ macro_atom_references = self.levels.rename( - columns={"level_number": "source_level_number"}).\ - loc[:, ["atomic_number", "ion_number", - "source_level_number", "level_id"]] + columns={"level_number": "source_level_number"} + ).loc[:, ["atomic_number", "ion_number", "source_level_number", "level_id"]] count_down = self.lines.groupby("upper_level_id").size() count_down.name = "count_down" @@ -827,22 +974,26 @@ def create_macro_atom_references(self): count_up.name = "count_up" macro_atom_references = macro_atom_references.join( - count_down, on="level_id").join(count_up, on="level_id") + count_down, on="level_id" + ).join(count_up, on="level_id") macro_atom_references = macro_atom_references.drop("level_id", axis=1) macro_atom_references = macro_atom_references.fillna(0) - macro_atom_references["count_total"] = 2 * \ - macro_atom_references["count_down"] + \ - macro_atom_references["count_up"] + macro_atom_references["count_total"] = ( + 2 * macro_atom_references["count_down"] + macro_atom_references["count_up"] + ) - macro_atom_references["count_down"] = \ - macro_atom_references["count_down"].astype(np.int) + macro_atom_references["count_down"] = macro_atom_references[ + "count_down" + ].astype(np.int) - macro_atom_references["count_up"] = \ - macro_atom_references["count_up"].astype(np.int) + macro_atom_references["count_up"] = macro_atom_references["count_up"].astype( + np.int + ) - macro_atom_references["count_total"] = \ - macro_atom_references["count_total"].astype(np.int) + macro_atom_references["count_total"] = macro_atom_references[ + "count_total" + ].astype(np.int) self.macro_atom_references = macro_atom_references @@ -858,13 +1009,16 @@ def ionization_energies_prepared(self): """ ionization_energies_prepared = self.ionization_energies.base.copy() ionization_energies_prepared = ionization_energies_prepared.reset_index() - ionization_energies_prepared['ion_charge'] += 1 - ionization_energies_prepared = ionization_energies_prepared.rename(columns={'ion_charge': 'ion_number'}) - ionization_energies_prepared = ionization_energies_prepared.set_index(['atomic_number', 'ion_number']) + ionization_energies_prepared["ion_charge"] += 1 + ionization_energies_prepared = ionization_energies_prepared.rename( + columns={"ion_charge": "ion_number"} + ) + ionization_energies_prepared = ionization_energies_prepared.set_index( + ["atomic_number", "ion_number"] + ) return ionization_energies_prepared.squeeze() - @property def levels_prepared(self): """ @@ -876,12 +1030,21 @@ def levels_prepared(self): """ - levels_prepared = self.levels.loc[:, [ - "atomic_number", "ion_number", "level_number", - "energy", "g", "metastable"]].copy() + levels_prepared = self.levels.loc[ + :, + [ + "atomic_number", + "ion_number", + "level_number", + "energy", + "g", + "metastable", + ], + ].copy() levels_prepared = levels_prepared.set_index( - ["atomic_number", "ion_number", "level_number"]) + ["atomic_number", "ion_number", "level_number"] + ) return levels_prepared @@ -896,19 +1059,32 @@ def lines_prepared(self): """ - lines_prepared = self.lines.loc[:, [ - "line_id", "wavelength", "atomic_number", "ion_number", - "f_ul", "f_lu", "level_number_lower", "level_number_upper", - "nu", "B_lu", "B_ul", "A_ul"]].copy() + lines_prepared = self.lines.loc[ + :, + [ + "line_id", + "wavelength", + "atomic_number", + "ion_number", + "f_ul", + "f_lu", + "level_number_lower", + "level_number_upper", + "nu", + "B_lu", + "B_ul", + "A_ul", + ], + ].copy() # TODO: store units in metadata # wavelength[angstrom], nu[Hz], f_lu[1], f_ul[1], # B_ul[cm^3 s^-2 erg^-1], B_lu[cm^3 s^-2 erg^-1], # A_ul[1/s]. - lines_prepared = lines_prepared.set_index([ - "atomic_number", "ion_number", - "level_number_lower", "level_number_upper"]) + lines_prepared = lines_prepared.set_index( + ["atomic_number", "ion_number", "level_number_lower", "level_number_upper"] + ) return lines_prepared @@ -923,14 +1099,21 @@ def collisions_prepared(self): """ - collisions_index = ["atomic_number", - "ion_number", - "level_number_lower", - "level_number_upper"] + collisions_index = [ + "atomic_number", + "ion_number", + "level_number_lower", + "level_number_upper", + ] if "chianti" in self.collisions_metadata.dataset: - collisions_columns = collisions_index + ['g_ratio', 'delta_e'] + \ - sorted([col for col in self.collisions.columns if re.match('^t\d+$', col)]) + collisions_columns = ( + collisions_index + + ["g_ratio", "delta_e"] + + sorted( + [col for col in self.collisions.columns if re.match("^t\d+$", col)] + ) + ) elif "cmfgen" in self.collisions_metadata.dataset: collisions_columns = collisions_index + list(self.collisions.columns) @@ -938,7 +1121,9 @@ def collisions_prepared(self): else: raise ValueError("Unknown source of collisional data") - collisions_prepared = self.collisions.reset_index().loc[:, collisions_columns].copy() + collisions_prepared = ( + self.collisions.reset_index().loc[:, collisions_columns].copy() + ) collisions_prepared = collisions_prepared.set_index(collisions_index) return collisions_prepared @@ -953,8 +1138,10 @@ def cross_sections_prepared(self): pandas.DataFrame """ - cross_sections_prepared = self.cross_sections.set_index(['atomic_number', 'ion_number', 'level_number']) - cross_sections_prepared = cross_sections_prepared[['nu', 'x_sect']] + cross_sections_prepared = self.cross_sections.set_index( + ["atomic_number", "ion_number", "level_number"] + ) + cross_sections_prepared = cross_sections_prepared[["nu", "x_sect"]] return cross_sections_prepared @@ -962,25 +1149,33 @@ def cross_sections_prepared(self): def macro_atom_prepared(self): """ Prepare the DataFrame with macro atom data for TARDIS - + Returns ------- macro_atom_prepared : pandas.DataFrame - + Notes ----- Refer to the docs: https://tardis-sn.github.io/tardis/physics/setup/plasma/macroatom.html """ - macro_atom_prepared = self.macro_atom.loc[:, [ - "atomic_number", - "ion_number", "source_level_number", "target_level_number", - "transition_type", "transition_probability", - "transition_line_id"]].copy() - - macro_atom_prepared = macro_atom_prepared.rename(columns={ - "target_level_number": "destination_level_number"}) + macro_atom_prepared = self.macro_atom.loc[ + :, + [ + "atomic_number", + "ion_number", + "source_level_number", + "target_level_number", + "transition_type", + "transition_probability", + "transition_line_id", + ], + ].copy() + + macro_atom_prepared = macro_atom_prepared.rename( + columns={"target_level_number": "destination_level_number"} + ) macro_atom_prepared = macro_atom_prepared.reset_index(drop=True) @@ -996,19 +1191,28 @@ def macro_atom_references_prepared(self): pandas.DataFrame """ - macro_atom_references_prepared = self.macro_atom_references.loc[:, [ - "atomic_number", "ion_number", "source_level_number", "count_down", - "count_up", "count_total"]].copy() + macro_atom_references_prepared = self.macro_atom_references.loc[ + :, + [ + "atomic_number", + "ion_number", + "source_level_number", + "count_down", + "count_up", + "count_total", + ], + ].copy() macro_atom_references_prepared = macro_atom_references_prepared.set_index( - ['atomic_number', 'ion_number', 'source_level_number']) + ["atomic_number", "ion_number", "source_level_number"] + ) return macro_atom_references_prepared def to_hdf(self, fname): """ Dump `prepared` attributes into an HDF5 file. - + Parameters ---------- fname : path @@ -1024,34 +1228,35 @@ def to_hdf(self, fname): from carsus import FORMAT_VERSION - with pd.HDFStore(fname, 'w') as f: - f.put('/atom_data', self.atomic_weights.base) - f.put('/ionization_data', self.ionization_energies_prepared) - f.put('/zeta_data', self.zeta_data.base) - f.put('/levels_data', self.levels_prepared) - f.put('/lines_data', self.lines_prepared) - f.put('/macro_atom_data', self.macro_atom_prepared) - f.put('/macro_atom_references', - self.macro_atom_references_prepared) + with pd.HDFStore(fname, "w") as f: + f.put("/atom_data", self.atomic_weights.base) + f.put("/ionization_data", self.ionization_energies_prepared) + f.put("/zeta_data", self.zeta_data.base) + f.put("/levels_data", self.levels_prepared) + f.put("/lines_data", self.lines_prepared) + f.put("/macro_atom_data", self.macro_atom_prepared) + f.put("/macro_atom_references", self.macro_atom_references_prepared) + + if hasattr(self.nndc_reader, "decay_data"): + f.put("/nuclear_decay_rad", self.nndc_reader.decay_data) - if hasattr(self.nndc_reader, 'decay_data'): - f.put('/nuclear_decay_rad', self.nndc_reader.decay_data) + if hasattr(self.vald_reader, "linelist"): + f.put("/linelist", self.vald_reader.linelist) - if hasattr(self, 'collisions'): - f.put('/collisions_data', self.collisions_prepared) - f.put('/collisions_metadata', self.collisions_metadata) + if hasattr(self, "collisions"): + f.put("/collisions_data", self.collisions_prepared) + f.put("/collisions_metadata", self.collisions_metadata) - if hasattr(self, 'cross_sections'): - f.put('/photoionization_data', self.cross_sections_prepared) + if hasattr(self, "cross_sections"): + f.put("/photoionization_data", self.cross_sections_prepared) lines_metadata = pd.DataFrame( - data=[["format", "version", "1.0"]], - columns=["field", "key", "value"] + data=[["format", "version", "1.0"]], columns=["field", "key", "value"] ).set_index(["field", "key"]) - f.put('/lines_metadata', lines_metadata) - + f.put("/lines_metadata", lines_metadata) + meta = [] - meta.append(('format', 'version', FORMAT_VERSION)) + meta.append(("format", "version", FORMAT_VERSION)) total_checksum = hashlib.md5() for key in f.keys(): @@ -1060,40 +1265,41 @@ def to_hdf(self, fname): # save individual DataFrame/Series checksum checksum = hash_pandas_object(f[key]) - meta.append(('md5sum', key.lstrip('/'), checksum)) + meta.append(("md5sum", key.lstrip("/"), checksum)) # data sources versions - meta.append(('datasets', 'nist_weights', - self.atomic_weights.version)) + meta.append(("datasets", "nist_weights", self.atomic_weights.version)) - meta.append(('datasets', 'nist_spectra', - self.ionization_energies.version)) + meta.append(("datasets", "nist_spectra", self.ionization_energies.version)) - meta.append(('datasets', 'gfall', - self.gfall_reader.version)) + meta.append(("datasets", "gfall", self.gfall_reader.version)) - meta.append(('datasets', 'zeta', - self.zeta_data.version)) + meta.append(("datasets", "zeta", self.zeta_data.version)) if self.chianti_reader is not None: - meta.append(('datasets', 'chianti', - self.chianti_reader.version)) + meta.append(("datasets", "chianti", self.chianti_reader.version)) if self.cmfgen_reader is not None: - meta.append(('datasets', 'cmfgen', - self.cmfgen_reader.version)) + meta.append(("datasets", "cmfgen", self.cmfgen_reader.version)) # relevant package versions - meta.append(('software', 'python', platform.python_version())) - imports = ['carsus', 'astropy', 'numpy', 'pandas', 'pyarrow', - 'tables', 'ChiantiPy'] + meta.append(("software", "python", platform.python_version())) + imports = [ + "carsus", + "astropy", + "numpy", + "pandas", + "pyarrow", + "tables", + "ChiantiPy", + ] for package in imports: - meta.append(('software', package, - __import__(package).__version__)) + meta.append(("software", package, __import__(package).__version__)) - meta_df = pd.DataFrame.from_records(meta, columns=['field', 'key', - 'value'], index=['field', 'key']) + meta_df = pd.DataFrame.from_records( + meta, columns=["field", "key", "value"], index=["field", "key"] + ) uuid1 = uuid.uuid1().hex @@ -1102,13 +1308,13 @@ def to_hdf(self, fname): logger.info(f"MD5: {total_checksum.hexdigest()}") logger.info(f"UUID1: {uuid1}") - f.root._v_attrs['MD5'] = total_checksum.hexdigest() - f.root._v_attrs['UUID1'] = uuid1 - f.root._v_attrs['FORMAT_VERSION'] = FORMAT_VERSION + f.root._v_attrs["MD5"] = total_checksum.hexdigest() + f.root._v_attrs["UUID1"] = uuid1 + f.root._v_attrs["FORMAT_VERSION"] = FORMAT_VERSION - tz = pytz.timezone('UTC') + tz = pytz.timezone("UTC") date = datetime.now(tz).isoformat() - f.root._v_attrs['DATE'] = date + f.root._v_attrs["DATE"] = date self.meta = meta_df - f.put('/metadata', meta_df) + f.put("/metadata", meta_df) diff --git a/carsus/io/tests/test_vald.py b/carsus/io/tests/test_vald.py index 9774da6e1..d2d5be6ad 100644 --- a/carsus/io/tests/test_vald.py +++ b/carsus/io/tests/test_vald.py @@ -6,7 +6,12 @@ @pytest.fixture() def vald_rdr(vald_fname): - return VALDReader(fname=vald_fname) + return VALDReader(fname=vald_fname, strip_molecules=False) + + +@pytest.fixture() +def vald_rdr_stripped_molecules(vald_fname): + return VALDReader(fname=vald_fname, strip_molecules=True) @pytest.fixture() @@ -19,6 +24,21 @@ def vald(vald_rdr): return vald_rdr.vald +@pytest.fixture() +def vald_stripped_molecules(vald_rdr_stripped_molecules): + return vald_rdr_stripped_molecules.vald + + +@pytest.fixture() +def vald_linelist_stripped_molecules(vald_rdr_stripped_molecules): + return vald_rdr_stripped_molecules.linelist + + +@pytest.fixture() +def vald_linelist(vald_rdr): + return vald_rdr.linelist + + @pytest.mark.parametrize( "index, wl_air, log_gf, e_low, e_up", [ @@ -46,3 +66,52 @@ def test_vald_reader_vald(vald, index, wl_air, log_gf, e_low, e_up, ion_charge): [row["log_gf"], row["e_low"], row["e_up"], row["ion_charge"]], [log_gf, e_low, e_up, ion_charge], ) + + +def test_vald_reader_strip_molecules(vald, vald_stripped_molecules): + # The stripped molecules list should always be smaller or equal to the size of the non-stripped list + assert len(vald) >= len(vald_stripped_molecules) + # There is only a single non-molecule in the current test vald file + assert len(vald_stripped_molecules) == 1 + + +def test_vald_strip_molecules_linelist(vald_linelist_stripped_molecules): + assert all( + vald_linelist_stripped_molecules.columns + == [ + "atomic_number", + "ion_charge", + "wavelength", + "log_gf", + "e_low", + "e_up", + "j_lo", + "j_up", + "rad", + "stark", + "waals", + ] + ) + # Test to see if any values have become nan in new columns + assert ~vald_linelist_stripped_molecules.isna().values.any() + + +def test_vald_linelist(vald_linelist): + assert all( + vald_linelist.columns + == [ + "chemical", + "ion_charge", + "wavelength", + "log_gf", + "e_low", + "e_up", + "j_lo", + "j_up", + "rad", + "stark", + "waals", + ] + ) + # Test to see if any values have become nan in new columns + assert ~vald_linelist.isna().values.any() diff --git a/carsus/io/vald/vald.py b/carsus/io/vald/vald.py index 2c41fdf86..11db67a58 100644 --- a/carsus/io/vald/vald.py +++ b/carsus/io/vald/vald.py @@ -1,8 +1,15 @@ import re import logging import pandas as pd +import numpy as np from io import StringIO from carsus.io.util import read_from_buffer +from carsus.util.helpers import ( + convert_wavelength_air2vacuum, + ATOMIC_SYMBOLS_DATA, + convert_symbol2atomic_number, +) + VALD_URL = "https://media.githubusercontent.com/media/tardis-sn/carsus-db/master/vald/vald_sample.dat" @@ -11,11 +18,15 @@ class VALDReader(object): """ - Class for extracting lines and levels data from vald files + Class for extracting lines data from vald files Attributes ---------- - fname: path to vald.dat + fname: str + path to vald data file + strip_molecules: bool + Whether to remove molecules from the data. + Methods -------- @@ -40,48 +51,67 @@ class VALDReader(object): "waals", ] - def __init__(self, fname=None): + def __init__(self, fname=None, strip_molecules=True): """ Parameters ---------- fname: str Path to the vald file (http or local file). + strip_molecules: bool + Whether to remove molecules from the data. """ - if fname is None: - self.fname = VALD_URL - else: - self.fname = fname + self.fname = VALD_URL if fname is None else fname self._vald_raw = None self._vald = None + self._linelist = None + + self.strip_molecules = strip_molecules @property def vald_raw(self): + """ + Reads the raw data and returns raw vald data as a pandas DataFrame + """ if self._vald_raw is None: self._vald_raw, self.version = self.read_vald_raw() return self._vald_raw @property def vald(self): + """ + Processes the raw vald DataFrame + """ if self._vald is None: - self._vald = self.parse_vald() + self._vald = self.parse_vald(strip_molecules=self.strip_molecules) return self._vald + @property + def linelist(self): + """ + Prepares the linelist from the processed vald DataFrame + """ + if self._linelist is None: + self._linelist = self.extract_linelist(self.vald) + return self._linelist + def read_vald_raw(self, fname=None): """ - Reading in a normal vald.dat + Reads in a vald data file Parameters ---------- fname: ~str - path to vald.dat + path to vald data file + Returns ------- pandas.DataFrame - pandas Dataframe represenation of vald + pandas Dataframe representation of vald + str MD5 checksum @@ -107,18 +137,20 @@ def read_vald_raw(self, fname=None): return vald, checksum - def parse_vald(self, vald_raw=None): + def parse_vald(self, vald_raw=None, strip_molecules=True): """ - Parse raw vald DataFrame + Parses raw vald DataFrame + Parameters ---------- vald_raw: pandas.DataFrame + strip_molecules: bool + If True, remove molecules from vald Returns ------- pandas.DataFrame - a level DataFrame """ vald = vald_raw if vald_raw is not None else self.vald_raw.copy() @@ -126,17 +158,80 @@ def parse_vald(self, vald_raw=None): vald["elm_ion"] = vald["elm_ion"].str.replace("'", "") vald[["chemical", "ion_charge"]] = vald["elm_ion"].str.split(" ", expand=True) vald["ion_charge"] = vald["ion_charge"].astype(int) - 1 + vald["wavelength"] = convert_wavelength_air2vacuum(vald["wl_air"]) del vald["elm_ion"] + if strip_molecules: + vald = self._strip_molecules(vald) + vald.reset_index(drop=True, inplace=True) + + atom_nums = np.zeros(len(vald), dtype=int) + atom_nums = [ + convert_symbol2atomic_number(symbol) + for symbol in vald.chemical.to_list() + ] + vald["atomic_number"] = atom_nums + return vald + def _strip_molecules(self, vald): + """ + Removes molecules from a vald dataframe + """ + return vald[vald.chemical.isin(ATOMIC_SYMBOLS_DATA["symbol"])] + + def extract_linelist(self, vald): + """ + Parameters + ---------- + vald: pandas.DataFrame + + Returns + ------- + pandas.DataFrame + vald linelist containing only the following columns: + atomic_number or chemical, ion_charge, wavelength, log_gf, rad, stark, waals + """ + if self.strip_molecules: + return vald[ + [ + "atomic_number", + "ion_charge", + "wavelength", + "log_gf", + "e_low", + "e_up", + "j_lo", + "j_up", + "rad", + "stark", + "waals", + ] + ].copy() + else: + return vald[ + [ + "chemical", + "ion_charge", + "wavelength", + "log_gf", + "e_low", + "e_up", + "j_lo", + "j_up", + "rad", + "stark", + "waals", + ] + ].copy() + def to_hdf(self, fname): """ Parameters ---------- fname : path - Path to the HDF5 output file + Path to the HDF5 output file """ with pd.HDFStore(fname, "w") as f: f.put("/vald_raw", self.vald_raw)