Skip to content

Commit

Permalink
add new termination conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
pmurley committed Feb 25, 2019
1 parent db8831c commit cf0a757
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 47 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
SOFTWARE.
7 changes: 6 additions & 1 deletion build_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func BuildCompressedTaskSet(cmd *cobra.Command, args []string) (CompressedMIDATa
if err != nil {
return t, err
}
*t.Completion.TimeAfterLoad, err = cmd.Flags().GetInt("time-after-load")
if err != nil {
return t, err
}
*t.Completion.CompletionCondition, err = cmd.Flags().GetString("completion")
if err != nil {
return t, err
Expand Down Expand Up @@ -162,8 +166,9 @@ func InitializeCompressedTaskSet() CompressedMIDATaskSet {
Extensions: new([]string),
},
Completion: &CompletionSettings{
Timeout: new(int),
CompletionCondition: new(string),
Timeout: new(int),
TimeAfterLoad: new(int),
},
Data: &DataSettings{
AllResources: new(bool),
Expand Down
5 changes: 5 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func BuildCommands() *cobra.Command {
// Completion settings
completionCondition string
timeout int
timeAfterLoad int

// Output settings
resultsOutputPath string // Results from task path
Expand Down Expand Up @@ -67,6 +68,8 @@ func BuildCommands() *cobra.Command {
"Completion condition for tasks (CompleteOnTimeoutOnly, CompleteOnLoadEvent, CompleteOnTimeoutAfterLoad")
cmdBuild.Flags().IntVarP(&timeout, "timeout", "t", DefaultTimeout,
"Timeout (in seconds) after which the browser will close and the task will complete")
cmdBuild.Flags().IntVarP(&timeAfterLoad, "time-after-load", "", DefaultTimeAfterLoad,
"Time after load event to remain on page (overridden by timeout if reached first)")

cmdBuild.Flags().StringVarP(&resultsOutputPath, "results-output-path", "r", DefaultOutputPath,
"Path (local or remote) to store results in. A new directory will be created inside this one for each task.")
Expand Down Expand Up @@ -114,6 +117,8 @@ to crawl, using default parameters where not specified`,
"Completion condition for tasks (CompleteOnTimeoutOnly, CompleteOnLoadEvent, CompleteOnTimeoutAfterLoad")
cmdGo.Flags().IntVarP(&timeout, "timeout", "t", DefaultTimeout,
"Timeout (in seconds) after which the browser will close and the task will complete")
cmdGo.Flags().IntVarP(&timeAfterLoad, "time-after-load", "", DefaultTimeAfterLoad,
"Time after load event to remain on page (overridden by timeout if reached first)")

cmdGo.Flags().StringVarP(&resultsOutputPath, "results-output-path", "r", DefaultOutputPath,
"Path (local or remote) to store results in. A new directory will be created inside this one for each task.")
Expand Down
113 changes: 81 additions & 32 deletions crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"github.com/chromedp/cdproto/debugger"
"github.com/chromedp/cdproto/network"
"github.com/phayes/freeport"
Expand Down Expand Up @@ -53,15 +54,16 @@ func CrawlerInstance(sanitizedTaskChan <-chan SanitizedMIDATask, rawResultChan c
func ProcessSanitizedTask(st SanitizedMIDATask) (RawMIDAResult, error) {

rawResult := RawMIDAResult{
Requests: make(map[string][]network.EventRequestWillBeSent),
Responses: make(map[string][]network.EventResponseReceived),
Scripts: make(map[string]debugger.EventScriptParsed),
Requests: make(map[string][]network.EventRequestWillBeSent),
Responses: make(map[string][]network.EventResponseReceived),
Scripts: make(map[string]debugger.EventScriptParsed),
SanitizedTask: st,
}
var rawResultLock sync.Mutex // Should be used any time this object is updated

var requestMapLock sync.Mutex
var responseMapLock sync.Mutex
var scriptsMapLock sync.Mutex
navChan := make(chan error, 1)
timeoutChan := time.After(time.Duration(st.Timeout) * time.Second)
loadEventChan := make(chan bool, 5)

rawResultLock.Lock()
rawResult.Stats.Timing.BeginCrawl = time.Now()
Expand Down Expand Up @@ -171,6 +173,16 @@ func ProcessSanitizedTask(st SanitizedMIDATask) (RawMIDAResult, error) {
Log.Warn("Duplicate load event")
}
rawResultLock.Unlock()

var sendLoadEvent bool
rawResultLock.Lock()
if rawResult.SanitizedTask.CCond == CompleteOnTimeoutAfterLoad || rawResult.SanitizedTask.CCond == CompleteOnLoadEvent {
sendLoadEvent = true
}
rawResultLock.Unlock()
if sendLoadEvent {
loadEventChan <- true
}
}))
if err != nil {
Log.Fatal(err)
Expand Down Expand Up @@ -200,19 +212,19 @@ func ProcessSanitizedTask(st SanitizedMIDATask) (RawMIDAResult, error) {

err = c.Run(cxt, chromedp.CallbackFunc("Network.requestWillBeSent", func(param interface{}, handler *chromedp.TargetHandler) {
data := param.(*network.EventRequestWillBeSent)
requestMapLock.Lock()
rawResultLock.Lock()
rawResult.Requests[data.RequestID.String()] = append(rawResult.Requests[data.RequestID.String()], *data)
requestMapLock.Unlock()
rawResultLock.Unlock()
}))
if err != nil {
Log.Fatal(err)
}

err = c.Run(cxt, chromedp.CallbackFunc("Network.responseReceived", func(param interface{}, handler *chromedp.TargetHandler) {
data := param.(*network.EventResponseReceived)
responseMapLock.Lock()
rawResultLock.Lock()
rawResult.Responses[data.RequestID.String()] = append(rawResult.Responses[data.RequestID.String()], *data)
responseMapLock.Unlock()
rawResultLock.Unlock()
}))
if err != nil {
Log.Fatal(err)
Expand Down Expand Up @@ -250,9 +262,9 @@ func ProcessSanitizedTask(st SanitizedMIDATask) (RawMIDAResult, error) {

err = c.Run(cxt, chromedp.CallbackFunc("Debugger.scriptParsed", func(param interface{}, handler *chromedp.TargetHandler) {
data := param.(*debugger.EventScriptParsed)
scriptsMapLock.Lock()
rawResultLock.Lock()
rawResult.Scripts[data.ScriptID.String()] = *data
scriptsMapLock.Unlock()
rawResultLock.Unlock()
if st.AllScripts {
source, err := debugger.GetScriptSource(data.ScriptID).Do(cxt, handler)
if err != nil && err.Error() != "context canceled" {
Expand All @@ -273,44 +285,81 @@ func ProcessSanitizedTask(st SanitizedMIDATask) (RawMIDAResult, error) {
Log.Fatal(err)
}

// Navigate to specified URL, timing out if no connection to the site
// can be made
navChan := make(chan error, 1)
// Below is the MIDA navigation logic. Since MIDA offers several different
// termination conditions (Terminate on timout, terminate on load event, etc.),
// this logic gets a little complex.
go func() {
navChan <- c.Run(cxt, chromedp.Navigate(st.Url))
}()
select {
case err = <-navChan:
Log.Debug("Connection Established")
rawResult.Stats.Timing.ConnectionEstablished = time.Now()
case <-time.After(DefaultNavTimeout * time.Second):
Log.Warn("Navigation timeout")
// TODO: Handle navigation errors, build a corresponding result, etc.
// This usually happens because we successfully resolved DNS,
// but we could not connect to the server
err = errors.New("nav timeout during connection to site")
case <-timeoutChan:
// Timeout is set shorter than DefaultNavTimeout, so we are just done
err = errors.New("full timeout during connection to site")
}
if err != nil {
if err.Error() == "net::ERR_NAME_NOT_RESOLVED" {
Log.Warn("DNS did not resolve")
} else if err.Error() == "net::ERR_INVALID_HTTP_RESPONSE" {
Log.Warn("Received invalid HTTP response")
} else {
Log.Warn("Unknown navigation error: ", err.Error())
// We failed to connect to the site. Shut things down.
rawResultLock.Lock()
rawResult.SanitizedTask.TaskFailed = true
rawResult.SanitizedTask.FailureCode = err.Error()
rawResultLock.Unlock()

err = c.Shutdown(cxt)
if err != nil {
Log.Fatal("Browser Shutdown Failed: ", err)
}
}

// Wait for specified termination condition. This logic is dependent on
// the completion condition specified in the task.
err = c.Run(cxt, chromedp.Sleep(time.Duration(st.Timeout)*time.Second))
if err != nil {
Log.Fatal(err)
rawResultLock.Lock()
rawResult.Stats.Timing.BrowserClose = time.Now()
rawResult.Stats.Timing.EndCrawl = time.Now()
rawResultLock.Unlock()

return rawResult, nil

} else {
// We successfully connected to the site. At this point, we WILL gather results.
// Wait for our termination condition.
select {
// This will only arrive if we are using a completion condition that requires load events
case <-loadEventChan:
var ccond CompletionCondition
var timeAfterLoad int
rawResultLock.Lock()
ccond = rawResult.SanitizedTask.CCond
timeAfterLoad = rawResult.SanitizedTask.TimeAfterLoad
rawResultLock.Unlock()
if ccond == CompleteOnTimeoutAfterLoad {
select {
case <-timeoutChan:
// We did not make it to the TimeAfterLoad. Too Bad.
case <-time.After(time.Duration(timeAfterLoad) * time.Second):
// We made it to TimeAfterLoad. Nothing else to wait on.
}
} else if ccond == CompleteOnLoadEvent {
// Do nothing here -- The load event happened already, we are done
} else if ccond == CompleteOnTimeoutOnly {
Log.Error("Unexpectedly received load event through channel on TimeoutOnly crawl")
}
case <-timeoutChan:
// Overall timeout, shut down now
}
}

// Clean up
err = c.Shutdown(cxt)
if err != nil {
Log.Fatal("Client Shutdown:", err)
Log.Fatal("Browser Shutdown Failed: ", err)
}
rawResult.Stats.Timing.BrowserClose = time.Now()

rawResultLock.Lock()
rawResult.Stats.Timing.BrowserClose = time.Now()
rawResult.Stats.Timing.EndCrawl = time.Now()
rawResultLock.Unlock()

return rawResult, nil

Expand Down
3 changes: 2 additions & 1 deletion default.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
DefaultLinuxChromePath = "/usr/bin/google-chrome-stable"
DefaultLinuxChromiumPath = "/usr/bin/chromium-browser"
DefaultBrowserLogFileName = "browser.log"
DefaultProtocolPrefix = "http://"

// Output Parameters
DefaultOutputPath = "results"
Expand All @@ -49,7 +50,7 @@ const (
DefaultGroupID = "default"

// Task completion
DefaultProtocolPrefix = "http://"
DefaultTimeAfterLoad = 0
DefaultTimeout = 5 // Default time (in seconds) to remain on a page before exiting browser
DefaultCompletionCondition = CompleteOnTimeoutOnly

Expand Down
3 changes: 3 additions & 0 deletions result.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"github.com/chromedp/cdproto/network"
)

// The metadata for a single resource. May contain multiple requests
// and multiple responses, so they are each given as arrays. In general,
// they will usually (but not always) both have a length of 1.
type Resource struct {
Requests []network.EventRequestWillBeSent `json:"requests"`
Responses []network.EventResponseReceived `json:"responses"`
Expand Down
13 changes: 10 additions & 3 deletions sanitize.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,22 @@ func SanitizeTask(t MIDATask) (SanitizedMIDATask, error) {
if t.Completion.Timeout == nil && st.CCond != CompleteOnLoadEvent {
Log.Debug("No timeout value given in task. Setting to default value of ", DefaultTimeout)
st.Timeout = DefaultTimeout
} else if t.Completion.Timeout != nil && st.CCond == CompleteOnLoadEvent {
Log.Warn("Task timeout value ignored due to CompleteOnLoadEvent")
st.Timeout = 0
} else if *t.Completion.Timeout < 0 {
return st, errors.New("invalid negative value for task timeout")
} else {
st.Timeout = *t.Completion.Timeout
}

if t.Completion.TimeAfterLoad == nil {
if st.CCond == CompleteOnTimeoutAfterLoad {
return st, errors.New("TimeoutAfterLoad specified but no value given")
}
} else if *t.Completion.TimeAfterLoad < 0 {
return st, errors.New("invalid value for TimeoutAfterLoad")
} else {
st.TimeAfterLoad = *t.Completion.TimeAfterLoad
}

///// END SANITIZE TASK COMPLETION SETTINGS /////
///// BEGIN SANITIZE BROWSER PARAMETERS /////

Expand Down
3 changes: 1 addition & 2 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func StoreResults(finalResultChan <-chan FinalMIDAResult, monitoringChan chan<-
if r.SanitizedTask.CurrentAttempt >= r.SanitizedTask.MaxAttempts {
// We are abandoning trying this task. Too bad.
Log.Error("Task failed after ", r.SanitizedTask.MaxAttempts, " attempts.")

} else {
// "Squash" task results and put the task back at the beginning of the pipeline
Log.Debug("Retrying task...")
Expand All @@ -62,7 +61,7 @@ func StoreResults(finalResultChan <-chan FinalMIDAResult, monitoringChan chan<-
r.Stats.Timing.EndStorage = time.Now()

// Send stats to Prometheus
if viper.GetBool("EnableMonitoring") {
if viper.GetBool("monitoring") {
r.Stats.Timing.EndStorage = time.Now()
monitoringChan <- r.Stats
}
Expand Down
17 changes: 10 additions & 7 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type BrowserSettings struct {
type CompletionSettings struct {
CompletionCondition *string `json:"completion_condition"`
Timeout *int `json:"timeout"`
TimeAfterLoad *int `json:"time_after_load"`
}

type DataSettings struct {
Expand Down Expand Up @@ -73,8 +74,9 @@ type SanitizedMIDATask struct {
BrowserFlags []runner.CommandLineOption

// Completion Settings
CCond CompletionCondition
Timeout int
CCond CompletionCondition
Timeout int
TimeAfterLoad int

// Data settings
AllResources bool
Expand All @@ -89,10 +91,11 @@ type SanitizedMIDATask struct {
RandomIdentifier string // Randomly generated task identifier

// Parameters for retrying a task if it fails to complete
MaxAttempts int
CurrentAttempt int
TaskFailed bool // Nothing else should be done on the task once this flag is set
FailureCode string // Should be appended whenever a task is set to fail
MaxAttempts int
CurrentAttempt int
TaskFailed bool // Nothing else should be done on the task once this flag is set
FailureCode string // Should be appended whenever a task is set to fail
PastFailureCodes []string
}

// Reads in a single task or task list from a byte array
Expand Down Expand Up @@ -172,7 +175,6 @@ func TaskIntake(rtc chan<- MIDATask, cmd *cobra.Command, args []string) {
Log.Fatal(err)
}

// Put raw tasks in the channel
for _, rt := range rawTasks {
rtc <- rt
}
Expand All @@ -181,6 +183,7 @@ func TaskIntake(rtc chan<- MIDATask, cmd *cobra.Command, args []string) {
if err != nil {
Log.Fatal(err)
}

rawTasks := ExpandCompressedTaskSet(compressedTaskSet)
for _, rt := range rawTasks {
rtc <- rt
Expand Down

0 comments on commit cf0a757

Please sign in to comment.