Skip to content

Commit

Permalink
Use Spark 3.5.4 for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jan 10, 2025
1 parent d4aa99a commit debf890
Show file tree
Hide file tree
Showing 39 changed files with 75 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/data/clickhouse/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ min: &min
max: &max
clickhouse-image: clickhouse/clickhouse-server
clickhouse-version: 24.11-alpine
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/core/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ min: &min
os: ubuntu-latest

max: &max
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/hdfs/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ min: &min

max: &max
hadoop-version: hadoop3-hdfs
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/hive/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ min: &min
os: ubuntu-latest

max: &max
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/kafka/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ min: &min
max: &max
kafka-version: 3.9.0
pydantic-version: 2
spark-version: 3.5.3
spark-version: 3.5.4
python-version: '3.12'
java-version: 20
os: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/local-fs/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ min_excel: &min_excel
os: ubuntu-latest

max: &max
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/mongodb/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ min: &min

max: &max
mongodb-version: 8.0.3
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/mssql/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ min: &min

max: &max
mssql-version: 2022-latest
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/mysql/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ min: &min

max: &max
mysql-version: 9.1.0
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/oracle/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ max: &max
oracle-image: gvenzl/oracle-free
oracle-version: 23.5-slim-faststart
db-name: FREEPDB1
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/postgres/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ min: &min

max: &max
postgres-version: 17.2-alpine
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/s3/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ min: &min

max: &max
minio-version: 2024.11.7
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/teradata/matrix.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
max: &max
spark-version: 3.5.3
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
java-version: 20
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Create virtualenv and install dependencies:
-r requirements/tests/postgres.txt \
-r requirements/tests/oracle.txt \
-r requirements/tests/pydantic-2.txt \
-r requirements/tests/spark-3.5.3.txt
-r requirements/tests/spark-3.5.4.txt
# TODO: remove after https://github.com/zqmillet/sphinx-plantuml/pull/4
pip install sphinx-plantuml --no-deps
Expand Down
6 changes: 3 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ Compatibility matrix
+--------------------------------------------------------------+-------------+-------------+-------+
| `3.4.x <https://spark.apache.org/docs/3.4.3/#downloading>`_ | 3.7 - 3.12 | 8u362 - 20 | 2.12 |
+--------------------------------------------------------------+-------------+-------------+-------+
| `3.5.x <https://spark.apache.org/docs/3.5.3/#downloading>`_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 |
| `3.5.x <https://spark.apache.org/docs/3.5.4/#downloading>`_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 |
+--------------------------------------------------------------+-------------+-------------+-------+

.. _pyspark-install:
Expand All @@ -210,7 +210,7 @@ or install PySpark explicitly:

.. code:: bash
pip install onetl pyspark==3.5.3 # install a specific PySpark version
pip install onetl pyspark==3.5.4 # install a specific PySpark version
or inject PySpark to ``sys.path`` in some other way BEFORE creating a class instance.
**Otherwise connection object cannot be created.**
Expand Down Expand Up @@ -551,7 +551,7 @@ Read files directly from S3 path, convert them to dataframe, transform it and th
setup_logging()
# Initialize new SparkSession with Hadoop AWS libraries and Postgres driver loaded
maven_packages = SparkS3.get_packages(spark_version="3.5.3") + Postgres.get_packages()
maven_packages = SparkS3.get_packages(spark_version="3.5.4") + Postgres.get_packages()
spark = (
SparkSession.builder.appName("spark_app_onetl_demo")
.config("spark.jars.packages", ",".join(maven_packages))
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
context: .
target: base
args:
SPARK_VERSION: 3.5.3
SPARK_VERSION: 3.5.4
env_file: .env.docker
volumes:
- ./:/app/
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ ENV PATH=${ONETL_USER_HOME}/.local/bin:${PATH}
COPY --chown=onetl:onetl ./run_tests.sh ./pytest_runner.sh ./combine_coverage.sh /app/
RUN chmod +x /app/run_tests.sh /app/pytest_runner.sh /app/combine_coverage.sh

ARG SPARK_VERSION=3.5.3
ARG SPARK_VERSION=3.5.4
# Spark is heavy, and version change is quite rare
COPY --chown=onetl:onetl ./requirements/tests/spark-${SPARK_VERSION}.txt /app/requirements/tests/
RUN pip install -r /app/requirements/tests/spark-${SPARK_VERSION}.txt
Expand Down
6 changes: 3 additions & 3 deletions docs/connection/db_connection/clickhouse/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Clickhouse <-> Spark type mapping

.. note::

The results below are valid for Spark 3.5.3, and may differ on other Spark versions.
The results below are valid for Spark 3.5.4, and may differ on other Spark versions.

.. note::

Expand Down Expand Up @@ -116,8 +116,8 @@ References
Here you can find source code with type conversions:

* `Clickhouse -> JDBC <https://github.com/ClickHouse/clickhouse-java/blob/0.3.2/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/JdbcTypeMapping.java#L39-L176>`_
* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L307>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L141-L164>`_
* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L307>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L141-L164>`_
* `JDBC -> Clickhouse <https://github.com/ClickHouse/clickhouse-java/blob/0.3.2/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/JdbcTypeMapping.java#L185-L311>`_

Supported types
Expand Down
2 changes: 1 addition & 1 deletion docs/connection/db_connection/mongodb/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ MongoDB <-> Spark type mapping

.. note::

The results below are valid for Spark 3.5.3, and may differ on other Spark versions.
The results below are valid for Spark 3.5.4, and may differ on other Spark versions.

Type detection & casting
------------------------
Expand Down
6 changes: 3 additions & 3 deletions docs/connection/db_connection/mssql/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ MSSQL <-> Spark type mapping

.. note::

The results below are valid for Spark 3.5.3, and may differ on other Spark versions.
The results below are valid for Spark 3.5.4, and may differ on other Spark versions.

Type detection & casting
------------------------
Expand Down Expand Up @@ -105,8 +105,8 @@ References
Here you can find source code with type conversions:

* `MSSQL -> JDBC <https://github.com/microsoft/mssql-jdbc/blob/v12.2.0/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSetMetaData.java#L117-L170>`_
* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala#L117-L134>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala#L136-L145>`_
* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala#L135-L152>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala#L154-L163>`_
* `JDBC -> MSSQL <https://github.com/microsoft/mssql-jdbc/blob/v12.2.0/src/main/java/com/microsoft/sqlserver/jdbc/DataTypes.java#L625-L676>`_

Supported types
Expand Down
6 changes: 3 additions & 3 deletions docs/connection/db_connection/mysql/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ MySQL <-> Spark type mapping

.. note::

The results below are valid for Spark 3.5.3, and may differ on other Spark versions.
The results below are valid for Spark 3.5.4, and may differ on other Spark versions.

Type detection & casting
------------------------
Expand Down Expand Up @@ -101,8 +101,8 @@ References
Here you can find source code with type conversions:

* `MySQL -> JDBC <https://github.com/mysql/mysql-connector-j/blob/8.0.33/src/main/core-api/java/com/mysql/cj/MysqlType.java#L44-L623>`_
* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala#L104-L132>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala#L204-L211>`_
* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala#L104-L132>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala#L204-L211>`_
* `JDBC -> MySQL <https://github.com/mysql/mysql-connector-j/blob/8.0.33/src/main/core-api/java/com/mysql/cj/MysqlType.java#L625-L867>`_

Supported types
Expand Down
6 changes: 3 additions & 3 deletions docs/connection/db_connection/oracle/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Oracle <-> Spark type mapping

.. note::

The results below are valid for Spark 3.5.3, and may differ on other Spark versions.
The results below are valid for Spark 3.5.4, and may differ on other Spark versions.

Type detection & casting
------------------------
Expand Down Expand Up @@ -105,8 +105,8 @@ See `List of Oracle types <https://docs.oracle.com/en/database/oracle/oracle-dat

Here you can find source code with type conversions:

* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L83-L109>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L111-L123>`_
* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L83-L109>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L111-L123>`_

Numeric types
~~~~~~~~~~~~~
Expand Down
6 changes: 3 additions & 3 deletions docs/connection/db_connection/postgres/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Postgres <-> Spark type mapping

.. note::

The results below are valid for Spark 3.5.3, and may differ on other Spark versions.
The results below are valid for Spark 3.5.4, and may differ on other Spark versions.

Type detection & casting
------------------------
Expand Down Expand Up @@ -113,8 +113,8 @@ See `List of Postgres types <https://www.postgresql.org/docs/current/datatype.ht
Here you can find source code with type conversions:

* `Postgres <-> JDBC <https://github.com/pgjdbc/pgjdbc/blob/REL42.6.0/pgjdbc/src/main/java/org/postgresql/jdbc/TypeInfoCache.java#L78-L112>`_
* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L50-L106>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L116-L130>`_
* `JDBC -> Spark <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L52-L108>`_
* `Spark -> JDBC <https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L118-L132>`_

Numeric types
~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion onetl/_metrics/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def extract_metrics_from_execution(execution: SparkListenerExecution) -> SparkCo
disk_spilled_bytes += stage.metrics.disk_spilled_bytes
result_size_bytes += stage.metrics.result_size_bytes

# https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473
# https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473
input_file_count = (
_get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_FILES_READ)
or _get_int(execution.metrics, SparkSQLMetricNames.STATIC_NUMBER_OF_FILES_READ)
Expand Down
2 changes: 1 addition & 1 deletion onetl/_metrics/listener/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class BaseSparkListener:
"""Base no-op SparkListener implementation.
See `SparkListener <https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListener.html>`_ interface.
See `SparkListener <https://spark.apache.org/docs/3.5.4/api/java/org/apache/spark/scheduler/SparkListener.html>`_ interface.
"""

spark: SparkSession
Expand Down
12 changes: 6 additions & 6 deletions onetl/_metrics/listener/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ class SparkSQLMetricNames(str, Enum): # noqa: WPS338
# Metric names passed to SQLMetrics.createMetric(...)
# But only those we're interested in.

# https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L231
# https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L231
NUMBER_OF_PARTITIONS_READ = "number of partitions read"

# https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227
# https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227
NUMBER_OF_FILES_READ = "number of files read"
SIZE_OF_FILES_READ = "size of files read"

# https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227
# https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227
STATIC_NUMBER_OF_FILES_READ = "static number of files read"
STATIC_SIZE_OF_FILES_READ = "static size of files read"

# https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala#L241-L246
# https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala#L241-L246
NUMBER_OF_DYNAMIC_PART = "number of dynamic part"
NUMBER_OF_WRITTEN_FILES = "number of written files"

Expand Down Expand Up @@ -66,11 +66,11 @@ def jobs(self) -> list[SparkListenerJob]:
return result

def on_execution_start(self, event):
# https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44-L58
# https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44-L58
self.status = SparkListenerExecutionStatus.STARTED

def on_execution_end(self, event):
# https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L61-L83
# https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L61-L83
for job in self._jobs.values():
if job.status == SparkListenerJobStatus.FAILED:
self.status = SparkListenerExecutionStatus.FAILED
Expand Down
4 changes: 2 additions & 2 deletions onetl/_metrics/listener/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def stages(self) -> list[SparkListenerStage]:

@classmethod
def create(cls, event):
# https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerJobSubmitted.html
# https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerJobCompleted.html
# https://spark.apache.org/docs/3.5.4/api/java/org/apache/spark/scheduler/SparkListenerJobSubmitted.html
# https://spark.apache.org/docs/3.5.4/api/java/org/apache/spark/scheduler/SparkListenerJobCompleted.html
result = cls(
id=event.jobId(),
description=event.properties().get("spark.job.description"),
Expand Down
2 changes: 1 addition & 1 deletion onetl/_metrics/listener/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def onExecutionEnd(self, event):

# Get execution metrics from SQLAppStatusStore,
# as SparkListenerSQLExecutionEnd event does not provide them:
# https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
# https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
session_status_store = self.spark._jsparkSession.sharedState().statusStore() # noqa: WPS437
raw_execution = session_status_store.execution(execution.id).get()
metrics = raw_execution.metrics()
Expand Down
6 changes: 3 additions & 3 deletions onetl/_metrics/listener/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __str__(self):

@dataclass
class SparkListenerStage:
# https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/StageInfo.html
# https://spark.apache.org/docs/3.5.4/api/java/org/apache/spark/scheduler/StageInfo.html
id: int
status: SparkListenerStageStatus = SparkListenerStageStatus.PENDING
metrics: SparkListenerTaskMetrics = field(default_factory=SparkListenerTaskMetrics, repr=False, init=False)
Expand All @@ -39,11 +39,11 @@ def create(cls, stage_info):
return cls(id=stage_info.stageId())

def on_stage_start(self, event):
# https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerStageSubmitted.html
# https://spark.apache.org/docs/3.5.4/api/java/org/apache/spark/scheduler/SparkListenerStageSubmitted.html
self.status = SparkListenerStageStatus.ACTIVE

def on_stage_end(self, event):
# https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerStageCompleted.html
# https://spark.apache.org/docs/3.5.4/api/java/org/apache/spark/scheduler/SparkListenerStageCompleted.html
stage_info = event.stageInfo()
if stage_info.failureReason().isDefined():
self.status = SparkListenerStageStatus.FAILED
Expand Down
6 changes: 3 additions & 3 deletions onetl/_metrics/listener/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ class SparkListenerTask:

@classmethod
def create(cls, task_info):
# https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/TaskInfo.html
# https://spark.apache.org/docs/3.5.4/api/java/org/apache/spark/scheduler/TaskInfo.html
return cls(id=task_info.taskId())

def on_task_start(self, event):
# https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerTaskStart.html
# https://spark.apache.org/docs/3.5.4/api/java/org/apache/spark/scheduler/SparkListenerTaskStart.html
self.status = SparkListenerTaskStatus(event.taskInfo().status())

def on_task_end(self, event):
# https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.html
# https://spark.apache.org/docs/3.5.4/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.html
self.status = SparkListenerTaskStatus(event.taskInfo().status())
self.metrics = SparkListenerTaskMetrics.create(event.taskMetrics())
Loading

0 comments on commit debf890

Please sign in to comment.