-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DOP-9787] Improve read strategies in DBReader #182
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
d4080ad
to
1fb42db
Compare
1fb42db
to
81d3d8d
Compare
81d3d8d
to
92a56b8
Compare
92a56b8
to
183b93d
Compare
183b93d
to
2c36e01
Compare
2c36e01
to
bdebfd4
Compare
bdebfd4
to
522513b
Compare
522513b
to
28b5492
Compare
28b5492
to
8e44dcc
Compare
8e44dcc
to
693b151
Compare
693b151
to
4c433bf
Compare
4c433bf
to
d7078b8
Compare
d7078b8
to
b2fe96a
Compare
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## develop #182 +/- ##
===========================================
+ Coverage 94.08% 94.34% +0.25%
===========================================
Files 205 204 -1
Lines 7744 7671 -73
Branches 1400 1376 -24
===========================================
- Hits 7286 7237 -49
+ Misses 332 318 -14
+ Partials 126 116 -10 ☔ View full report in Codecov by Sentry. |
b2fe96a
to
a781c4a
Compare
c1a29e0
to
8529626
Compare
8529626
to
56a449b
Compare
56a449b
to
771b0d7
Compare
771b0d7
to
270bb3b
Compare
270bb3b
to
fabd05a
Compare
fabd05a
to
e90ce19
Compare
tests/tests_integration/tests_strategy_integration/test_strategy_incremental_batch.py
Show resolved
Hide resolved
...tegy_integration/tests_incremental_strategy_integration/test_strategy_increment_greenplum.py
Show resolved
Hide resolved
f09818d
to
dd7d06d
Compare
dd7d06d
to
cdfb491
Compare
maxim-lixakov
approved these changes
Dec 8, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Change Summary
SELECT * FROM table WHERE 1=0
(ifDBReader.columns
contains*
)SELECT hwm.expression AS hwm.column, ...other table columns... FROM table WHERE prev_hwm.expression > prev_hwm.value
.df.schema[hwm.column].dataType
df.select(min(hwm.column), max(hwm.column)).collect()
on Spark side.max(hwm.column)
as next HWM value.This was far from ideal:
Dataframe content (all rows or just changed ones) was loaded from the source to Spark only to get min/max values of specific column.
Step of fetching table schema and then substituting column names in the following query may cause errors.
For example, source contains columns with mixed name case, like
"MyColumn"
and"My column"
.Column names were not escaped during query generation, leading to queries that cannot be executed by database.
So users have to explicitly set proper columns list with wrapping them with
"
.Dataframe was created from query with clause like
WHERE hwm.expression > prev_hwm.value
,not
WHERE hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value
.So if new rows appeared in the source after HWM value is determined, these rows may be read by DBReader on the first run,
and then again on the next run, because they are returned by
WHERE hwm.expression > prev_hwm.value
query.Now it looks like this:
SELECT hwm.expression FROM table WHERE 1=0
df.schema[0]
.SELECT MIN(hwm.expression), MAX(hwm.expression) FROM table WHERE hwm.expression >= prev_hwm.value
.max(hwm.column)
as next HWM value.SELECT * FROM table WHERE hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value
, and return it to user.Improvements:
DBReader
does not fail on tables with mixed column naming.Breaking change - HWM column is not being implicitly added to dataframe.
If it was not just some column value but some expression which then used in your code by accessing dataframe column,
you should explicitly add same expression to
DBReader.columns
.Add 2 internal classes
Edge
andWindow
, replacing oldStatement
class. They representcolumn > edge
/column >= edge
/column <= edge
parts of WHERE clause. Merged separated argumentsstart_from
/end_at
withwindow
inDBConnection
methods, addedapply_window
method toDBDialect
which converts these classes to WHERE clause (replacescondition_assembler
).Get rid of
StrategyHelper
class, move corresponding logic toDBReader
itself. Also updateStrategy
classes, propertiescurrent
/next
returnEdge
which simplifies implementation and allows to dropcurrent_comparator
/next_comparator
.Improved checks for different edge cases
DBReader.hwm
has higher priority than attributes from HWMStore. This may be changed in next PRs.Related issue number
Checklist
docs/changelog/next_release/<pull request or issue id>.<change type>.rst
file added describing change(see CONTRIBUTING.rst for details.)