Skip to content

Commit

Permalink
Calendly Integration Workflow Added (#35)
Browse files Browse the repository at this point in the history
* Added calendly workflow functions

* Bugfix for Worklfow's Offset

* We weren't updating workflow's offset in database layer
* Also removed duplicate sleep, since we do it in the layer above

* Made changes for calendly workflow integration tests

* Added an extra check to prevent the deletion of a topic if it's being used by a workflow

* Corrected LeadDocketRequest struct's name
  • Loading branch information
keremeyuboglu authored Aug 13, 2024
1 parent 17fbc37 commit 4a8175f
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 48 deletions.
89 changes: 61 additions & 28 deletions integrand-py/tests/test_integrand.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# This is the integration test suite for LucidMQ
from requests import HTTPError
from Integrand import Integrand
import pytest
import string
Expand Down Expand Up @@ -27,6 +28,10 @@ def clean_up_topics():
topics = response['data']
for topic in topics:
integrand.DeleteTopic(topic['topicName'])

@pytest.fixture(params=["ld_ld_sync", "calendly_sync"])
def workflow_function_name(request):
return request.param

@pytest.mark.usefixtures("clean_up_topics")
class TestConnectorAPI:
Expand Down Expand Up @@ -139,6 +144,39 @@ def test_delete_topic(self):
integrand.CreateTopic(topicName)
response = integrand.DeleteTopic(topicName)
assert response['status'] == 'success'

def test_delete_topic_used_by_workflow_raises_exception(self, workflow_function_name):
topicName = get_random_string(5)
integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY)
connectorId = get_random_string(5)
integrand.CreateTopic(topicName)
integrand.CreateConnector(connectorId, topicName)
response = integrand.CreateWorkflow(topicName, workflow_function_name, "http://example.com")
workflowId = response['data']['id']
try:
integrand.DeleteTopic(topicName)
except HTTPError as e:
print(e.response.status_code)
assert e.response.status_code == 500
# Cleanup
integrand.DeleteConnector(connectorId)
integrand.DeleteWorkflow(workflowId)
integrand.DeleteTopic(topicName)

def test_delete_topic_used_by_connector_raises_exception(self):
topicName = get_random_string(5)
integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY)
integrand.CreateTopic(topicName)
connectorId = get_random_string(5)
integrand.CreateConnector(connectorId, topicName)
try:
integrand.DeleteTopic(topicName)
except HTTPError as e:
print(e.response.status_code)
assert e.response.status_code == 500
# Cleanup
integrand.DeleteConnector(connectorId)
integrand.DeleteTopic(topicName)

class TestMessages():
def test_send_message(self):
Expand Down Expand Up @@ -177,13 +215,8 @@ def test_send_multiple_messages(self):
integrand.DeleteConnector(id)
integrand.DeleteTopic(topicName)


class TestsWorkflow():
@pytest.fixture(params=["ld_ld_sync"])
def functionName(self, request):
return request.param

def test_create_workflow(self, functionName):
class TestsWorkflow():
def test_create_workflow(self, workflow_function_name):
integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY)
connectorId = get_random_string(5)
topicName = get_random_string(5)
Expand All @@ -192,18 +225,18 @@ def test_create_workflow(self, functionName):
connectorAPIKey = response['data']['securityKey']
sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{connectorId}?apikey={connectorAPIKey}"

response = integrand.CreateWorkflow(topicName, functionName, sinkURL)
response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL)
assert response['status'] == 'success'
assert response['data']['topicName'] == topicName
assert response['data']['functionName'] == functionName
assert response['data']['functionName'] == workflow_function_name
assert response['data']['enabled'] == True
id = response['data']['id']
# Cleanup
integrand.DeleteConnector(connectorId)
integrand.DeleteTopic(topicName)
integrand.DeleteWorkflow(id)
integrand.DeleteTopic(topicName)

def test_delete_workflow(self, functionName):
def test_delete_workflow(self, workflow_function_name):
integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY)
connectorId = get_random_string(5)
topicName = get_random_string(5)
Expand All @@ -212,7 +245,7 @@ def test_delete_workflow(self, functionName):
connectorAPIKey = response['data']['securityKey']
sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{connectorId}?apikey={connectorAPIKey}"

response = integrand.CreateWorkflow(topicName, functionName, sinkURL)
response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL)
id = response['data']['id']

response = integrand.DeleteWorkflow(id)
Expand All @@ -221,7 +254,7 @@ def test_delete_workflow(self, functionName):
integrand.DeleteConnector(connectorId)
integrand.DeleteTopic(topicName)

def test_update_workflow(self, functionName):
def test_update_workflow(self, workflow_function_name):
integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY)
connectorId = get_random_string(5)
topicName = get_random_string(5)
Expand All @@ -230,17 +263,17 @@ def test_update_workflow(self, functionName):
connectorAPIKey = response['data']['securityKey']
sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{connectorId}?apikey={connectorAPIKey}"

response = integrand.CreateWorkflow(topicName, functionName, sinkURL)
response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL)
id = response['data']['id']
integrand.UpdateWorkflow(response['data']['id'])

assert response['status'] == 'success'
# Cleanup
integrand.DeleteConnector(connectorId)
integrand.DeleteTopic(topicName)
integrand.DeleteWorkflow(id)
integrand.DeleteTopic(topicName)

def test_get_workflow(self, functionName):
def test_get_workflow(self, workflow_function_name):
integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY)
connectorId = get_random_string(5)
topicName = get_random_string(5)
Expand All @@ -249,20 +282,20 @@ def test_get_workflow(self, functionName):
connectorAPIKey = response['data']['securityKey']
sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{connectorId}?apikey={connectorAPIKey}"

response = integrand.CreateWorkflow(topicName, functionName, sinkURL)
response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL)
id = response['data']['id']

response = integrand.GetWorkflow(response['data']['id'])
assert response['status'] == 'success'
assert response['data']['topicName'] == topicName
assert response['data']['functionName'] == functionName
assert response['data']['functionName'] == workflow_function_name
assert response['data']['enabled'] == True
# Cleanup
integrand.DeleteConnector(connectorId)
integrand.DeleteTopic(topicName)
integrand.DeleteWorkflow(id)
integrand.DeleteTopic(topicName)

def test_get_workflows(self, functionName):
def test_get_workflows(self, workflow_function_name):
integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY)
connectorId = get_random_string(5)
topicName = get_random_string(5)
Expand All @@ -275,20 +308,20 @@ def test_get_workflows(self, functionName):
assert response['status'] == 'success'
assert response['data'] == []

response = integrand.CreateWorkflow(topicName, functionName, sinkURL)
response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL)
id = response['data']['id']

response = integrand.GetWorkflows()
assert response['status'] == 'success'
assert response['data'][0]['topicName'] == topicName
assert response['data'][0]['functionName'] == functionName
assert response['data'][0]['functionName'] == workflow_function_name
assert response['data'][0]['enabled'] == True
# Cleanup
integrand.DeleteConnector(connectorId)
integrand.DeleteTopic(topicName)
integrand.DeleteWorkflow(id)
integrand.DeleteTopic(topicName)

def test_workflow_send_message_one_end_to_another(self, functionName):
def test_workflow_send_message_one_end_to_another(self, workflow_function_name):
integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY)
sourceConnectorId = get_random_string(5)
sinkConnectorId = get_random_string(5)
Expand All @@ -301,14 +334,14 @@ def test_workflow_send_message_one_end_to_another(self, functionName):
sinkConnectorAPIKey = response['data']['securityKey']
sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{sinkConnectorId}?apikey={sinkConnectorAPIKey}"

response = integrand.CreateWorkflow(sourceTopicName, functionName, sinkURL)
response = integrand.CreateWorkflow(sourceTopicName, workflow_function_name, sinkURL)
assert response['status'] == 'success'
assert response['data']['topicName'] == sourceTopicName
assert response['data']['functionName'] == functionName
assert response['data']['functionName'] == workflow_function_name
assert response['data']['enabled'] == True
id = response['data']['id']

with open(rf'tests/workflow_functions/{functionName}/message.json', 'r') as file:
with open(rf'tests/workflow_functions/{workflow_function_name}/message.json', 'r') as file:
data = json.load(file)

integrand.EndpointRequest(sourceConnectorId, sourceConnectorAPIKey, data)
Expand All @@ -323,7 +356,7 @@ def test_workflow_send_message_one_end_to_another(self, functionName):
limit = 1
sinkTopicEvents = integrand.GetEventsFromTopic(sinkTopicName, offset, limit)
if sinkTopicEvents['data'] != None:
with open(rf'tests/workflow_functions/{functionName}/message_output.json', 'r') as file:
with open(rf'tests/workflow_functions/{workflow_function_name}/message_output.json', 'r') as file:
data = json.load(file)
assert sinkTopicEvents['data'][offset] == data
break
Expand Down
67 changes: 67 additions & 0 deletions integrand-py/tests/workflow_functions/calendly_sync/message.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"created_at": "2024-08-05T21:58:18.000000Z",
"created_by": "https://api.calendly.com/users/test",
"event": "invitee.created",
"payload": {
"cancel_url": "https://calendly.com/cancellations/another-test",
"created_at": "2024-08-05T21:58:17.901807Z",
"email": "[email protected]",
"event": "https://api.calendly.com/scheduled_events/test-event",
"first_name": null,
"invitee_scheduled_by": null,
"last_name": null,
"name": "Client Name",
"new_invitee": null,
"no_show": null,
"old_invitee": null,
"payment": null,
"questions_and_answers": [],
"reconfirmation": null,
"reschedule_url": "https://calendly.com/reschedulings/another-test",
"rescheduled": false,
"routing_form_submission": null,
"scheduled_event": {
"created_at": "2024-08-05T21:58:17.881846Z",
"end_time": "2024-08-16T15:45:00.000000Z",
"event_guests": [],
"event_memberships": [
{
"user": "https://api.calendly.com/users/test",
"user_email": "[email protected]",
"user_name": "Test Company"
}
],
"event_type": "https://api.calendly.com/event_types/test-type",
"invitees_counter": {
"active": 1,
"limit": 1,
"total": 1
},
"location": {
"location": "+1 322-999-1323",
"type": "outbound_call"
},
"meeting_notes_html": null,
"meeting_notes_plain": null,
"name": "15 Minute Consultation",
"start_time": "2024-08-16T15:30:00.000000Z",
"status": "active",
"updated_at": "2024-08-05T21:58:17.881846Z",
"uri": "https://api.calendly.com/scheduled_events/test-event"
},
"scheduling_method": null,
"status": "active",
"text_reminder_number": null,
"timezone": "America/Los_Angeles",
"tracking": {
"salesforce_uuid": null,
"utm_campaign": null,
"utm_content": null,
"utm_medium": null,
"utm_source": null,
"utm_term": null
},
"updated_at": "2024-08-05T21:58:17.901807Z",
"uri": "https://api.calendly.com/scheduled_events/test-event/invitees/another-test"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"Email": "[email protected]",
"First": "Client",
"Last": "Name",
"Phone": "+1 322-999-1323",
"Summary": ""
}
21 changes: 20 additions & 1 deletion persistence/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,24 @@ func (dstore *Datastore) UpdateWorkflow(id int) (Workflow, error) {
if err != nil {
return workflow, err
}
return workflow, err
return workflow, nil
}

func (dstore *Datastore) SetOffsetOfWorkflow(id int, offset int) (Workflow, error) {
updateQuery := `
UPDATE workflows
SET offset = ?, last_modified = CURRENT_TIMESTAMP
WHERE id = ?
RETURNING id, topic_name, offset, function_name, enabled, sink_url, last_modified;
`

dstore.RWMutex.Lock()
row := dstore.db.QueryRow(updateQuery, offset, id)
dstore.RWMutex.Unlock()
var workflow Workflow
err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
if err != nil {
return workflow, err
}
return workflow, nil
}
20 changes: 12 additions & 8 deletions services/topicService.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,27 @@ func CreateEventStream(topicName string) (persistence.TopicDetails, error) {
}

func DeleteEventStream(topicName string, userId int) error {
//Check if topic is being used...
// Check if the topic is being used by any endpoint
endpoints, err := GetEndpoints(userId)
if err != nil {
return err
}
topicIsBeingUsed := false
for _, endpoint := range endpoints {
if endpoint.TopicName == topicName {
topicIsBeingUsed = true
break
return errors.New("topic is being used by an endpoint")
}
}

if topicIsBeingUsed {
return errors.New("topic is being used")
// Check if the topic is being used by any workflow
workflows, err := GetWorkflows()
if err != nil {
return err
}

for _, workflow := range workflows {
if workflow.TopicName == topicName {
return errors.New("topic is being used by a workflow")
}
}
// Delete the topic if it is not being used
return persistence.BROKER.DeleteTopic(topicName)
}

Expand Down
Loading

0 comments on commit 4a8175f

Please sign in to comment.