Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Cross-language JDBC (MSSQL) - incorrect negative Integer type conversion #34089

Open
3 of 17 tasks
mataralhawiti opened this issue Feb 26, 2025 · 4 comments
Open
3 of 17 tasks

Comments

@mataralhawiti
Copy link

mataralhawiti commented Feb 26, 2025

What happened?

We use JDBC cross-language transform to read data from MSSQL to BigQuery, and we noticed negative integers are being converted incorrectly.
For example: if we have INT column in source with value (-1), it's being converted to (4294967295).

Here's a snippet from our code:

import apache_beam as beam
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.options.pipeline_options import PipelineOptions
import logging

"""
table1
CREATE TABLE Customers (
    quanitity int,
    LastName varchar(255),
);

INSERT INTO Customers (quanitity, LastName) VALUES (44, 'Tom');
INSERT INTO Customers (quanitity, LastName) VALUES (-1, 'Tom');

"""


class LogResults(beam.DoFn):
    """Just log the results"""

    def process(self, element):
        logging.info("elment.logger - : %s", element)
        yield element


def row_to_dict(row):
    as_dict = row._asdict()
    return as_dict


def run(argv=None, save_main_session=True):
    # Start the pipeline
    pipeline_args = ""
    pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
    LogicalType.register_logical_type(MillisInstant)
    with beam.Pipeline(options=pipeline_options) as p:
        p | "full-Read" >> ReadFromJdbc(
            query="select quanitity, LastName  from Customers",
            table_name=f"xxxxxx",
            driver_class_name="com.microsoft.sqlserver.jdbc.SQLServerDriver",
            jdbc_url="jdbc:sqlserver://{0};databaseName={1}".format("xx", "xx"),
            username="username",
            password="password",
            classpath=["gs://xxxxx/mssql-jdbc-12.6.2.jre11.jar"],
        )
        | "row to map" >> beam.Map(row_to_dict)
        | "log result" >> beam.ParDo(LogResults())


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    run()

Here's the output:

elment.logger - : {'quanitity': 44, 'LastName': "Tom" }
elment.logger - : {'quanitity': 4294967295, 'LastName': "Tom" }

Environment:
Apache Beam 2.63.0
Google Cloud Dataflow

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@mataralhawiti mataralhawiti changed the title [Bug]: Cross-language JDBC negative Integer type conversion [Bug]: Cross-language JDBC - negative Integer type conversion incorrectly Feb 26, 2025
@mataralhawiti mataralhawiti changed the title [Bug]: Cross-language JDBC - negative Integer type conversion incorrectly [Bug]: Cross-language JDBC (MSSQL) - incorrect negative Integer type conversion Feb 26, 2025
@mataralhawiti
Copy link
Author

update

I was able to get the correct value when I cast the value to BIGINT:
query="select cast(quanitity as BIGINT), LastName from Customers",

but I still couldn't understand why the signed integer (-1) not being converted correctly without the casting

@liferoad
Copy link
Contributor

@Brijeshthummar02
Copy link

@mataralhawiti @liferoad how exactly it can be tackled?

@mataralhawiti
Copy link
Author

Can you try to define the coder like https://github.com/GoogleCloudPlatform/dataflow-cookbook/blob/main/Python/jdbc/read_jdbc.py#L84?

thanks @liferoad , I tried defining a row coder as you suggested but it's still the same issue.

class ExampleRow(NamedTuple):
    quanitity : int
    LastName  : str
..
..

    | "Map to ExampleRow" >> beam.Map(lambda element: ExampleRow(element[0], element[1]))
    | "log result" >> beam.ParDo(LogResults())

and my worker logs

elment.logger - : ExampleRow(quanitity=4294967290, LastName='xxx')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants