Skip to content

Commit

Permalink
Do not check schema compatibility for same hashed schemas
Browse files Browse the repository at this point in the history
To speedup inserts
commit_hash:b2de3895cbe08fa9087e3893ce9c5a5ad280afa0
  • Loading branch information
laskoviymishka committed Jan 30, 2025
1 parent 716aef6 commit e646c61
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 7 deletions.
1 change: 1 addition & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,7 @@
"pkg/providers/yt/recipe/yt_helpers.go":"transfer_manager/go/pkg/providers/yt/recipe/yt_helpers.go",
"pkg/providers/yt/reference/canondata/result.json":"transfer_manager/go/pkg/providers/yt/reference/canondata/result.json",
"pkg/providers/yt/reference/reference_test.go":"transfer_manager/go/pkg/providers/yt/reference/reference_test.go",
"pkg/providers/yt/sink/bechmarks/sorted_table_bench_test.go":"transfer_manager/go/pkg/providers/yt/sink/bechmarks/sorted_table_bench_test.go",
"pkg/providers/yt/sink/change_item_view.go":"transfer_manager/go/pkg/providers/yt/sink/change_item_view.go",
"pkg/providers/yt/sink/common.go":"transfer_manager/go/pkg/providers/yt/sink/common.go",
"pkg/providers/yt/sink/common_test.go":"transfer_manager/go/pkg/providers/yt/sink/common_test.go",
Expand Down
12 changes: 12 additions & 0 deletions pkg/abstract/changeitem/change_item_collapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,25 @@ func compareColumns(old, new []string) (bool, map[string]int, map[string]int, []
return false, oldM, newM, total
}

func InsertsOnly(input []ChangeItem) bool {
for _, r := range input {
if r.Kind != InsertKind && r.Kind != SynchronizeKind {
return false
}
}
return true
}

// Collapse collapses (possible) multiple items in the input into a single (or none) items in the output.
// Currently, it preserves the order of items in the result.
// It should only be applied by sinks which support PRIMARY KEYs. For them the order of items is considered to be of no importance.
func Collapse(input []ChangeItem) []ChangeItem {
if len(input) < 2 {
return input
}
if InsertsOnly(input) {
return input
}

// to preserve the order
idxToHashK := map[int]string{}
Expand Down
12 changes: 12 additions & 0 deletions pkg/abstract/changeitem/table_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ func (s *TableSchema) Hash() (string, error) {
return s.hash, nil
}

func (s *TableSchema) Equal(o *TableSchema) bool {
sh, err := s.Hash()
if err != nil {
return false
}
oh, err := o.Hash()
if err != nil {
return false
}
return sh == oh
}

func NewTableSchema(columns []ColSchema) *TableSchema {
return &TableSchema{
columns: columns,
Expand Down
185 changes: 185 additions & 0 deletions pkg/providers/yt/sink/bechmarks/sorted_table_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package bechmarks

import (
"context"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/internal/metrics"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
client2 "github.com/doublecloud/transfer/pkg/abstract/coordinator"
yt2 "github.com/doublecloud/transfer/pkg/providers/yt"
"github.com/doublecloud/transfer/pkg/providers/yt/sink"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.ytsaurus.tech/yt/go/schema"
"go.ytsaurus.tech/yt/go/ypath"
"go.ytsaurus.tech/yt/go/yt"
)

type overrideable interface {
OverrideClient(client yt.Client)
}

type fakeYTTX struct {
yt.TabletTx
}

func (fakeYTTX) InsertRows(
ctx context.Context,
path ypath.Path,
rows []any,
options *yt.InsertRowsOptions,
) (err error) {
return nil
}

func (fakeYTTX) Abort() error {
return nil
}

func (fakeYTTX) Commit() error {
return nil
}

type fakeYT struct {
yt.Client
cols []schema.Column
}

func (fakeYT) NodeExists(
ctx context.Context,
path ypath.YPath,
options *yt.NodeExistsOptions,
) (ok bool, err error) {
return true, nil
}

func (fakeYT) BeginTabletTx(ctx context.Context, options *yt.StartTabletTxOptions) (tx yt.TabletTx, err error) {
return &fakeYTTX{}, nil
}

func (f fakeYT) GetNode(
ctx context.Context,
path ypath.YPath,
result any,
options *yt.GetNodeOptions,
) (err error) {
resPtr, ok := result.(*struct {
Schema schema.Schema `yson:"schema"`
TabletState string `yson:"expected_tablet_state"`
})
if !ok {
return xerrors.Errorf("result must be a pointer to the expected struct")
}

resPtr.TabletState = yt.TabletMounted
resPtr.Schema = schema.Schema{
Strict: aws.Bool(true),
UniqueKeys: true,
Columns: f.cols,
}

return nil
}

func BenchmarkSinkWrite(b *testing.B) {
scenario := func(b *testing.B, table abstract.Sinker, size int, ci abstract.ChangeItem) {
var data []abstract.ChangeItem
for range size {
data = append(data, ci)
}
err := table.Push(data)
b.SetBytes(int64(ci.Size.Values) * int64(size))
require.NoError(b, err)
}

b.Run("simple", func(b *testing.B) {
schema_ := abstract.NewTableSchema([]abstract.ColSchema{
{
DataType: "double",
ColumnName: "test",
PrimaryKey: true,
},
{
DataType: "datetime",
ColumnName: "_timestamp",
PrimaryKey: true,
},
})
row := abstract.ChangeItem{
TableSchema: schema_,
Table: "test",
Kind: "insert",
ColumnNames: []string{"test", "_timestamp"},
ColumnValues: []interface{}{3.99, time.Now()},
}
b.Run("dt_hack", func(b *testing.B) {
cfg := yt2.NewYtDestinationV1(yt2.YtDestination{
CellBundle: "default",
PrimaryMedium: "default",
DisableDatetimeHack: false,
CanAlter: true,
})
cfg.WithDefaults()
table, err := sink.NewSinker(cfg, "some_uniq_transfer_id", 0, logger.LoggerWithLevel(zapcore.WarnLevel), metrics.NewRegistry(), client2.NewFakeClient(), nil)
require.NoError(b, err)
if o, ok := table.(overrideable); ok {
o.OverrideClient(&fakeYT{cols: []schema.Column{{
Name: "test",
Type: "double",
SortOrder: "ascending",
}, {
Name: "_timestamp",
Type: "int64",
SortOrder: "ascending",
}, {
Name: "__dummy",
Type: "any",
}}})
}
b.Run("10_000", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
scenario(b, table, 10_000, row)
}
b.ReportAllocs()
})
})
b.Run("no_dt_hack", func(b *testing.B) {
cfg := yt2.NewYtDestinationV1(yt2.YtDestination{
CellBundle: "default",
PrimaryMedium: "default",
DisableDatetimeHack: true,
CanAlter: true,
})
cfg.WithDefaults()
table, err := sink.NewSinker(cfg, "some_uniq_transfer_id", 0, logger.LoggerWithLevel(zapcore.WarnLevel), metrics.NewRegistry(), client2.NewFakeClient(), nil)
require.NoError(b, err)
if o, ok := table.(overrideable); ok {
o.OverrideClient(&fakeYT{cols: []schema.Column{{
Name: "test",
Type: "double",
SortOrder: "ascending",
}, {
Name: "_timestamp",
Type: "datetime",
SortOrder: "ascending",
}, {
Name: "__dummy",
Type: "any",
}}})
}
b.Run("10_000", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
scenario(b, table, 10_000, row)
}
b.ReportAllocs()
})
})
})
}
28 changes: 24 additions & 4 deletions pkg/providers/yt/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/library/go/core/xerrors/multierr"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/changeitem"
"github.com/doublecloud/transfer/pkg/abstract/coordinator"
"github.com/doublecloud/transfer/pkg/abstract/model"
"github.com/doublecloud/transfer/pkg/maplock"
Expand Down Expand Up @@ -286,6 +287,9 @@ func (s *sinker) checkPrimaryKeyChanges(input []abstract.ChangeItem) error {
return nil
}

if changeitem.InsertsOnly(input) {
return nil
}
start := time.Now()
for i := range input {
item := &input[i]
Expand Down Expand Up @@ -525,12 +529,19 @@ func (s *sinker) pushOneBatch(table string, batch []abstract.ChangeItem) error {
return xerrors.Errorf("Check table (%v) error: %w", table, err)
}

// YT have a-b-a problem with PKey update, this would split such changes in sub-batches without PKey updates.
for _, subslice := range abstract.SplitUpdatedPKeys(batch) {
if err := s.pushSlice(subslice, table); err != nil {
if changeitem.InsertsOnly(batch) {
if err := s.pushSlice(batch, table); err != nil {
return xerrors.Errorf("unable to upload batch: %w", err)
}
s.logger.Infof("Upload %v changes delay %v", len(subslice), time.Since(start))
s.logger.Infof("Upload %v changes delay %v", len(batch), time.Since(start))
} else {
// YT have a-b-a problem with PKey update, this would split such changes in sub-batches without PKey updates.
for _, subslice := range abstract.SplitUpdatedPKeys(batch) {
if err := s.pushSlice(subslice, table); err != nil {
return xerrors.Errorf("unable to upload batch: %w", err)
}
s.logger.Infof("Upload %v changes delay %v", len(subslice), time.Since(start))
}
}
return nil
}
Expand Down Expand Up @@ -848,6 +859,10 @@ func (s *sinker) newStaticTable(schema []abstract.ColSchema, table string) (Gene
return NewSingleStaticTable(s.ytClient, s.dir, table, schema, s.config, s.jobIndex, s.transferID, s.config.CleanupMode(), s.metrics, s.logger, s.pathToBinary)
}

func (s *sinker) OverrideClient(client yt.Client) {
s.ytClient = client
}

func NewSinker(
cfg yt2.YtDestinationModel,
transferID string,
Expand Down Expand Up @@ -923,6 +938,7 @@ func newSinker(cfg yt2.YtDestinationModel, transferID string, jobIndex int, lgr

func NewGenericTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error) {
logger.Info("create generic table", log.Any("name", path), log.Any("schema", schema))
originalSchema := schema
if !cfg.DisableDatetimeHack() {
schema = hackTimestamps(schema)
logger.Warn("nasty hack that replace datetime -> int64", log.Any("name", path), log.Any("schema", schema))
Expand Down Expand Up @@ -960,6 +976,10 @@ func NewGenericTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColS
if err != nil {
return nil, xerrors.Errorf("cannot create sorted table: %w", err)
}
if !cfg.DisableDatetimeHack() {
// this hack force agly code, if hack is enabled we rebuild EVERY change item schema, which is very costly
sortedTable.tableSchema = abstract.NewTableSchema(originalSchema)
}
return sortedTable, nil
}

Expand Down
18 changes: 17 additions & 1 deletion pkg/providers/yt/sink/sorted_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/changeitem"
yt2 "github.com/doublecloud/transfer/pkg/providers/yt"
"github.com/doublecloud/transfer/pkg/stats"
"go.ytsaurus.tech/library/go/core/log"
Expand All @@ -29,6 +30,7 @@ type SortedTable struct {
archiveSpawned bool
config yt2.YtDestinationModel
sem *semaphore.Weighted
tableSchema *changeitem.TableSchema
}

func (t *SortedTable) Init() error {
Expand Down Expand Up @@ -104,6 +106,9 @@ func (t *SortedTable) prepareDataRows(input []abstract.ChangeItem, commitTime ui
var dataBatch ytDataBatch
dataBatch.insertOptions.Update = &upd
dataBatch.deleteRows = t.makeDataRowDeleter(commitTime)
if changeitem.InsertsOnly(input) {
dataBatch.toInsert = make([]any, 0, len(input))
}

for _, item := range input {
if item.Kind == abstract.UpdateKind {
Expand Down Expand Up @@ -248,6 +253,9 @@ func (t *SortedTable) Write(input []abstract.ChangeItem) error {
}

for _, item := range input {
if t.tableSchema.Equal(item.TableSchema) {
continue
}
schemaCompatible, err := t.ensureSchema(item.TableSchema.Columns())
if err != nil {
return xerrors.Errorf("Table %s: %w", t.path.String(), err)
Expand Down Expand Up @@ -442,7 +450,14 @@ func (t *SortedTable) spawnArchive(ctx context.Context) error {
return nil
}

func NewSortedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error) {
func NewSortedTable(
ytClient yt.Client,
path ypath.Path,
schema []abstract.ColSchema,
cfg yt2.YtDestinationModel,
metrics *stats.SinkerStats,
logger log.Logger,
) (*SortedTable, error) {
t := SortedTable{
ytClient: ytClient,
path: path,
Expand All @@ -453,6 +468,7 @@ func NewSortedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSc
archiveSpawned: false,
config: cfg,
sem: semaphore.NewWeighted(10),
tableSchema: changeitem.NewTableSchema(schema),
}

if err := t.Init(); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/providers/yt/sink/table_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func (t *tableColumns) hasKey(name columnName) bool {
if !ok {
return false
}
col := &t.columns[columnPos]
return col.PrimaryKey
return t.columns[columnPos].PrimaryKey
}

func newTableColumns(columns []abstract.ColSchema) tableColumns {
Expand Down

0 comments on commit e646c61

Please sign in to comment.