Skip to content

Commit

Permalink
IG-19426: fix client.write() when first column is all null (#617)
Browse files Browse the repository at this point in the history
* Fix typo

* Use first index for dataframe length, not first column.

* More resilient solution.

* Add integration test for IG-19426.

* impi...
  • Loading branch information
gtopper authored Sep 30, 2021
1 parent e3b59ab commit c380cf3
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 4 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (api *API) Write(request *frames.WriteRequest, in chan frames.Frame) (int,
return nFrames, nRows, errors.Wrap(err, msg)
}
} else {
api.logger.DebugWith("write request with zero rows", "frames", nFrames, "requst", request)
api.logger.DebugWith("write request with zero rows", "frames", nFrames, "request", request)
}

ingestDuration := time.Since(ingestStartTime)
Expand Down
2 changes: 2 additions & 0 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (c *colImpl) Len() int {
return len(c.msg.Times)
case pb.DType_BOOLEAN:
return len(c.msg.Bools)
case pb.DType_NULL:
return -1
}

// TODO: panic?
Expand Down
11 changes: 8 additions & 3 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,15 @@ func (fr *frameImpl) Labels() map[string]interface{} {

// Len is the number of rows
func (fr *frameImpl) Len() int {
if len(fr.columns) > 0 {
return fr.columns[0].Len()
// Use length of first index of length > 0, otherwise the first such column.
for _, slice := range [][]Column{fr.indices, fr.columns} {
for _, column := range slice {
len := column.Len()
if len >= 0 {
return len
}
}
}

return 0
}

Expand Down
99 changes: 99 additions & 0 deletions test/kv_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,105 @@ func (kvSuite *KvTestSuite) TestNullValuesWrite() {
kvSuite.Require().NoError(iter.Err(), "error querying items got")
}

// IG-19426
func (kvSuite *KvTestSuite) TestNullColumnWrite() {
table := fmt.Sprintf("kv_test_nulls%d", time.Now().UnixNano())

index := []string{"mike", "joe", "jim"}
indexCol := &pb.Column{
Kind: pb.Column_SLICE,
Name: "idx",
Dtype: pb.DType_STRING,
Strings: index,
}

columns := []*pb.Column{
{
Kind: pb.Column_SLICE,
Name: "n1",
Dtype: pb.DType_NULL,
},
{
Kind: pb.Column_SLICE,
Name: "n2",
Dtype: pb.DType_INTEGER,
Ints: []int64{1, 2, 3},
},
}

nullValues := initializeNullColumns(len(index))
nullValues[0].NullColumns["n1"] = true
nullValues[1].NullColumns["n1"] = true
nullValues[2].NullColumns["n1"] = true

frame := frames.NewFrameFromProto(&pb.Frame{
Columns: columns,
Indices: []*pb.Column{indexCol},
NullValues: nullValues,
})

wreq := &frames.WriteRequest{
Backend: kvSuite.backendName,
Table: table,
}

appender, err := kvSuite.client.Write(wreq)
kvSuite.Require().NoError(err)

err = appender.Add(frame)
kvSuite.Require().NoError(err)

err = appender.WaitForComplete(3 * time.Second)
kvSuite.Require().NoError(err)

input := v3io.GetItemsInput{AttributeNames: []string{"__name", "n1", "n2"}}

iter, err := v3ioutils.NewAsyncItemsCursor(
kvSuite.v3ioContainer, &input, 1,
nil, kvSuite.internalLogger,
0, []string{table + "/"},
"", "")

var numRows int
for iter.Next() {
currentRow := iter.GetItem()

key, _ := currentRow.GetFieldString("__name")

if key != ".#schema" {
numRows++
}

switch key {
case ".#schema":
continue
case "mike":
kvSuite.Require().Nil(currentRow.GetField("n1"),
"item %v - key n1 supposed to be null but got %v", key, currentRow.GetField("n1"))

kvSuite.Require().NotNil(currentRow.GetField("n2"),
"item %v - key n2 supposed to be null but got %v", key, currentRow.GetField("n2"))
case "joe":
kvSuite.Require().Nil(currentRow.GetField("n1"),
"item %v - key n1 supposed to be null but got %v", key, currentRow.GetField("n1"))

kvSuite.Require().NotNil(currentRow.GetField("n2"),
"item %v - key n2 supposed to be null but got %v", key, currentRow.GetField("n2"))
case "jim":
kvSuite.Require().Nil(currentRow.GetField("n1"),
"item %v - key n1 supposed to be null but got %v", key, currentRow.GetField("n1"))
kvSuite.Require().NotNil(currentRow.GetField("n2"),
"item %v - key n2 supposed to be null but got %v", key, currentRow.GetField("n2"))
default:
kvSuite.T().Fatalf("got an unexpected key '%v'", key)
}
}

kvSuite.Require().Equal(3, numRows)

kvSuite.Require().NoError(iter.Err(), "error querying items got")
}

func (kvSuite *KvTestSuite) TestNullValuesRead() {
table := fmt.Sprintf("kv_test_nulls_read%d", time.Now().UnixNano())

Expand Down

0 comments on commit c380cf3

Please sign in to comment.