From 031eea66d0c6d17e1fac680186d6609ecb578c9f Mon Sep 17 00:00:00 2001 From: Henrique Goncalves Date: Wed, 29 Jan 2025 15:05:14 +0300 Subject: [PATCH] Remove unused code I purposefully left unused struct methods, and also unused code inside `library/go` untouched. --- Pull Request resolved: https://github.com/doublecloud/transfer/pull/183 Co-authored-by: tserakhau commit_hash:fb8a197f5b14768ec7070cf1ea2a5ee938097e48 --- .mapping.json | 1 - internal/logger/common.go | 23 ----- .../model/endpoint_rotator_config_test.go | 20 ++-- pkg/csv/error.go | 1 - pkg/debezium/typeutil/helpers.go | 92 +++++++++++-------- pkg/parsers/tests/generic_parser_test.go | 26 +----- pkg/parsers/utils.go | 6 +- pkg/predicate/token.go | 8 -- pkg/providers/clickhouse/schema/engine.go | 26 ------ pkg/providers/delta/provider.go | 2 - pkg/providers/greenplum/model_gp_source.go | 3 - pkg/providers/kinesis/source.go | 17 ---- pkg/providers/mongo/provider.go | 8 -- pkg/providers/ydb/decimal/parse.go | 3 - pkg/providers/ydb/utils.go | 16 +--- pkg/providers/yt/lfstaging/close_gaps.go | 16 ---- pkg/providers/yt/lfstaging/lock.go | 40 -------- pkg/providers/yt/sink/static_table.go | 9 -- pkg/transformer/registry/filter_rows/util.go | 22 +---- pkg/worker/tasks/add_tables.go | 11 --- recipe/mongo/pkg/config/config.go | 7 -- .../kafka2ch/replication_mv/check_db_test.go | 5 +- tests/e2e/mysql2yt/alters/check_db_test.go | 9 -- tests/e2e/mysql2yt/date_time/check_db_test.go | 1 - .../some_parts/partitioned_tables_test.go | 16 ++-- tests/e2e/pg2pg/replication/check_db_test.go | 2 - .../cdc_partial_activate/check_db_test.go | 21 +++-- 27 files changed, 91 insertions(+), 320 deletions(-) delete mode 100644 pkg/providers/yt/lfstaging/lock.go diff --git a/.mapping.json b/.mapping.json index 79693c30..9e68d518 100644 --- a/.mapping.json +++ b/.mapping.json @@ -1824,7 +1824,6 @@ "pkg/providers/yt/lfstaging/close_gaps_test.go":"transfer_manager/go/pkg/providers/yt/lfstaging/close_gaps_test.go", "pkg/providers/yt/lfstaging/intermediate_writer.go":"transfer_manager/go/pkg/providers/yt/lfstaging/intermediate_writer.go", "pkg/providers/yt/lfstaging/intermediate_writer_test.go":"transfer_manager/go/pkg/providers/yt/lfstaging/intermediate_writer_test.go", - "pkg/providers/yt/lfstaging/lock.go":"transfer_manager/go/pkg/providers/yt/lfstaging/lock.go", "pkg/providers/yt/lfstaging/logbroker_metadata.go":"transfer_manager/go/pkg/providers/yt/lfstaging/logbroker_metadata.go", "pkg/providers/yt/lfstaging/logbroker_metadata_test.go":"transfer_manager/go/pkg/providers/yt/lfstaging/logbroker_metadata_test.go", "pkg/providers/yt/lfstaging/rows.go":"transfer_manager/go/pkg/providers/yt/lfstaging/rows.go", diff --git a/internal/logger/common.go b/internal/logger/common.go index e1d4a0ec..839c8cce 100644 --- a/internal/logger/common.go +++ b/internal/logger/common.go @@ -59,26 +59,3 @@ func copySlice(src []byte) []byte { copy(dst, src) return dst } - -func copyBytes(source []byte) []byte { - dup := make([]byte, len(source)) - copy(dup, source) - return dup -} - -func newConsoleLogger() log.Logger { - levels := getEnvLogLevels() - if os.Getenv("CONSOLE_LOG_LEVEL") != "" { - levels = parseLevel(os.Getenv("CONSOLE_LOG_LEVEL")) - } - - levelEnabler := levelEnablerFactory(levels.Zap) - - encoderConfig := zap.CLIConfig(levels.Log).EncoderConfig - encoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder - - encoder := zapcore.NewConsoleEncoder(encoderConfig) - writeSyncer := zapcore.AddSync(os.Stdout) - - return newLogger(zapcore.NewCore(encoder, writeSyncer, levelEnabler)) -} diff --git a/pkg/abstract/model/endpoint_rotator_config_test.go b/pkg/abstract/model/endpoint_rotator_config_test.go index a4c5b295..e35ce960 100644 --- a/pkg/abstract/model/endpoint_rotator_config_test.go +++ b/pkg/abstract/model/endpoint_rotator_config_test.go @@ -35,11 +35,6 @@ var monthList = []time.Month{ time.June, time.July, time.August, time.September, time.October, time.November, time.December, } -var ( - monthDayCount = []int{31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31} - monthDayCountLeap = []int{31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31} -) - // scenario tests func scenarioTesting(t *testing.T) { // this tests based on real user scenarios @@ -90,6 +85,7 @@ func scenarioTestingDTSUPPORT693(t *testing.T) { timestamp = timestamp.Add(time.Hour) require.Equal(t, rotator.KeepPartCount, len(rotationTables), "Check that there is always window of six tables") } + } // tests @@ -179,18 +175,18 @@ func getMonthPartitionedTestHeavy(t *testing.T) { require.Equal(t, monthList[i-(i%partSize)], rc.getMonthPartitioned(month)) } } + } func offsetDateTest(t *testing.T) { t.Setenv("TZ", "Europe/Moscow") // this test is timezone aware t.Run("Hours", offsetDateTestHours) t.Run("Days", offsetDateTestDays) - // t.Run("MonthHeavy", offsetDateTestMonthHeavy) // TODO(@kry127) temporary switched off + //t.Run("MonthHeavy", offsetDateTestMonthHeavy) // TODO(@kry127) temporary switched off t.Run("NilReceiver", offsetDateTestNilReceiver) } func offsetDateTestHours(t *testing.T) { - t.Parallel() rcHours := RotatorConfig{KeepPartCount: 0, PartType: RotatorPartHour, PartSize: 1, TimeColumn: ""} rcHoursTimestamp := time.Now() @@ -209,7 +205,6 @@ func offsetDateTestHours(t *testing.T) { } func offsetDateTestDays(t *testing.T) { - t.Parallel() rcDays := RotatorConfig{KeepPartCount: 0, PartType: RotatorPartDay, PartSize: 1, TimeColumn: ""} rcDaysTimestamp := time.Now() @@ -249,6 +244,11 @@ func isLeap(year int) bool { } func countDaysForYearAcc(year, month, offset int, acc int64) int64 { + var ( + monthDayCount = []int{31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31} + monthDayCountLeap = []int{31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31} + ) + switch { case offset > 0: dayDiff := 0 @@ -288,11 +288,9 @@ func countDaysForYear(year, month, offset int) int64 { return countDaysForYearAcc(year, month, offset, 0) } -func offsetDateTestMonthHeavy(t *testing.T) { - t.Parallel() +func TestOffsetDateTestMonthHeavy(t *testing.T) { checkYear := func(t *testing.T, year, partSize int) { t.Helper() - t.Parallel() rcMonths := RotatorConfig{KeepPartCount: 0, PartType: RotatorPartMonth, PartSize: partSize, TimeColumn: ""} nowTimestamp := time.Now() for offset := 1; offset < 15; offset++ { diff --git a/pkg/csv/error.go b/pkg/csv/error.go index 800da08c..322276b3 100644 --- a/pkg/csv/error.go +++ b/pkg/csv/error.go @@ -4,7 +4,6 @@ import "github.com/doublecloud/transfer/library/go/core/xerrors" var ( errInvalidDelimiter = xerrors.NewSentinel("csv: invalid delimiter") - errInvalidEscape = xerrors.NewSentinel("csv: escape char used outside of quoted text") errDoubleQuotesDisabled = xerrors.NewSentinel("csv: found double quotes while double quote feature disabled") errQuotingDisabled = xerrors.NewSentinel("csv: found quote char while feature disabled") ) diff --git a/pkg/debezium/typeutil/helpers.go b/pkg/debezium/typeutil/helpers.go index 6322110c..5917c493 100644 --- a/pkg/debezium/typeutil/helpers.go +++ b/pkg/debezium/typeutil/helpers.go @@ -21,10 +21,11 @@ import ( // it's fixed in higher version of debezium (in 1.1 this bug is present, in 1.8 absent) // so, actual function - changeItemsBitsToDebeziumHonest -func changeItemsBitsToDebeziumWA(bits string) string { - bufSize := imitateDebeziumBufSize(len(bits)) - return changeItemsBitsStringToDebezium(bits, bufSize) -} +/* + func changeItemsBitsToDebeziumWA(bits string) string { + bufSize := imitateDebeziumBufSize(len(bits)) + return changeItemsBitsStringToDebezium(bits, bufSize) + } func imitateDebeziumBufSize(bitsCount int) int { if bitsCount < 16 { @@ -37,6 +38,7 @@ func imitateDebeziumBufSize(bitsCount int) int { return int(math.Ceil(float64(bitsCount) / 8)) // honest count } } +*/ //--------------------------------------------------------------------------------------------------------------------- @@ -116,10 +118,12 @@ func GetTimeDivider(originalTypeWithoutProvider string) (int, error) { } } -var reTimeWithoutTZ = regexp.MustCompile(`^time\((\d)\) without time zone`) -var reTimestampWithoutTZ = regexp.MustCompile(`^timestamp\((\d)\) without time zone`) -var reMysqlTime = regexp.MustCompile(`^mysql:timestamp\((\d)\)`) -var reMysqlDatetime = regexp.MustCompile(`^mysql:datetime\((\d)\)`) +var ( + reTimeWithoutTZ = regexp.MustCompile(`^time\((\d)\) without time zone`) + reTimestampWithoutTZ = regexp.MustCompile(`^timestamp\((\d)\) without time zone`) + reMysqlTime = regexp.MustCompile(`^mysql:timestamp\((\d)\)`) + reMysqlDatetime = regexp.MustCompile(`^mysql:datetime\((\d)\)`) +) func GetTimePrecision(colTypeStr string) int { precision := -1 @@ -447,10 +451,12 @@ func DecimalToDebeziumPrimitives(decimal string, connectorParameters map[string] } } -var pgTimestampLayout0 = "2006-01-02T15:04:05Z" -var pgTimestampLayout1 = "2006-01-02 15:04:05Z" -var pgTimestampLayout2 = "2006-01-02T15:04:05-07:00" -var pgTimestampLayout3 = "2006-01-02 15:04:05-07" +var ( + pgTimestampLayout0 = "2006-01-02T15:04:05Z" + pgTimestampLayout1 = "2006-01-02 15:04:05Z" + pgTimestampLayout2 = "2006-01-02T15:04:05-07:00" + pgTimestampLayout3 = "2006-01-02 15:04:05-07" +) func ParsePgDateTimeWithTimezone(in string) (time.Time, error) { var result time.Time @@ -664,10 +670,12 @@ func ParsePgDateTimeWithTimezone2(l, r string) (time.Time, time.Time, error) { return lTime, rTime, nil } -var regexHour = *regexp.MustCompile(`(\d+):(\d+):(\d+)`) -var regexYear = *regexp.MustCompile(`(\d+) years?`) -var regexMonth = *regexp.MustCompile(`(\d+) (?:mon|months?)`) -var regexDay = *regexp.MustCompile(`(\d+) days?`) +var ( + regexHour = *regexp.MustCompile(`(\d+):(\d+):(\d+)`) + regexYear = *regexp.MustCompile(`(\d+) years?`) + regexMonth = *regexp.MustCompile(`(\d+) (?:mon|months?)`) + regexDay = *regexp.MustCompile(`(\d+) days?`) +) func ExtractPostgresIntervalArray(interval string) ([]string, error) { result := make([]string, 7) @@ -706,13 +714,15 @@ func ExtractPostgresIntervalArray(interval string) ([]string, error) { return result, nil } -var timeWithoutTZ0 = "15:04:05" -var timeWithoutTZ1 = "15:04:05.0" -var timeWithoutTZ2 = "15:04:05.00" -var timeWithoutTZ3 = "15:04:05.000" -var timeWithoutTZ4 = "15:04:05.0000" -var timeWithoutTZ5 = "15:04:05.00000" -var timeWithoutTZ6 = "15:04:05.000000" +var ( + timeWithoutTZ0 = "15:04:05" + timeWithoutTZ1 = "15:04:05.0" + timeWithoutTZ2 = "15:04:05.00" + timeWithoutTZ3 = "15:04:05.000" + timeWithoutTZ4 = "15:04:05.0000" + timeWithoutTZ5 = "15:04:05.00000" + timeWithoutTZ6 = "15:04:05.000000" +) func ParseTimeWithoutTZ(timeStr string) (time.Time, error) { var layout string @@ -742,13 +752,15 @@ func ParseTimeWithoutTZ(timeStr string) (time.Time, error) { return timeVal, nil } -var timestampWithoutTZ0 = "2006-01-02T15:04:05Z" -var timestampWithoutTZ1 = "2006-01-02T15:04:05.0Z" -var timestampWithoutTZ2 = "2006-01-02T15:04:05.00Z" -var timestampWithoutTZ3 = "2006-01-02T15:04:05.000Z" -var timestampWithoutTZ4 = "2006-01-02T15:04:05.0000Z" -var timestampWithoutTZ5 = "2006-01-02T15:04:05.00000Z" -var timestampWithoutTZ6 = "2006-01-02T15:04:05.000000Z" +var ( + timestampWithoutTZ0 = "2006-01-02T15:04:05Z" + timestampWithoutTZ1 = "2006-01-02T15:04:05.0Z" + timestampWithoutTZ2 = "2006-01-02T15:04:05.00Z" + timestampWithoutTZ3 = "2006-01-02T15:04:05.000Z" + timestampWithoutTZ4 = "2006-01-02T15:04:05.0000Z" + timestampWithoutTZ5 = "2006-01-02T15:04:05.00000Z" + timestampWithoutTZ6 = "2006-01-02T15:04:05.000000Z" +) func ParseTimestamp(timeStr string) (time.Time, error) { var layout string @@ -832,12 +844,14 @@ func BufToChangeItemsBits(in []byte) string { return result } -var yearMS = int64(31557600000000) -var monthMS = int64(2629800000000) -var dayMS = int64(3600 * 24 * 1000 * 1000) -var hourMS = int64(3600 * 1000 * 1000) -var minuteMS = int64(60 * 1000 * 1000) -var secondMS = int64(1000 * 1000) +var ( + yearMS = int64(31557600000000) + monthMS = int64(2629800000000) + dayMS = int64(3600 * 24 * 1000 * 1000) + hourMS = int64(3600 * 1000 * 1000) + minuteMS = int64(60 * 1000 * 1000) + secondMS = int64(1000 * 1000) +) func EmitPostgresInterval(val int64) string { y := val / yearMS @@ -1106,8 +1120,10 @@ func OriginalTypeWithoutProvider(originalType string) string { return originalType[index+1:] } -var pgTimeWithoutTimeZoneParam = *regexp.MustCompile(`pg:time\((\d)\) without time zone`) -var pgNumeric = *regexp.MustCompile(`pg:numeric\(\d+,\d+\)`) +var ( + pgTimeWithoutTimeZoneParam = *regexp.MustCompile(`pg:time\((\d)\) without time zone`) + pgNumeric = *regexp.MustCompile(`pg:numeric\(\d+,\d+\)`) +) func PgTimeWithoutTimeZonePrecision(originalType string) int { if originalType == "pg:time without time zone" { diff --git a/pkg/parsers/tests/generic_parser_test.go b/pkg/parsers/tests/generic_parser_test.go index 703e581f..24d73f11 100644 --- a/pkg/parsers/tests/generic_parser_test.go +++ b/pkg/parsers/tests/generic_parser_test.go @@ -140,24 +140,6 @@ func TestParser_TableSplitterSpecChar(t *testing.T) { require.Equal(t, test6TableName[:len(test6TableExpectedPrefix)], test6TableExpectedPrefix) } -func changeItemFactoryForMetrica(tableName string, appVersionName string, receiveTimestamp int64, apiKey int32, deviceID interface{}) *abstract.ChangeItem { - table1Schema := abstract.NewTableSchema(abstract.TableColumns{ - abstract.MakeTypedColSchema("AppVersionName", string(schema.TypeString), true), - abstract.MakeTypedColSchema("ReceiveTimestamp", string(schema.TypeInt64), false), - abstract.MakeTypedColSchema("APIKey", string(schema.TypeInt32), false), - abstract.MakeTypedColSchema("DeviceID", string(schema.TypeAny), false), - }) - item1 := abstract.ChangeItem{ - Kind: "Insert", - Schema: "db", - Table: tableName, - ColumnNames: []string{"AppVersionName", "ReceiveTimestamp", "APIKey", "DeviceID"}, - ColumnValues: []interface{}{appVersionName, receiveTimestamp, apiKey, deviceID}, - TableSchema: table1Schema, - } - return &item1 -} - func testTableSplitterOnChangeItem( t *testing.T, originalTableName string, @@ -224,7 +206,6 @@ func TestGenericParser_DoSensitive(t *testing.T) { fieldsWithSecretErasure++ } } - } logger.Log.Debug("Example of secret erasure", log.Any("res", res)) require.Greater(t, fieldsWithSecretErasure, 0, "there should be at least one erasured secret") @@ -245,7 +226,6 @@ func TestGenericParser_DoSensitiveDisabled(t *testing.T) { fieldsWithSecretErasure++ } } - } logger.Log.Debug("Example of keeping secrets unerasured to increase performance", log.Any("res", res)) require.Equal(t, 0, fieldsWithSecretErasure, "secrets should not be processed in order to increase performance") @@ -440,9 +420,9 @@ func TestGenericParser_Parse_vs_Do(t *testing.T) { return } require.NoError(t, err) - //genericParserImpl := GetGenericParserImpl(parser) + // genericParserImpl := GetGenericParserImpl(parser) res := parser.Do(samples.Data[k], *new(abstract.Partition)) - //batch := genericParserImpl.Parse(samples.Data[k], *new(abstract.Partition)) + // batch := genericParserImpl.Parse(samples.Data[k], *new(abstract.Partition)) var changes []abstract.ChangeItem //for batch.Next() { // ev, err := batch.Event() @@ -458,7 +438,7 @@ func TestGenericParser_Parse_vs_Do(t *testing.T) { abstract.Dump(res) t.Logf("new parser: %v len", len(changes)) abstract.Dump(changes) - //require.Equal(t, len(res), len(changes)) + // require.Equal(t, len(res), len(changes)) }) } } diff --git a/pkg/parsers/utils.go b/pkg/parsers/utils.go index 7cebbe64..b7572e3f 100644 --- a/pkg/parsers/utils.go +++ b/pkg/parsers/utils.go @@ -19,10 +19,8 @@ import ( ) type schema struct { - Path ypath.Path - Fields []abstract.ColSchema `json:"fields"` - revision int - dead chan bool + Path ypath.Path + Fields []abstract.ColSchema `json:"fields"` } func adjustType(s string) string { diff --git a/pkg/predicate/token.go b/pkg/predicate/token.go index 1bb96fde..4b1fcccd 100644 --- a/pkg/predicate/token.go +++ b/pkg/predicate/token.go @@ -80,11 +80,3 @@ func (tok Token) Precedence() int { // isOperator returns true for operator tokens. func (tok Token) isOperator() bool { return tok > operatorBegin && tok < operatorEnd } - -// tokstr returns a literal if provided, otherwise returns the token string. -func tokstr(tok Token, lit string) string { - if lit != "" { - return lit - } - return tok.String() -} diff --git a/pkg/providers/clickhouse/schema/engine.go b/pkg/providers/clickhouse/schema/engine.go index 248578b3..3fdba718 100644 --- a/pkg/providers/clickhouse/schema/engine.go +++ b/pkg/providers/clickhouse/schema/engine.go @@ -165,32 +165,6 @@ func ParseMergeTreeFamilyEngine(sql string) (*MergeTreeFamilyEngine, string, err return GetEngine(engineStr) } -type substr struct { - startIdx int - endIdx int -} - -func grepEngineFull(sql string) (substr, error) { - startIdx := strings.Index(sql, "ENGINE = ") - if startIdx == -1 { - return substr{}, xerrors.Errorf("Cannot find engine definition in '%v'", sql) - } - startIdx += len("ENGINE = ") - - endIdx := -1 - if endIdx = TryFindNextStatement(sql, startIdx); endIdx == -1 { - if endIdx = strings.Index(sql[startIdx:], ")"); endIdx == -1 { - if endIdx = strings.Index(sql[startIdx:], " "); endIdx == -1 { - return substr{}, xerrors.Errorf("Cannot find engine definition in '%v'", sql) - } - } else { - endIdx += 1 // need include closing parenthesis - } - endIdx += startIdx - } - return substr{startIdx: startIdx, endIdx: endIdx}, nil -} - func TryFindNextStatement(sql string, from int) int { possibleStatements := []string{ " ORDER BY", diff --git a/pkg/providers/delta/provider.go b/pkg/providers/delta/provider.go index 9420b899..c7d8ab1e 100644 --- a/pkg/providers/delta/provider.go +++ b/pkg/providers/delta/provider.go @@ -6,7 +6,6 @@ import ( "github.com/doublecloud/transfer/library/go/core/metrics" "github.com/doublecloud/transfer/library/go/core/xerrors" "github.com/doublecloud/transfer/pkg/abstract" - "github.com/doublecloud/transfer/pkg/abstract/coordinator" "github.com/doublecloud/transfer/pkg/abstract/model" "github.com/doublecloud/transfer/pkg/providers" "go.ytsaurus.tech/library/go/core/log" @@ -32,7 +31,6 @@ var ( type Provider struct { logger log.Logger registry metrics.Registry - cp coordinator.Coordinator transfer *model.Transfer } diff --git a/pkg/providers/greenplum/model_gp_source.go b/pkg/providers/greenplum/model_gp_source.go index b84c8ee0..9188c37b 100644 --- a/pkg/providers/greenplum/model_gp_source.go +++ b/pkg/providers/greenplum/model_gp_source.go @@ -39,9 +39,6 @@ type GpSourceAdvancedProps struct { // EnforceConsistency enables *enforcement* of consistent snapshot. When it is not set, the user is responsible for snapshot consistency EnforceConsistency bool - batchLimitRows int // deprecated: is not used anymore - batchLimitSize model.BytesSize // deprecated: is not used anymore - ServiceSchema string // AllowCoordinatorTxFailure disables coordinator TX monitoring (liveness monitor) and enables the transfer to finish snapshot successfully even if the coordinator TX fails diff --git a/pkg/providers/kinesis/source.go b/pkg/providers/kinesis/source.go index b4f7f686..17b1ed22 100644 --- a/pkg/providers/kinesis/source.go +++ b/pkg/providers/kinesis/source.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "hash/fnv" - "math/big" "strconv" "strings" "sync" @@ -216,21 +215,6 @@ const ( SequenceMask = (1 << 4) - 1 ) -// parseSeqNo try to extract lsn from seq-no -// the sequenceNumber in Kinesis streams is only guaranteed to be unique within each shard (partition key is what determines the shard). -// but we know that for certain version (186 bit length) we can extract sequence mask -// this code is extracted from here: https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/SequenceNumberValidator.java#L39 -func parseSeqNo(id string) int64 { - bigint, ok := new(big.Int).SetString(id, 10) - if !ok { - return hash(id) - } - if bigint.BitLen() != ExpectedBitLength { - return hash(id) - } - return bigint.Rsh(bigint, SequenceMask).Int64() -} - func hash(id string) int64 { algorithm := fnv.New64a() _, _ = algorithm.Write([]byte(id)) @@ -321,5 +305,4 @@ func NewSource( consumer: c, lastError: nil, }, nil - } diff --git a/pkg/providers/mongo/provider.go b/pkg/providers/mongo/provider.go index 78e9f6d6..7c287736 100644 --- a/pkg/providers/mongo/provider.go +++ b/pkg/providers/mongo/provider.go @@ -44,14 +44,6 @@ const ( ClusterTimeCollName = "__dt_cluster_time" ) -func isSystemTable(tableName string) bool { - switch tableName { - case SystemDatabase, ClusterTimeCollName: - return true - } - return false -} - const ProviderType = abstract.ProviderType("mongo") // To verify providers contract implementation diff --git a/pkg/providers/ydb/decimal/parse.go b/pkg/providers/ydb/decimal/parse.go index 5dc215de..ffc6fe60 100644 --- a/pkg/providers/ydb/decimal/parse.go +++ b/pkg/providers/ydb/decimal/parse.go @@ -3,11 +3,8 @@ package decimal import ( "fmt" "math/big" - "math/bits" ) -const wordSize = bits.UintSize / 8 - var ( ten = big.NewInt(10) zero = big.NewInt(0) diff --git a/pkg/providers/ydb/utils.go b/pkg/providers/ydb/utils.go index e23f50e8..0eb06e63 100644 --- a/pkg/providers/ydb/utils.go +++ b/pkg/providers/ydb/utils.go @@ -29,9 +29,9 @@ func filterYdbTableColumns(filter []YdbColumnsFilter, description options.Descri if err != nil { return nil, xerrors.Errorf("unable to compile regexp: %s: %w", filterRule.ColumnNamesRegexp, err) } - var filteredColumns = make([]options.Column, 0) + filteredColumns := make([]options.Column, 0) for _, column := range description.Columns { - var hasMatch = columnsToFilterRegExp.MatchString(column.Name) + hasMatch := columnsToFilterRegExp.MatchString(column.Name) if (filterRule.Type == YdbColumnsWhiteList && hasMatch) || (filterRule.Type == YdbColumnsBlackList && !hasMatch) { filteredColumns = append(filteredColumns, column) @@ -51,18 +51,6 @@ func filterYdbTableColumns(filter []YdbColumnsFilter, description options.Descri return description.Columns, nil } -func flatten(batch [][]abstract.ChangeItem) []abstract.ChangeItem { - sumSize := 0 - for _, currArr := range batch { - sumSize += len(currArr) - } - result := make([]abstract.ChangeItem, 0, sumSize) - for _, currArr := range batch { - result = append(result, currArr...) - } - return result -} - func tableSchema(ctx context.Context, db *ydb.Driver, database string, tableID abstract.TableID) (*abstract.TableSchema, error) { tablePath := path.Join(database, tableID.Namespace, tableID.Name) desc, err := describeTable(ctx, db, tablePath) diff --git a/pkg/providers/yt/lfstaging/close_gaps.go b/pkg/providers/yt/lfstaging/close_gaps.go index 0e2e3be8..b0c30c4d 100644 --- a/pkg/providers/yt/lfstaging/close_gaps.go +++ b/pkg/providers/yt/lfstaging/close_gaps.go @@ -1,28 +1,12 @@ package lfstaging import ( - "strconv" - "strings" "time" "go.ytsaurus.tech/yt/go/yt" "golang.org/x/xerrors" ) -func getTableTimestamp(node ytNode) (int64, error) { - parts := strings.Split(node.Name, "-") - if len(parts) != 2 { - return 0, xerrors.Errorf("Invalid node name '%v'", node.Name) - } - - ts, err := strconv.ParseInt(parts[0], 10, 64) - if err != nil { - return 0, xerrors.Errorf("Cannot convert value '%v' to integer: %w", parts[0], err) - } else { - return ts, nil - } -} - func closeGaps( tx yt.Tx, config *sinkConfig, diff --git a/pkg/providers/yt/lfstaging/lock.go b/pkg/providers/yt/lfstaging/lock.go deleted file mode 100644 index b4f56b77..00000000 --- a/pkg/providers/yt/lfstaging/lock.go +++ /dev/null @@ -1,40 +0,0 @@ -package lfstaging - -import ( - "context" - "time" - - "go.ytsaurus.tech/yt/go/ypath" - "go.ytsaurus.tech/yt/go/yt" - "go.ytsaurus.tech/yt/go/ytlock" - "golang.org/x/xerrors" -) - -type lock struct { - path ypath.Path - yc yt.Client -} - -func newLock(yc yt.Client, path ypath.Path) *lock { - return &lock{ - path: path, - yc: yc, - } -} - -func (l *lock) WithLock(fn func() error) error { - lock := ytlock.NewLock(l.yc, l.path) - _, err := lock.Acquire(context.Background()) - if err != nil { - time.Sleep(10 * time.Minute) - return xerrors.Errorf("Cannot acquire lock: %w", err) - } - if err := fn(); err != nil { - return err - } - err = lock.Release(context.Background()) - if err != nil { - return xerrors.Errorf("Cannot release lock: %w", err) - } - return nil -} diff --git a/pkg/providers/yt/sink/static_table.go b/pkg/providers/yt/sink/static_table.go index b070ab20..fe927874 100644 --- a/pkg/providers/yt/sink/static_table.go +++ b/pkg/providers/yt/sink/static_table.go @@ -197,15 +197,6 @@ func staticYTSchema(item abstract.ChangeItem) []schema.Column { return result } -func findCorrespondingIndex(cols []abstract.ColSchema, name string) int { - for i, colSchema := range cols { - if colSchema.ColumnName == name { - return i - } - } - return -1 -} - func getNameFromTableID(tID abstract.TableID) string { if tID.Namespace == "public" || len(tID.Namespace) == 0 { return tID.Name diff --git a/pkg/transformer/registry/filter_rows/util.go b/pkg/transformer/registry/filter_rows/util.go index 3c96ebc1..c6a769e7 100644 --- a/pkg/transformer/registry/filter_rows/util.go +++ b/pkg/transformer/registry/filter_rows/util.go @@ -10,9 +10,7 @@ import ( "github.com/doublecloud/transfer/pkg/util/set" ) -var ( - errIntOverflow = xerrors.Errorf("Provided value overflows int64") -) +var errIntOverflow = xerrors.Errorf("Provided value overflows int64") func stringToTime(str string) (time.Time, bool) { layouts := []string{ @@ -80,24 +78,6 @@ func toInt64E(i interface{}) (int64, error) { } } -func trimZeroDecimal(s string) string { - // It is copy-paste of https://github.com/spf13/cast/blob/master/caste.go trimZeroDecimal function. - var foundZero bool - for i := len(s); i > 0; i-- { - switch s[i-1] { - case '.': - if foundZero { - return s[:i-1] - } - case '0': - foundZero = true - default: - return s - } - } - return s -} - func valuesListToSet(valList filter.Value) (*set.Set[interface{}], error) { values := make([]interface{}, 0) if valList.IsIntList() { diff --git a/pkg/worker/tasks/add_tables.go b/pkg/worker/tasks/add_tables.go index 51d82c07..807cab30 100644 --- a/pkg/worker/tasks/add_tables.go +++ b/pkg/worker/tasks/add_tables.go @@ -2,7 +2,6 @@ package tasks import ( "context" - "encoding/json" "sort" "github.com/doublecloud/transfer/internal/logger" @@ -143,13 +142,3 @@ func setSourceTables(source model.Source, tableSet map[string]bool) { src.DBTables = result } } - -func prepareSourceParamsToStore(source model.Source) string { - var params string - switch src := source.(type) { - case *postgres.PgSource: - j, _ := json.Marshal(src) - params = string(j) - } - return params -} diff --git a/recipe/mongo/pkg/config/config.go b/recipe/mongo/pkg/config/config.go index 31d4df40..ddc2a441 100644 --- a/recipe/mongo/pkg/config/config.go +++ b/recipe/mongo/pkg/config/config.go @@ -184,10 +184,3 @@ func GetKey(original interface{}, path []string) (interface{}, bool) { } return GetKey(value, path[1:]) } - -func defaultizePathValue(original interface{}, path []string, value interface{}) interface{} { - if _, ok := GetKey(original, path); ok { - return OverridePathValue(original, path, value) - } - return original -} diff --git a/tests/e2e/kafka2ch/replication_mv/check_db_test.go b/tests/e2e/kafka2ch/replication_mv/check_db_test.go index 4e011d07..bcb67ae3 100644 --- a/tests/e2e/kafka2ch/replication_mv/check_db_test.go +++ b/tests/e2e/kafka2ch/replication_mv/check_db_test.go @@ -22,9 +22,8 @@ var ( kafkaTopic = "topic1" source = *kafkasink.MustSourceRecipe() - chDatabase = "public" - target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase)) - targetAsSource = *chrecipe.MustSource(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase)) + chDatabase = "public" + target = *chrecipe.MustTarget(chrecipe.WithInitDir("dump/ch"), chrecipe.WithDatabase(chDatabase)) timestampToUse = time.Date(2024, 03, 19, 0, 0, 0, 0, time.Local) ) diff --git a/tests/e2e/mysql2yt/alters/check_db_test.go b/tests/e2e/mysql2yt/alters/check_db_test.go index 431b2501..f84a85c2 100644 --- a/tests/e2e/mysql2yt/alters/check_db_test.go +++ b/tests/e2e/mysql2yt/alters/check_db_test.go @@ -27,15 +27,6 @@ import ( var ( Source = *helpers.WithMysqlInclude(helpers.RecipeMysqlSource(), []string{"__test_a", "__test_b", "__test_c", "__test_d"}) Target = yt_helpers.RecipeYtTarget("//home/cdc/test/mysql2yt_e2e_alters") - - db = os.Getenv("RECIPE_MYSQL_SOURCE_DATABASE") - - tableNames = []abstract.TableDescription{ - {Name: fmt.Sprintf("%s___test_a", db)}, - {Name: fmt.Sprintf("%s___test_b", db)}, - {Name: fmt.Sprintf("%s___test_c", db)}, - {Name: fmt.Sprintf("%s___test_d", db)}, - } ) func init() { diff --git a/tests/e2e/mysql2yt/date_time/check_db_test.go b/tests/e2e/mysql2yt/date_time/check_db_test.go index ceb1ce65..0ec97f01 100644 --- a/tests/e2e/mysql2yt/date_time/check_db_test.go +++ b/tests/e2e/mysql2yt/date_time/check_db_test.go @@ -33,7 +33,6 @@ const ( var ( source = *helpers.WithMysqlInclude(helpers.RecipeMysqlSource(), []string{tableName}) - tablePath = ypath.Path(fmt.Sprintf("//home/cdc/test/mysql2yt/date_time/%s_%s", source.Database, tableName)) targetCluster = os.Getenv("YT_PROXY") ) diff --git a/tests/e2e/pg2pg/partitioned_tables/some_parts/partitioned_tables_test.go b/tests/e2e/pg2pg/partitioned_tables/some_parts/partitioned_tables_test.go index 47e4001e..45697745 100644 --- a/tests/e2e/pg2pg/partitioned_tables/some_parts/partitioned_tables_test.go +++ b/tests/e2e/pg2pg/partitioned_tables/some_parts/partitioned_tables_test.go @@ -3,7 +3,6 @@ package replication import ( "context" "os" - "strconv" "testing" "time" @@ -29,8 +28,7 @@ var ( ), pgrecipe.WithEdit(func(pg *postgres.PgSource) { pg.UseFakePrimaryKey = true })) - dstPort, _ = strconv.Atoi(os.Getenv("DB0_PG_LOCAL_PORT")) - Target = *pgrecipe.RecipeTarget(pgrecipe.WithPrefix("DB0_")) + Target = *pgrecipe.RecipeTarget(pgrecipe.WithPrefix("DB0_")) ) func init() { @@ -178,24 +176,24 @@ func Load(t *testing.T) { compareParams := helpers.NewCompareStorageParams() compareParams.TableFilter = func(tables abstract.TableMap) []abstract.TableDescription { return []abstract.TableDescription{ - abstract.TableDescription{ + { Name: "measurement_inherited", Schema: "public", }, - abstract.TableDescription{ + { Name: "measurement_inherited_y2006m02", Schema: "public", }, - abstract.TableDescription{ + { Name: "measurement_inherited_y2006m04", Schema: "public", }, - //skip measurement_declarative because of turned UseFakePrimaryKey option on (limitation of outdated 10.5 PG version) - abstract.TableDescription{ + // skip measurement_declarative because of turned UseFakePrimaryKey option on (limitation of outdated 10.5 PG version) + { Name: "measurement_declarative_y2006m02", Schema: "public", }, - abstract.TableDescription{ + { Name: "measurement_declarative_y2006m04", Schema: "public", }, diff --git a/tests/e2e/pg2pg/replication/check_db_test.go b/tests/e2e/pg2pg/replication/check_db_test.go index 8a3c5c45..18f96c5b 100644 --- a/tests/e2e/pg2pg/replication/check_db_test.go +++ b/tests/e2e/pg2pg/replication/check_db_test.go @@ -3,7 +3,6 @@ package replication import ( "context" "os" - "strconv" "testing" "time" @@ -21,7 +20,6 @@ import ( var ( TransferType = abstract.TransferTypeSnapshotAndIncrement Source = *pgrecipe.RecipeSource(pgrecipe.WithInitDir("dump"), pgrecipe.WithPrefix("")) - dstPort, _ = strconv.Atoi(os.Getenv("DB0_PG_LOCAL_PORT")) Target = *pgrecipe.RecipeTarget(pgrecipe.WithPrefix("DB0_")) ) diff --git a/tests/e2e/pg2yt/cdc_partial_activate/check_db_test.go b/tests/e2e/pg2yt/cdc_partial_activate/check_db_test.go index 3b53b50d..67cbe6f7 100644 --- a/tests/e2e/pg2yt/cdc_partial_activate/check_db_test.go +++ b/tests/e2e/pg2yt/cdc_partial_activate/check_db_test.go @@ -18,13 +18,14 @@ import ( ) var ( - srcPort = helpers.GetIntFromEnv("PG_LOCAL_PORT") - Source = *pgrecipe.RecipeSource(pgrecipe.WithPrefix(""), pgrecipe.WithInitDir("dump"), pgrecipe.WithDBTables("public.__test")) - Target = yt_helpers.RecipeYtTarget("//home/cdc/test/pg2yt_e2e") + Source = *pgrecipe.RecipeSource(pgrecipe.WithPrefix(""), pgrecipe.WithInitDir("dump"), pgrecipe.WithDBTables("public.__test")) + Target = yt_helpers.RecipeYtTarget("//home/cdc/test/pg2yt_e2e") ) -const CursorField = "id" -const CursorValue = "5" +const ( + CursorField = "id" + CursorValue = "5" +) func init() { _ = os.Setenv("YC", "1") // to not go to vanga @@ -78,11 +79,11 @@ func Load(t *testing.T) { ] } `)) - //start cdc + // start cdc worker := helpers.Activate(t, transfer) require.NotNil(t, worker, "Transfer is not activated") - //check snapshot loaded + // check snapshot loaded conn, err := pgcommon.MakeConnPoolFromSrc(&Source, logger.Log) require.NoError(t, err) @@ -93,12 +94,12 @@ func Load(t *testing.T) { require.NoError(t, helpers.WaitDestinationEqualRowsCount("public", "__test", storage, 60*time.Second, expectedYtRows), "Wrong row number after first snapshot round!") - //add some data to pg + // add some data to pg expectedYtRows = addSomeDataAndGetExpectedCount(t, conn) require.NoError(t, helpers.WaitDestinationEqualRowsCount("public", "__test", helpers.GetSampleableStorageByModel(t, Target.LegacyModel()), 60*time.Second, expectedYtRows)) worker.Close(t) - //read data from target + // read data from target require.NoError(t, storage.LoadTable(context.Background(), abstract.TableDescription{ Name: "__test", Schema: "", @@ -144,7 +145,7 @@ func addSomeDataAndGetExpectedCount(t *testing.T, conn *pgxpool.Pool) uint64 { extraItems += 3 _, err = conn.Exec(context.Background(), "delete from __test where str='rrr' or str='eee';") require.NoError(t, err) - extraItems += 2 //item before deletion + deleted event + extraItems += 2 // item before deletion + deleted event return currentDBRows + extraItems }