Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): update minor updates #20

Merged
merged 3 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ jobs:
# renovate: datasource=conda depName=conda-forge/python
python-version: "3.12.4"
# renovate: datasource=pypi depName=ruff
ruff-version: "0.4.10"
ruff-version: "0.7.4"
2 changes: 1 addition & 1 deletion extcats/CatalogQuery.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ def test_queries(
for pp in tqdm.tqdm(points):
buff = qfunc(pp[0], pp[1], rs_arcsec, method, **qfunc_args)
if (
(type(buff) == tuple and buff == (None, None))
(isinstance(buff, tuple) and buff == (None, None))
or (buff is None)
or (not buff)
):
Expand Down
80 changes: 45 additions & 35 deletions notebooks/example_ingest_multiproc.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -30,77 +30,87 @@
"from healpy import ang2pix\n",
"\n",
"import importlib\n",
"\n",
"importlib.reload(CatalogPusher)\n",
"\n",
"# build the pusher object and point it to the raw files.\n",
"ps1p = CatalogPusher.CatalogPusher(\n",
" catalog_name = 'ps1_test', # short name of the catalog\n",
" data_source = '../testdata/PS1DR1_test/', # where to find the data (other options are possible)\n",
" file_type = '*.csv.gz' # filter files (there is col definition file in data_source)\n",
" )\n",
" catalog_name=\"ps1_test\", # short name of the catalog\n",
" data_source=\"../testdata/PS1DR1_test/\", # where to find the data (other options are possible)\n",
" file_type=\"*.csv.gz\", # filter files (there is col definition file in data_source)\n",
")\n",
"\n",
"# define the reader for the raw files (import column names from file.)\n",
"headfile = '../testdata/PS1DR1_test/column_headings.csv'\n",
"with open(headfile, 'r') as header:\n",
" catcols=[c.strip() for c in header.readline().split(',')]\n",
"headfile = \"../testdata/PS1DR1_test/column_headings.csv\"\n",
"with open(headfile, \"r\") as header:\n",
" catcols = [c.strip() for c in header.readline().split(\",\")]\n",
"\n",
"# skimm out some columns\n",
"bad = ['projectionID', 'skyCellID']\n",
"usecols = [c for c in catcols if (not c in bad) or ('gNpt' in c)]\n",
"bad = [\"projectionID\", \"skyCellID\"]\n",
"usecols = [c for c in catcols if (c not in bad) or (\"gNpt\" in c)]\n",
"\n",
"# specify some data types to save up on the storage\n",
"# See https://outerspace.stsci.edu/display/PANSTARRS/PS1+MeanObject+table+fields\n",
"types = {}\n",
"for c in usecols:\n",
" types[c] = np.float16\n",
" if c == 'objID':\n",
" if c == \"objID\":\n",
" types[c] = np.int32\n",
" if 'Flags' in c:\n",
" if \"Flags\" in c:\n",
" types[c] = np.int16\n",
" if ('ra' in c) or ('dec' in c):\n",
" if (\"ra\" in c) or (\"dec\" in c):\n",
" types[c] = np.float32\n",
"\n",
"ps1p.assign_file_reader(\n",
" reader_func = pd.read_csv, # callable to use to read the raw_files. \n",
" read_chunks = True, # weather or not the reader process each file into smaller chunks.\n",
" names=catcols, # All other arguments are passed directly to this function.\n",
" reader_func=pd.read_csv, # callable to use to read the raw_files.\n",
" read_chunks=True, # weather or not the reader process each file into smaller chunks.\n",
" names=catcols, # All other arguments are passed directly to this function.\n",
" usecols=usecols,\n",
" dtype = types,\n",
" na_values = -999,\n",
" dtype=types,\n",
" na_values=-999,\n",
" chunksize=50000,\n",
" engine='c')\n",
" engine=\"c\",\n",
")\n",
"\n",
"# define modifier. This time the healpix grid is finer (an orer 16 corresponds to 3\")\n",
"hp_nside16=2**16\n",
"hp_nside16 = 2**16\n",
"\n",
"\n",
"def ps1_modifier(srcdict):\n",
" srcdict['_id'] = srcdict.pop('objID')\n",
" srcdict['hpxid_16']=int(\n",
" ang2pix(hp_nside16, srcdict['raMean'], srcdict['decMean'], lonlat = True, nest = True))\n",
" srcdict[\"_id\"] = srcdict.pop(\"objID\")\n",
" srcdict[\"hpxid_16\"] = int(\n",
" ang2pix(\n",
" hp_nside16, srcdict[\"raMean\"], srcdict[\"decMean\"], lonlat=True, nest=True\n",
" )\n",
" )\n",
" return srcdict\n",
"\n",
"\n",
"ps1p.assign_dict_modifier(ps1_modifier)\n",
"\n",
"# wrap up the file pushing function so that we can \n",
"# wrap up the file pushing function so that we can\n",
"# use multiprocessing to speed up the catalog ingestion\n",
"\n",
"\n",
"def pushfiles(filerange):\n",
" # push stuff\n",
" ps1p.push_to_db(\n",
" coll_name = 'srcs',\n",
" index_on = ['hpxid_16'],\n",
" filerange = filerange,\n",
" overwrite_coll = False,\n",
" dry = False, \n",
" fillna_val = None)\n",
" coll_name=\"srcs\",\n",
" index_on=[\"hpxid_16\"],\n",
" filerange=filerange,\n",
" overwrite_coll=False,\n",
" dry=False,\n",
" fillna_val=None,\n",
" )\n",
" # add metadata to direct queries\n",
" ps1p.healpix_meta(\n",
" healpix_id_key = 'hpxid_16', \n",
" order = 16, is_indexed = True, nest = True)\n",
" ps1p.healpix_meta(healpix_id_key=\"hpxid_16\", order=16, is_indexed=True, nest=True)\n",
"\n",
"\n",
"# each job will run on a subgroup of all the files\n",
"file_groups = ps1p.file_groups(group_size=1)\n",
"with concurrent.futures.ProcessPoolExecutor(max_workers = 2) as executor:\n",
" executor.map(pushfiles, file_groups) \n",
"print (\"done! Enjoy your PS1_test database.\")"
"with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:\n",
" executor.map(pushfiles, file_groups)\n",
"print(\"done! Enjoy your PS1_test database.\")"
]
},
{
Expand Down
137 changes: 73 additions & 64 deletions notebooks/insert_allWISE.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -78,27 +78,26 @@
],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"from healpy import ang2pix\n",
"from extcats import CatalogPusher\n",
"\n",
"# build the pusher object and point it to the raw files.\n",
"wisep = CatalogPusher.CatalogPusher(\n",
" catalog_name = 'wise',\n",
" data_source = '../testdata/AllWISE/',\n",
" file_type = \".bz2\")\n",
" catalog_name=\"wise\", data_source=\"../testdata/AllWISE/\", file_type=\".bz2\"\n",
")\n",
"\n",
"\n",
"# read column names and types from schema file\n",
"schema_file = \"../testdata/AllWISE/wise-allwise-cat-schema.txt\"\n",
"names, types = [], {}\n",
"with open(schema_file) as schema:\n",
" for l in schema:\n",
" if \"#\" in l or (not l.strip()):\n",
" for line in schema:\n",
" if \"#\" in line or (not line.strip()):\n",
" continue\n",
" name, dtype = zip(\n",
" [p.strip() for p in l.strip().split(\" \") if not p in [\"\"]])\n",
" name, dtype = zip([p.strip() for p in line.strip().split(\" \") if p not in [\"\"]])\n",
" name, dtype = name[0], dtype[0]\n",
" #print (name, dtype)\n",
" # print (name, dtype)\n",
" names.append(name)\n",
" # convert the data type\n",
" if \"char\" in dtype:\n",
Expand All @@ -114,60 +113,65 @@
" elif dtype == \"int8\":\n",
" types[name] = np.int8\n",
" else:\n",
" print(\"unknown data type: %s\"%dtype)\n",
" print(\"unknown data type: %s\" % dtype)\n",
"\n",
"# select the columns you want to use.\n",
"use_cols = []\n",
"select = [\"Basic Position and Identification Information\", \n",
" \"Primary Photometric Information\", \n",
" \"Measurement Quality and Source Reliability Information\",\n",
" \"2MASS PSC Association Information\"]\n",
"select = [\n",
" \"Basic Position and Identification Information\",\n",
" \"Primary Photometric Information\",\n",
" \"Measurement Quality and Source Reliability Information\",\n",
" \"2MASS PSC Association Information\",\n",
"]\n",
"with open(schema_file) as schema:\n",
" blocks = schema.read().split(\"#\")\n",
" for block in blocks:\n",
" if any([k in block for k in select]):\n",
" for l in block.split(\"\\n\")[1:]:\n",
" if \"#\" in l or (not l.strip()):\n",
" for line in block.split(\"\\n\")[1:]:\n",
" if \"#\" in line or (not line.strip()):\n",
" continue\n",
" name, dtype = zip(\n",
" [p.strip() for p in l.strip().split(\" \") if not p in [\"\"]])\n",
" [p.strip() for p in line.strip().split(\" \") if p not in [\"\"]]\n",
" )\n",
" use_cols.append(name[0])\n",
"print(\"we will be using %d columns out of %d\"%(len(use_cols), len(names)))\n",
"print(\"we will be using %d columns out of %d\" % (len(use_cols), len(names)))\n",
"\n",
"# now assign the reader to the catalog pusher object\n",
"import pandas as pd\n",
"\n",
"wisep.assign_file_reader(\n",
" reader_func = pd.read_csv, \n",
" read_chunks = True,\n",
" names = names,\n",
" usecols = lambda x : x in use_cols,\n",
" #dtype = types, #this mess up with NaN values\n",
" chunksize=5000,\n",
" header=None,\n",
" engine='c',\n",
" sep='|',\n",
" na_values = 'nnnn')\n",
" reader_func=pd.read_csv,\n",
" read_chunks=True,\n",
" names=names,\n",
" usecols=lambda x: x in use_cols,\n",
" # dtype = types, #this mess up with NaN values\n",
" chunksize=5000,\n",
" header=None,\n",
" engine=\"c\",\n",
" sep=\"|\",\n",
" na_values=\"nnnn\",\n",
")\n",
"\n",
"\n",
"# define the dictionary modifier that will act on the single entries\n",
"def modifier(srcdict):\n",
" srcdict['hpxid_16'] = int(\n",
" ang2pix(2**16, srcdict['ra'], srcdict['dec'], lonlat = True, nest = True))\n",
" #srcdict['_id'] = srcdict.pop('source_id') doesn't work, seems it is not unique\n",
" srcdict[\"hpxid_16\"] = int(\n",
" ang2pix(2**16, srcdict[\"ra\"], srcdict[\"dec\"], lonlat=True, nest=True)\n",
" )\n",
" # srcdict['_id'] = srcdict.pop('source_id') doesn't work, seems it is not unique\n",
" return srcdict\n",
"\n",
"\n",
"wisep.assign_dict_modifier(modifier)\n",
"\n",
"\n",
"# finally push it in the databse\n",
"wisep.push_to_db(\n",
" coll_name = 'srcs', \n",
" index_on = \"hpxid_16\",\n",
" overwrite_coll = True, \n",
" append_to_coll = False)\n",
" coll_name=\"srcs\", index_on=\"hpxid_16\", overwrite_coll=True, append_to_coll=False\n",
")\n",
"\n",
"\n",
"# if needed print extensive info on database\n",
"#wisep.info()"
"# wisep.info()"
]
},
{
Expand Down Expand Up @@ -210,53 +214,57 @@
}
],
"source": [
"# now test the database for query performances. We use \n",
"# now test the database for query performances. We use\n",
"# a sample of randomly distributed points on a sphere\n",
"# as targets. \n",
"# as targets.\n",
"\n",
"# define the funtion to test coordinate based queries:\n",
"from healpy import ang2pix, get_all_neighbours\n",
"from healpy import get_all_neighbours\n",
"from astropy.table import Table\n",
"from astropy.coordinates import SkyCoord\n",
"\n",
"return_fields = ['designation', 'ra', 'dec']\n",
"return_fields = [\"designation\", \"ra\", \"dec\"]\n",
"project = {}\n",
"for field in return_fields: project[field] = 1\n",
"print (project)\n",
"for field in return_fields:\n",
" project[field] = 1\n",
"print(project)\n",
"\n",
"\n",
"hp_order, rs_arcsec = 16, 30.0\n",
"\n",
"\n",
"hp_order, rs_arcsec = 16, 30.\n",
"def test_query(ra, dec, coll):\n",
" \"\"\"query collection for points within rs of target ra, dec.\n",
" The results as returned as an astropy Table.\"\"\"\n",
" \n",
" # find the index of the target pixel and its neighbours \n",
" target_pix = int( ang2pix(2**hp_order, ra, dec, nest = True, lonlat = True) )\n",
" neighbs = get_all_neighbours(2**hp_order, ra, dec, nest = True, lonlat = True)\n",
"\n",
" # find the index of the target pixel and its neighbours\n",
" target_pix = int(ang2pix(2**hp_order, ra, dec, nest=True, lonlat=True))\n",
" neighbs = get_all_neighbours(2**hp_order, ra, dec, nest=True, lonlat=True)\n",
"\n",
" # remove non-existing neigbours (in case of E/W/N/S) and add center pixel\n",
" pix_group = [int(pix_id) for pix_id in neighbs if pix_id != -1] + [target_pix]\n",
" \n",
"\n",
" # query the database for sources in these pixels\n",
" qfilter = { 'hpxid_%d'%hp_order: { '$in': pix_group } }\n",
" qfilter = {\"hpxid_%d\" % hp_order: {\"$in\": pix_group}}\n",
" qresults = [o for o in coll.find(qfilter)]\n",
" if len(qresults)==0:\n",
" if len(qresults) == 0:\n",
" return None\n",
" \n",
"\n",
" # then use astropy to find the closest match\n",
" tab = Table(qresults)\n",
" target = SkyCoord(ra, dec, unit = 'deg')\n",
" matches_pos = SkyCoord(tab['ra'], tab['dec'], unit = 'deg')\n",
" target = SkyCoord(ra, dec, unit=\"deg\")\n",
" matches_pos = SkyCoord(tab[\"ra\"], tab[\"dec\"], unit=\"deg\")\n",
" d2t = target.separation(matches_pos).arcsecond\n",
" match_id = np.argmin(d2t)\n",
"\n",
" # if it's too far away don't use it\n",
" if d2t[match_id]>rs_arcsec:\n",
" if d2t[match_id] > rs_arcsec:\n",
" return None\n",
" return tab[match_id]\n",
"\n",
"\n",
"# run the test\n",
"wisep.run_test(test_query, npoints = 10000)\n"
"wisep.run_test(test_query, npoints=10000)"
]
},
{
Expand All @@ -274,7 +282,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [
{
Expand All @@ -290,13 +298,14 @@
}
],
"source": [
"mqp.healpix_meta(healpix_id_key = 'hpxid_16', order = 16, is_indexed = True, nest = True)\n",
"mqp.coord_meta(ra = 'ra', dec = 'dec')\n",
"mqp.science_meta(\n",
" contact = 'C. Norris', \n",
" email = '[email protected]', \n",
" description = 'allWISE infrared catalog',\n",
" reference = 'http://wise2.ipac.caltech.edu/docs/release/allwise/')"
"wisep.healpix_meta(healpix_id_key=\"hpxid_16\", order=16, is_indexed=True, nest=True)\n",
"wisep.coord_meta(ra=\"ra\", dec=\"dec\")\n",
"wisep.science_meta(\n",
" contact=\"C. Norris\",\n",
" email=\"[email protected]\",\n",
" description=\"allWISE infrared catalog\",\n",
" reference=\"http://wise2.ipac.caltech.edu/docs/release/allwise/\",\n",
")"
]
},
{
Expand Down
Loading
Loading