Skip to content

Commit

Permalink
Fixed the broken workflow ui, updated noisy logs, created correct ids…
Browse files Browse the repository at this point in the history
… to work with javascript parsing... (#30)
  • Loading branch information
bdkiran authored Jul 31, 2024
1 parent 252417b commit 1eef609
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 44 deletions.
2 changes: 1 addition & 1 deletion commitlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (seg *segment) readAt(offset int) (returnBuff []byte, err error) {
var buff []byte
//Do we want to do this with the base offset??
if offset >= seg.nextOffset-seg.startingOffset {
slog.Error(fmt.Sprintf("offset given: %d, max offset: %d", offset, seg.nextOffset-seg.startingOffset))
slog.Debug(fmt.Sprintf("offset given: %d, max offset: %d", offset, seg.nextOffset-seg.startingOffset))
return nil, errors.New("offset out of bounds")
} else {
ent := seg.index.entries[offset]
Expand Down
2 changes: 1 addition & 1 deletion services/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var caseTypeMapping = map[string]int{
"Other": 2,
}

var Workflows []Workflow
var WORKFLOWS = make([]Workflow, 0)

type Workflow struct {
Id int `json:"id"`
Expand Down
33 changes: 19 additions & 14 deletions services/workflowService.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
func Workflower() error {
for {
workflowMu.Lock()
currentWorkflows := append([]Workflow(nil), Workflows...)
currentWorkflows := append([]Workflow(nil), WORKFLOWS...)
workflowMu.Unlock()

var wg sync.WaitGroup
Expand All @@ -44,16 +44,17 @@ func processWorkflow(wg *sync.WaitGroup, workflow Workflow) {
bytes, err := persistence.BROKER.ConsumeMessage(workflow.TopicName, workflow.Offset)
if err != nil {
if err.Error() == "offset out of bounds" {
slog.Warn(err.Error())
// This error is returned when we're given an offset thats ahead of the commitlog
slog.Debug(err.Error())
time.Sleep(time.Duration(sleep_time) * time.Second)
return // Exit the function, to be re-checked in the next cycle
continue
} else if err.Error() == "offset does not exist" {
// I think this means no message in given topic?
// This error is returned when we look for an offset and it does not exist becuase it can't be avaliable in the commitlog
slog.Warn(err.Error())
time.Sleep(time.Duration(sleep_time) * time.Second)
return // Exit the function, to be re-checked in the next cycle
} else {
slog.Warn(err.Error())
slog.Error(err.Error())
return // Something's wrong
}
}
Expand All @@ -66,15 +67,15 @@ func processWorkflow(wg *sync.WaitGroup, workflow Workflow) {
func GetWorkflows() ([]Workflow, error) {
workflowMu.Lock()
defer workflowMu.Unlock()
return Workflows, nil
return WORKFLOWS, nil
}

func DeleteWorkflow(id int) error {
workflowMu.Lock()
defer workflowMu.Unlock()
for i, workflow := range Workflows {
for i, workflow := range WORKFLOWS {
if workflow.Id == id {
Workflows = append(Workflows[:i], Workflows[i+1:]...)
WORKFLOWS = append(WORKFLOWS[:i], WORKFLOWS[i+1:]...)
return nil
}
}
Expand All @@ -84,10 +85,10 @@ func DeleteWorkflow(id int) error {
func UpdateWorkflow(id int) (*Workflow, error) {
workflowMu.Lock()
defer workflowMu.Unlock()
for i, workflow := range Workflows {
for i, workflow := range WORKFLOWS {
if workflow.Id == id {
Workflows[i].Enabled = !Workflows[i].Enabled
return &Workflows[i], nil
WORKFLOWS[i].Enabled = !WORKFLOWS[i].Enabled
return &WORKFLOWS[i], nil
}
}
return nil, errors.New("workflow not found")
Expand All @@ -96,7 +97,7 @@ func UpdateWorkflow(id int) (*Workflow, error) {
func GetWorkflow(id int) (*Workflow, error) {
workflowMu.Lock()
defer workflowMu.Unlock()
for _, workflow := range Workflows {
for _, workflow := range WORKFLOWS {
if workflow.Id == id {
return &workflow, nil
}
Expand All @@ -118,7 +119,7 @@ func CreateWorkflow(topicName string, functionName string, sinkURL string) (*Wor
}

newWorkflow := Workflow{
Id: rand.Int(),
Id: rangeIn(0, 100),
TopicName: topicName,
Offset: topic.OldestOffset,
FunctionName: functionName,
Expand All @@ -127,7 +128,11 @@ func CreateWorkflow(topicName string, functionName string, sinkURL string) (*Wor
}

workflowMu.Lock()
Workflows = append(Workflows, newWorkflow)
WORKFLOWS = append(WORKFLOWS, newWorkflow)
workflowMu.Unlock()
return &newWorkflow, nil
}

func rangeIn(low, hi int) int {
return low + rand.Intn(hi-low)
}
19 changes: 9 additions & 10 deletions test-request.txt
Original file line number Diff line number Diff line change
@@ -1,38 +1,38 @@

//TODO: To fix this one...
//Example of sending a message to an endpoint
curl --header "Content-Type: application/json" \
--request POST \
--data '{"username":"hello said","password":"this is a super long string where another thing is going to happen"}' \
'http://localhost:8000/api/v1/connector/f/my?apikey=ByAXhsNb
'http://localhost:8000/api/v1/connector/f/test?apikey=RcdouYvy'

// Connector API
// Get all Glue handler
//---------------- Endpoint API -------------------------
// Get all Endpoints
curl --header "Content-Type: application/json" \
--header "Authorization: Bearer 11111" \
--request GET \
http://localhost:8000/api/v1/connector

// Create glue handler
// Create an endpoint
curl --header "Content-Type: application/json" \
--header "Authorization: Bearer 11111" \
--request POST \
--data '{"id":"myEndpoint","topicName":"myTopic"}' \
http://localhost:8000/api/v1/connector

// Get Glue handler
// Get a single endpoint
curl --header "Content-Type: application/json" \
--header "Authorization: Bearer 11111" \
--request GET \
http://localhost:8000/api/v1/connector/id

// Delete Glue handler
// Delete an endpoint handler
curl --header "Content-Type: application/json" \
--header "Authorization: Bearer 11111" \
--request DELETE \
http://localhost:8000/api/v1/connector/id


// Event API
//---------------- Event API -------------------------
// Get all topics
curl --header "Content-Type: application/json" \
--header "Authorization: Bearer 11111" \
Expand Down Expand Up @@ -65,8 +65,7 @@ curl --header "Accept:text/event-stream" \
--request GET -N \
http://localhost:8000/api/v1/topic/here/consume


// Workflow API
//---------------- Workflow API -------------------------
// Get all workflows
curl --header "Content-Type: application/json" \
--header "Authorization: Bearer 11111" \
Expand Down
11 changes: 5 additions & 6 deletions web/static/webcomponents/pages/workflows/workflow/workflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ class WorkflowPage extends HTMLElement {
headers: {"Content-Type": "application/json",}
}).then(res => {
// Check response to see if it's bad
res.json().then((endpointResponseData) => {
console.log(endpointResponseData)
res.json().then((workflowResponseData) => {
window.location.replace("/workflows");
});
})
Expand All @@ -38,19 +37,19 @@ class WorkflowPage extends HTMLElement {
</li>
<li>
<p class="titler">Offset:</p>
<p>${workflow.Offset}</p>
<p>${workflow.offset}</p>
</li>
<li>
<p class="titler">Topic Name:</p>
<a class="link" href=${topic_link}>${endpoint.topicName}</a>
<a class="link" href=${topic_link}>${workflow.topicName}</a>
</li>
<li>
<p class="titler">Sink URL:</p>
<p>${workflow.SinkURL}</p>
<p>${workflow.sinkURL}</p>
</li>
<li>
<p class="titler">Enabled:</p>
<p>${workflow.Enabled}</p>
<p>${workflow.enabled}</p>
</li>
</ul>`
const div = fromHTML(workflow_markup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class WorkflowsHome extends HTMLElement {
return res.json();
}
throw new Error('Something went wrong');
}).then(glueResponseData => {
const element = this.generateEndpointCard(glueResponseData.data)
}).then(workflowResponse => {
const element = this.generateWorkflowCard(workflowResponse.data)
cardsContainer.prepend(element)
const successMessage = `<h1>Workflow Successfully Created</h1>`
const modal_element = fromHTML(successMessage);
Expand Down Expand Up @@ -65,7 +65,7 @@ class WorkflowsHome extends HTMLElement {
const modal_element = fromHTML(modalMarkup);
this.shawdow.append(modal_element)
const formComponent = this.shawdow.querySelector('#myForm');
formComponent.addEventListener('submit', this.newConnector.bind(this));
formComponent.addEventListener('submit', this.newWorkflow.bind(this));
};

generateWorkflowCard(workflow) {
Expand All @@ -74,13 +74,13 @@ class WorkflowsHome extends HTMLElement {
<h1><span class="titler">ID:</span> ${workflow.id}</h1>
<p><span class="titler">Topic Name:</span> ${workflow.topicName}</p>
<h2><span class="titler">Function Name:</span> ${workflow.functionName}</h2>
<a class="jobLink" href="${endpoint_link}"> View Endpoint Details </a>
<a class="jobLink" href="${workflow_link}"> View Workflow Details </a>
</div>`
const card_element = fromHTML(workflow_link);
const card_element = fromHTML(workflow_markup);
return card_element;
}

async generateEndpointsContainer () {
async generateWorkflowsContainer () {
const workflowResponse = await fetch('/api/v1/workflow');
const workflowResponseData = await workflowResponse.json();

Expand All @@ -100,8 +100,8 @@ class WorkflowsHome extends HTMLElement {
pageTitleElement.buttonFunction = this.newWorkflowAction.bind(this);
this.shawdow.appendChild(pageTitleElement)

const endpoint_card_container = await this.generateEndpointsContainer()
this.shawdow.appendChild(endpoint_card_container)
const workflow_card_container = await this.generateWorkflowsContainer()
this.shawdow.appendChild(workflow_card_container)
}
}

Expand Down
8 changes: 4 additions & 4 deletions web/workflowApi.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (wf *workflowAPI) getWorkflow(w http.ResponseWriter, r *http.Request) {
workflow, err := services.GetWorkflow(id)
if err != nil {
slog.Error(err.Error())
apiMessageResponse(w, http.StatusInternalServerError, "internal server error")
apiMessageResponse(w, http.StatusBadRequest, "incorrect request sent")
return
}
resJsonBytes, err := generateSuccessMessage(workflow)
Expand All @@ -99,13 +99,13 @@ func (wf *workflowAPI) createWorkflow(w http.ResponseWriter, r *http.Request) {
var createBody CreateWorkflowBody
if err := json.NewDecoder(r.Body).Decode(&createBody); err != nil {
slog.Error(err.Error())
apiMessageResponse(w, http.StatusInternalServerError, "internal server error")
apiMessageResponse(w, http.StatusBadRequest, "invalid body sent")
return
}
_, err := url.ParseRequestURI(createBody.SinkURL)
if err != nil {
slog.Error("invalid url")
apiMessageResponse(w, http.StatusInternalServerError, "internal server error")
apiMessageResponse(w, http.StatusBadRequest, "invalid sink url sent")
return
}
workflow, err := services.CreateWorkflow(createBody.TopicName, createBody.FunctionName, createBody.SinkURL)
Expand Down Expand Up @@ -163,7 +163,7 @@ func (wf *workflowAPI) deleteWorkflow(w http.ResponseWriter, r *http.Request) {
err = services.DeleteWorkflow(id)
if err != nil {
slog.Error(err.Error())
apiMessageResponse(w, http.StatusInternalServerError, "internal server error")
apiMessageResponse(w, http.StatusBadRequest, "internal server error")
return
}
c := map[string]interface{}{"success": "successfully deleted workflow"}
Expand Down

0 comments on commit 1eef609

Please sign in to comment.