From 4a8175fbe647e5f41ca6cca87d81ea264abc4748 Mon Sep 17 00:00:00 2001 From: keremeyuboglu <32223948+keremeyuboglu@users.noreply.github.com> Date: Tue, 13 Aug 2024 23:02:14 +0300 Subject: [PATCH] Calendly Integration Workflow Added (#35) * Added calendly workflow functions * Bugfix for Worklfow's Offset * We weren't updating workflow's offset in database layer * Also removed duplicate sleep, since we do it in the layer above * Made changes for calendly workflow integration tests * Added an extra check to prevent the deletion of a topic if it's being used by a workflow * Corrected LeadDocketRequest struct's name --- integrand-py/tests/test_integrand.py | 89 ++++++++---- .../calendly_sync/message.json | 67 +++++++++ .../calendly_sync/message_output.json | 7 + persistence/workflow.go | 21 ++- services/topicService.go | 20 +-- services/workflow.go | 130 ++++++++++++++++-- 6 files changed, 286 insertions(+), 48 deletions(-) create mode 100644 integrand-py/tests/workflow_functions/calendly_sync/message.json create mode 100644 integrand-py/tests/workflow_functions/calendly_sync/message_output.json diff --git a/integrand-py/tests/test_integrand.py b/integrand-py/tests/test_integrand.py index 11abc7b..d549c42 100644 --- a/integrand-py/tests/test_integrand.py +++ b/integrand-py/tests/test_integrand.py @@ -1,4 +1,5 @@ # This is the integration test suite for LucidMQ +from requests import HTTPError from Integrand import Integrand import pytest import string @@ -27,6 +28,10 @@ def clean_up_topics(): topics = response['data'] for topic in topics: integrand.DeleteTopic(topic['topicName']) + +@pytest.fixture(params=["ld_ld_sync", "calendly_sync"]) +def workflow_function_name(request): + return request.param @pytest.mark.usefixtures("clean_up_topics") class TestConnectorAPI: @@ -139,6 +144,39 @@ def test_delete_topic(self): integrand.CreateTopic(topicName) response = integrand.DeleteTopic(topicName) assert response['status'] == 'success' + + def test_delete_topic_used_by_workflow_raises_exception(self, workflow_function_name): + topicName = get_random_string(5) + integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY) + connectorId = get_random_string(5) + integrand.CreateTopic(topicName) + integrand.CreateConnector(connectorId, topicName) + response = integrand.CreateWorkflow(topicName, workflow_function_name, "http://example.com") + workflowId = response['data']['id'] + try: + integrand.DeleteTopic(topicName) + except HTTPError as e: + print(e.response.status_code) + assert e.response.status_code == 500 + # Cleanup + integrand.DeleteConnector(connectorId) + integrand.DeleteWorkflow(workflowId) + integrand.DeleteTopic(topicName) + + def test_delete_topic_used_by_connector_raises_exception(self): + topicName = get_random_string(5) + integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY) + integrand.CreateTopic(topicName) + connectorId = get_random_string(5) + integrand.CreateConnector(connectorId, topicName) + try: + integrand.DeleteTopic(topicName) + except HTTPError as e: + print(e.response.status_code) + assert e.response.status_code == 500 + # Cleanup + integrand.DeleteConnector(connectorId) + integrand.DeleteTopic(topicName) class TestMessages(): def test_send_message(self): @@ -177,13 +215,8 @@ def test_send_multiple_messages(self): integrand.DeleteConnector(id) integrand.DeleteTopic(topicName) - -class TestsWorkflow(): - @pytest.fixture(params=["ld_ld_sync"]) - def functionName(self, request): - return request.param - - def test_create_workflow(self, functionName): +class TestsWorkflow(): + def test_create_workflow(self, workflow_function_name): integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY) connectorId = get_random_string(5) topicName = get_random_string(5) @@ -192,18 +225,18 @@ def test_create_workflow(self, functionName): connectorAPIKey = response['data']['securityKey'] sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{connectorId}?apikey={connectorAPIKey}" - response = integrand.CreateWorkflow(topicName, functionName, sinkURL) + response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL) assert response['status'] == 'success' assert response['data']['topicName'] == topicName - assert response['data']['functionName'] == functionName + assert response['data']['functionName'] == workflow_function_name assert response['data']['enabled'] == True id = response['data']['id'] # Cleanup integrand.DeleteConnector(connectorId) - integrand.DeleteTopic(topicName) integrand.DeleteWorkflow(id) + integrand.DeleteTopic(topicName) - def test_delete_workflow(self, functionName): + def test_delete_workflow(self, workflow_function_name): integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY) connectorId = get_random_string(5) topicName = get_random_string(5) @@ -212,7 +245,7 @@ def test_delete_workflow(self, functionName): connectorAPIKey = response['data']['securityKey'] sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{connectorId}?apikey={connectorAPIKey}" - response = integrand.CreateWorkflow(topicName, functionName, sinkURL) + response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL) id = response['data']['id'] response = integrand.DeleteWorkflow(id) @@ -221,7 +254,7 @@ def test_delete_workflow(self, functionName): integrand.DeleteConnector(connectorId) integrand.DeleteTopic(topicName) - def test_update_workflow(self, functionName): + def test_update_workflow(self, workflow_function_name): integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY) connectorId = get_random_string(5) topicName = get_random_string(5) @@ -230,17 +263,17 @@ def test_update_workflow(self, functionName): connectorAPIKey = response['data']['securityKey'] sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{connectorId}?apikey={connectorAPIKey}" - response = integrand.CreateWorkflow(topicName, functionName, sinkURL) + response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL) id = response['data']['id'] integrand.UpdateWorkflow(response['data']['id']) assert response['status'] == 'success' # Cleanup integrand.DeleteConnector(connectorId) - integrand.DeleteTopic(topicName) integrand.DeleteWorkflow(id) + integrand.DeleteTopic(topicName) - def test_get_workflow(self, functionName): + def test_get_workflow(self, workflow_function_name): integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY) connectorId = get_random_string(5) topicName = get_random_string(5) @@ -249,20 +282,20 @@ def test_get_workflow(self, functionName): connectorAPIKey = response['data']['securityKey'] sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{connectorId}?apikey={connectorAPIKey}" - response = integrand.CreateWorkflow(topicName, functionName, sinkURL) + response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL) id = response['data']['id'] response = integrand.GetWorkflow(response['data']['id']) assert response['status'] == 'success' assert response['data']['topicName'] == topicName - assert response['data']['functionName'] == functionName + assert response['data']['functionName'] == workflow_function_name assert response['data']['enabled'] == True # Cleanup integrand.DeleteConnector(connectorId) - integrand.DeleteTopic(topicName) integrand.DeleteWorkflow(id) + integrand.DeleteTopic(topicName) - def test_get_workflows(self, functionName): + def test_get_workflows(self, workflow_function_name): integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY) connectorId = get_random_string(5) topicName = get_random_string(5) @@ -275,20 +308,20 @@ def test_get_workflows(self, functionName): assert response['status'] == 'success' assert response['data'] == [] - response = integrand.CreateWorkflow(topicName, functionName, sinkURL) + response = integrand.CreateWorkflow(topicName, workflow_function_name, sinkURL) id = response['data']['id'] response = integrand.GetWorkflows() assert response['status'] == 'success' assert response['data'][0]['topicName'] == topicName - assert response['data'][0]['functionName'] == functionName + assert response['data'][0]['functionName'] == workflow_function_name assert response['data'][0]['enabled'] == True # Cleanup integrand.DeleteConnector(connectorId) - integrand.DeleteTopic(topicName) integrand.DeleteWorkflow(id) + integrand.DeleteTopic(topicName) - def test_workflow_send_message_one_end_to_another(self, functionName): + def test_workflow_send_message_one_end_to_another(self, workflow_function_name): integrand = Integrand(INTEGRAND_URL, INTEGRAND_API_KEY) sourceConnectorId = get_random_string(5) sinkConnectorId = get_random_string(5) @@ -301,14 +334,14 @@ def test_workflow_send_message_one_end_to_another(self, functionName): sinkConnectorAPIKey = response['data']['securityKey'] sinkURL = rf"{INTEGRAND_URL}/api/v1/connector/f/{sinkConnectorId}?apikey={sinkConnectorAPIKey}" - response = integrand.CreateWorkflow(sourceTopicName, functionName, sinkURL) + response = integrand.CreateWorkflow(sourceTopicName, workflow_function_name, sinkURL) assert response['status'] == 'success' assert response['data']['topicName'] == sourceTopicName - assert response['data']['functionName'] == functionName + assert response['data']['functionName'] == workflow_function_name assert response['data']['enabled'] == True id = response['data']['id'] - with open(rf'tests/workflow_functions/{functionName}/message.json', 'r') as file: + with open(rf'tests/workflow_functions/{workflow_function_name}/message.json', 'r') as file: data = json.load(file) integrand.EndpointRequest(sourceConnectorId, sourceConnectorAPIKey, data) @@ -323,7 +356,7 @@ def test_workflow_send_message_one_end_to_another(self, functionName): limit = 1 sinkTopicEvents = integrand.GetEventsFromTopic(sinkTopicName, offset, limit) if sinkTopicEvents['data'] != None: - with open(rf'tests/workflow_functions/{functionName}/message_output.json', 'r') as file: + with open(rf'tests/workflow_functions/{workflow_function_name}/message_output.json', 'r') as file: data = json.load(file) assert sinkTopicEvents['data'][offset] == data break diff --git a/integrand-py/tests/workflow_functions/calendly_sync/message.json b/integrand-py/tests/workflow_functions/calendly_sync/message.json new file mode 100644 index 0000000..93831fa --- /dev/null +++ b/integrand-py/tests/workflow_functions/calendly_sync/message.json @@ -0,0 +1,67 @@ +{ + "created_at": "2024-08-05T21:58:18.000000Z", + "created_by": "https://api.calendly.com/users/test", + "event": "invitee.created", + "payload": { + "cancel_url": "https://calendly.com/cancellations/another-test", + "created_at": "2024-08-05T21:58:17.901807Z", + "email": "client_email@gmail.com", + "event": "https://api.calendly.com/scheduled_events/test-event", + "first_name": null, + "invitee_scheduled_by": null, + "last_name": null, + "name": "Client Name", + "new_invitee": null, + "no_show": null, + "old_invitee": null, + "payment": null, + "questions_and_answers": [], + "reconfirmation": null, + "reschedule_url": "https://calendly.com/reschedulings/another-test", + "rescheduled": false, + "routing_form_submission": null, + "scheduled_event": { + "created_at": "2024-08-05T21:58:17.881846Z", + "end_time": "2024-08-16T15:45:00.000000Z", + "event_guests": [], + "event_memberships": [ + { + "user": "https://api.calendly.com/users/test", + "user_email": "info@testcompany.com", + "user_name": "Test Company" + } + ], + "event_type": "https://api.calendly.com/event_types/test-type", + "invitees_counter": { + "active": 1, + "limit": 1, + "total": 1 + }, + "location": { + "location": "+1 322-999-1323", + "type": "outbound_call" + }, + "meeting_notes_html": null, + "meeting_notes_plain": null, + "name": "15 Minute Consultation", + "start_time": "2024-08-16T15:30:00.000000Z", + "status": "active", + "updated_at": "2024-08-05T21:58:17.881846Z", + "uri": "https://api.calendly.com/scheduled_events/test-event" + }, + "scheduling_method": null, + "status": "active", + "text_reminder_number": null, + "timezone": "America/Los_Angeles", + "tracking": { + "salesforce_uuid": null, + "utm_campaign": null, + "utm_content": null, + "utm_medium": null, + "utm_source": null, + "utm_term": null + }, + "updated_at": "2024-08-05T21:58:17.901807Z", + "uri": "https://api.calendly.com/scheduled_events/test-event/invitees/another-test" + } +} \ No newline at end of file diff --git a/integrand-py/tests/workflow_functions/calendly_sync/message_output.json b/integrand-py/tests/workflow_functions/calendly_sync/message_output.json new file mode 100644 index 0000000..bf6be13 --- /dev/null +++ b/integrand-py/tests/workflow_functions/calendly_sync/message_output.json @@ -0,0 +1,7 @@ +{ + "Email": "client_email@gmail.com", + "First": "Client", + "Last": "Name", + "Phone": "+1 322-999-1323", + "Summary": "" + } \ No newline at end of file diff --git a/persistence/workflow.go b/persistence/workflow.go index 9ac4381..3512993 100644 --- a/persistence/workflow.go +++ b/persistence/workflow.go @@ -151,5 +151,24 @@ func (dstore *Datastore) UpdateWorkflow(id int) (Workflow, error) { if err != nil { return workflow, err } - return workflow, err + return workflow, nil +} + +func (dstore *Datastore) SetOffsetOfWorkflow(id int, offset int) (Workflow, error) { + updateQuery := ` + UPDATE workflows + SET offset = ?, last_modified = CURRENT_TIMESTAMP + WHERE id = ? + RETURNING id, topic_name, offset, function_name, enabled, sink_url, last_modified; + ` + + dstore.RWMutex.Lock() + row := dstore.db.QueryRow(updateQuery, offset, id) + dstore.RWMutex.Unlock() + var workflow Workflow + err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified) + if err != nil { + return workflow, err + } + return workflow, nil } diff --git a/services/topicService.go b/services/topicService.go index 9300104..eeecba6 100644 --- a/services/topicService.go +++ b/services/topicService.go @@ -35,23 +35,27 @@ func CreateEventStream(topicName string) (persistence.TopicDetails, error) { } func DeleteEventStream(topicName string, userId int) error { - //Check if topic is being used... + // Check if the topic is being used by any endpoint endpoints, err := GetEndpoints(userId) if err != nil { return err } - topicIsBeingUsed := false for _, endpoint := range endpoints { if endpoint.TopicName == topicName { - topicIsBeingUsed = true - break + return errors.New("topic is being used by an endpoint") } } - - if topicIsBeingUsed { - return errors.New("topic is being used") + // Check if the topic is being used by any workflow + workflows, err := GetWorkflows() + if err != nil { + return err } - + for _, workflow := range workflows { + if workflow.TopicName == topicName { + return errors.New("topic is being used by a workflow") + } + } + // Delete the topic if it is not being used return persistence.BROKER.DeleteTopic(topicName) } diff --git a/services/workflow.go b/services/workflow.go index 215d503..bf4cf7e 100644 --- a/services/workflow.go +++ b/services/workflow.go @@ -8,6 +8,7 @@ import ( "log" "log/slog" "net/http" + "strings" "sync" "time" ) @@ -19,7 +20,8 @@ const MAX_BACKOFF int = 10 func init() { // Register all of our functions persistence.FUNC_MAP = map[string]interface{}{ - "ld_ld_sync": ld_ld_sync, + "ld_ld_sync": ld_ld_sync, + "calendly_sync": calendly_sync, } } @@ -40,28 +42,33 @@ func Workflower() error { func processWorkflow(wg *sync.WaitGroup, workflow persistence.Workflow) { defer wg.Done() - sleep_time := SLEEP_TIME + currentOffset := workflow.Offset for { - bytes, err := persistence.BROKER.ConsumeMessage(workflow.TopicName, workflow.Offset) + bytes, err := persistence.BROKER.ConsumeMessage(workflow.TopicName, currentOffset) if err != nil { if err.Error() == "offset out of bounds" { - // This error is returned when we're given an offset thats ahead of the commitlog + // This error is returned when we're given an offset thats ahead of the commitlog, so we can return for next cycle to begin slog.Debug(err.Error()) - time.Sleep(time.Duration(sleep_time) * time.Second) - continue + break } else if err.Error() == "offset does not exist" { // This error is returned when we look for an offset and it does not exist becuase it can't be avaliable in the commitlog slog.Warn(err.Error()) - time.Sleep(time.Duration(sleep_time) * time.Second) - return // Exit the function, to be re-checked in the next cycle + break // Exit the function, to be re-checked in the next cycle } else { slog.Error(err.Error()) - return // Something's wrong + break // Something's wrong } } workflow.Call(bytes, workflow.SinkURL) - workflow.Offset++ - sleep_time = SLEEP_TIME + currentOffset++ + } + // We set offset here in case we create a new workflow with lots of messages in topic which would send redundant requests to update the offset + if currentOffset != workflow.Offset { + _, err := persistence.DATASTORE.SetOffsetOfWorkflow(workflow.Id, currentOffset) + if err != nil { + // This is a critical error. If we cannot set workflow's offset properly, our workflows will be out of sync forever + slog.Error(err.Error()) + } } } @@ -140,6 +147,107 @@ func sendLeadToClf(jsonBody map[string]interface{}, sinkURL string) error { return nil } +type CalendlyEventBody struct { + Event string `json:"event"` + Payload Payload `json:"payload"` +} + +type Payload struct { + FirstName *string `json:"first_name"` + LastName *string `json:"last_name"` + Name string `json:"name"` + Email string `json:"email"` + ScheduledEvent struct { + Location struct { + Location *string `json:"location"` + Type *string `json:"type"` + } `json:"location"` + } `json:"scheduled_event"` +} + +type LeadDocketRequestBody struct { + First string `json:"First"` + Last string `json:"Last"` + Phone string `json:"Phone"` + Email string `json:"Email"` + Summary string `json:"Summary"` +} + +func calendly_sync(bytes []byte, sinkURL string) error { + // Unmarshal the JSON byte array into the map + var calendlyJson CalendlyEventBody + err := json.Unmarshal(bytes, &calendlyJson) + if err != nil { + slog.Error(err.Error()) + return err + } + + if calendlyJson.Event == "invitee.created" { + err := sendCalendlyAppointment(calendlyJson, sinkURL) + if err != nil { + slog.Error("Error occurred while sending calendly appointment to CLF", "error", err) + return err + } + } + + return nil +} + +func sendCalendlyAppointment(calendlyJson CalendlyEventBody, sinkURL string) error { + payload := calendlyJson.Payload + var request LeadDocketRequestBody + + if payload.FirstName != nil { + request.First = *payload.FirstName + request.Last = *payload.LastName + } else { + nameParts := strings.Split(strings.TrimSpace(payload.Name), " ") + request.First = nameParts[0] + if len(nameParts) > 1 { + request.Last = nameParts[1] + } else { + request.Last = "" + } + } + + if *payload.ScheduledEvent.Location.Type == "outbound_call" { + request.Phone = *payload.ScheduledEvent.Location.Location + } + + request.Email = payload.Email + + jsonBodyBytes, err := json.Marshal(request) + if err != nil { + slog.Error(err.Error()) + return err + } + + resp, err := http.Post(sinkURL, "application/json", bytes.NewBuffer(jsonBodyBytes)) + if err != nil { + slog.Error(err.Error()) + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + slog.Error("HTTP request failed", + "status_code", resp.StatusCode, + "status_text", http.StatusText(resp.StatusCode), + ) + return errors.New("HTTP request failed with status code: " + http.StatusText(resp.StatusCode)) + } + + var responseBody map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&responseBody); err != nil { + slog.Error(err.Error()) + return err + } + + log.Printf("Status Code: %d", resp.StatusCode) + log.Printf("Response Body: %v", responseBody) + return nil +} + // Should move to utils later func GetOrDefaultString(m map[string]interface{}, key string, defaultStr string) string {