Skip to content

Commit

Permalink
PLIN-4034 Lookups for deletes on upserts (#101)
Browse files Browse the repository at this point in the history
* Add delete queries before inserts and updates

* Add tests for additional lookups

* Modify test wrappers for delete statements

---------

Co-authored-by: Fabian Melendez <[email protected]>
  • Loading branch information
mlndz28 and Fabian Melendez authored Mar 16, 2023
1 parent f06df5e commit 9f0a2e7
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 13 deletions.
41 changes: 38 additions & 3 deletions picard.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,36 @@ func (p PersistenceORM) upsert(data interface{}, deleteFilters interface{}) erro
dataValue := reflect.ValueOf(data)
dataCount := dataValue.Len()
var changeSets []*dbchange.ChangeSet

// the delete queries should be executed first because of posible UNIQUE restrictions on the DB

deleteLookupResults, _, err := p.checkForExisting(data, tableMetadata, nil, true)
if err != nil {
return err
}

deletes := []dbchange.Change{}

for _, value := range deleteLookupResults {
deletes = append(deletes, dbchange.Change{
Changes: value.(map[string]interface{}),
Type: dbchange.Delete,
})
}

if len(deletes) > 0 {
for i := 0; i < len(deletes); i += p.batchSize {
end := i + p.batchSize
if end > len(deletes) {
end = len(deletes)
}
err = p.upsertBatch(&dbchange.ChangeSet{Deletes: deletes[i:end]}, tableMetadata)
if err != nil {
return err
}
}
}

if dataCount > 0 {
for i := 0; i < dataCount; i += p.batchSize {
end := i + p.batchSize
Expand Down Expand Up @@ -420,6 +450,7 @@ func (p PersistenceORM) checkForExisting(
data interface{},
tableMetadata *tags.TableMetadata,
foreignKey *tags.ForeignKey,
nonExisting bool, // negates the query to check which items shouldn't exist anymore in the DB
) (
map[string]interface{},
[]tags.Lookup,
Expand Down Expand Up @@ -457,7 +488,11 @@ func (p PersistenceORM) checkForExisting(
}
}
}
query = query.Where(strings.Join(wheres, " || '"+separator+"' || ")+" = ANY(?)", pq.Array(lookupObjectKeys))
if nonExisting {
query = query.Where("NOT "+strings.Join(wheres, " || '"+separator+"' || ")+" = ANY(?)", pq.Array(lookupObjectKeys))
} else {
query = query.Where(strings.Join(wheres, " || '"+separator+"' || ")+" = ANY(?)", pq.Array(lookupObjectKeys))
}
}

if multitenancyKeyColumnName != "" {
Expand Down Expand Up @@ -790,14 +825,14 @@ func (p PersistenceORM) generateChanges(
foreignKeys := tableMetadata.GetForeignKeys()
insertsHavePrimaryKey := false
primaryKeyColumnName := tableMetadata.GetPrimaryKeyColumnName()
lookupResults, lookups, err := p.checkForExisting(data, tableMetadata, nil)
lookupResults, lookups, err := p.checkForExisting(data, tableMetadata, nil, false)
if err != nil {
return nil, err
}

for index := range foreignKeys {
foreignKey := &foreignKeys[index]
foreignResults, foreignLookupsUsed, err := p.checkForExisting(data, foreignKey.TableMetadata, foreignKey)
foreignResults, foreignLookupsUsed, err := p.checkForExisting(data, foreignKey.TableMetadata, foreignKey, false)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 9f0a2e7

Please sign in to comment.