Skip to content

Commit

Permalink
fix(processor/sql): updated where condition for int64 values
Browse files Browse the repository at this point in the history
  • Loading branch information
le-vlad committed Dec 22, 2023
1 parent d4e8db3 commit 48b5325
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 249 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ require (
github.com/charmbracelet/log v0.3.1
github.com/cloudquery/plugin-sdk/v4 v4.16.1
github.com/go-playground/validator/v10 v10.14.0
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.1
github.com/gorilla/websocket v1.5.0
github.com/jackc/pgx/v5 v5.4.3
github.com/prometheus/client_golang v1.11.1
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/reiver/go-cast v0.0.0-20220716221226-5360a56d42f4
github.com/sashabaranov/go-openai v1.17.9
github.com/spf13/cobra v1.6.1
github.com/twmb/franz-go v1.15.2
Expand All @@ -44,7 +46,6 @@ require (
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand All @@ -71,6 +72,7 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/reiver/go-fck v0.0.1 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
Expand Down
232 changes: 4 additions & 228 deletions go.sum

Large diffs are not rendered by default.

151 changes: 151 additions & 0 deletions internal/processors/sql/comparable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package sqlproc

// Yes, I know this is a bad idea;
// I'll think about it a bit later
func compareValues(a, b interface{}, op string) bool {
switch v := a.(type) {
case int:
switch op {
case ">":
return v > b.(int)
case "<":
return v < b.(int)
case ">=":
return v >= b.(int)
case "<=":
return v <= b.(int)
}
case int8:
switch op {
case ">":
return v > b.(int8)
case "<":
return v < b.(int8)
case ">=":
return v >= b.(int8)
case "<=":
return v <= b.(int8)
}
case int16:
switch op {
case ">":
return v > b.(int16)
case "<":
return v < b.(int16)
case ">=":
return v >= b.(int16)
case "<=":
return v <= b.(int16)
}
case int32:
switch op {
case ">":
return v > b.(int32)
case "<":
return v < b.(int32)
case ">=":
return v >= b.(int32)
case "<=":
return v <= b.(int32)
}
case int64:
switch op {
case ">":
return v > b.(int64)
case "<":
return v < b.(int64)
case ">=":
return v >= b.(int64)
case "<=":
return v <= b.(int64)
}
case uint:
switch op {
case ">":
return v > b.(uint)
case "<":
return v < b.(uint)
case ">=":
return v >= b.(uint)
case "<=":
return v <= b.(uint)
}
case uint8:
switch op {
case ">":
return v > b.(uint8)
case "<":
return v < b.(uint8)
case ">=":
return v >= b.(uint8)
case "<=":
return v <= b.(uint8)
}
case uint16:
switch op {
case ">":
return v > b.(uint16)
case "<":
return v < b.(uint16)
case ">=":
return v >= b.(uint16)
case "<=":
return v <= b.(uint16)
}
case uint32:
switch op {
case ">":
return v > b.(uint32)
case "<":
return v < b.(uint32)
case ">=":
return v >= b.(uint32)
case "<=":
return v <= b.(uint32)
}
case uint64:
switch op {
case ">":
return v > b.(uint64)
case "<":
return v < b.(uint64)
case ">=":
return v >= b.(uint64)
case "<=":
return v <= b.(uint64)
}
case float32:
switch op {
case ">":
return v > b.(float32)
case "<":
return v < b.(float32)
case ">=":
return v >= b.(float32)
case "<=":
return v <= b.(float32)
}
case float64:
switch op {
case ">":
return v > b.(float64)
case "<":
return v < b.(float64)
case ">=":
return v >= b.(float64)
case "<=":
return v <= b.(float64)
}
case string:
switch op {
case "=":
return v != b.(string)
case "!=":
return v == b.(string)
}
default:
return false
}

return false
}
49 changes: 33 additions & 16 deletions internal/processors/sql/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sqlproc

import (
"context"
"encoding/binary"
"errors"
"fmt"
_ "github.com/apache/arrow/go/v14/arrow"
Expand All @@ -11,7 +12,9 @@ import (
"github.com/usedatabrew/blink/internal/message"
"github.com/usedatabrew/blink/internal/schema"
"github.com/usedatabrew/blink/internal/stream_context"
"math"
"slices"
"strconv"
"strings"
)

Expand All @@ -26,10 +29,10 @@ type Plugin struct {
whereExist bool
whereLeft string
whereOp string
whereRight string
whereRight interface{}
}

func NewSqlTransformlugin(appctx *stream_context.Context, config Config) (*Plugin, error) {
func NewSqlTransformPlugin(appctx *stream_context.Context, config Config) (*Plugin, error) {
return &Plugin{
config: config,
ctx: appctx,
Expand All @@ -50,17 +53,8 @@ func (p *Plugin) Process(context context.Context, msg *message.Message) (*messag

if p.whereExist {
columnValue := msg.GetValue(p.whereLeft)
// TODO:: Add more types to do filtering
// This is so damn awful;; have to come up with something more convenient
switch p.whereOp {
case "=":
if columnValue != p.whereRight {
return nil, nil
}
case "!=":
if columnValue == p.whereRight {
return nil, nil
}
if !compareValues(columnValue, p.whereRight, p.whereOp) {
return nil, nil
}
}

Expand Down Expand Up @@ -91,7 +85,6 @@ func (p *Plugin) EvolveSchema(streamSchema *schema.StreamSchemaObj) error {
}
}

fmt.Println("Stream to process", streamToProcess)
if streamToProcess == nil {
return errors.New("select from undefined stream")
}
Expand Down Expand Up @@ -146,8 +139,18 @@ func (p *Plugin) EvolveSchema(streamSchema *schema.StreamSchemaObj) error {
return errors.New(fmt.Sprintf("Column %s doesnt exist in current stream", whereColumn))
}

rightVal := string(stmt.(*sqlparser.Select).Where.Expr.(*sqlparser.ComparisonExpr).Right.(*sqlparser.SQLVal).Val)
p.whereRight = rightVal
rightValType := stmt.(*sqlparser.Select).Where.Expr.(*sqlparser.ComparisonExpr).Right.(*sqlparser.SQLVal).Type
rightVal := stmt.(*sqlparser.Select).Where.Expr.(*sqlparser.ComparisonExpr).Right.(*sqlparser.SQLVal).Val
switch rightValType {
case sqlparser.StrVal:
p.whereRight = string(rightVal)
case sqlparser.FloatVal:
p.whereRight = Float64FromBytes(rightVal)
case sqlparser.IntVal:
p.whereRight = Int64FromBytes(rightVal)
default:
panic("unhandled default case")
}
}

if len(p.columnsToDropFromSchema) > 0 {
Expand All @@ -156,3 +159,17 @@ func (p *Plugin) EvolveSchema(streamSchema *schema.StreamSchemaObj) error {

return nil
}

func Float64FromBytes(bytes []byte) float64 {
bits := binary.LittleEndian.Uint64(bytes)
float := math.Float64frombits(bits)
return float
}

func Int64FromBytes(bytes []byte) int64 {
i, err := strconv.ParseInt(string(bytes), 10, 64)
if err != nil {
panic(err)
}
return i
}
6 changes: 3 additions & 3 deletions internal/processors/sql/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestPlugin_EvolveSchema(t *testing.T) {
},
})

plugin, err := NewSqlTransformlugin(stream_context.CreateContext(), Config{
plugin, err := NewSqlTransformPlugin(stream_context.CreateContext(), Config{
Query: "SELECT destination from stream.flights",
})
if err != nil {
Expand All @@ -51,7 +51,7 @@ func TestPlugin_EvolveSchema(t *testing.T) {
t.Fatal("Schema should contain only one column")
}

plugin, err = NewSqlTransformlugin(stream_context.CreateContext(), Config{
plugin, err = NewSqlTransformPlugin(stream_context.CreateContext(), Config{
Query: "SELECT destination from stream.flightas",
})

Expand Down Expand Up @@ -104,7 +104,7 @@ func TestPlugin_Process(t *testing.T) {
mess.SetStream("test")
mess.SetEvent("insert")

plugin, err := NewSqlTransformlugin(stream_context.CreateContext(), Config{
plugin, err := NewSqlTransformPlugin(stream_context.CreateContext(), Config{
Query: "SELECT id, user from stream.test WHERE id = 123",
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion public/stream/processor_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (p *ProcessorWrapper) LoadDriver(driver processors.ProcessorDriver, cfg int
if err != nil {
panic("can read driver config")
}
return sqlproc.NewSqlTransformlugin(p.ctx, driverConfig)
return sqlproc.NewSqlTransformPlugin(p.ctx, driverConfig)
case processors.HttpProcessor:
driverConfig, err := ReadDriverConfig[http.Config](cfg, http.Config{})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions public/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (s *Stream) Start() error {
switch i.(type) {
case *message.Message:
inMessage := i.(*message.Message)
if inMessage == nil {
return nil, nil
}

err := s.sinks[0].Write(inMessage)
if err != nil {
s.ctx.Logger.WithPrefix("sink").Errorf("failed to write to sink %v", err)
Expand Down

0 comments on commit 48b5325

Please sign in to comment.