Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated fractions #42

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 74 additions & 24 deletions pypahdb/decomposer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from mpl_toolkits.axes_grid1.inset_locator import inset_axes

import pypahdb
from pypahdb.decomposer_base import DecomposerBase
from pypahdb.decomposer_base import DecomposerBase, SMALL_SIZE, MEDIUM_SIZE


class Decomposer(DecomposerBase):
Expand All @@ -36,6 +36,18 @@ def __init__(self, spectrum):
spectrum (specutils.Spectrum1D): The data to fit/decompose.
"""
DecomposerBase.__init__(self, spectrum)
self._cation_neutral_ratio = None

def _get_cation_neutral_ratio(self):
"""
Calculate the cation neutral ratio if it is not already calculated and return it.
"""
if self._cation_neutral_ratio is None:
self._cation_neutral_ratio = self.charge_fractions['cation']
nonzero = np.nonzero(self.charge_fractions['neutral'])
self._cation_neutral_ratio[nonzero] /= self.charge_fractions['neutral'][nonzero]

return self._cation_neutral_ratio

def save_pdf(self, filename, header="", domaps=True, doplots=True):
"""Save a PDF summary of the fit results.
Expand Down Expand Up @@ -107,10 +119,35 @@ def save_pdf(self, filename, header="", domaps=True, doplots=True):
wcs = WCS(hdr)
else:
wcs = None
fig = self.plot_map(self.ionized_fraction, "ionized fraction", wcs=wcs)

fig = self.plot_map(self.cation_neutral_ratio.value, "n$_{{cation}}$/n$_{{neutral}}$", wcs=wcs)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(self.nc, "average PAH size (N$_{{C}}$)", wcs=wcs)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(self.charge_fractions['neutral'], "PAH neutral fraction", wcs=wcs)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(self.charge_fractions['cation'], "PAH cation fraction", wcs=wcs)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(self.large_fraction, "large fraction", wcs=wcs)
fig = self.plot_map(self.charge_fractions['anion'], "PAH anion fraction", wcs=wcs)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(self.size_fractions['large'],
f"large PAH fraction (N$_{{C}}$ > {MEDIUM_SIZE})",
wcs=wcs)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(self.size_fractions['medium'],
f"medium PAH fraction ({SMALL_SIZE} < N$_{{C}}$ ≤ {MEDIUM_SIZE})",
wcs=wcs)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(self.size_fractions['small'],
f"small PAH fraction (N$_{{C}}$ ≤ {SMALL_SIZE})",
wcs=wcs)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(self.error, "error", wcs=wcs)
Expand Down Expand Up @@ -187,23 +224,29 @@ def _fits_to_disk(hdr, filename):
for line in comments.split("\n"):
for chunk in [line[i: i + 72] for i in range(0, len(line), 72)]:
hdr["COMMENT"] = chunk
hdr["COMMENT"] = "1st data plane contains the PAH ionization " "fraction."
hdr["COMMENT"] = "2nd data plane contains the PAH large fraction."
hdr["COMMENT"] = "3rd data plane contains the error."

# Write results to FITS-file.
hdu = fits.PrimaryHDU(
np.stack(
(
self.ionized_fraction.value,
self.large_fraction.value,
self.error.value,
),
axis=0,
),
header=hdr,
)
hdu.writeto(filename, overwrite=True, output_verify="fix")
hdr["COMMENT"] = "1st extension has the PAH neutral fraction."
hdr["COMMENT"] = "2nd extension has the PAH cation fraction."
hdr["COMMENT"] = "3rd extension has the PAH anion fraction."
hdr["COMMENT"] = "4th extension has the PAH large fraction."
hdr["COMMENT"] = "5th extension has the PAH medium fraction."
hdr["COMMENT"] = "6th extension has the PAH small fraction."
hdr["COMMENT"] = "7th extension has the error."
hdr["COMMENT"] = "8th extension has the cation to neutral PAH ratio."
hdr["COMMENT"] = "9th extension has the average Nc."

# Write results to FITS-file with multiple extension.
primary_hdu = fits.PrimaryHDU(header=hdr)
hdulist = fits.HDUList([primary_hdu])

for key, value in self.charge_fractions.items():
hdulist.append(fits.ImageHDU(data=value.value, name=key.upper()))
for key, value in self.size_fractions.items():
hdulist.append(fits.ImageHDU(data=value.value, name=key.upper()))
hdulist.append(fits.ImageHDU(data=self.error.value, name="ERROR"))
hdulist.append(fits.ImageHDU(data=self.nc.value, name="NC"))
hdulist.append(fits.ImageHDU(data=self.cation_neutral_ratio.value, name="CATION_NEUTRAL_RATIO"))

hdulist.writeto(filename, overwrite=True, output_verify="fix")

return

Expand Down Expand Up @@ -450,12 +493,15 @@ def plot_fit(self, i=0, j=0):
)
ax2.plot(abscissa, model, color="tab:red", lw=1.5)
ax2.plot(
abscissa, self.size["large"][:, i, j], label="large", lw=1, color="tab:green"
abscissa, self.size["large"][:, i, j], label="large", lw=1, color="tab:orange"
)
ax2.plot(
abscissa, self.size["medium"][:, i, j], label="medium", lw=1, color="tab:green"
)
ax2.plot(
abscissa, self.size["small"][:, i, j], label="small", lw=1, color="tab:blue"
)
size_str = "$f_{large}$=%3.1f" % (self.large_fraction[i][j])
size_str = "$f_{large}$=%3.1f" % (self.size_fractions['large'][i][j])
ax2.text(0.025, 0.88, size_str, ha="left", va="center", transform=ax2.transAxes)
ax2.set_ylabel(f'{self.spectrum.meta["colnames"][1]} [{self.spectrum.flux.unit}]')

Expand Down Expand Up @@ -483,8 +529,9 @@ def plot_fit(self, i=0, j=0):
ax3.plot(
abscissa, charge["cation"][:, i, j], label="cation", lw=1, color="tab:purple"
)
ion_str = "$f_{ionized}$=%3.1f" % (self.ionized_fraction[i][j])
ax3.text(0.025, 0.88, ion_str, ha="left", va="center", transform=ax3.transAxes)
cnr_str = ("$n_{cation}/n_{neutral}$=%3.1f" %
(self.charge_fractions['cation'][i][j]/self.charge_fractions['neutral'][i][j]))
ax3.text(0.025, 0.88, cnr_str, ha="left", va="center", transform=ax3.transAxes)
ax3.set_xlabel(f'{self.spectrum.meta["colnames"][0]} [{self.spectrum.spectral_axis.unit}]')
ax3.set_ylabel(f'{self.spectrum.meta["colnames"][1]} [{self.spectrum.flux.unit}]')

Expand All @@ -499,3 +546,6 @@ def plot_fit(self, i=0, j=0):
fig.set_layout_engine("constrained")

return fig

# Make a cation to neutral ratio property.
cation_neutral_ratio = property(_get_cation_neutral_ratio)
148 changes: 107 additions & 41 deletions pypahdb/decomposer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from scipy import optimize
from specutils import Spectrum1D

SMALL_SIZE = 50
MEDIUM_SIZE = 70


def _decomposer_anion(w, m=None, p=None):
"""Do the anion decomposition in multiprocessing."""
Expand All @@ -42,12 +45,17 @@ def _decomposer_cation(w, m=None, p=None):

def _decomposer_large(w, m=None, p=None):
"""Do the actual large decomposition in multiprocessing."""
return m.dot(w * (p > 40).astype(float))
return m.dot(w * (p > MEDIUM_SIZE).astype(float))


def _decomposer_medium(w, m=None, p=None):
"""Do the actual large decomposition in multiprocessing."""
return m.dot(w * ((p > SMALL_SIZE) & (p <= MEDIUM_SIZE)).astype(float))


def _decomposer_small(w, m=None, p=None):
"""Do the small decomposition in multiprocessing."""
return m.dot(w * (p <= 40).astype(float))
return m.dot(w * (p <= 50).astype(float))


def _decomposer_fit(w, m=None):
Expand Down Expand Up @@ -83,10 +91,11 @@ def __init__(self, spectrum):
self._mask = None
self._yfit = None
self._yerror = None
self._ionized_fraction = None
self._large_fraction = None
self._charge_fractions = None
self._size_fractions = None
self._charge = None
self._size = None
self._nc = None

# Check if spectrum is a Spectrum1D.
if not isinstance(spectrum, Spectrum1D):
Expand Down Expand Up @@ -268,56 +277,104 @@ def _error(self):

return self._yerror

def _get_ionized_fraction(self):
"""Return the ionized fraction.
def _get_charge_fractions(self):
"""Return the charge fraction.

Returns:
self._ionized_fraction (quantity.Quantity): Ionized fraction from
fit.
self._charge_fraction (dict): Fraction of neutral,
cation and anion PAHs from fit.
"""

# Lazy Instantiation.
if self._ionized_fraction is None:
if self._charge_fractions is None:
# Compute ionized fraction.
charge_matrix = self._precomputed["properties"]["charge"]
ions = (charge_matrix > 0).astype(float)[:, None, None]
self._ionized_fraction = np.sum(self._weights * ions, axis=0)
self._ionized_fraction *= u.dimensionless_unscaled
neutrals = (charge_matrix == 0).astype(float)[:, None, None]
cations = (charge_matrix > 0).astype(float)[:, None, None]
anions = (charge_matrix < 0).astype(float)[:, None, None]
neutral_fraction = np.sum(self._weights * neutrals, axis=0)
cation_fraction = np.sum(self._weights * cations, axis=0)
anion_fraction = np.sum(self._weights * anions, axis=0)
neutral_fraction *= u.dimensionless_unscaled
cation_fraction *= u.dimensionless_unscaled
anion_fraction *= u.dimensionless_unscaled

charge_sum = neutral_fraction + cation_fraction + anion_fraction
nonzero = np.nonzero(charge_sum)

neutral_fraction[nonzero] /= charge_sum[nonzero]
cation_fraction[nonzero] /= charge_sum[nonzero]
anion_fraction[nonzero] /= charge_sum[nonzero]

# Make dictionary of charge fractions.
self._charge_fractions = {
"neutral": neutral_fraction,
"cation": cation_fraction,
"anion": anion_fraction,
}

nonzero = np.nonzero(self._ionized_fraction)
return self._charge_fractions

# Update ionized fraction.
neutrals = (charge_matrix == 0).astype(float)[:, None, None]
neutrals_wt = (np.sum(self._weights * neutrals, axis=0))[nonzero]
self._ionized_fraction[nonzero] /= (
self._ionized_fraction[nonzero] + neutrals_wt
)
def _get_average_nc(self):
"""Return the average number of carbon atoms.

Returns:
self._nc (quantity.Quantity): Average number of carbon atoms
"""

# Lazy Instantiation.
if self._nc is None:
size_array = self._precomputed["properties"]["size"]
size = size_array.astype(float)[:, None, None]
self._nc = np.sum(self._weights * size, axis=0)
nc_sum = np.sum(self._weights, axis=0)
nonzero = np.nonzero(nc_sum)
self._nc[nonzero] / nc_sum[nonzero]
self._nc *= u.dimensionless_unscaled

return self._ionized_fraction
return self._nc

def _get_large_fraction(self):
"""Return the large fraction.
def _get_size_fractions(self):
"""Return the size fraction.

Returns:
self._large_fraction (quantity.Quantity): Large fraction from fit.
self._size_fractions (dict): Size fractions from fit.
"""

# Lazy Instantiation.
if self._large_fraction is None:
if self._size_fractions is None:
# Compute large fraction.
size_matrix = self._precomputed["properties"]["size"]
large = (size_matrix > 40).astype(float)[:, None, None]
self._large_fraction = np.sum(self._weights * large, axis=0)
self._large_fraction *= u.dimensionless_unscaled

nonzero = np.nonzero(self._large_fraction)

# Update the large fraction.
small = (size_matrix <= 40).astype(float)[:, None, None]
small_wt = (np.sum(self._weights * small, axis=0))[nonzero]
self._large_fraction[nonzero] /= self._large_fraction[nonzero] + small_wt
large = (size_matrix > MEDIUM_SIZE).astype(float)[:, None, None]
large_fraction = np.sum(self._weights * large, axis=0)
large_fraction *= u.dimensionless_unscaled

# Compute medium fraction between 50 and 70.
medium = ((size_matrix > SMALL_SIZE) & (size_matrix <= MEDIUM_SIZE)).astype(float)[:, None, None]
medium_fraction = np.sum(self._weights * medium, axis=0)
medium_fraction *= u.dimensionless_unscaled

# Compute small fraction between 20 and 50.
small = (size_matrix <= SMALL_SIZE).astype(float)[:, None, None]
small_fraction = (np.sum(self._weights * small, axis=0))
small_fraction *= u.dimensionless_unscaled

size_sum = large_fraction + medium_fraction + small_fraction
nonzero = np.nonzero(size_sum)

# Update the size fractions.
large_fraction[nonzero] /= size_sum[nonzero]
medium_fraction[nonzero] /= size_sum[nonzero]
small_fraction[nonzero] /= size_sum[nonzero]

# Make dictionary of size fractions.
self._size_fractions = {
"large": large_fraction,
"medium": medium_fraction,
"small": small_fraction,
}

return self._large_fraction
return self._size_fractions

def _get_charge(self):
"""Return the spectral charge breakdown from fit.
Expand Down Expand Up @@ -388,7 +445,7 @@ def _get_size(self):

Returns:
self._size (dictionary): Dictionary with keys
'large', 'small'.
'large', 'medium', 'small'.
"""

# TODO: Should self._size be a Spectrum1D-object?
Expand All @@ -400,6 +457,11 @@ def _get_size(self):
m=self._matrix,
p=self._precomputed["properties"]["size"],
)
decomposer_medium = partial(
_decomposer_medium,
m=self._matrix,
p=self._precomputed["properties"]["size"],
)
decomposer_small = partial(
_decomposer_small,
m=self._matrix,
Expand All @@ -416,6 +478,7 @@ def _get_size(self):
ordinate = self.spectrum.flux.T
mappings = {
"small": decomposer_small,
"medium": decomposer_medium,
"large": decomposer_large,
}

Expand Down Expand Up @@ -447,14 +510,17 @@ def _get_size(self):
# Make error a property.
error = property(_error)

# Make ionized_fraction a property.
ionized_fraction = property(_get_ionized_fraction)
# Make charge fractions a property.
charge_fractions = property(_get_charge_fractions)

# Make large_fraction a property.
large_fraction = property(_get_large_fraction)
# Make size fractions a property.
size_fractions = property(_get_size_fractions)

# Make charge a property.
charge = property(_get_charge)

# Make size a property for.
# Make size a property.
size = property(_get_size)

# Make nc a property.
nc = property(_get_average_nc)
Loading