Source code for psrqpy.utils

"""
A selection of useful functions used by the module.
"""

import warnings
import re
import os
import functools
import numpy as np
import requests
import tarfile
from bs4 import BeautifulSoup

from appdirs import user_cache_dir
from astropy.table import Table
from astropy.coordinates import SkyCoord, Angle
import astropy.units as aunits
from pandas import DataFrame

from .config import (
    ATNF_BASE_URL,
    ADS_URL,
    ATNF_TARBALL,
    ATNF_VERSION_TARBALL,
    MSP_URL,
    PSR_ALL,
    PSR_ALL_PARS,
    GLITCH_URL,
    GC_URL,
)


#: set cache location
CACHEDIR = user_cache_dir(appname="psrqpy", appauthor=False)

#: set ATNF tarball cache file name
DEFAULTCACHENAME = os.path.basename(ATNF_TARBALL)


[docs]def get_catalogue( path_to_db=None, cache=True, update=False, pandas=False, version="latest" ): """ This function will attempt to download and cache the entire ATNF Pulsar Catalogue database `tarball <http://www.atnf.csiro.au/people/pulsar/psrcat/downloads/psrcat_pkg.tar.gz>`_, or read in database file from a provided path. The database will be converted into an :class:`astropy.table.Table` or :class:`pandas.DataFrame`. This was originally based on the method in the `ATNF.ipynb <https://github.com/astrophysically/ATNF-Pulsar-Cat/blob/master/ATNF.ipynb>`_ notebook by Joshua Tan (`@astrophysically <https://github.com/astrophysically/>`_). Args: path_to_db (str): if the path to a local version of the database file is given then that will be read in rather than attempting to download the file (defaults to None). cache (bool): cache the downloaded ATNF Pulsar Catalogue file. Defaults to True. This is ignored if `path_to_db` is given. The cache file path uses the specification in the `appdirs <https://github.com/ActiveState/appdirs>`_ package to set the location. update (bool): if True the ATNF Pulsar Catalogue will be re-downloaded and cached if there has been a change compared to the currently cached version. This is ignored if `path_to_db` is given. pandas (bool): if True the catalogue will be returned as a :class:`pandas.DataFrame` rather than the default of an :class:`~astropy.table.Table`. version (str): the version string (without the leading "v") of the ATNF catalogue version to download. This defaults to "latest" to get the most up-to-date version. Returns: :class:`~astropy.table.Table` or :class:`~pandas.DataFrame`: a table containing the entire catalogue. """ if path_to_db is None: # remove any cached file if requested if update: if check_update(): os.remove(os.path.join(CACHEDIR, DEFAULTCACHENAME)) if version == "latest": atnftarball = ATNF_TARBALL else: atnftarball = ATNF_VERSION_TARBALL.format(version) # get the tarball try: dbtarfile = download_atnf_tarball( atnftarball, usecache=cache, version=version ) except RuntimeError as e: raise IOError( f"Problem accessing ATNF catalogue tarball: {atnftarball}\n{e}" ) try: # open tarball pulsargz = tarfile.open(dbtarfile, mode="r:gz") # extract the database file dbfile = pulsargz.extractfile("psrcat_tar/psrcat.db") except IOError: raise IOError("Problem extracting the database file") else: try: dbfile = open(path_to_db, "r") except IOError: raise IOError("Error loading given database file") breakstring = "@" # break between each pulsar commentstring = "#" # specifies line is a comment # create list of dictionaries - one for each pulsar psrlist = [{}] version = None # catalogue version # loop through lines in dbfile for line in dbfile.readlines(): if isinstance(line, bytes): line = line.decode() if line.startswith(commentstring): if version is None and "CATALOGUE" in line: # get catalogue version (should be in first comment string) version = line.split()[-1].strip() continue # split on whitespace dataline = line.split() if dataline[0][0] == breakstring: # First break comes at the end of the first object and so forth psrlist.append({}) # New object! continue try: psrlist[-1][dataline[0]] = float(dataline[1]) except ValueError: psrlist[-1][dataline[0]] = dataline[1] if len(dataline) > 2: # check whether 3rd value is a float (so its an error value) or not try: float(dataline[2]) isfloat = True except ValueError: isfloat = False if isfloat: # error values are last digit errors, so convert to actual # errors by finding the number of decimal places after the # '.' in the value string val = dataline[1].split(":")[-1] # account for RA and DEC strings try: float(val) except ValueError: raise ValueError("Value with error is not convertable to a float") if dataline[2][0] == "-" or "." in dataline[2]: # negative errors or those with decimal points are absolute values scalefac = 1.0 else: # split on exponent valsplit = re.split("e|E|d|D", val) scalefac = 1.0 if len(valsplit) == 2: scalefac = 10 ** (-int(valsplit[1])) dpidx = valsplit[0].find(".") # find position of decimal point if dpidx != -1: # a point is found scalefac *= 10 ** (len(valsplit[0]) - dpidx - 1) # add error column if required psrlist[-1][dataline[0] + "_ERR"] = ( float(dataline[2]) / scalefac ) # error entry else: # add reference column if required psrlist[-1][dataline[0] + "_REF"] = dataline[2] # reference entry if len(dataline) > 3: # last entry must(!) be a reference psrlist[-1][dataline[0] + "_REF"] = dataline[3] # reference entry dbfile.close() # close tar file if not path_to_db: pulsargz.close() del psrlist[-1] # Final breakstring comes at the end of the file # add RA and DEC in degs and JNAME/BNAME for i, psr in enumerate(list(psrlist)): # add 'JNAME', 'BNAME' and 'NAME' if "PSRJ" in psr.keys(): psrlist[i]["JNAME"] = psr["PSRJ"] if "PSRJ_REF" in psr.keys(): psrlist[i]["JNAME_REF"] = psr["PSRJ_REF"] if "PSRB" in psr.keys(): psrlist[i]["BNAME"] = psr["PSRB"] if "PSRB_REF" in psr.keys(): psrlist[i]["BNAME_REF"] = psr["PSRB_REF"] if "PSRB" in psr.keys() and "PSRJ" in psr.keys(): psrlist[i]["NAME"] = psr["PSRB"] if "PSRB_REF" in psr.keys(): psrlist[i]["NAME_REF"] = psr["PSRB_REF"] else: psrlist[i]["NAME"] = psr["PSRJ"] if "PSRJ_REF" in psr.keys(): psrlist[i]["NAME_REF"] = psr["PSRJ_REF"] if "RAJ" in psr.keys() and "DECJ" in psr.keys(): # check if the string can be converted to a float (there are a few # cases where the position is just a decimal value) try: rad = float(psr["RAJ"]) ras = Angle(rad * aunits.hourangle) psr["RAJ"] = ras.to_string(sep=":", pad=True) except ValueError: pass try: decd = float(psr["DECJ"]) decs = Angle(decd * aunits.deg) psr["DECJ"] = decs.to_string(sep=":", pad=True, alwayssign=True) except ValueError: pass try: coord = SkyCoord( psr["RAJ"], psr["DECJ"], unit=(aunits.hourangle, aunits.deg) ) psrlist[i]["RAJD"] = coord.ra.deg # right ascension in degrees psrlist[i]["DECJD"] = coord.dec.deg # declination in degrees except Exception as e: warnings.warn( "Error converting RAJ/DECJ strings to degrees for {}: {}".format( psrlist[i]["NAME"], e ) ) psrlist[i]["RAJD"] = np.nan psrlist[i]["DECJD"] = np.nan # set errors on positions in degrees if "RAJ_ERR" in psr.keys(): # get the units for the error nvals = len(psr["RAJ"].split(":")) rajscale = 1.0 / (60.0 ** (len(psr["RAJ"].split(":")) - 1)) psrlist[i]["RAJD_ERR"] = ( (psr["RAJ_ERR"] * rajscale * aunits.hourangle).to("deg").value ) if "DECJ_ERR" in psr.keys(): # get the units for the error nvals = len(psr["DECJ"].split(":")) if nvals == 1: decunit = aunits.deg elif nvals == 2: decunit = aunits.arcmin else: decunit = aunits.arcsec psrlist[i]["DECJD_ERR"] = (psr["DECJ_ERR"] * decunit).to("deg").value # always include a value for POSEPOCH if PEPOCH is present # this matches the behaviour of the psrcat v1.66 CLI (definePosEpoch.c:202-213) if "POSEPOCH" not in psr.keys() and "PEPOCH" in psr.keys(): psrlist[i]["POSEPOCH"] = psrlist[i]["PEPOCH"] if "PEPOCH_REF" in psr.keys(): psrlist[i]["POSEPOCH_REF"] = psrlist[i]["PEPOCH_REF"] # derive the 'DATE' parameter from the 'POSEPOCH' reference # implementation based upon the psrcat v1.66 CLI (definePosEpoch.c:214-242) if "POSEPOCH_REF" in psr.keys(): try: refyear = int( re.search(r"(\d{2})(?!.*\d)", psrlist[i]["POSEPOCH_REF"]).group(0) ) if refyear > 65: refyear += 1900 else: refyear += 2000 psrlist[i]["DATE"] = refyear except (AttributeError, TypeError): pass # convert to a pandas DataFrame - this will fill in empty spaces dftable = DataFrame(psrlist) if pandas: # return pandas DataFrame dftable.version = version return dftable # convert into an astropy table psrtable = Table.from_pandas(dftable) # add units if known for key in PSR_ALL_PARS: if key in psrtable.colnames: if PSR_ALL[key]["units"]: psrtable.columns[key].unit = PSR_ALL[key]["units"] if PSR_ALL[key]["err"] and key + "_ERR" in psrtable.colnames: psrtable.columns[key + "_ERR"].unit = PSR_ALL[key]["units"] # add metadata if not path_to_db: if version is not None: psrtable.meta["version"] = version else: psrtable.meta["version"] = None warnings.warn("No version number found in the database file", UserWarning) psrtable.meta["ATNF Pulsar Catalogue"] = ATNF_BASE_URL if path_to_db: psrtable.meta["Database file"] = path_to_db return psrtable
[docs]def check_update(): """ Check if the ATNF Pulsar Catalogue has been updated compared to the version in the cache. Returns: bool: True if the cache can be updated. """ from astropy.utils.data import compute_hash if not os.path.isfile(os.path.join(CACHEDIR, DEFAULTCACHENAME)): # can update cache as file is not cached yet return True # get the cached file name cachefile = download_atnf_tarball(ATNF_TARBALL, usecache=True) # download a new version of the file and check the hash tmpcache = download_atnf_tarball(ATNF_TARBALL, usecache=False) curhash = compute_hash(cachefile) tmphash = compute_hash(tmpcache) if curhash == tmphash: # no update needed return False else: # an update can be obtained return True
[docs]def download_atnf_tarball(url, usecache=True, version="latest"): """ Download and cache the ATNF catalogue database. Args: url (str): the URL of the ATNF Pulsar Catalogue tarball. usecache (bool): if True try and get the cached version """ if version == "latest": cachefile = os.path.join(CACHEDIR, DEFAULTCACHENAME) else: cachefile = os.path.join( CACHEDIR, os.path.basename(ATNF_VERSION_TARBALL.format(version)) ) if usecache and os.path.isfile(cachefile): # return cached file return cachefile try: tarfile = requests.get(url) tarfile.raise_for_status() except Exception as e: raise RuntimeError("Error downloading pulsar catalogue: {}".format(str(e))) if not os.path.exists(CACHEDIR): try: os.makedirs(CACHEDIR) except OSError: if not os.path.exists(CACHEDIR): raise elif not os.path.isdir(CACHEDIR): raise OSError(f"Query cache directory {CACHEDIR} is not a directory") # write out file to cache with open(cachefile, "wb") as fp: fp.write(tarfile.content) return cachefile
[docs]def get_glitch_catalogue(psr=None): """ Return a :class:`~astropy.table.Table` containing the `Jodrell Bank pulsar glitch catalogue <http://www.jb.man.ac.uk/pulsar/glitches/gTable.html>`_. If using data from the glitch catalogue then please cite `Espinoza et al. (2011) <http://adsabs.harvard.edu/abs/2011MNRAS.414.1679E>`_ and the URL `<http://www.jb.man.ac.uk/pulsar/glitches.html>`_. The output table will contain the following columns: * `NAME`: the pulsars common name * `JNAME`: the pulsar name based on J2000 coordinates * `Glitch number`: the number of the glitch for a particular pulsar in chronological order * `MJD`: the time of the glitch in Modified Julian Days * `MJD_ERR`: the uncertainty on the glitch time in days * `DeltaF/F`: the fractional frequency change * `DeltaF/F_ERR`: the uncertainty on the fractional frequency change * `DeltaF1/F1`: the fractional frequency derivative change * `DeltaF1/F1_ERR`: the uncertainty on the fractional frequency derivative change * `Reference`: the glitch publication reference Args: psr (str): if a pulsar name is given then only the glitches for that pulsar are returned, otherwise all glitches are returned. Returns: :class:`~astropy.table.Table`: a table containing the entire glitch catalogue. Example: An example of using this to extract the glitches for the Crab Pulsar would be: >>> import psrqpy >>> gtable = psrqpy.get_glitch_catalogue(psr='J0534+2200') >>> print("There have been {} glitches observed from the Crab pulsar".format(len(gtable))) 27 """ # get webpage try: gt = requests.get(GLITCH_URL) except Exception as e: raise RuntimeError("Error downloading glitch catalogue: {}".format(str(e))) if gt.status_code != 200: warnings.warn("Count not query the glitch catalogue.", UserWarning) return None # parse HTML try: soup = BeautifulSoup(gt.content, "html.parser") except Exception as e: warnings.warn( "Count not parse the glitch catalogue: {}".format(str(e)), UserWarning ) return None # get table rows rows = soup.table.find_all("tr") # set the table headings tabledict = dict() tabledict["NAME"] = [] tabledict["JNAME"] = [] tabledict["Glitch number"] = [] tabledict["MJD"] = [] tabledict["MJD_ERR"] = [] tabledict["DeltaF/F"] = [] tabledict["DeltaF/F_ERR"] = [] tabledict["DeltaF1/F1"] = [] tabledict["DeltaF1/F1_ERR"] = [] tabledict["Reference"] = [] # loop through rows: rows with glitches have their first column as an index for row in rows: tds = row.find_all("td") if tds[0].contents[0].string is None: continue try: tabledict["NAME"].append(tds[1].contents[0].string) jname = ( "J" + tds[2].contents[0].string if "J" != tds[2].contents[0].string[0] else tds[2].contents[0].string ) tabledict["JNAME"].append(jname) tabledict["Glitch number"].append(int(tds[3].contents[0].string)) for j, pname in enumerate( [ "MJD", "MJD_ERR", "DeltaF/F", "DeltaF/F_ERR", "DeltaF1/F1", "DeltaF1/F1_ERR", ] ): try: val = float(tds[4 + j].contents[0].string) except (ValueError, TypeError): val = np.nan tabledict[pname].append(val) # get reference link if present try: ref = tds[10].contents[0].a.attrs["href"] except AttributeError: ref = tds[10].contents[0].string tabledict["Reference"].append(ref) except RuntimeError: warnings.warn("Problem parsing glitch table", UserWarning) return None # convert to an astropy table table = Table(tabledict) table.columns["MJD"].unit = aunits.Unit("d") # add units of days to glitch time table.columns["MJD_ERR"].unit = aunits.Unit("d") # correct scaling of parameters table["DeltaF/F"] *= 1e-9 table["DeltaF/F_ERR"] *= 1e-9 table["DeltaF1/F1"] *= 1e-3 table["DeltaF1/F1_ERR"] *= 1e-3 if psr is None: return table else: if psr not in table["NAME"] and psr not in table["JNAME"]: warnings.warn( "Pulsar '{}' not found in glitch catalogue".format(psr), UserWarning ) return None else: if psr in table["NAME"]: return table[table["NAME"] == psr] else: return table[table["JNAME"] == psr]
[docs]def get_gc_catalogue(): """ Download and parse Paolo Freire's table of pulsars in globular clusters from http://www.naic.edu/~pfreire/GCpsr.txt. This will be returned as a :class:`astropy.table.Table`, but with some additional methods to extract information for individual clusters. If the webpage returned an error ``None`` is returned. The additional methods of the returned :class:`~astropy.table.Table` are: * ``clusters``: by default this returns a list if the common names of the globular clusters in the table, or using the ``ngc=True`` argument will return the NGC names of all clusters. * ``cluster_pulsars``: returns a sub-table only containing pulsars within a supplied globular cluster name. """ # get the webpage try: gct = requests.get(GC_URL) except Exception as e: raise RuntimeError( "Error downloading the globular cluster pulsar table: {}".format(str(e)) ) if gct.status_code != 200: warnings.warn("Count not query the globular cluster pulsar table.", UserWarning) return None # convert from btyes to utf-8 tabledata = gct.content.decode("utf-8") names = [ "Cluster", "NGC", "Pulsar", "Offset", "Offset error", "Period", "dP/dt", "dP/dt error", "DM", "DM error", "Pb", "x", "x error", "e", "e error", "m2", ] # create astropy table gctable = Table( names=names, dtype=[ str, str, str, float, float, float, float, float, float, float, float, float, float, float, float, float, ], units=[ None, None, None, aunits.arcmin, aunits.arcmin, 1e-3 * aunits.s, 1e-20 * aunits.s / aunits.s, 1e-20 * aunits.s / aunits.s, aunits.cm**-3 * aunits.pc, aunits.cm**-3 * aunits.pc, aunits.d, aunits.s, aunits.s, None, None, aunits.Msun, ], masked=True, ) # loop over data and add in rows for line in tabledata.split("\n"): line = line.strip() if len(line) == 0: continue if line[0] == "#": # comment line continue # make sure any en-dashes "−" are replaced with - linevalues = re.sub("−", "-", line).split() # check if line is a pulsar ispulsar = False if linevalues[0][0] in ["J", "B"]: try: if linevalues[0][5] in ["+", "-"]: ispulsar = True except IndexError: pass # get globular cluster name if not ispulsar: # check if there's an NGC name ngc = re.search(r"NGC\s\d+", line) if ngc is not None: ngc = ngc.group() # common name gcname = line.split("(")[0].strip() continue psrentry = {key: None for key in names} psrentry["Cluster"] = gcname psrentry["NGC"] = str(ngc) psrentry["Pulsar"] = linevalues[0] psrentry["Period"] = float(linevalues[2]) if linevalues[2] != "*" else np.nan def parse_value_error(value): """ Parse a string value and extract the error if given. Args: value (str): A string containing a value with associated last digit error in parentheses, e.g., "-1.2324(5)" Returns: tuple: A tuple containing the value and the error as floats, or NaN for the error is not present """ if "(" in value: error = float(value[value.find("(") + 1 : value.find(")")]) exponent = value.find("(") - value.find(".") - 1 if "*10-15" in value: # scale to units of 10-20 scale = 1e-15 / 1e-20 else: scale = 1.0 val = float(value[: value.find("(")]) * scale error *= 10**-exponent * scale else: if "*10-15" in value: # scale to units of 10-20 scale = 1e-15 / 1e-20 value = value.rstrip("*10-15") else: scale = 1.0 # remove any inequalities val = ( float(value.strip(">").strip("<")) if value != "*" and value != "i" else np.nan ) error = np.nan return val, error # Offset val, error = parse_value_error(linevalues[1]) psrentry["Offset"] = val psrentry["Offset error"] = error # dP/dt val, error = parse_value_error(linevalues[3]) psrentry["dP/dt"] = val psrentry["dP/dt error"] = error # DM val, error = parse_value_error(linevalues[4]) psrentry["DM"] = val psrentry["DM error"] = error # binary parameters val, _ = parse_value_error(linevalues[5]) psrentry["Pb"] = val val, error = parse_value_error(linevalues[6]) psrentry["x"] = val psrentry["x error"] = error val, error = parse_value_error(linevalues[7]) psrentry["e"] = val psrentry["e error"] = error # due to an entry without proper spacing (J1953+1846A) one line gives an index error for m2 try: val, _ = parse_value_error(linevalues[8]) psrentry["m2"] = val except IndexError: psrentry["m2"] = np.nan mask = {} for key, value in psrentry.items(): mask[key] = ( True if value is None or value == "None" or value is np.nan else False ) gctable.add_row(psrentry, mask=mask) class GCTable(Table): def __init__(self, data=None, **kwargs): super().__init__(data=data, **kwargs) def clusters(self, ngc=False): """ Return a list of the globular cluster names. Args: ngc (bool): Set to True to return the NGC names rather than common names (some of which may be NGC names anyway!) """ if ngc: return [val[0] for val in self.group_by("NGC").groups.keys.as_array()] else: return [ val[0] for val in self.group_by("Cluster").groups.keys.as_array() ] def cluster_pulsars(self, cluster): """ Return a table just containing the pulsars in a requested globular cluster. If the cluster is not found then return None. Args: cluster (str): The name of the globular cluster for which to return the pulsars. Returns: :class:`~astropy.table.Table`: a table the pulsars in the given cluster. """ clustercolumns = ["Cluster", "NGC"] # check if name is just an integer value, which could be an NGC id try: gcnum = int(cluster) except ValueError: gcnum = None if gcnum is not None: cluster = "NGC {}".format(gcnum) # check for cluster name in the two columns containing name info gctable = None for cc in clustercolumns: # remove spaces from cluster names, so, e.g., 47Tuc will work # as well as 47 Tuc cluster = cluster.replace(" ", "") clusters = [ gc.replace(" ", "") for gc in self[cc] if isinstance(gc, str) ] if cluster in clusters: mask = clusters == cluster gctable = self[mask] break return gctable # return the table return GCTable(gctable)
[docs]def get_msp_catalogue(): """ Download and parse Dunc Lorimer's `catalogue <http://astro.phys.wvu.edu/GalacticMSPs/GalacticMSPs.txt>`_ of galactic millisecond pulsars. This makes use of code from the now deleted `galmsps <https://github.com/astrogewgaw/galmsps>`_ respository that scraped the database into JSON every month, contributed by `@astrogewgaw <https://github.com/astrogewgaw>`_ before deletion via `PR #88 <https://github.com/mattpitkin/psrqpy/pull/88>`_. Returns: :class:`~astropy.table.Table`: a table the MSPs. """ import re # get the webpage try: mspt = requests.get(MSP_URL) except Exception as e: raise RuntimeError("Error downloading the MSP table: {}".format(str(e))) if mspt.status_code != 200: warnings.warn("Count not query the MSP table.", UserWarning) return None numeric = ( lambda _: float(_) if re.search(re.compile(r"^[-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?$"), _) else None ) # parse the text file into a dict tabledata = { str(i + 1): { key: conv(value) # type: ignore for (key, conv), value in zip( [ ("NAME", str), ("P0", numeric), ("DM", numeric), ("GL", numeric), ("GB", numeric), ("PB", numeric), ("A1", numeric), ("DISCOVERY YEAR", int), ("NOTES", str), ], values, ) } for i, values in enumerate( [ [_ for _ in re.split(r"\s+", _) if _] for _ in re.split( r"\n+", re.sub( re.compile(r"^([^#]*)[#](.*)$", re.MULTILINE), "", mspt.content.decode(), ).strip(), ) ] ) } # column names colnames = [ "NAME", "P0", # period "DM", # dispersion measure "GL", # galactic longitude "GB", # galactic latitude, "PB", # binary period "A1", # projected semi-major axis "DISCOVERY YEAR", # discovery year "NOTES", ] # units for the columns units = { "NAME": None, "P0": aunits.s * 1e-3, "DM": aunits.cm**-3 * aunits.pc, "GL": aunits.deg, "GB": aunits.deg, "PB": aunits.d, "A1": aunits.s, "DISCOVERY YEAR": aunits.year, "NOTES": None, } # create dictionary in form usable by astropy Table tabledict = { colnames[i]: [item[colnames[i]] for item in tabledata.values()] for i in range(len(colnames)) } # convert into astropy Table msptable = Table( data=tabledict, units=units, dtype=[str, float, float, float, float, float, float, int, str], masked=True, ) return msptable
def check_old_references(func): @functools.wraps(func) def wrapper_check_old_references(*args, **kwargs): # check version try: return func(*args, **kwargs) except Exception as e: if kwargs.get("useads", False): from ads.exceptions import APIResponseError if type(e) is APIResponseError: raise (e) warnings.warn( "The way references are stored in the ATNF catalogue has " "changed and 'get_references' will no longer parse the old " "style references. Please update your cached version of the " "ATNF pulsar catalogue with:\n\n" ">>> from psrqpy import QueryATNF\n" ">>> QueryATNF(checkupdate=True)\n\n" "and update any cached references with:\n\n" ">>> from psrqpy import get_references\n" ">>> refs = get_references(updaterefcache=True)" ) # get the correct number of expected outputs output = [None] if kwargs.get("useads", False): output.append(None) elif len(args) > 0: if args[0]: output.append(None) if kwargs.get("bibtex", False): output.append(None) elif len(args) == 4: if args[3]: output.append(None) if kwargs.get("showfails", False): output.append(None) elif len(args) == 5: if args[4]: output.append(None) return tuple(output) return wrapper_check_old_references
[docs]@check_old_references def get_references( useads=False, cache=True, updaterefcache=False, bibtex=False, showfails=False, version="latest", ): """ Return a dictionary of paper `reference <http://www.atnf.csiro.au/research/pulsar/psrcat/psrcat_ref.html>`_ in the ATNF catalogue. The keys are the ref strings given in the ATNF catalogue. Note: The way that the ATNF references are stored has changed, so if you downloaded the catalogue with a version of psrqpy before v1.0.8 you may need to run this function with ``updaterefcache=True`` to allow references to work. You may also want to update the ATNF catalogue tarball with: >>> import psrqpy >>> psrqpy.QueryATNF(checkupdate=True) Args: useads (bool): boolean to set whether to use the python mod:`ads` module to get the NASA ADS URL for the references. cache (bool): use cached, or cache, the reference bundled with the catalogue tarball. updaterefcache (bool): update the cached references. bibtex (bool): if using ADS return the bibtex for the reference along with the ADS URL. showfails (bool): if outputting NASA ADS references set this flag to True to output the reference tags of references that fail to be found (mainly for debugging purposes). version (str): the version string (without the leading "v") of the ATNF catalogue version to download. This defaults to "latest" to get the most up-to-date version. Returns: dict: a dictionary of references. """ import json if version == "latest": atnftarball = ATNF_TARBALL else: atnftarball = ATNF_VERSION_TARBALL.format(version) # get the tarball try: dbtarfile = download_atnf_tarball( atnftarball, usecache=not updaterefcache, version=version ) except IOError: raise IOError("Problem accessing ATNF catalogue tarball") try: # open tarball pulsargz = tarfile.open(dbtarfile, mode="r:gz") # extract the references reffile = pulsargz.extractfile("psrcat_tar/psrcat_ref") except IOError: raise IOError("Problem extracting the database file") refdic = { line.split()[0]: " ".join(line.split()[2:]) for line in reffile.read().decode("utf-8").strip().split("***") if len(line) > 0 } reffile.close() pulsargz.close() # close tar file # if not requiring ADS references just return the current dictionary if not useads: return refdic else: try: import ads from ads.exceptions import APIResponseError except ImportError: warnings.warn( "Could not import ADS module, so no ADS information " "will be included", UserWarning, ) return refdic, None # try getting cached references if not cache: adsrefs = {} else: cachefile = os.path.join(CACHEDIR, "ads_cache") # check if cached ADS refs list exists (using dummy URL) if os.path.exists(cachefile) and not updaterefcache: try: fp = open(cachefile, "r") except IOError: warnings.warn( "Could not load ADS URL cache for references", UserWarning ) return refdic, None cachedrefs = json.load(fp) fp.close() adsrefs = None adsbibtex = None failures = None if "urls" in cachedrefs: adsrefs = cachedrefs["urls"] if bibtex and "bibtex" in cachedrefs: adsbibtex = cachedrefs["bibtex"] if showfails and "failures" in cacherefs: failures = cachedrefs["failures"] if bibtex: if failures is None: return refdic, adsrefs, adsbibtex else: return refdic, adsrefs, adsbibtex, failures else: if failures is None: return refdic, adsrefs else: return refdic, adsrefs, failures else: adsrefs = {} # loop over references j = 0 bibcodes = {} failures = [] for reftag in refdic: j = j + 1 refstring = refdic[reftag] # check if IAU Circular or PhD thesis iaucirc = True if "IAU Circ" in refstring else False thesis = True if "PhD thesis" in refstring else False sepauthors = "" # check for arXiv identifier arxivid = None if "arXiv:" in refstring or "ArXiv:" in refstring: for searchterm in [ r"[Aa]rXiv:[0-9]{4}.[0-9]*", r"[Aa]rXiv:astro-ph/[0-9]{7}", ]: match = re.search(searchterm, refstring) if match is not None: arxivid = match.group().lower() break else: if iaucirc: # get circular number (value after IAU Circ. No.) spl = re.split(r"([0-9]{4})", refstring) noidx = 1 for val in spl: if "IAU Circ" in val: break noidx += 1 volume = spl[noidx] else: # do splitting on the year (allows between 1000-2999) spl = re.split(r"([1-2][0-9]{3})", refstring) if len(spl) < 2: # no authors + year, so ignore! failures.append(reftag) continue year = spl[1] if len(spl[1]) == 4 else None try: int(year) except (ValueError, TypeError): # "year" is not an integer failures.append(reftag) continue # get the authors (remove line breaks/extra spaces and final full-stop) authors = spl[0].strip().strip(".") # remove " Jr." from any author names (as it causes issues!) authors = authors.replace(" Jr.", "") # replace ampersands/and with ".," for separation authors = authors.replace(" &", ".,").replace(" and", ".,") # separate out authors sepauthors = [ auth.lstrip() for auth in authors.split(".,") if len(auth.strip()) > 0 and "et al" not in auth ] # remove any "'s for umlauts in author names sepauthors = [a.replace(r'"', "") for a in sepauthors] if len(sepauthors) == 0: # no authors were parsed failures.append(reftag) continue if not thesis and not iaucirc: volume = None page = None if len(spl) > 2: # join the remaining values and split on "," extrainfo = [ info for info in ("".join(spl[2:])).lstrip(".").split(",") if len(info.strip()) > 0 ] # get the journal volume (assumed to be second from last) try: # in case volume contains issue number in brackets perform split volume = int(extrainfo[-2].strip().split("(")[0]) except (IndexError, TypeError, ValueError): # could not get the volume pass # get the page if given (assumed to be th last value) try: testpage = re.sub( r"[\+\-\.]", "", extrainfo[-1].strip().split("-")[0] ) if not testpage.startswith( "eaao" ): # Science Advances page string if ( testpage[0].upper() in ["L", "A", "E"] or testpage[0:4] == "" ): # e.g. for ApJL, A&A, PASA _ = int(testpage[1:]) elif testpage[-1].upper() == "P": # e.g., for early MNRAS _ = int(testpage[:-1]) else: _ = int(testpage) page = testpage except (IndexError, TypeError, ValueError): # could not get the page pass if volume is None or page is None: failures.append(reftag) continue # generate the query string if arxivid is None: if not thesis: if iaucirc: myquery = 'bibstem:"IAUC" volume:"{}"'.format(volume) else: # default query without authors myquery = "year:{} AND volume:{} AND page:{}".format( year, volume, page ) # add author if given if len(sepauthors) > 0: # check if authors have spaces in last names (a few cases due to formating of some accented names), # if so try next author... for k, thisauthor in enumerate(sepauthors): if len(thisauthor.split(",")[0].split()) == 1: myquery += ' AND author:"{}{}"'.format( "^" if k == 0 else "", thisauthor ) break else: myquery = 'year: {} AND author:"^{}" AND bibstem:"PhDT"'.format( year, sepauthors[0] ) else: myquery = arxivid try: article = ads.SearchQuery(q=myquery) except APIResponseError: failures.append(reftag) warnings.warn( "Could not get reference information, so no ADS " "information for {} will be included".format(reftag), UserWarning, ) continue for paper in article: bibcodes[reftag] = paper.bibcode adsrefs[reftag] = ADS_URL.format(bibcodes[reftag]) # check if paper bibcode was found if reftag not in bibcodes: failures.append(reftag) if bibtex: # use ExportQuery to get bibtex expquery = ads.ExportQuery(list(bibcodes.values())).execute().split("\n\n") adsbibtex = {} for reftag in bibcodes: for equery in expquery: if bibcodes[reftag] in equery: adsbibtex[reftag] = equery break if cache: # output adsrefs to cache file try: # output to dummy temporary file and then "download" to cache with open(cachefile, "w") as fp: cachedic = {} cachedic["urls"] = adsrefs if bibtex: cachedic["bibtex"] = adsbibtex if showfails: cachedic["failures"] = failures json.dump(cachedic, fp, indent=2) except IOError: raise IOError("Could not output the ADS references to a file") if bibtex: if showfails: return refdic, adsrefs, adsbibtex, failures else: return refdic, adsrefs, adsbibtex else: if showfails: return refdic, adsrefs, failures else: return refdic, adsrefs
# string of logical expressions for use in regex parser LOGEXPRS = ( r"(\bAND\b" # logical AND r"|\band\b" # logical AND r"|\&\&" # logical AND r"|\bOR\b" # logical OR r"|\bor\b" # logical OR r"|\|\|" # logical OR r"|!=" # not equal to r"|==" # equal to r"|<=" # less than or equal to r"|>=" # greater than or equal to r"|<" # less than r"|>" # greater than r"|\(" # left opening bracket r"|\)" # right closing bracket r"|\bNOT\b" # logical NOT r"|\bnot\b" # logical NOT r"|!" # logical NOT r"|~" # logical NOT r"|\bASSOC\b" # pulsar association r"|\bassoc\b" # pulsar association r"|\bTYPE\b" # pulsar type r"|\btype\b" # pulsar type r"|\bBINCOMP\b" # pulsar binary companion type r"|\bbincomp\b" # pulsar binary companion type r"|\bSURVEY\b" # pulsar observation survey r"|\bsurvey\b" # pulsar observation survey r"|\bDISCOVERY\b" # pulsar discovery survey r"|\bdiscovery\b" # pulsar discovery survey r"|\bEXIST\b" # pulsar parameter exists in the catalogue r"|\bexist\b" # pulsar parameter exists in the catalogue r"|\bERROR\b" # condition on parameter error r"|\berror\b)" ) # condition on parameter error
[docs]def condition(table, expression, exactMatch=False): """ Apply a logical expression to a table of values. Args: table (:class:`astropy.table.Table` or :class:`pandas.DataFrame`): a table of pulsar data expression (str, :class:`~numpy.ndarray`): a string containing a set of logical conditions with respect to pulsar parameter names (also containing `conditions <http://www.atnf.csiro.au/research/pulsar/psrcat/psrcat_help.html?type=normal#condition>`_ allowed when accessing the ATNF Pulsar Catalogue), or a boolean array of the same length as the table. exactMatch (bool): set to true to exactly match some string comparison expressions, e.g., if asking for `'ASSOC(SNR)'` and `exactMatch` is True then only pulsar with an association that is just `'SNR'` will be returned, whereas if it is False then there could be multiple associations including `'SNR'`. Returns: :class:`astropy.table.Table` or :class:`pandas.DataFrame`: the table of values conforming to the input condition. Depending on the type of input table the returned table will either be a :class:`astropy.table.Table` or :class:`pandas.DataFrame`. Example: Some examples of this might are: 1. finding all pulsars with frequencies greater than 100 Hz >>> newtable = condition(psrtable, 'F0 > 100') 2. finding all pulsars with frequencies greater than 50 Hz and period derivatives less than 1e-15 s/s. >>> newtable = condition(psrtable, '(F0 > 50) & (P1 < 1e-15)') 3. finding all pulsars in binary systems >>> newtable = condition(psrtable, 'TYPE(BINARY)') 4. parsing a boolean array equivalent to the first example >>> newtable = condition(psrtable, psrtable['F0'] > 100) """ from astropy.table import Table from pandas import DataFrame if expression is None: return table # check if expression is just a boolean array if isinstance(expression, np.ndarray): if expression.dtype != bool: raise TypeError("Numpy array must be a boolean array") elif len(expression) != len(table): raise Exception("Boolean array and table must be the same length") else: return table[expression] else: if not isinstance(expression, str): raise TypeError("Expression must be a boolean array or a string") else: if len(expression) == 0: return table # parse the expression string and split into tokens reg = re.compile(LOGEXPRS) tokens = reg.split(expression) tokens = [t.strip() for t in tokens if t.strip() != ""] if isinstance(table, Table): # convert astropy table to pandas DataFrame tab = table.to_pandas() elif not isinstance(table, DataFrame): raise TypeError("Table must be a pandas DataFrame or astropy Table") else: tab = table matchTypes = ["ASSOC", "TYPE", "BINCOMP", "EXIST", "ERROR", "SURVEY", "DISCOVERY"] # parse through tokens and replace as required ntokens = len(tokens) newtokens = [] i = 0 while i < ntokens: if tokens[i] in [r"&&", r"AND", r"and"]: # replace synonyms for '&' or 'and' newtokens.append(r"&") elif tokens[i] in [r"||", r"OR", r"or"]: # replace synonyms for '|' or 'or' newtokens.append(r"|") elif tokens[i] in [r"!", r"NOT", r"not"]: # replace synonyms for '~' newtokens.append(r"~") elif tokens[i].upper() in matchTypes: if ntokens < i + 3: warnings.warn( "A '{}' must be followed by a '(NAME)': " "ignoring in query".format(tokens[i].upper()), UserWarning, ) elif tokens[i + 1] != "(" or tokens[i + 3] != ")": warnings.warn( "A '{}' must be followed by a '(NAME)': " "ignoring in query".format(tokens[i].upper()), UserWarning, ) else: if tokens[i].upper() == "ASSOC": if "ASSOC" not in tab.columns: warnings.warn( "'ASSOC' parameter not in table: " "ignoring in query", UserWarning, ) elif exactMatch: newtokens.append( r'(ASSOC == "{}")'.format(tokens[i + 2].upper()) ) else: assoc = np.array( [tokens[i + 2] in str(a) for a in table["ASSOC"]] ) newtokens.append(r"(@assoc)") i += 1 elif tokens[i].upper() == "TYPE": if tokens[i + 2].upper() == "BINARY": if "BINARY" not in tab.keys(): warnings.warn( "'BINARY' parameter not in table: " "ignoring in query", UserWarning, ) else: binary = ~tab["BINARY"].isna() newtokens.append(r"(@binary)") i += 1 else: if "TYPE" not in tab.keys(): warnings.warn( "'TYPE' parameter not in table: ignoring in query", UserWarning, ) elif exactMatch: newtokens.append( r'(TYPE == "{}")'.format(tokens[i + 2].upper()) ) else: ttype = np.array( [tokens[i + 2] in str(a) for a in table["TYPE"]] ) newtokens.append(r"(@ttype)") i += 1 elif tokens[i].upper() == "BINCOMP": if "BINCOMP" not in tab.columns: warnings.warn( "'BINCOMP' parameter not in table: ignoring in query", UserWarning, ) elif exactMatch: newtokens.append( r'(BINCOMP == "{}")'.format(tokens[i + 2].upper()) ) else: bincomp = np.array( [tokens[i + 2] in str(a) for a in table["BINCOMP"]] ) newtokens.append(r"(@bincomp)") i += 1 elif tokens[i].upper() == "SURVEY": if "SURVEY" not in tab.columns: warnings.warn( "'SURVEY' parameter not in table: ignoring in query", UserWarning, ) elif exactMatch: newtokens.append( r'(SURVEY == "{}")'.format(tokens[i + 2].upper()) ) else: survey = np.array( [tokens[i + 2] in str(a) for a in table["SURVEY"]] ) newtokens.append(r"(@survey)") i += 1 elif tokens[i].upper() == "DISCOVERY": if "SURVEY" not in tab.columns: warnings.warn( "'SURVEY' parameter not in table: ignoring in query", UserWarning, ) else: if exactMatch: discovery = np.array( [ tokens[i + 2] == str(a).split(",")[0] for a in table["SURVEY"] ] ) else: discovery = np.array( [ tokens[i + 2] in str(a).split(",")[0] for a in table["SURVEY"] ] ) newtokens.append(r"(@discovery)") i += 1 elif tokens[i].upper() == "EXIST": if tokens[i + 2].upper() not in tab.columns: warnings.warn( "'{}' does not exist for any pulsar".format(tokens[i + 2]), UserWarning, ) # create an empty DataFrame tab = DataFrame(columns=tab.columns) break else: exists = ~tab[tokens[i + 2].upper()].isna() newtokens.append(r"(@exists)") i += 1 elif tokens[i].upper() == "ERROR": if tokens[i + 2].upper() + "_ERR" not in tab.columns: warnings.warn( "Error value for '{}' not present: " "ignoring in query".format(tokens[i + 2]), UserWarning, ) else: newtokens.append(r"{}_ERR".format(tokens[i + 2].upper())) i += 2 else: newtokens.append(tokens[i].upper()) i += 1 # evaluate the expression try: newtab = tab.query("".join(newtokens)) except RuntimeError: raise RuntimeError("Could not parse the query") if isinstance(table, Table): # convert back to an astropy table newtab = Table.from_pandas(newtab) # re-add any units/types for key in table.colnames: newtab.columns[key].unit = table.columns[key].unit newtab[key] = newtab[key].astype(table[key].dtype) return newtab
[docs]def characteristic_age(period, pdot, braking_idx=3.0): """ Function defining the characteristic age of a pulsar. Returns the characteristic age in years using .. math:: \\tau = \\frac{P}{\\dot{P}(n-1)} NaNs are returned for any negative period derivates, or NaN imput values. Args: period (float, array_like): the pulsar period in seconds pdot (float, array_like): the pulsar period derivative braking_idx (float): the pulsar braking index (defaults to :math:`n=3`) Returns: float: the characteristic age in years """ # convert period and pdot to numpy arrays periodarr = np.array(period).flatten() pdotarr = np.array(pdot).flatten() assert periodarr.dtype == float, "Periods must be floats" assert pdotarr.dtype == float, "Period derivatives must be floats" assert len(periodarr) == len( pdotarr ), "Period and derivative arrays must be equal lengths" assert braking_idx > 1.0, "Braking index must be greater than 1" # check everything is positive, otherwise return NaN age = np.full(len(periodarr), np.nan) with np.errstate(invalid="ignore"): idx = np.isfinite(pdotarr) & np.isfinite(periodarr) & (pdotarr > 0.0) age[idx] = (periodarr[idx] / (pdotarr[idx] * (braking_idx - 1.0))) / ( 365.25 * 86400.0 ) # if period and period derivates were just floats return values rather # than arrays if isinstance(pdot, float) and isinstance(period, float): return age[0] return age
[docs]def age_pdot(period, tau=1e6, braking_idx=3.0): """ Function returning the period derivative for a pulsar with a given period and characteristic age, using .. math:: \\dot{P} = \\frac{P}{\\tau(n - 1)} Args: period (list, :class:`numpy.ndarray`): the pulsar period in seconds tau (float): the characteristic age in years braking_idx (float): the pulsar braking index (defaults to :math:`n=3`) Returns: :class:`numpy.ndarray`: an array of period derivatives. """ periods = period if not isinstance(periods, np.ndarray): periods = np.array(periods) taus = tau * 365.25 * 86400.0 # tau in seconds pdots = periods / (taus * (braking_idx - 1.0)) pdots[pdots < 0] = np.nan # set any non zero values to NaN return pdots
[docs]def B_field(period, pdot): """ Function defining the polar magnetic field strength at the surface of the pulsar in gauss (Equation 5.12 of Lyne & Graham-Smith, Pulsar Astronmy, 2nd edition) with .. math:: B = 3.2\\!\\times\\!10^{19} \\sqrt{P\\dot{P}} NaNs are returned for any negative period derivates, or NaN imput values. Args: period (float): a pulsar period (s) pdot (float): a period derivative Returns: float: the magnetic field strength in gauss. """ # convert period and pdot to numpy arrays periodarr = np.array(period).flatten() pdotarr = np.array(pdot).flatten() assert periodarr.dtype == float, "Periods must be floats" assert pdotarr.dtype == float, "Period derivatives must be floats" assert len(periodarr) == len( pdotarr ), "Period and derivative arrays must be equal lengths" # check pdot is positive, otherwise return NaN bfield = np.full(len(periodarr), np.nan) with np.errstate(invalid="ignore"): idx = np.isfinite(pdotarr) & np.isfinite(periodarr) & (pdotarr > 0.0) bfield[idx] = 3.2e19 * np.sqrt(periodarr[idx] * pdotarr[idx]) # if period and period derivates were just floats return values rather # than arrays if isinstance(pdot, float) and isinstance(period, float): return bfield[0] return bfield
[docs]def B_field_pdot(period, Bfield=1e10): """ Function to get the period derivative from a given pulsar period and magnetic field strength using .. math:: \\dot{P} = \\frac{1}{P}\\left( \\frac{B}{3.2\\!\\times\\!10^{19}} \\right)^2 Args: period (list, :class:`~numpy.ndarray`): a list of period values Bfield (float): the polar magnetic field strength (Defaults to :math:`10^{10}` G) Returns: :class:`numpy.ndarray`: an array of period derivatives """ periods = period if not isinstance(periods, np.ndarray): periods = np.array(periods) pdots = (Bfield / 3.2e19) ** 2 / periods pdots[pdots < 0] = np.nan # set any non zero values to NaN return pdots
[docs]def pdot_to_fdot(pdot, period=None, frequency=None): """ Convert period derivatives to frequency derivatives. Either periods or frequencies must be supplied. .. math:: \\dot{f} = -\\frac{\\dot{P}}{P^2} Args: pdot (list, :class:`~numpy.ndarray`): a list of period derivative values. period (list, :class:`~numpy.ndarray`): a list of periods (seconds) frequency (list, :class:`~numpy.ndarray`): a list of frequencies (Hz) Returns: :class:`numpy.ndarray`: an array of frequency derivatives. """ if period is None and frequency is None: raise ValueError("Either periods or frequencies must be provided") # convert to numpy arrays if period is not None: periodarr = np.array(period).flatten() pdotarr = np.array(pdot).flatten() else: periodarr = 1.0 / np.array(frequency).flatten() pdotarr = np.array(pdot).flatten() assert periodarr.dtype == float, "Periods must be floats" assert pdotarr.dtype == float, "Period derivatives must be floats" assert len(periodarr) == len( pdotarr ), "Period and period derivative arrays must be equal lengths" fdot = np.full(len(periodarr), np.nan) with np.errstate(invalid="ignore"): idx = np.isfinite(pdotarr) & np.isfinite(periodarr) fdot[idx] = -pdotarr[idx] / periodarr[idx] ** 2 # period derivates were just a float return values rather than arrays if isinstance(pdot, float): return fdot[0] return fdot
[docs]def fdot_to_pdot(fdot, period=None, frequency=None): """ Convert frequency derivatives to period derivatives. Either periods or frequencies must be supplied. .. math:: \\dot{P} = -\\frac{\\dot{f}}{f^2} Args: fdot (list, :class:`~numpy.ndarray`): a list of frequency derivative values. period (list, :class:`~numpy.ndarray`): a list of periods (seconds) frequency (list, :class:`~numpy.ndarray`): a list of frequencies (Hz) Returns: :class:`numpy.ndarray`: an array of period derivatives. """ if period is None and frequency is None: raise ValueError("Either periods or frequencies must be provided") # convert to numpy arrays if period is not None: frequencyarr = 1.0 / np.array(period).flatten() fdotarr = np.array(fdot).flatten() else: frequencyarr = np.array(frequency).flatten() fdotarr = np.array(fdot).flatten() assert frequencyarr.dtype == float, "Frequencies must be floats" assert fdotarr.dtype == float, "Frequency derivatives must be floats" assert len(frequencyarr) == len( fdotarr ), "Frequency and frequency derivative arrays must be equal lengths" pdot = np.full(len(frequencyarr), np.nan) with np.errstate(invalid="ignore"): idx = np.isfinite(fdotarr) & np.isfinite(frequencyarr) pdot[idx] = -fdotarr[idx] / frequencyarr[idx] ** 2 # period derivates were just a float return values rather than arrays if isinstance(fdot, float): return pdot[0] return pdot
[docs]def gw_h0_spindown_limit(frequency, fdot, distance, Izz=1e38): """ The gravitational-wave amplutitude at Earth assuming all rotational energy loss is radiated via emission from an :math:`l=m=2` mass quadrupole mode, using (see Equation A7 of [LVC]_) .. math:: h_0^{\\text{sd}} = \\frac{1}{d}\\left(\\frac{5}{2} \\frac{G I_{zz}}{c^3} \\frac{|\\dot{f}|}{f}\\right)^{1/2} Args: frequency (list, :class:`~numpy.ndarray`): a list of frequencies (Hz). fdot (list, :class:`~numpy.ndarray`): a list of frequency derivatives (Hz/s). distance (list, :class:`~numpy.ndarray`): a list of distances (kpc). Izz (float): the moment of inertia along the principle axis (defaults to 1e38 kg m\\ :sup:`2`). Returns: :class:`numpy.ndarray`: an array of spin-down limits. .. [LVC] Abbott et al, *ApJ*, **879**, 28 (2019), `arXiv:1902.08507 <https://arxiv.org/abs/1902.08507>`_ """ from astropy.constants import G, c # convert values to numpy arrays frequencyarr = np.array(frequency).flatten() fdotarr = np.array(fdot).flatten() distancearr = (np.array(distance).flatten() * aunits.pc * 1e3).to("m").value assert frequencyarr.dtype == float, "Frequencies must be floats" assert fdotarr.dtype == float, "Frequency derivatives must be floats" assert len(frequencyarr) == len(fdotarr) and len(frequencyarr) == len( distancearr ), "Input arrays must be equal lengths" h0ul = np.full(len(frequencyarr), np.nan) # only use negative fdots with np.errstate(invalid="ignore"): idx = ( np.isfinite(fdotarr) & np.isfinite(frequencyarr) & np.isfinite(distancearr) & (fdotarr < 0.0) ) h0ul[idx] = ( np.sqrt( (5.0 * G.value * Izz * np.fabs(fdotarr[idx])) / (2.0 * c.value**3 * frequencyarr[idx]) ) / distancearr[idx] ) if isinstance(frequency, float): return h0ul[0] return h0ul
[docs]def gw_luminosity(h0, frequency, distance): """ The gravitational-wave luminosity as derived from a given gravitatonal-wave amplitude measured at Earth, assuming all rotational energy loss is radiated via emission from an :math:`l=m=2` mass quadrupole mode, using (see Equation A5 of [LVC]_) .. math:: \\dot{E}_{\\text{gw}} = \\frac{8\\pi^2c^3}{5G} f^2 h_0^2 d^2 Args: h0 (list, :class:`~numpy.ndarray`): a list of gravitational-wave amplitudes. frequency (list, :class:`~numpy.ndarray`): a list of frequencies (Hz). distance (list, :class:`~numpy.ndarray`): a list of distances (kpc). Returns: :class:`numpy.ndarray`: an array of luminosities. """ from astropy.constants import G, c # convert values to numpy arrays frequencyarr = np.array(frequency).flatten() h0arr = np.array(h0).flatten() distancearr = (np.array(distance).flatten() * aunits.pc * 1e3).to("m").value assert frequencyarr.dtype == float, "Frequencies must be floats" assert h0arr.dtype == float, "h0 must be floats" assert len(frequencyarr) == len(h0arr) and len(frequencyarr) == len( distancearr ), "Input arrays must be equal lengths" edot = np.full(len(frequencyarr), np.nan) with np.errstate(invalid="ignore"): idx = np.isfinite(h0arr) & np.isfinite(frequencyarr) & np.isfinite(distancearr) edot[idx] = ( 8.0 * np.pi**2 * c.value**3 * (frequencyarr[idx] * h0arr[idx] * distancearr[idx]) ** 2 / (5.0 * G.value) ) if isinstance(frequency, float): return edot[0] return edot
[docs]def h0_to_q22(h0, frequency, distance): """ Convert a gravitational-wave amplitude :math:`h_0` at Earth to an equivalent :math:`l=m=2` mass quadrupole :math:`Q_{22}` of the pulsar using (see Equations A3 and A4 of [LVC]_): .. math:: Q_{22} = h_0 \\left(\\frac{c^4 d}{16\\pi^2 G f^2} \\right) \\sqrt{\\frac{15}{8\\pi}} Args: h0 (list, :class:`~numpy.ndarray`): a list of gravitational-wave amplitudes. frequency (list, :class:`~numpy.ndarray`): a list of frequencies (Hz). distance (list, :class:`~numpy.ndarray`): a list of distances (kpc). Returns: :class:`numpy.ndarray`: an array of :math:`Q_{22}` values. """ from astropy.constants import G, c # convert values to numpy arrays frequencyarr = np.array(frequency).flatten() h0arr = np.array(h0).flatten() distancearr = (np.array(distance).flatten() * aunits.pc * 1e3).to("m").value assert frequencyarr.dtype == float, "Frequencies must be floats" assert h0arr.dtype == float, "h0 must be floats" assert len(frequencyarr) == len(h0arr) and len(frequencyarr) == len( distancearr ), "Input arrays must be equal lengths" q22 = np.full(len(frequencyarr), np.nan) with np.errstate(invalid="ignore"): idx = np.isfinite(h0arr) & np.isfinite(frequencyarr) & np.isfinite(distancearr) q22[idx] = ( h0arr[idx] * np.sqrt(15.0 / (8.0 * np.pi)) * ( c.value**4 * distancearr[idx] / (16.0 * np.pi**2 * G.value * frequencyarr[idx] ** 2) ) ) if isinstance(frequency, float): return q22[0] return q22
[docs]def q22_to_ellipticity(q22, Izz=1e38): """ Convert a :math:`l=m=2` mass quadrupole :math:`Q_{22}` to an equivalent fiducial ellipticity :math:`\\varepsilon` of the pulsar using (see Equation A3 of [LVC]_): .. math:: \\varepsilon = \\frac{Q_{22}}{I_{zz}}\\sqrt{\\frac{8\\pi}{15}} Args: q22 (list, :class:`~numpy.ndarray`): a list of mass quadrupole values (kg m\\ :sup:`2`). Izz (float): the moment of inertia of the principle axis (defaults to 1e38 kg m\\ :sup:`2`). Returns: :class:`numpy.ndarray`: an array of :math:`\\varepsilon` values. """ q22arr = np.array(q22).flatten() ellipticity = np.full(len(q22arr), np.nan) with np.errstate(invalid="ignore"): idx = np.isfinite(q22arr) ellipticity[idx] = q22arr[idx] * np.sqrt(8.0 * np.pi / 15.0) / Izz if isinstance(q22, float): return ellipticity[0] return ellipticity
[docs]def ellipticity_to_q22(ellipticity, Izz=1e38): """ Convert a fiducial ellipticity :math:`\\varepsilon` of the pulsar to the equivalent :math:`l=m=2` mass quadrupole :math:`Q_{22}` using (see Equation A3 of [LVC]_): .. math:: Q_{22} = \\varepsilon I_{zz} \\sqrt{\\frac{15}{8\\pi}} Args: ellipticity (list, :class:`~numpy.ndarray`): a list of ellipticity values. Izz (float): the moment of inertia of the principle axis (defaults to 1e38 kg m\\ :sup:`2`). Returns: :class:`numpy.ndarray`: an array of :math:`Q_{22}` values. """ ellipticityarr = np.array(ellipticity).flatten() q22 = np.full(len(ellipticityarr), np.nan) with np.errstate(invalid="ignore"): idx = np.isfinite(ellipticityarr) q22[idx] = ellipticityarr[idx] * np.sqrt(15.0 / (8.0 * np.pi)) * Izz if isinstance(ellipticity, float): return q22[0] return q22
[docs]def h0_to_ellipticity(h0, frequency, distance, Izz=1e38): """ Convert a gravitational-wave amplitude :math:`h_0` at Earth to an equivalent fiducial ellipticity :math:`\\varepsilon` of the pulsar using (see Equation A4 of [LVC]_): .. math:: \\varepsilon = h_0 \\left(\\frac{c^4 d}{16\\pi^2 G f^2} \\right) \\sqrt{\\frac{15}{8\\pi}} Args: h0 (list, :class:`~numpy.ndarray`): a list of gravitational-wave amplitudes. frequency (list, :class:`~numpy.ndarray`): a list of frequencies (Hz). distance (list, :class:`~numpy.ndarray`): a list of distances (kpc). Izz (float): the moment of inertia of the principle axis (defaults to 1e38 kg m\\ :sup:`2`). Returns: :class:`numpy.ndarray`: an array of :math:`Q_{22}` values. """ # use q22 q22 = h0_to_q22(h0, frequency, distance) ellipticity = q22_to_ellipticity(q22) return ellipticity
[docs]def death_line(logP, linemodel="Ip", rho6=1.0): """ The pulsar death line. Returns the base-10 logarithm of the period derivative for the given values of the period. Args: logP (list, :class:`~numpy.ndarray`): the base-10 log values of period. linemodel (str): a string with one of the above model names. Defaults to ``'Ip'``. rho6 (float): the value of the :math:`\\rho_6` parameter from [ZHM]_ . Defaults to 1 is, which is equivalent to :math:`10^6` cm. Returns: :class:`numpy.ndarray`: a vector of period derivative values .. note:: The death line models can be: * 'I' - Equation 3 of [ZHM] * 'Ip' - Equation 4 of [ZHM] * 'II' - Equation 5 of [ZHM] * 'IIp' - Equation 6 of [ZHM] * 'III' - Equation 8 of [ZHM] * 'IIIp' - Equation 9 of [ZHM] * 'IV' - Equation 10 of [ZHM] * 'IVp' - Equation 11 of [ZHM] .. [ZHM] Zhang, Harding & Muslimov, *ApJ*, **531**, L135-L138 (2000), `arXiv:astro-ph/0001341 <https://arxiv.org/abs/astro-ph/0001341>`_ """ gradvals = { "I": (11.0 / 4), "Ip": (9.0 / 4.0), "II": (2.0 / 11.0), "IIp": -(2.0 / 11.0), "III": (5.0 / 2.0), "IIIp": 2.0, "IV": -(3.0 / 11.0), "IVp": -(7.0 / 11.0), } intercept = { "I": 14.62, "Ip": 16.58, "II": 13.07, "IIp": 14.50, "III": 14.56, "IIIp": 16.52, "IV": 15.36, "IVp": 16.79, } rho = { "I": 0.0, "Ip": 1.0, "II": 0.0, "IIp": (8.0 / 11.0), "III": 0.0, "IIIp": 1.0, "IV": 0.0, "IVp": (8.0 / 11.0), } lp = logP if not isinstance(lp, np.ndarray): lp = np.array(lp) return ( lp * gradvals[linemodel] - intercept[linemodel] + rho[linemodel] * np.log10(rho6) )
[docs]def label_line(ax, line, label, color="k", fs=14, frachoffset=0.1): """ Add an annotation to the given line with appropriate placement and rotation. Based on code from `"How to rotate matplotlib annotation to match a line?" <http://stackoverflow.com/a/18800233/230468>`_ and `this <https://stackoverflow.com/a/38414616/1862861>`_ answer. Args: ax (:class:`matplotlib.axes.Axes`): Axes on which the label should be added. line (:class:`matplotlib.lines.Line2D`): Line which is being labeled. label (str): Text which should be drawn as the label. color (str): a color string for the label text. Defaults to ``'k'`` fs (int): the font size for the label text. Defaults to 14. frachoffset (float): a number between 0 and 1 giving the fractional offset of the label text along the x-axis. Defaults to 0.1, i.e., 10%. Returns: :class:`matplotlib.text.Text`: an object containing the label information """ xdata, ydata = line.get_data() x1 = xdata[0] x2 = xdata[-1] y1 = ydata[0] y2 = ydata[-1] # use fractional horizontal offset frachoffset to set the x position of the label by default # otherwise use the halign value if frachoffset >= 0 and frachoffset <= 1: if ax.get_xscale() == "log": xx = np.log10(x1) + frachoffset * (np.log10(x2) - np.log10(x1)) else: xx = x1 + frachoffset * (x2 - x1) else: raise ValueError("frachoffset must be between 0 and 1") if ax.get_xscale() == "log" and ax.get_yscale() == "log": yy = np.interp(xx, np.log10(xdata), np.log10(ydata)) xx = 10**xx yy = 10**yy elif ax.get_xscale() == "log" and ax.get_yscale() != "log": yy = np.interp(xx, np.log10(xdata), ydata) xx = 10**xx else: yy = np.interp(xx, xdata, ydata) ylim = ax.get_ylim() xytext = (0, 10) text = ax.annotate( label, xy=(xx, yy), xytext=xytext, textcoords="offset points", size=fs, color=color, zorder=1, horizontalalignment="left", verticalalignment="center_baseline", ) sp1 = ax.transData.transform_point((x1, y1)) sp2 = ax.transData.transform_point((x2, y2)) rise = sp2[1] - sp1[1] run = sp2[0] - sp1[0] slope_degrees = np.degrees(np.arctan2(rise, run)) text.set_rotation_mode("anchor") text.set_rotation(slope_degrees) ax.set_ylim(ylim) return text