Skip to content

Commit

Permalink
use astropy to get the sky offset and separation from tel poiting and…
Browse files Browse the repository at this point in the history
… source
  • Loading branch information
TjarkMiener committed Jul 3, 2024
1 parent 97fdc55 commit 6175704
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 59 deletions.
26 changes: 18 additions & 8 deletions dl1_data_handler/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dl1_data_handler.processor import DL1DataProcessor

import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.table import (
Table,
join, # let us merge tables horizontally
Expand Down Expand Up @@ -129,7 +130,7 @@ def __init__(

# Telescope pointings
self.telescope_pointings = {}
self.fix_pointing_alt, self.fix_pointing_az = None, None
self.fix_pointing = None
if self.process_type == "Observation":
for tel_id in self.tel_ids:
with lock:
Expand All @@ -147,17 +148,26 @@ def __init__(

# Only fix telescope pointings valid for MCs!
# No divergent pointing implemented!
self.fix_pointing_alt = self.telescope_pointings[f"tel_{tel_id:03d}"][
fix_pointing_alt = self.telescope_pointings[f"tel_{tel_id:03d}"][
"telescope_pointing_altitude"
][0]
self.fix_pointing_az = self.telescope_pointings[f"tel_{tel_id:03d}"][
]
fix_pointing_az = self.telescope_pointings[f"tel_{tel_id:03d}"][
"telescope_pointing_azimuth"
][0]
# Set the telescope pointing to the delta Alt/Az tranform
]
# Reading the pointing for the first obs_id assuming fix tel pointing
fix_pointing_az = fix_pointing_az[0] * fix_pointing_az.unit
fix_pointing_alt = fix_pointing_alt[0] * fix_pointing_alt.unit
self.fix_pointing = SkyCoord(
fix_pointing_az.to_value(u.deg),
fix_pointing_alt.to_value(u.deg),
frame="altaz",
unit="deg",
)
# Set the telescope pointing of the SkyOffsetSeparation tranform to the fix pointing
if transforms is not None:
for transform in transforms:
if transform.name == "deltaAltAz":
transform.set_pointing(self.fix_pointing_alt, self.fix_pointing_az)
if transform.name == "SkyOffsetSeparation":
transform.set_pointing(self.fix_pointing)

# AI-based trigger system
self.trigger_settings = trigger_settings
Expand Down
97 changes: 46 additions & 51 deletions dl1_data_handler/transforms.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import astropy.units as u
from astropy.coordinates import SkyCoord
import numpy as np
import itertools
from .processor import Transform


class ShowerPrimaryID(Transform):
def __init__(
self, name="true_shower_primary_id", particle_id_col_name="true_shower_primary_id"
):
def __init__(self):
super().__init__()
self.particle_id_col_name = particle_id_col_name
self.particle_id_col_name = "true_shower_primary_id"
self.shower_primary_id_to_class = {
0: 1,
101: 0,
Expand All @@ -22,7 +22,7 @@ def __init__(
255: "hadron",
}

self.name = name
self.name = "true_shower_primary_id"
self.dtype = np.dtype("int8")

def describe(self, description):
Expand Down Expand Up @@ -57,56 +57,40 @@ def transform(self, example):
return example


class MCEnergy(Transform):
def __init__(self, name="energy", energy_col_name="true_energy", unit="log(TeV)"):
class LogEnergy(Transform):
def __init__(self):
super().__init__()
self.name = name
self.energy_col_name = energy_col_name
self.name = "energy"
self.shape = 1
self.dtype = np.dtype("float32")
self.unit = unit
self.unit = "log(TeV)"

def describe(self, description):
self.description = [
{**des, "name": self.name, "dtype": self.dtype, "unit": self.unit}
if des["name"] == self.energy_col_name
if des["name"] == "true_energy"
else des
for des in description
]
return self.description

def __call__(self, example):
for i, (val, des) in enumerate(zip(example, self.description)):
if des["base_name"] == self.energy_col_name:
example[i] = (
np.array([np.log10(val)])
if self.unit == "log(TeV)"
else np.array([val])
)
if des["base_name"] == "true_energy":
example[i] = np.log10(val)
return example


class DeltaAltAz(Transform):
def __init__(
self,
base_name="direction",
alt_col_name="true_alt",
az_col_name="true_az",
deg2rad=True,
north_pointing_correction=True,
):
class SkyOffsetSeparation(Transform):
def __init__(self, transform_to_rad=False):
super().__init__()

self.name = "deltaAltAz"
self.base_name = base_name
self.alt_col_name = alt_col_name
self.az_col_name = az_col_name
self.deg2rad = deg2rad
self.north_pointing_correction = north_pointing_correction
self.shape = 2
self.name = "SkyOffsetSeparation"
self.base_name = "direction"
self.shape = 3
self.dtype = np.dtype("float32")
self.unit = "rad"
self.pointing_alt, self.pointing_az = None, None
self.unit = u.rad if transform_to_rad else u.deg
self.fix_pointing = None

def describe(self, description):
self.description = description
Expand All @@ -117,36 +101,47 @@ def describe(self, description):
"base_name": self.base_name,
"shape": self.shape,
"dtype": self.dtype,
"unit": self.unit,
"unit": str(self.unit),
}
)
return self.description

def set_pointing(self, pointing_alt, pointing_az):
self.pointing_alt = pointing_alt
self.pointing_az = pointing_az
return
def set_pointing(self, fix_pointing):
self.fix_pointing = fix_pointing

def __call__(self, example):
for i, (val, des) in enumerate(
itertools.zip_longest(example, self.description)
):
if des["base_name"] == self.alt_col_name:
alt = np.radians(example[i]) if self.deg2rad else example[i]
alt -= self.pointing_alt
elif des["base_name"] == self.az_col_name:
az = np.radians(example[i]) if self.deg2rad else example[i]
if self.north_pointing_correction and az > 3*np.pi/2:
az -= 2 * np.pi
az -= self.pointing_az
if des["base_name"] == "true_alt":
alt = example[i]
elif des["base_name"] == "true_az":
az = example[i]
elif des["base_name"] == self.base_name:
example.append(np.array([alt, az]))
true_direction = SkyCoord(
az * u.deg,
alt * u.deg,
frame="altaz",
unit="deg",
)
sky_offset = self.fix_pointing.spherical_offsets_to(true_direction)
angular_separation = self.fix_pointing.separation(true_direction)
example.append(
np.array(
[
sky_offset[0].to_value(self.unit),
sky_offset[1].to_value(self.unit),
angular_separation.to_value(self.unit),
]
)
)
return example


class CoreXY(Transform):
def __init__(self, name="impact"):
def __init__(self):
super().__init__()
self.name = name
self.name = "impact"
self.shape = 2
self.dtype = np.dtype("float32")
self.unit = "km"
Expand Down

0 comments on commit 6175704

Please sign in to comment.