Skip to content

Commit

Permalink
Replace deprecated code
Browse files Browse the repository at this point in the history
Replace some deprecated code.

Some where left as-is as there were possible breaking changes that need to be further investigated. Those include protobuf libs and bson.

---

Pull Request resolved: doublecloud/transfer#182
commit_hash:ac8eb363dee6e18bccd1b91e43039626922bbfe1
  • Loading branch information
kamushadenes authored and robot-piglet committed Jan 29, 2025
1 parent cf2963c commit d5e9e39
Show file tree
Hide file tree
Showing 25 changed files with 52 additions and 78 deletions.
4 changes: 2 additions & 2 deletions internal/config/nirvana.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package config
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
)

type JobContext struct {
Expand Down Expand Up @@ -35,7 +35,7 @@ type JobContext struct {
}

func TryParseNirvanaConfig() (*Config, error) {
dat, err := ioutil.ReadFile("job_context.json")
dat, err := os.ReadFile("job_context.json")
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/metrics/pidstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"math"
"os"
"os/exec"
"path"
"runtime"
Expand Down Expand Up @@ -78,7 +78,7 @@ func stat(pid int, statType string) (*SysInfo, error) {
var clkTck float64 = 100
var pageSize float64 = 4096

uptimeFileBytes, _ := ioutil.ReadFile(path.Join("/proc", "uptime"))
uptimeFileBytes, _ := os.ReadFile(path.Join("/proc", "uptime"))
uptime := parseFloat(strings.Split(string(uptimeFileBytes), " ")[0])

clkTckStdout, err := exec.Command("getconf", "CLK_TCK").Output()
Expand All @@ -91,7 +91,7 @@ func stat(pid int, statType string) (*SysInfo, error) {
pageSize = parseFloat(formatStdOut(pageSizeStdout, 0)[0])
}

procStatFileBytes, _ := ioutil.ReadFile(path.Join("/proc", strconv.Itoa(pid), "stat"))
procStatFileBytes, _ := os.ReadFile(path.Join("/proc", strconv.Itoa(pid), "stat"))
splitAfter := strings.SplitAfter(string(procStatFileBytes), ")")

if len(splitAfter) == 0 || len(splitAfter) == 1 {
Expand Down
6 changes: 3 additions & 3 deletions pkg/providers/airbyte/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"os"
"os/exec"
"sort"
Expand Down Expand Up @@ -166,7 +166,7 @@ func (a *Storage) LoadTable(ctx context.Context, table abstract.TableDescription
if err := a.storeState(table.ID(), currentState); err != nil {
return xerrors.Errorf("unable to store incremental state: %w", err)
}
data, err := ioutil.ReadAll(stderr)
data, err := io.ReadAll(stderr)
if err != nil {
return xerrors.Errorf("%s stderr read all failed: %w", table.ID().String(), err)
}
Expand Down Expand Up @@ -323,7 +323,7 @@ func (a *Storage) writeFile(fileName, fileData string) error {
fullPath := fmt.Sprintf("%v/%v", a.config.DataDir(), fileName)
a.logger.Debugf("%s -> \n%s", fileName, fileData)
defer a.logger.Infof("file(%s) %s written", format.SizeInt(len(fileData)), fullPath)
return ioutil.WriteFile(
return os.WriteFile(
fullPath,
[]byte(fileData),
0664,
Expand Down
4 changes: 2 additions & 2 deletions pkg/providers/elastic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package elastic

import (
"fmt"
"io/ioutil"
"io"
"net"
"reflect"
"unsafe"
Expand Down Expand Up @@ -126,7 +126,7 @@ func getResponseBody(res *esapi.Response, err error) ([]byte, error) {
}
defer res.Body.Close()

body, err := ioutil.ReadAll(res.Body)
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, xerrors.Errorf("failed to read response body: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/providers/eventhub/eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const (
var namespace, hub, consumerGroup string

func init() {
rand.Seed(time.Now().UnixNano())
namespace = os.Getenv("EVENTHUB_NAMESPACE")
hub = os.Getenv("EVENTHUB_NAME")
consumerGroup = os.Getenv("EVENTHUB_CONSUMER_GROUP")
Expand Down
6 changes: 2 additions & 4 deletions pkg/providers/mongo/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.ytsaurus.tech/library/go/core/log"
Expand Down Expand Up @@ -140,7 +139,6 @@ func (f *keyBatcher) PushKeyChangeEvent(keyEvent *keyChangeEvent) error {
defer f.batchMutex.Unlock()
return f.flush() // f.batchMutex taken
}()

if err != nil {
return xerrors.Errorf("Couldn't flush batch: %w", err)
}
Expand Down Expand Up @@ -219,7 +217,7 @@ func (f *keyBatcher) putInBatch(chEvent *keyChangeEvent) error {
}

if oplogEvent, exists := f.batch[kd]; exists {
if primitive.CompareTimestamp(oplogEvent.keyEvent.ClusterTime, event.ClusterTime) <= 0 {
if oplogEvent.keyEvent.ClusterTime.Compare(event.ClusterTime) <= 0 {
// refresh key time to newer time and register collapse
oplogEvent.keyEvent.OperationType = event.OperationType
oplogEvent.keyEvent.ClusterTime = event.ClusterTime
Expand Down Expand Up @@ -317,7 +315,7 @@ func (f *keyBatcher) pushUnorderedChangeEvents(changeEventSlice []*changeEvent)
sort.Slice(changeEventSlice, func(i, j int) bool {
tsi := changeEventSlice[i].event.ClusterTime
tsj := changeEventSlice[j].event.ClusterTime
return primitive.CompareTimestamp(tsi, tsj) > 0
return tsi.Compare(tsj) > 0
})
// serially provide events to pusher
for _, ce := range changeEventSlice {
Expand Down
13 changes: 5 additions & 8 deletions pkg/providers/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"strings"
"time"

Expand Down Expand Up @@ -142,17 +142,12 @@ func Connect(ctx context.Context, opts MongoConnectionOptions, lgr log.Logger) (
}

func newClient(ctx context.Context, connOpts *options.ClientOptions, lgr log.Logger) (*mongo.Client, error) {
client, err := mongo.NewClient(connOpts)
client, err := mongo.Connect(ctx, connOpts)

isWithPassword := bool(connOpts != nil && connOpts.Auth != nil && len(connOpts.Auth.Password) > 0)
if err != nil && isWithPassword && strings.Contains(err.Error(), connOpts.Auth.Password) {
err = xerrors.New(strings.ReplaceAll(err.Error(), connOpts.Auth.Password, "<secret>"))
}

if err != nil {
return nil, xerrors.Errorf("unable to create mongo client: %w", err)
}
if err := client.Connect(ctx); err != nil {
return nil, xerrors.Errorf("unable to connect mongo client: %w", err)
}

Expand Down Expand Up @@ -221,6 +216,7 @@ func getClusterInfo(endpoint MongoConnectionOptions) (hosts []string, sharded bo
}
return hosts, sharded, nil
}

func DriverConnectionSrvOptions(mongoConnectionOptions *MongoConnectionOptions) (*options.ClientOptions, error) {
if len(mongoConnectionOptions.Hosts) != 1 {
return nil, xerrors.Errorf("Cannot be empty or more than hosts in srv connection")
Expand Down Expand Up @@ -256,6 +252,7 @@ func DriverConnectionSrvOptions(mongoConnectionOptions *MongoConnectionOptions)
}
return clientOptions, nil
}

func DriverConnectionOptions(mongoConnectionOptions *MongoConnectionOptions) (*options.ClientOptions, error) {
opts := options.ClientOptions{}
hosts, sharded, err := getClusterInfo(*mongoConnectionOptions)
Expand Down Expand Up @@ -307,7 +304,7 @@ func newTLSConfig(caCert TrustedCACertificate) (*tls.Config, error) {
case CACertificatePEMFilePaths:
caFilePaths := downcasted
for _, cert := range caFilePaths {
pemFileContent, err := ioutil.ReadFile(string(cert))
pemFileContent, err := os.ReadFile(string(cert))
if err != nil {
return nil, xerrors.Errorf("Cannot read file %s: %w", cert, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/providers/mongo/local_oplog_rs_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (w localOplogRsWatcher) watchBatchFrom(ctx context.Context, ts primitive.Ti
w.logger.Infof("Begin watching batch from time %v", FromMongoTimestamp(ts))
// check that timestamp is still in oplog
from, to, err := GetLocalOplogInterval(ctx, w.client)
if primitive.CompareTimestamp(ts, from) < 0 {
if ts.Compare(from) < 0 {
return until, eventsRead, xerrors.Errorf("local.oplog.rs watcher is out of timeline. Requested timestamp %v, actual interval: [%v, %v]",
FromMongoTimestamp(ts),
FromMongoTimestamp(from),
Expand Down
17 changes: 8 additions & 9 deletions pkg/providers/s3/sink/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"compress/gzip"
"fmt"
"io/ioutil"
"io"
"os"
"sync"
"testing"
Expand All @@ -31,12 +31,12 @@ func canonFile(t *testing.T, client *s3.S3, bucket, file string) {
Key: aws.String(file),
})
require.NoError(t, err)
data, err := ioutil.ReadAll(obj.Body)
data, err := io.ReadAll(obj.Body)
require.NoError(t, err)
logger.Log.Infof("read data: %v", format.SizeInt(len(data)))
unzipped, err := gzip.NewReader(bytes.NewReader(data))
require.NoError(t, err)
unzippedData, err := ioutil.ReadAll(unzipped)
unzippedData, err := io.ReadAll(unzipped)
require.NoError(t, err)
logger.Log.Infof("unpack data: %v", format.SizeInt(len(unzippedData)))
logger.Log.Infof("%s content:\n%s", file, string(unzippedData))
Expand Down Expand Up @@ -175,13 +175,13 @@ func TestS3SinkerUploadTableGzip(t *testing.T) {
}))
}()
require.NoError(t, err)
data, err := ioutil.ReadAll(obj.Body)
data, err := io.ReadAll(obj.Body)
require.NoError(t, err)
logger.Log.Infof("read data: %v", format.SizeInt(len(data)))
require.True(t, len(data) > 0)
unzipped, err := gzip.NewReader(bytes.NewReader(data))
require.NoError(t, err)
unzippedData, err := ioutil.ReadAll(unzipped)
unzippedData, err := io.ReadAll(unzipped)
require.NoError(t, err)
logger.Log.Infof("unpack data: %v", format.SizeInt(len(unzippedData)))
require.Len(t, unzippedData, 7111120)
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestJsonReplication(t *testing.T) {
})
require.NoError(t, err)

data, err := ioutil.ReadAll(obj.Body)
data, err := io.ReadAll(obj.Body)
require.NoError(t, err)
require.Equal(t, string(data), tc.expectedResult)
})
Expand Down Expand Up @@ -299,13 +299,13 @@ func TestRawReplication(t *testing.T) {
defer require.NoError(t, sinker.Push([]abstract.ChangeItem{
{Kind: abstract.DropTableKind, CommitTime: uint64(time.Now().UnixNano()), Table: "test_table"},
}))
data, err := ioutil.ReadAll(obj.Body)
data, err := io.ReadAll(obj.Body)
require.NoError(t, err)
logger.Log.Infof("read data: %v", format.SizeInt(len(data)))
require.True(t, len(data) > 0)
unzipped, err := gzip.NewReader(bytes.NewReader(data))
require.NoError(t, err)
unzippedData, err := ioutil.ReadAll(unzipped)
unzippedData, err := io.ReadAll(unzipped)
require.NoError(t, err)
logger.Log.Infof("unpack data: %v", format.SizeInt(len(unzippedData)))
require.Len(t, unzippedData, 21890)
Expand Down Expand Up @@ -463,7 +463,6 @@ func TestParquetReadAfterWrite(t *testing.T) {

require.NoError(t, sinker.Push(round1))
require.NoError(t, sinker.Close())

}

func TestRawReplicationHugeFiles(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/providers/ydb/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,6 @@ func (s *sinker) insert(tablePath ydbPath, batch []abstract.ChangeItem) error {
}
return nil
})

if err != nil {
return xerrors.Errorf("unable to insert with legacy writer:\n %w", err)
}
Expand Down Expand Up @@ -1022,15 +1021,15 @@ func (s *sinker) ydbVal(dataType, originalType string, val interface{}) (types.V
case schema.TypeBytes:
switch v := val.(type) {
case string:
return types.StringValue([]byte(v)), false, nil
return types.BytesValue([]byte(v)), false, nil
case []uint8:
return types.StringValue(v), false, nil
return types.BytesValue(v), false, nil
default:
r, err := json.Marshal(val)
if err != nil {
return nil, false, xerrors.Errorf("unable to json marshal: %w", err)
}
return types.StringValue(r), false, nil
return types.BytesValue(r), false, nil
}
case schema.TypeString:
switch v := val.(type) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/providers/yt/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package yt
import (
"context"
"io"
"math/rand"
"os"
"time"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
Expand Down Expand Up @@ -78,7 +76,6 @@ func uploadExe(exePrefix, exePath string) error {
}
defer client.Stop()

rand.Seed(time.Now().UnixNano())
exeVersion = exePrefix + randutil.GenerateAlphanumericString(8)
ExePath = DataplaneExecutablePath("", exeVersion)
if _, err := client.CreateNode(context.Background(), ExePath, yt.NodeFile, &yt.CreateNodeOptions{Recursive: true}); err != nil {
Expand Down
9 changes: 4 additions & 5 deletions pkg/providers/yt/merge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package merge

import (
"context"
"math/rand"
"testing"
"time"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
Expand Down Expand Up @@ -43,7 +41,6 @@ func testYT(t *testing.T, testName string, test func(t *testing.T, ctx context.C
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rand.Seed(time.Now().UnixNano())
testDir := randutil.GenerateAlphanumericString(10)

path := yt2.SafeChild("//home/cdc/test", testName, testDir)
Expand Down Expand Up @@ -93,7 +90,8 @@ func TestMergeBasic(t *testing.T) {
{Key: "7"},
{Key: "8"},
{Key: "9"},
}}, false)
},
}, false)
nodes, err := yt2.ListNodesWithAttrs(ctx, client, path, "", false)
require.NoError(t, err)
require.Equal(t, 1, len(nodes))
Expand Down Expand Up @@ -136,7 +134,8 @@ func TestMergeBasicCleanup(t *testing.T) {
{Key: "7"},
{Key: "8"},
{Key: "9"},
}}, false)
},
}, false)
nodes, err := yt2.ListNodesWithAttrs(ctx, client, path, "", false)
require.NoError(t, err)
require.Equal(t, 1, len(nodes))
Expand Down
3 changes: 0 additions & 3 deletions pkg/providers/yt/tests/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package yt
import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
Expand Down Expand Up @@ -89,7 +87,6 @@ func TestMountUnmount(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rand.Seed(time.Now().UnixNano())
testDir := randutil.GenerateAlphanumericString(10)

path := ypath.Path("//home/cdc/test/mount_unmount").Child(testDir)
Expand Down
6 changes: 2 additions & 4 deletions pkg/randutil/randutil.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package randutil

import (
"math/rand"
"crypto/rand"
)

var (
AlphanumericValues string
)
var AlphanumericValues string

func init() {
var alphanumericValues []byte
Expand Down
Loading

0 comments on commit d5e9e39

Please sign in to comment.