"""
This module provides a Jones matrix framework for radio interferometric
measurement equations.
"""
import copy
import numpy.ma as ma
import numpy as np
import matplotlib.pyplot as plt
from casacore.measures import measures
from casacore.quanta import quantity
from .conversion_utils import sph2crt, crt2sph, convertBasis, \
getSph2CartTransf, getSph2CartTransfArr, \
IAU_pol_basis, shiftmat2back, IAUtoC09, \
sphmeshgrid, dc_hrz2vrt
[docs]class Jones(object):
"""This is the base class for Jones algebra. It contains the Jones matrix
itself and a basis w.r.t. which the Jones matrix is given.
The basis is such that:
::
self.jonesbasis = array([[r_hat], [phi_hat], [theta_hat]]).
"""
_ecef_frame = 'ITRF'
_eci_frame = 'J2000'
_topo_frame = 'STN'
def __init__(self):
pass
[docs] def op(self, jonesobjright):
"""Operate this Jones on to the Jones passed in the argument."""
self.jonesr = jonesobjright.getValue()
self.jonesrbasis_from = jonesobjright.get_basis()
self.refframe_r = jonesobjright.get_refframe()
self.iaucmp = jonesobjright.iaucmp
self.computeJonesRes()
return self
[docs] def getValue(self):
"""Return value of the Jones matrix"""
return self.jones
[docs] def get_basis(self):
"""Return basis of the Jones matrix"""
return self.jonesbasis
[docs] def get_refframe(self):
"""Return the reference frame of the Jones matrix."""
return self.refframe
[docs] def computeJonesRes(self):
pass
[docs] def sph2lud3_basis(self, jonesbasis_sph, alignment=None):
"""Convert sph basis to Ludwig3 frame with an optional rotation
alignment."""
# The jonesbasis for the antennas is taken to be the Ludwig3 def.
# with r,u,v basis expressed wrt the station frame
r_refframe = jonesbasis_sph[..., 0]
if alignment is not None:
r = np.tensordot(r_refframe, alignment, axes=([-1, 1]))
else:
r = r_refframe
(az, el) = crt2sph(r.T)
lugwig3rot = np.zeros((3, 3, len(az)))
lugwig3rot[0, 0, :] = 1.
lugwig3rot[1:, 1:, :] = np.array([[np.cos(az), np.sin(az)],
[-np.sin(az), np.cos(az)]])
lugwig3rot = np.moveaxis(lugwig3rot, -1, 0)
jonesbasis_lud3 = np.matmul(jonesbasis_sph, lugwig3rot)
# ang_u = np.rad2deg(
# np.arctan2(jonesbasis_lud3[:,1,1], jonesbasis_lud3[:,0,1]))
# print ang_u
return jonesbasis_lud3
[docs] def convert2iaucmp(self):
if not self.iaucmp:
self.jones = np.matmul(self.jones, IAUtoC09[1:, 1:])
self.iaucmp = True
[docs]class JonesChain(object):
jonesproducts = []
def __init__(self):
self.jonesproducts = []
[docs]class PJones(Jones):
"""This is a P-Jones or parallactic Jones. This has a temporal dependence
given by the epoch of observation."""
def __init__(self, obsTimespy, ITRF2stnrot, do_parallactic_rot=True):
super(PJones, self).__init__()
obsTimes_lst = []
for obsTimepy in obsTimespy:
obsTimes_lst.append(quantity(obsTimepy.isoformat()).get_value())
obsTimes_me = quantity(obsTimes_lst, 'd')
self.obsTimes = obsTimes_me.get_value()
self.obsTimeUnit = obsTimes_me.get_unit()
self.ITRF2stnrot = ITRF2stnrot
self.do_parallactic_rot = do_parallactic_rot
[docs] def computeJonesRes(self):
if type(self.obsTimes) is float:
self.computeJonesRes_overfield()
else:
self.computeJonesRes_overtime()
[docs] def computeJonesRes_overtime(self):
"""Compute and apply the P-Jones matrix. The structure is:
::
jones[time, sphcomp, skycomp] =
Pjones[time, sphcomp, comp]*jonesr[comp, skycomp]
The P-Jones matrix is computed as follows: consider a direction
vector d. Let jonesrbasis be the column concatenation of the 3
spherical basis vectors corresponding to d in the J2000 reference
frame, so
::
jonesrbasis = [[r_J2000],[phi_J2000],[theta_J2000]].T
where `r_J2000` is along the direction d and theta, phi are the
remaining two spherical basis vectors. Let `jonesbasis` be the basis
vectors corresponding to the same direction d but in the STN reference
frame, so
::
jonesbasis = [[r_STN],[phi_STN],[theta_STN]].T
where `r_STN` is along the direction d and theta, phi are the remaining
two spherical basis vectors in the spherical system associated with the
STN.
The algorithm takes `r_J2000` from component 0 of the `jonesrbasis` and
converts it to STN (i.e. finds `r_STN`) using casacore measures module,
along with the other 2 J2000 basis vectors. These converted vectors are
called `jonesrbasis_to`. With `r_STN`, it also computes the
corresponding `jonesbasis`. A vector in the cartesian J2000 ref sys
converted to STN must be equal to the same vector expressed in the
cartesian STN ref sys via a conversion from spherical, so
::
jonesbasis * V_STN^sph = jonesrbasis_to * V_J2000^sph
which implies that we can convert directly from spherical J2000 to the
spherical STN like this
::
V_STN^sph = (jonesbasis.H * jonesrbasis_to) * V_J2000^sph
where the matrix in parentheses is the P-Jones matrix.
The P-Jones matrix is then applied to the operand Jones matrix.
"""
nrOfTimes = len(self.obsTimes)
pjones = np.zeros((nrOfTimes, 2, 2))
me = measures()
me.doframe(measures().position(self._ecef_frame, '0m', '0m', '0m'))
self.jonesbasis = np.zeros((nrOfTimes, 3, 3))
if self.refframe_r == self._eci_frame:
convert2irf = self._ecef_frame
jonesrbasis_from = self.jonesrbasis_from
jr_refframe = self.refframe_r
else:
convert2irf = self._eci_frame
jonesrbasis_from = np.matmul(self.ITRF2stnrot.T,
self.jonesrbasis_from)
jr_refframe = self._ecef_frame
for ti in range(0, nrOfTimes):
# Set current time in reference frame
timEpoch = me.epoch('UTC', quantity(self.obsTimes[ti],
self.obsTimeUnit))
me.doframe(timEpoch)
jonesrbasis_to = np.asmatrix(convertBasis(me, jonesrbasis_from,
jr_refframe,
convert2irf))
if convert2irf == self._ecef_frame:
jonesrbasis_to = np.matmul(self.ITRF2stnrot, jonesrbasis_to)
jonesbasisMat = getSph2CartTransf(jonesrbasis_to[:, 0])
if self.do_parallactic_rot:
pjones[ti, :, :] = jonesbasisMat[:, 1:].H \
* jonesrbasis_to[:, 1:]
else:
pjones[ti, :, :] = np.asmatrix(np.identity(2))
self.jonesbasis[ti, :, :] = jonesbasisMat
if convert2irf == self._ecef_frame:
self.refframe = 'STN' # Final Ref frame is station
else:
self.refframe = self._eci_frame
self.jones = np.matmul(pjones, self.jonesr)
self.thisjones = pjones
[docs] def computeJonesRes_overfield(self):
"""Compute the PJones over field of directions for one frequency.
"""
pjones = np.zeros(self.jonesrbasis_from.shape[0:-2]+(2, 2))
me = measures()
me.doframe(measures().position(self._ecef_frame, '0m', '0m', '0m'))
self.jonesbasis = np.zeros(self.jonesrbasis_from.shape)
if self.refframe_r == self._eci_frame:
convert2irf = self._ecef_frame
jonesrbasis_from = self.jonesrbasis_from
jr_refframe = self.refframe_r
else:
convert2irf = self._eci_frame
jonesrbasis_from = np.matmul(self.ITRF2stnrot.T,
self.jonesrbasis_from)
jr_refframe = self._ecef_frame
timEpoch = me.epoch('UTC', quantity(self.obsTimes, self.obsTimeUnit))
me.doframe(timEpoch)
for idxi in range(self.jonesrbasis_from.shape[0]):
for idxj in range(self.jonesrbasis_from.shape[1]):
jonesrbasis_to = np.asmatrix(convertBasis(
me,
jonesrbasis_from[idxi, idxj, :, :],
jr_refframe, convert2irf))
if convert2irf == self._ecef_frame:
jonesrbasis_to = np.matmul(self.ITRF2stnrot,
jonesrbasis_to)
jonesbasisMat = getSph2CartTransf(jonesrbasis_to[..., 0])
pjones[idxi, idxj, :, :] = jonesbasisMat[:, 1:].H \
* jonesrbasis_to[:, 1:]
self.jonesbasis[idxi, idxj, :, :] = jonesbasisMat
if convert2irf == self._ecef_frame:
self.refframe = 'STN' # Final Ref frame is station
else:
self.refframe = self._eci_frame
self.jones = np.matmul(pjones, self.jonesr)
self.thisjones = pjones
[docs]class DualPolFieldPointSrc(Jones):
"""This is a mock Jones point source. It does not model a real source. It's
purpose is for testing. It can be seen as a source that first transmits in
one polarization and then in another, then 2 transmissions given in the 2
columns.
It may have a spectral dimension. The src_dir should be a tuple with
(az, el, ref)."""
def __init__(self, src_dir, dualPolField=np.identity(2), iaucmp=True):
(src_az, src_el, src_ref) = src_dir
dualPolField3d = np.asmatrix(np.identity(3))
dualPolField3d[1:, 1:] = np.asmatrix(dualPolField)
if iaucmp:
jones = np.matmul(IAUtoC09, dualPolField3d)[1:, 1:]
self.iaucmp = True
else:
jones = dualPolField3d[1:, 1:]
self.iaucmp = False
self.jones = np.asarray(jones)
self.jonesbasis = np.asarray(IAU_pol_basis(src_az, src_el))
self.refframe = src_ref
[docs]class DualPolFieldRegion(Jones):
"""This is a Jones unit flux density field."""
def __init__(self, refframe='J2000', dualPolField=np.identity(2),
iaucmp=True, lmgrid=None):
if not lmgrid:
azimsh, elemsh = sphmeshgrid()
lmn = sph2crt(azimsh, elemsh)
else:
nn = dc_hrz2vrt(*lmgrid)
lmn = np.array(lmgrid+(nn,))
azimsh, elemsh = crt2sph(lmn)
self.azmsh = azimsh
self.elmsh = elemsh
dualPolField3d = np.asmatrix(np.identity(3))
dualPolField3d[1:, 1:] = np.asmatrix(dualPolField)
if iaucmp:
jones = np.matmul(IAUtoC09, dualPolField3d)[1:, 1:]
self.iaucmp = True
else:
jones = dualPolField3d[1:, 1:]
self.iaucmp = False
self.jones = np.broadcast_to(jones,
elemsh.shape+dualPolField.shape)
self.jonesbasis = shiftmat2back(getSph2CartTransfArr(lmn))
self.refframe = refframe
[docs]class EJones(Jones):
"""This is the antenna or feed Jones. It is given by a set of complex gain
patterns for each frequency and polarization channel."""
def __init__(self, dualPolElem, position, stnRot, freqSel=None):
self.position = position
self.stnRot = stnRot
self.dualPolElem = dualPolElem
if freqSel is None:
self.freqChan = self.dualPolElem.getfreqs()
else:
self.freqChan = freqSel
self.refframe = 'STN'
[docs] def computeJonesRes(self):
"""Compute the Jones that results from applying the E-Jones to the
right.
The structure of the `jonesrbasis` is ``[timeIdx, sphIdx, skycompIdx]``.
"""
idxshape = self.jonesrbasis_from.shape[0:-2]
jonesrbasis = np.reshape(self.jonesrbasis_from, (-1, 3, 3))
(az_from, el_from) = crt2sph(jonesrbasis[..., 0].squeeze().T)
theta_phi_view = (np.pi/2-el_from.flatten(), az_from.flatten())
ejones = self.dualPolElem.getJonesAlong(self.freqChan, theta_phi_view)
# AntPat has column order theta_hat, phi_hat
# while C09 has phi_hat, vartheta_hat (where vartheta_hat=-theta_hat).
# So flip order and change sign of theta_hat.
ejones = ejones[..., ::-1]
ejones[..., 1] = -ejones[..., 1]
self.jonesbasis = self.jonesrbasis_from # Basis does not change
# This is the actual MEq multiplication:
if ejones.ndim > 3:
frqdimsz = (ejones.shape[0],)
else:
frqdimsz = ()
self.jones = np.reshape(
np.matmul(ejones, np.reshape(self.jonesr, (-1, 2, 2))),
frqdimsz+idxshape+(2, 2)
)
self.thisjones = np.reshape(ejones, frqdimsz+idxshape+(2, 2))
[docs] def getPosRot(self, position):
"""Compute the nominal transformation from the geodetic position to
ITRF. (Not implemented yet)"""
return np.identity(2)
[docs]class DualPolFieldSink(Jones):
[docs] def computeJonesRes(self):
self.jones = self.jonesr
self.refframe = self.refframe_r
[docs]def inverse(jonesobj):
"""Return a Jones object that is the inverse of `jonesobj`."""
inv_jones = copy.copy(jonesobj)
jmat = jonesobj.getValue()
inv_jones.jones = np.linalg.inv(jmat)
# Swap basis between left and right:
inv_jones.jonesbasis = jonesobj.jonesrbasis_from
inv_jones.jonesrbasis_from = jonesobj.jonesbasis
jframe = jonesobj.get_refframe()
jrframe = jonesobj.refframe_r
inv_jones.refframe = jrframe
inv_jones.refframe_r = jframe
return inv_jones
[docs]def fix_imaginary_directions(jonesobj, fill=np.identity(2)):
"""Replace jones matrices with imaginary directions in a Jones object.
When specifying 2D Cartesian direction cosines, it is possible that the
corresponding direction is not physical, e.g. when l,m = 1,1. In such cases,
the Jones radius basis will have an imaginary vertical component. This function
will find such 'directions' and replace the corresponding Jones matrix will the
fill matrix specified by the `fill` argument.
"""
idxs = np.where(np.imag(jonesobj.jonesbasis[..., 0, 2]))
jonesobj.jones[idxs[0], idxs[1], ...] = fill
[docs]def plotJonesField(jonesfld, jbasis, refframe, rep='abs-Jones',
mask_belowhorizon=True):
"""Plot a Jones field."""
def belowhorizon(z):
"""Return masked z values that are below the horizon.
Below the horizon means either than z is negative or
the z has a nonzero imaginary part.
"""
imagz_ma = ma.getmaskarray(ma.masked_not_equal(z.imag, 0.))
negz_ma = ma.getmaskarray(ma.masked_less(z, .0))
belowhrz = ma.mask_or(imagz_ma, negz_ma)
return belowhrz
if rep == 'abs-Jones':
restitle = 'Beam Jones on sky'
res00 = np.abs(jonesfld[:, :, 0, 0])
res00 = ma.masked_invalid(res00)
res00lbl = r'|J_{p\phi}|'
res01 = np.abs(jonesfld[:, :, 0, 1])
res01 = ma.masked_invalid(res01)
res01lbl = r'|J_{p\theta}|'
res10 = np.abs(jonesfld[:, :, 1, 0])
res10 = ma.masked_invalid(res10)
res10lbl = r'|J_{q\phi}|'
res11 = np.abs(jonesfld[:, :, 1, 1])
res11 = ma.masked_invalid(res11)
res11lbl = r'|J_{q\theta}|'
elif rep == 'Stokes':
corrmat = np.matmul(jonesfld, np.swapaxes(jonesfld.conj(), -2, -1))
S0 = np.real(corrmat[..., 0, 0]+corrmat[..., 1, 1])
SQ = np.real(corrmat[..., 0, 0]-corrmat[..., 1, 1])
SU = np.real(corrmat[..., 0, 1]+corrmat[..., 1, 0])
SV = np.imag(corrmat[..., 0, 1]-corrmat[..., 1, 0])
restitle = 'Antenna Stokes on sky'
res00 = S0
res00lbl = 'I'
res01 = SQ/S0
res01lbl = 'q'
res10 = SU/S0
res10lbl = 'u'
res11 = SV/S0
res11lbl = 'v'
else:
raise Exception("Unknown Jones representation {}.".format(rep))
if refframe == 'STN':
# Directions in Cartesian station crds
x = jbasis[..., 0, 0]
y = jbasis[..., 1, 0]
z = jbasis[..., 2, 0]
if mask_belowhorizon:
belowhrz = belowhorizon(z)
res00 = ma.MaskedArray(res00, mask=belowhrz)
res01 = ma.MaskedArray(res01, mask=belowhrz)
res10 = ma.MaskedArray(res10, mask=belowhrz)
res11 = ma.MaskedArray(res11, mask=belowhrz)
xlabel = 'STN X'
ylabel = 'STN Y'
elif refframe == 'J2000':
r = np.moveaxis(np.squeeze(jbasis[..., :, 0]), -1, 0)
az, el = crt2sph(r, branchcut_neg_x=False)
x = np.rad2deg(az)
y = np.rad2deg(el)
xlabel = 'RA'
ylabel = 'DEC'
fig = plt.figure()
fig.suptitle(restitle)
ax = plt.subplot(221, polar=False)
plt.pcolormesh(x, y, res00) # , vmin=0., vmax=2.0)
plt.colorbar()
ax.set_title(res00lbl)
plt.ylabel(ylabel)
ax = plt.subplot(222, polar=False)
plt.pcolormesh(x, y, res01)
plt.colorbar()
ax.set_title(res01lbl)
ax = plt.subplot(223, polar=False)
plt.pcolormesh(x, y, res10)
plt.colorbar()
ax.set_title(res10lbl)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
ax = plt.subplot(224, polar=False)
plt.pcolormesh(x, y, res11, vmin=np.nanmin(res11), vmax=np.nanmax(res11))
plt.colorbar()
ax.set_title(res11lbl)
plt.xlabel(xlabel)
plt.show()