Skip to content

Commit

Permalink
Oozie + Datastage + misc improvements (#27)
Browse files Browse the repository at this point in the history
- [datastage] fix/feat/doc: rm defusedXML dependency, add tests+docs, add more JQ

- [oozie] feat: update demo translation

- [oozie] fix: add airflow as o2a dep :(

- [datastage] fix: return sql as list, if multiple

- [datastage] fix: improve sql extraction

- [various] chore: standardize 'demo' msg, add/improve unit tests
  • Loading branch information
fritz-astronomer authored Jan 14, 2025
1 parent 2e37762 commit 479be42
Show file tree
Hide file tree
Showing 14 changed files with 1,013 additions and 281 deletions.
4 changes: 3 additions & 1 deletion orbiter_translations/automic/xml_demo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""
## Demo `translation_ruleset` Example
Demo Automic XML Translation to Airflow DAGs
Contact Astronomer @ https://astronomer.io/contact for access to our full translation.
```pycon
>>> translation_ruleset.test(input_value='''<?xml version="1.0" encoding="ISO-8859-15"?>
Expand Down
10 changes: 8 additions & 2 deletions orbiter_translations/autosys/jil_demo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
r"""Demonstration of translating AutoSys JIL files into Airflow DAGs.
Contact Astronomer @ https://astronomer.io/contact for access to our full translation.
```pycon
>>> translation_ruleset.test('''
... insert_job: foo.job
Expand All @@ -12,7 +15,7 @@
from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator
from pendulum import DateTime, Timezone
with DAG(dag_id='foo_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, default_args={'owner': '[email protected]'}):
with DAG(dag_id='foo_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, default_args={'owner': '[email protected]'}, doc_md=...):
foo_job_task = SSHOperator(task_id='foo_job', ssh_conn_id='bar', command='"C:\\ldhe\\cxl\\TidalDB\\startApp.cmd" "arg1" "arg2" "arg3"', doc='Foo Job')
```
Expand Down Expand Up @@ -121,7 +124,7 @@ def basic_dag_rule(val: dict) -> OrbiterDAG | None:
... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
from airflow import DAG
from pendulum import DateTime, Timezone
with DAG(dag_id='foo_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, default_args={'owner': '[email protected]'}):
with DAG(dag_id='foo_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, default_args={'owner': '[email protected]'}, doc_md=...):
```
"""
Expand All @@ -137,6 +140,9 @@ def basic_dag_rule(val: dict) -> OrbiterDAG | None:
return OrbiterDAG(
dag_id=dag_id,
file_path=dag_id + ".py",
doc_md="**Created via [Orbiter](https://astronomer.github.io/orbiter) w/ Demo Translation Ruleset**.\n"
"Contact Astronomer @ [[email protected]](mailto:[email protected]) "
"or at [astronomer.io/contact](https://www.astronomer.io/contact/) for more!",
**default_args,
)

Expand Down
6 changes: 4 additions & 2 deletions orbiter_translations/control_m/xml_demo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""
## Demo `translation_ruleset` Example
Demo Control-M XML Translation Ruleset
Contact Astronomer @ https://astronomer.io/contact for access to our full translation.
```pycon
>>> translation_ruleset.test({
... "DEFTABLE": {
Expand All @@ -25,7 +28,6 @@
```
""" # noqa: E501

from __future__ import annotations

from pathlib import Path
Expand Down
3 changes: 3 additions & 0 deletions orbiter_translations/dag_factory/yaml_base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""Full DAG Factory YAML Translation Ruleset.
Convert DAG Factory YAML into full Airflow DAGs"""
from __future__ import annotations

import inspect
Expand Down
257 changes: 198 additions & 59 deletions orbiter_translations/data_stage/xml_demo.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,108 @@
"""Demo translation ruleset for DataStage XML files to Airflow DAGs
Contact Astronomer @ https://astronomer.io/contact for access to our full translation
```pycon
>>> translation_ruleset.test(input_value='''<?xml version="1.0" encoding="UTF-8"?>
... <DSExport>
... <Job Identifier="DataStage_Job" DateModified="2020-11-27" TimeModified="05.07.33">
... <Record Identifier="V253S0" Type="CustomStage" Readonly="0">
... <Property Name="Name">SELECT_TABLE</Property>
... <Collection Name="Properties" Type="CustomProperty">
... <SubRecord>
... <Property Name="Name">XMLProperties</Property>
... <Property Name="Value" PreFormatted="1">&lt;?xml version=&apos;1.0&apos;
... encoding=&apos;UTF-16&apos;?&gt;&lt;Properties
... version=&apos;1.1&apos;&gt;&lt;Common&gt;&lt;Context
... type=&apos;int&apos;&gt;1&lt;/Context&gt;&lt;Variant
... type=&apos;string&apos;&gt;1.0&lt;/Variant&gt;&lt;DescriptorVersion
... type=&apos;string&apos;&gt;1.0&lt;/DescriptorVersion&gt;&lt;PartitionType
... type=&apos;int&apos;&gt;-1&lt;/PartitionType&gt;&lt;RCP
... type=&apos;int&apos;&gt;0&lt;/RCP&gt;&lt;/Common&gt;&lt;Connection&gt;&lt;URL
... modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[jdbc:snowflake://xyz.us-east-1.snowflakecomputing.com/?&amp;warehouse=#XYZ_DB.$snowflake_wh#&amp;db=#DB.$schema#]]&gt;&lt;/URL&gt;&lt;Username
... modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[#DB.$snowflake_userid#]]&gt;&lt;/Username&gt;&lt;Password
... modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[#DB.$snowflake_passwd#]]&gt;&lt;/Password&gt;&lt;Attributes
... modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;/Attributes&gt;&lt;/Connection&gt;&lt;Usage&gt;&lt;ReadMode
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadMode&gt;&lt;GenerateSQL
... modified=&apos;1&apos;
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/GenerateSQL&gt;&lt;EnableQuotedIDs
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/EnableQuotedIDs&gt;&lt;SQL&gt;&lt;SelectStatement
... collapsed=&apos;1&apos; modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[
... Select 1 as Dummy from db.schema.table Limit 1;]]&gt;&lt;ReadFromFileSelect
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileSelect&gt;&lt;/SelectStatement&gt;&lt;EnablePartitionedReads
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/EnablePartitionedReads&gt;&lt;/SQL&gt;&lt;Transaction&gt;&lt;RecordCount
... type=&apos;int&apos;&gt;&lt;![CDATA[2000]]&gt;&lt;/RecordCount&gt;&lt;IsolationLevel
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/IsolationLevel&gt;&lt;AutocommitMode
... modified=&apos;1&apos;
... type=&apos;int&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/AutocommitMode&gt;&lt;EndOfWave
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/EndOfWave&gt;&lt;BeginEnd
... collapsed=&apos;1&apos;
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/BeginEnd&gt;&lt;/Transaction&gt;&lt;Session&gt;&lt;ArraySize
... type=&apos;int&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/ArraySize&gt;&lt;FetchSize
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/FetchSize&gt;&lt;ReportSchemaMismatch
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReportSchemaMismatch&gt;&lt;DefaultLengthForColumns
... type=&apos;int&apos;&gt;&lt;![CDATA[200]]&gt;&lt;/DefaultLengthForColumns&gt;&lt;DefaultLengthForLongColumns
... type=&apos;int&apos;&gt;&lt;![CDATA[20000]]&gt;&lt;/DefaultLengthForLongColumns&gt;&lt;CharacterSetForNonUnicodeColumns
... collapsed=&apos;1&apos;
... type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/CharacterSetForNonUnicodeColumns&gt;&lt;KeepConductorConnectionAlive
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/KeepConductorConnectionAlive&gt;&lt;/Session&gt;&lt;BeforeAfter
... modified=&apos;1&apos; type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;BeforeSQL
... collapsed=&apos;1&apos; modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;ReadFromFileBeforeSQL
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileBeforeSQL&gt;&lt;FailOnError
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/BeforeSQL&gt;&lt;AfterSQL
... collapsed=&apos;1&apos; modified=&apos;1&apos;
... type=&apos;string&apos;&gt;&lt;![CDATA[
... SELECT * FROM db.schema.table;
... ]]&gt;&lt;ReadFromFileAfterSQL
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileAfterSQL&gt;&lt;FailOnError
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/AfterSQL&gt;&lt;BeforeSQLNode
... type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;ReadFromFileBeforeSQLNode
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileBeforeSQLNode&gt;&lt;FailOnError
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/BeforeSQLNode&gt;&lt;AfterSQLNode
... type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;ReadFromFileAfterSQLNode
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileAfterSQLNode&gt;&lt;FailOnError
... type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/AfterSQLNode&gt;&lt;/BeforeAfter&gt;&lt;Java&gt;&lt;ConnectorClasspath
... type=&apos;string&apos;&gt;&lt;![CDATA[$(DSHOME)/../DSComponents/bin/ccjdbc.jar;$(DSHOME)]]&gt;&lt;/ConnectorClasspath&gt;&lt;HeapSize
... modified=&apos;1&apos;
... type=&apos;int&apos;&gt;&lt;![CDATA[1024]]&gt;&lt;/HeapSize&gt;&lt;ConnectorOtherOptions
... type=&apos;string&apos;&gt;&lt;![CDATA[-Dcom.ibm.is.cc.options=noisfjars]]&gt;&lt;/ConnectorOtherOptions&gt;&lt;/Java&gt;&lt;LimitRows
... collapsed=&apos;1&apos;
... type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/LimitRows&gt;&lt;/Usage&gt;&lt;/Properties
... &gt;</Property>
... </SubRecord>
... </Collection>
... </Record>
... </Job>
... </DSExport>''').dags['data_stage_job'] # doctest: +ELLIPSIS
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from pendulum import DateTime, Timezone
with DAG(dag_id='data_stage_job', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md=...):
select_table_task = SQLExecuteQueryOperator(task_id='select_table', conn_id='DB', sql=['Select 1 as Dummy from db.schema.table Limit 1;', 'SELECT * FROM db.schema.table;'])
"""
from __future__ import annotations
from itertools import pairwise
from defusedxml import ElementTree
import inflection

import json
from itertools import pairwise

from loguru import logger

import jq
from orbiter.file_types import FileTypeXML
from orbiter.objects import conn_id
from orbiter.objects.dag import OrbiterDAG
from orbiter.objects.operators.empty import OrbiterEmptyOperator
from orbiter.objects.operators.sql import OrbiterSQLExecuteQueryOperator
from orbiter.objects.task import OrbiterOperator
from orbiter.objects.task_group import OrbiterTaskGroup
from orbiter.objects.task import OrbiterTaskDependency
from orbiter.objects.task_group import OrbiterTaskGroup
from orbiter.rules import (
dag_filter_rule,
dag_rule,
Expand All @@ -32,25 +123,51 @@


@dag_filter_rule
def basic_dag_filter(val: dict) -> list | None:
"""Filter input down to a list of dictionaries that can be processed by the `@dag_rules`"""
return val["DSExport"][0]["Job"]
def basic_dag_filter(val: dict) -> list[dict] | None:
"""Get `Job` objects from within a parent `DSExport` object
```pycon
>>> basic_dag_filter({"DSExport": [{"Job": [{'@Identifier': 'foo'}, {'@Identifier': 'bar'}]}]})
[{'@Identifier': 'foo'}, {'@Identifier': 'bar'}]
```
"""
if ds_export := val.get("DSExport"):
return [job for export in ds_export for job in export.get("Job") if export.get("Job")]


@dag_rule
def basic_dag_rule(val: dict) -> OrbiterDAG | None:
"""Translate input into an `OrbiterDAG`"""
try:
dag_id = val["@Identifier"]
dag_id = inflection.underscore(dag_id)
return OrbiterDAG(dag_id=dag_id, file_path=f"{dag_id}.py")
except Exception:
return None
"""Translate input into an `OrbiterDAG`, using the `Identifier` as the DAG ID
```pycon
>>> basic_dag_rule({"@Identifier": "demo.extract_sample_currency_data"}) # doctest: +ELLIPSIS
from airflow import DAG
from pendulum import DateTime, Timezone
with DAG(dag_id='demo.extract_sample_currency_data', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md=...):
```
"""
if dag_id := val.get("@Identifier"):
return OrbiterDAG(
dag_id=dag_id,
file_path=f"{dag_id}.py",
doc_md="**Created via [Orbiter](https://astronomer.github.io/orbiter) w/ Demo Translation Ruleset**.\n"
"Contact Astronomer @ [[email protected]](mailto:[email protected]) "
"or at [astronomer.io/contact](https://www.astronomer.io/contact/) for more!",
)


@task_filter_rule
def basic_task_filter(val: dict) -> list | None:
"""Filter input down to a list of dictionaries that can be processed by the `@task_rules`"""
"""Filter input down to a list of dictionaries with `@Type=CustomStage`
```pycon
>>> basic_task_filter({"Record": [{"@Type": "CustomStage"}, {"@Type": "SomethingElse"}]})
[{'@Type': 'CustomStage'}]
```
"""
if isinstance(val, dict):
val = json.loads(json.dumps(val, default=str)) # pre-serialize values, for JQ
try:
Expand All @@ -64,67 +181,91 @@ def basic_task_filter(val: dict) -> list | None:
return None


@task_rule(priority=2)
def basic_task_rule(val: dict) -> OrbiterOperator | OrbiterTaskGroup | None:
"""Translate input into an Operator (e.g. `OrbiterBashOperator`). will be applied first, with a higher priority"""
if "task_id" in val:
return OrbiterEmptyOperator(task_id=val["task_id"])
else:
return None


def task_common_args(val: dict) -> dict:
"""
Common mappings for all tasks
"""
task_id: str = (
jq.compile(""".Property[] | select(.["@Name"] == "Name") | .["#text"]""")
.input_value(val)
.first()
)
task_id = inflection.underscore(task_id)
try:
task_id: str = (
jq.compile(""".Property[] | select(.["@Name"] == "Name") | .["#text"]""")
.input_value(val)
.first()
)
except ValueError:
task_id = "UNKNOWN"
params = {"task_id": task_id}
return params


def extract_sql_statements(root):
sql_statements = {}
sql_tags = ["SelectStatement", "BeforeSQL", "AfterSQL"]
@task_rule(priority=2)
def _cannot_map_rule(val: dict) -> OrbiterOperator | OrbiterTaskGroup | None:
"""Translate input into an Operator (e.g. `OrbiterBashOperator`). will be applied first, with a higher priority"""
return OrbiterEmptyOperator(**task_common_args(val))


def extract_sql_statements(root: dict) -> list[str]:
"""Find SQL Statements deeply nested
for tag in sql_tags:
elements = root.findall(f".//{tag}")
for elem in elements:
if elem.text:
sql_text = elem.text.strip()
sql_statements[tag] = sql_text
return sql_statements
Looks for text of `SelectStatement`, `BeforeSQL`, and `AfterSQL` tags
```pycon
>>> extract_sql_statements({
... "BeforeSQL": [{"#text": ""}],
... "SelectStatement": [{"#text": "SELECT 1 as Dummy from db.schema.table Limit 1;"}],
... "AfterSQL": [{"#text": "SELECT * FROM db.schema.table;"}]
... })
['SELECT 1 as Dummy from db.schema.table Limit 1;', 'SELECT * FROM db.schema.table;']
>>> extract_sql_statements({"a": {"b": {"c": {"d": {"SelectStatement": [{"#text": "SELECT 1;"}]}}}}})
['SELECT 1;']
```
"""
if root:
return [
sql.strip()
for tag in [
"BeforeSQL",
"SelectStatement",
"AfterSQL"
]
for sql in jq.all(f"""recurse | select(.{tag}?) | .{tag}[]["#text"]""", root)
if sql and sql.strip()
] or None
raise ValueError("No SQL Statements found")

@task_rule(priority=2)
def sql_command_rule(val) -> OrbiterSQLExecuteQueryOperator | None:
"""
For SQLQueryOperator.
Create a SQL Operator with one or more SQL Statements
```pycon
>>> sql_command_rule({
... 'Property': [{'@Name': 'Name', '#text': 'SELECT_TABLE'}],
... '@Identifier': 'V253S0',
... "Collection": [{"SubRecord": [{"Property": [{
... "@PreFormatted": "1",
... "#text": {"Properties": [{
... "Usage": [{"SQL": [{"SelectStatement": [{"#text": "SELECT 1;"}]}]}],
... }]}
... }]}]}]
... }) # doctest: +ELLIPSIS
select_table_task = SQLExecuteQueryOperator(task_id='select_table', conn_id='DB', sql='SELECT 1;')
```
""" # noqa: E501
try:
sql: str = (
jq.compile(
""".Collection[] | .SubRecord[] | .Property[] | select(.["@PreFormatted"] == "1") | .["#text"] """
)
.input_value(val)
.first()
)
root = ElementTree.fromstring(sql.encode("utf-16"))
sql_statements = extract_sql_statements(root)
sql = " ".join(sql_statements.values())
if sql:
if sql := jq.first(
""".Collection[] | .SubRecord[] | .Property[] | select(.["@PreFormatted"] == "1") | .["#text"]""",
val
):
return OrbiterSQLExecuteQueryOperator(
sql=sql,
**conn_id(conn_id="snowflake_default", conn_type="snowflake"),
sql=stmt[0] if len(stmt := extract_sql_statements(sql)) == 1 else stmt,
**conn_id(conn_id="DB"),
**task_common_args(val),
)
except StopIteration:
pass
except (StopIteration, ValueError) as e:
logger.debug(f"[WARNING] No SQL found in {val}, {e}")
return None


Expand All @@ -149,9 +290,7 @@ def basic_task_dependency_rule(val: OrbiterDAG) -> list | None:
dag_filter_ruleset=DAGFilterRuleset(ruleset=[basic_dag_filter]),
dag_ruleset=DAGRuleset(ruleset=[basic_dag_rule]),
task_filter_ruleset=TaskFilterRuleset(ruleset=[basic_task_filter]),
task_ruleset=TaskRuleset(
ruleset=[sql_command_rule, basic_task_rule, cannot_map_rule]
),
task_ruleset=TaskRuleset(ruleset=[sql_command_rule, _cannot_map_rule, cannot_map_rule]),
task_dependency_ruleset=TaskDependencyRuleset(ruleset=[basic_task_dependency_rule]),
post_processing_ruleset=PostProcessingRuleset(ruleset=[]),
)
Loading

0 comments on commit 479be42

Please sign in to comment.