Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oozie + Datastage + misc improvements #27

Merged
merged 9 commits into from
Jan 14, 2025
Merged
4 changes: 3 additions & 1 deletion orbiter_translations/automic/xml_demo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""
## Demo `translation_ruleset` Example
Demo Automic XML Translation to Airflow DAGs

Contact Astronomer @ https://astronomer.io/contact for access to our full translation.

```pycon
>>> translation_ruleset.test(input_value='''<?xml version="1.0" encoding="ISO-8859-15"?>
Expand Down
10 changes: 8 additions & 2 deletions orbiter_translations/autosys/jil_demo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
r"""Demonstration of translating AutoSys JIL files into Airflow DAGs.

Contact Astronomer @ https://astronomer.io/contact for access to our full translation.

```pycon
>>> translation_ruleset.test('''
... insert_job: foo.job
Expand All @@ -12,7 +15,7 @@
from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator
from pendulum import DateTime, Timezone
with DAG(dag_id='foo_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, default_args={'owner': '[email protected]'}):
with DAG(dag_id='foo_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, default_args={'owner': '[email protected]'}, doc_md=...):
foo_job_task = SSHOperator(task_id='foo_job', ssh_conn_id='bar', command='"C:\\ldhe\\cxl\\TidalDB\\startApp.cmd" "arg1" "arg2" "arg3"', doc='Foo Job')

```
Expand Down Expand Up @@ -121,7 +124,7 @@ def basic_dag_rule(val: dict) -> OrbiterDAG | None:
... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
from airflow import DAG
from pendulum import DateTime, Timezone
with DAG(dag_id='foo_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, default_args={'owner': '[email protected]'}):
with DAG(dag_id='foo_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, default_args={'owner': '[email protected]'}, doc_md=...):

```
"""
Expand All @@ -137,6 +140,9 @@ def basic_dag_rule(val: dict) -> OrbiterDAG | None:
return OrbiterDAG(
dag_id=dag_id,
file_path=dag_id + ".py",
doc_md="**Created via [Orbiter](https://astronomer.github.io/orbiter) w/ Demo Translation Ruleset**.\n"
"Contact Astronomer @ [[email protected]](mailto:[email protected]) "
"or at [astronomer.io/contact](https://www.astronomer.io/contact/) for more!",
**default_args,
)

Expand Down
6 changes: 4 additions & 2 deletions orbiter_translations/control_m/xml_demo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""
## Demo `translation_ruleset` Example
Demo Control-M XML Translation Ruleset

Contact Astronomer @ https://astronomer.io/contact for access to our full translation.

```pycon
>>> translation_ruleset.test({
... "DEFTABLE": {
Expand All @@ -25,7 +28,6 @@

```
""" # noqa: E501

from __future__ import annotations

from pathlib import Path
Expand Down
3 changes: 3 additions & 0 deletions orbiter_translations/dag_factory/yaml_base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""Full DAG Factory YAML Translation Ruleset.

Convert DAG Factory YAML into full Airflow DAGs"""
from __future__ import annotations

import inspect
Expand Down
257 changes: 198 additions & 59 deletions orbiter_translations/data_stage/xml_demo.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,108 @@
"""Demo translation ruleset for DataStage XML files to Airflow DAGs

Contact Astronomer @ https://astronomer.io/contact for access to our full translation

```pycon
>>> translation_ruleset.test(input_value='''<?xml version="1.0" encoding="UTF-8"?>
... <DSExport>
... <Job Identifier="DataStage_Job" DateModified="2020-11-27" TimeModified="05.07.33">
... <Record Identifier="V253S0" Type="CustomStage" Readonly="0">
... <Property Name="Name">SELECT_TABLE</Property>
... <Collection Name="Properties" Type="CustomProperty">
... <SubRecord>
... <Property Name="Name">XMLProperties</Property>
... <Property Name="Value" PreFormatted="1">&lt;?xml version=&apos;1.0&apos;
... encoding=&apos;UTF-16&apos;?&gt;&lt;Properties
... version=&apos;1.1&apos;&gt;&lt;Common&gt;&lt;Context
... type=&apos;int&apos;&gt;1&lt;/Context&gt;&lt;Variant
... type=&apos;string&apos;&gt;1.0&lt;/Variant&gt;&lt;DescriptorVersion
... type=&apos;string&apos;&gt;1.0&lt;/DescriptorVersion&gt;&lt;PartitionType
... type=&apos;int&apos;&gt;-1&lt;/PartitionType&gt;&lt;RCP
... type=&apos;int&apos;&gt;0&lt;/RCP&gt;&lt;/Common&gt;&lt;Connection&gt;&lt;URL
... modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[jdbc:snowflake://xyz.us-east-1.snowflakecomputing.com/?&amp;warehouse=#XYZ_DB.$snowflake_wh#&amp;db=#DB.$schema#]]&gt;&lt;/URL&gt;&lt;Username
... modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[#DB.$snowflake_userid#]]&gt;&lt;/Username&gt;&lt;Password
... modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[#DB.$snowflake_passwd#]]&gt;&lt;/Password&gt;&lt;Attributes
... modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;/Attributes&gt;&lt;/Connection&gt;&lt;Usage&gt;&lt;ReadMode
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadMode&gt;&lt;GenerateSQL
... modified=&apos;1&apos;
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/GenerateSQL&gt;&lt;EnableQuotedIDs
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/EnableQuotedIDs&gt;&lt;SQL&gt;&lt;SelectStatement
... collapsed=&apos;1&apos; modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[
... Select 1 as Dummy from db.schema.table Limit 1;]]&gt;&lt;ReadFromFileSelect
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileSelect&gt;&lt;/SelectStatement&gt;&lt;EnablePartitionedReads
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/EnablePartitionedReads&gt;&lt;/SQL&gt;&lt;Transaction&gt;&lt;RecordCount
... type=&apos;int&apos;&gt;&lt;![CDATA[2000]]&gt;&lt;/RecordCount&gt;&lt;IsolationLevel
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/IsolationLevel&gt;&lt;AutocommitMode
... modified=&apos;1&apos;
... type=&apos;int&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/AutocommitMode&gt;&lt;EndOfWave
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/EndOfWave&gt;&lt;BeginEnd
... collapsed=&apos;1&apos;
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/BeginEnd&gt;&lt;/Transaction&gt;&lt;Session&gt;&lt;ArraySize
... type=&apos;int&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/ArraySize&gt;&lt;FetchSize
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/FetchSize&gt;&lt;ReportSchemaMismatch
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReportSchemaMismatch&gt;&lt;DefaultLengthForColumns
... type=&apos;int&apos;&gt;&lt;![CDATA[200]]&gt;&lt;/DefaultLengthForColumns&gt;&lt;DefaultLengthForLongColumns
... type=&apos;int&apos;&gt;&lt;![CDATA[20000]]&gt;&lt;/DefaultLengthForLongColumns&gt;&lt;CharacterSetForNonUnicodeColumns
... collapsed=&apos;1&apos;
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/CharacterSetForNonUnicodeColumns&gt;&lt;KeepConductorConnectionAlive
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/KeepConductorConnectionAlive&gt;&lt;/Session&gt;&lt;BeforeAfter
... modified=&apos;1&apos; type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;BeforeSQL
... collapsed=&apos;1&apos; modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;ReadFromFileBeforeSQL
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileBeforeSQL&gt;&lt;FailOnError
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/BeforeSQL&gt;&lt;AfterSQL
... collapsed=&apos;1&apos; modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[
... SELECT * FROM db.schema.table;
... ]]&gt;&lt;ReadFromFileAfterSQL
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileAfterSQL&gt;&lt;FailOnError
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/AfterSQL&gt;&lt;BeforeSQLNode
... type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;ReadFromFileBeforeSQLNode
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileBeforeSQLNode&gt;&lt;FailOnError
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/BeforeSQLNode&gt;&lt;AfterSQLNode
... type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;ReadFromFileAfterSQLNode
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileAfterSQLNode&gt;&lt;FailOnError
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/AfterSQLNode&gt;&lt;/BeforeAfter&gt;&lt;Java&gt;&lt;ConnectorClasspath
... type=&apos;string&apos;&gt;&lt;![CDATA[$(DSHOME)/../DSComponents/bin/ccjdbc.jar;$(DSHOME)]]&gt;&lt;/ConnectorClasspath&gt;&lt;HeapSize
... modified=&apos;1&apos;
... type=&apos;int&apos;&gt;&lt;![CDATA[1024]]&gt;&lt;/HeapSize&gt;&lt;ConnectorOtherOptions
... type=&apos;string&apos;&gt;&lt;![CDATA[-Dcom.ibm.is.cc.options=noisfjars]]&gt;&lt;/ConnectorOtherOptions&gt;&lt;/Java&gt;&lt;LimitRows
... collapsed=&apos;1&apos;
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/LimitRows&gt;&lt;/Usage&gt;&lt;/Properties
... &gt;</Property>
... </SubRecord>
... </Collection>
... </Record>
... </Job>
... </DSExport>''').dags['data_stage_job'] # doctest: +ELLIPSIS
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from pendulum import DateTime, Timezone
with DAG(dag_id='data_stage_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md=...):
select_table_task = SQLExecuteQueryOperator(task_id='select_table', conn_id='DB', sql=['Select 1 as Dummy from db.schema.table Limit 1;', 'SELECT * FROM db.schema.table;'])

"""
from __future__ import annotations
from itertools import pairwise
from defusedxml import ElementTree
import inflection

import json
from itertools import pairwise

from loguru import logger

import jq
from orbiter.file_types import FileTypeXML
from orbiter.objects import conn_id
from orbiter.objects.dag import OrbiterDAG
from orbiter.objects.operators.empty import OrbiterEmptyOperator
from orbiter.objects.operators.sql import OrbiterSQLExecuteQueryOperator
from orbiter.objects.task import OrbiterOperator
from orbiter.objects.task_group import OrbiterTaskGroup
from orbiter.objects.task import OrbiterTaskDependency
from orbiter.objects.task_group import OrbiterTaskGroup
from orbiter.rules import (
dag_filter_rule,
dag_rule,
Expand All @@ -32,25 +123,51 @@


@dag_filter_rule
def basic_dag_filter(val: dict) -> list | None:
"""Filter input down to a list of dictionaries that can be processed by the `@dag_rules`"""
return val["DSExport"][0]["Job"]
def basic_dag_filter(val: dict) -> list[dict] | None:
"""Get `Job` objects from within a parent `DSExport` object

```pycon
>>> basic_dag_filter({"DSExport": [{"Job": [{'@Identifier': 'foo'}, {'@Identifier': 'bar'}]}]})
[{'@Identifier': 'foo'}, {'@Identifier': 'bar'}]

```
"""
if ds_export := val.get("DSExport"):
return [job for export in ds_export for job in export.get("Job") if export.get("Job")]


@dag_rule
def basic_dag_rule(val: dict) -> OrbiterDAG | None:
"""Translate input into an `OrbiterDAG`"""
try:
dag_id = val["@Identifier"]
dag_id = inflection.underscore(dag_id)
return OrbiterDAG(dag_id=dag_id, file_path=f"{dag_id}.py")
except Exception:
return None
"""Translate input into an `OrbiterDAG`, using the `Identifier` as the DAG ID

```pycon
>>> basic_dag_rule({"@Identifier": "demo.extract_sample_currency_data"}) # doctest: +ELLIPSIS
from airflow import DAG
from pendulum import DateTime, Timezone
with DAG(dag_id='demo.extract_sample_currency_data', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md=...):

```
"""
if dag_id := val.get("@Identifier"):
return OrbiterDAG(
dag_id=dag_id,
file_path=f"{dag_id}.py",
doc_md="**Created via [Orbiter](https://astronomer.github.io/orbiter) w/ Demo Translation Ruleset**.\n"
"Contact Astronomer @ [[email protected]](mailto:[email protected]) "
"or at [astronomer.io/contact](https://www.astronomer.io/contact/) for more!",
)


@task_filter_rule
def basic_task_filter(val: dict) -> list | None:
"""Filter input down to a list of dictionaries that can be processed by the `@task_rules`"""
"""Filter input down to a list of dictionaries with `@Type=CustomStage`

```pycon
>>> basic_task_filter({"Record": [{"@Type": "CustomStage"}, {"@Type": "SomethingElse"}]})
[{'@Type': 'CustomStage'}]

```
"""
if isinstance(val, dict):
val = json.loads(json.dumps(val, default=str)) # pre-serialize values, for JQ
try:
Expand All @@ -64,67 +181,91 @@ def basic_task_filter(val: dict) -> list | None:
return None


@task_rule(priority=2)
def basic_task_rule(val: dict) -> OrbiterOperator | OrbiterTaskGroup | None:
"""Translate input into an Operator (e.g. `OrbiterBashOperator`). will be applied first, with a higher priority"""
if "task_id" in val:
return OrbiterEmptyOperator(task_id=val["task_id"])
else:
return None


def task_common_args(val: dict) -> dict:
"""
Common mappings for all tasks
"""
task_id: str = (
jq.compile(""".Property[] | select(.["@Name"] == "Name") | .["#text"]""")
.input_value(val)
.first()
)
task_id = inflection.underscore(task_id)
try:
task_id: str = (
jq.compile(""".Property[] | select(.["@Name"] == "Name") | .["#text"]""")
.input_value(val)
.first()
)
except ValueError:
task_id = "UNKNOWN"
params = {"task_id": task_id}
return params


def extract_sql_statements(root):
sql_statements = {}
sql_tags = ["SelectStatement", "BeforeSQL", "AfterSQL"]
@task_rule(priority=2)
def _cannot_map_rule(val: dict) -> OrbiterOperator | OrbiterTaskGroup | None:
"""Translate input into an Operator (e.g. `OrbiterBashOperator`). will be applied first, with a higher priority"""
return OrbiterEmptyOperator(**task_common_args(val))


def extract_sql_statements(root: dict) -> list[str]:
"""Find SQL Statements deeply nested

for tag in sql_tags:
elements = root.findall(f".//{tag}")
for elem in elements:
if elem.text:
sql_text = elem.text.strip()
sql_statements[tag] = sql_text
return sql_statements
Looks for text of `SelectStatement`, `BeforeSQL`, and `AfterSQL` tags

```pycon
>>> extract_sql_statements({
... "BeforeSQL": [{"#text": ""}],
... "SelectStatement": [{"#text": "SELECT 1 as Dummy from db.schema.table Limit 1;"}],
... "AfterSQL": [{"#text": "SELECT * FROM db.schema.table;"}]
... })
['SELECT 1 as Dummy from db.schema.table Limit 1;', 'SELECT * FROM db.schema.table;']
>>> extract_sql_statements({"a": {"b": {"c": {"d": {"SelectStatement": [{"#text": "SELECT 1;"}]}}}}})
['SELECT 1;']

```
"""
if root:
return [
sql.strip()
for tag in [
"BeforeSQL",
"SelectStatement",
"AfterSQL"
]
for sql in jq.all(f"""recurse | select(.{tag}?) | .{tag}[]["#text"]""", root)
if sql and sql.strip()
] or None
raise ValueError("No SQL Statements found")

@task_rule(priority=2)
def sql_command_rule(val) -> OrbiterSQLExecuteQueryOperator | None:
"""
For SQLQueryOperator.
Create a SQL Operator with one or more SQL Statements


```pycon
>>> sql_command_rule({
... 'Property': [{'@Name': 'Name', '#text': 'SELECT_TABLE'}],
... '@Identifier': 'V253S0',
... "Collection": [{"SubRecord": [{"Property": [{
... "@PreFormatted": "1",
... "#text": {"Properties": [{
... "Usage": [{"SQL": [{"SelectStatement": [{"#text": "SELECT 1;"}]}]}],
... }]}
... }]}]}]
... }) # doctest: +ELLIPSIS
select_table_task = SQLExecuteQueryOperator(task_id='select_table', conn_id='DB', sql='SELECT 1;')

```
""" # noqa: E501
try:
sql: str = (
jq.compile(
""".Collection[] | .SubRecord[] | .Property[] | select(.["@PreFormatted"] == "1") | .["#text"] """
)
.input_value(val)
.first()
)
root = ElementTree.fromstring(sql.encode("utf-16"))
sql_statements = extract_sql_statements(root)
sql = " ".join(sql_statements.values())
if sql:
if sql := jq.first(
""".Collection[] | .SubRecord[] | .Property[] | select(.["@PreFormatted"] == "1") | .["#text"]""",
val
):
return OrbiterSQLExecuteQueryOperator(
sql=sql,
**conn_id(conn_id="snowflake_default", conn_type="snowflake"),
sql=stmt[0] if len(stmt := extract_sql_statements(sql)) == 1 else stmt,
**conn_id(conn_id="DB"),
**task_common_args(val),
)
except StopIteration:
pass
except (StopIteration, ValueError) as e:
logger.debug(f"[WARNING] No SQL found in {val}, {e}")
return None


Expand All @@ -149,9 +290,7 @@ def basic_task_dependency_rule(val: OrbiterDAG) -> list | None:
dag_filter_ruleset=DAGFilterRuleset(ruleset=[basic_dag_filter]),
dag_ruleset=DAGRuleset(ruleset=[basic_dag_rule]),
task_filter_ruleset=TaskFilterRuleset(ruleset=[basic_task_filter]),
task_ruleset=TaskRuleset(
ruleset=[sql_command_rule, basic_task_rule, cannot_map_rule]
),
task_ruleset=TaskRuleset(ruleset=[sql_command_rule, _cannot_map_rule, cannot_map_rule]),
task_dependency_ruleset=TaskDependencyRuleset(ruleset=[basic_task_dependency_rule]),
post_processing_ruleset=PostProcessingRuleset(ruleset=[]),
)
Loading
Loading