Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-13139] Request times out in some cases when sending input via http inputs #6592

Conversation

buraksezer
Copy link
Contributor

@buraksezer buraksezer commented Sep 27, 2024

User description

PR for https://tyktech.atlassian.net/browse/TT-13139

  • defaultStreamManager has removed, I used Gorilla's muxer directly instead of creating a default stream manager at the beginning
  • I managed to remove some code because defaultStreamManager is gone.
  • ProcessRequest method has simplified and split into different methods for readability.
  • Tyk Streams not creating a new Bento stream if you publish messages via the HTTP endpoint. It iterates over the existing streams and hands over the request to Bento.
  • Added two new integration tests

PR Type

Enhancement, Tests


Description

  • Removed the defaultStreamManager and refactored the streaming middleware to use Gorilla's muxer directly, simplifying the codebase.
  • Enhanced the ProcessRequest method by splitting it into smaller, more manageable functions for better readability and maintainability.
  • Introduced new methods such as registerHandlers, inputHttpServerPublishHandler, and subscriptionHandler to improve HTTP request handling.
  • Added integration tests to verify the functionality of HTTP server input handling, ensuring that messages are correctly received by WebSocket clients.

Changes walkthrough 📝

Relevant files
Enhancement
mw_streaming.go
Refactor streaming middleware and enhance HTTP handling   

gateway/mw_streaming.go

  • Removed the defaultStreamManager and used Gorilla's muxer directly.
  • Simplified and refactored ProcessRequest method.
  • Added new methods registerHandlers, inputHttpServerPublishHandler, and
    subscriptionHandler.
  • Introduced dummyResponseWriter for handling HTTP responses.
  • +191/-82
    Tests
    mw_streaming_test.go
    Add integration tests for HTTP server input handling         

    gateway/mw_streaming_test.go

  • Added integration tests for HTTP server input handling.
  • Introduced new test templates for HTTP server input.
  • Verified WebSocket message reception for single and multiple clients.
  • +119/-1 

    💡 PR-Agent usage: Comment /help "your question" on any pull request to receive relevant information

    Copy link
    Contributor

    PR Reviewer Guide 🔍

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Key issues to review

    Error Handling
    The error handling in inputHttpServerPublishHandler does not properly handle the case where copyBody fails. It should return an HTTP error response immediately instead of continuing to iterate over the streams.

    Concurrency Concerns
    The streamManagers.Range function in inputHttpServerPublishHandler iterates over stream managers and modifies requests concurrently. This could lead to race conditions or inconsistent states if not handled properly.

    Resource Leak
    In inputHttpServerPublishHandler, the body of the request is cloned but not closed in some paths, potentially leading to resource leaks.

    Missing Error Handling
    The function getRouteMatch in StreamingMiddleware does not handle the case where muxer.Match might return an error. This could lead to unhandled errors during route matching.

    Copy link
    Contributor

    github-actions bot commented Sep 27, 2024

    API Changes

    --- prev.txt	2024-10-01 14:03:30.211762176 +0000
    +++ current.txt	2024-10-01 14:03:24.067722220 +0000
    @@ -7563,6 +7563,11 @@
     	ErrOAuthClientDeleted               = "oauth.client_deleted"
     )
     const (
    +
    +	// ExtensionTykStreaming is the oas extension for tyk streaming
    +	ExtensionTykStreaming = "x-tyk-streaming"
    +)
    +const (
     	ResetQuota              string = "resetQuota"
     	CertificateRemoved      string = "CertificateRemoved"
     	CertificateAdded        string = "CertificateAdded"
    @@ -7614,10 +7619,6 @@
         The name for event handlers as defined in the API Definition JSON/BSON
         format
     
    -const (
    -	// ExtensionTykStreaming is the oas extension for tyk streaming
    -	ExtensionTykStreaming = "x-tyk-streaming"
    -)
     const ListDetailed = "detailed"
     const LoopScheme = "tyk"
     const OIDPREFIX = "openid"

    Copy link
    Contributor

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Score
    Possible bug
    Handle missing keys in streamConfig to prevent runtime errors

    Consider handling the case where streamConfig does not contain an 'input' or
    'http_server' key to avoid runtime panics.

    gateway/mw_streaming.go [210-218]

     if componentMap, ok := streamConfig["input"].(map[string]interface{}); ok {
         if httpServerConfig, ok := componentMap["http_server"].(map[string]interface{}); ok {
             if val, ok := httpServerConfig["path"].(string); ok {
                 return val
             }
         }
     }
    +return defaultHTTPServerInputPath // Handle missing keys gracefully
    Suggestion importance[1-10]: 8

    Why: Handling cases where streamConfig lacks expected keys prevents runtime panics, which is crucial for maintaining application stability. This suggestion addresses a potential bug and improves code robustness.

    8
    Add error handling for route matching failures to improve robustness

    Implement error handling for the router.Match method to manage cases where the route
    matching fails.

    gateway/mw_streaming.go [339-340]

     if !s.router.Match(strippedPathRequest, routeMatch) {
    -    return nil, http.StatusOK
    +    return fmt.Errorf("failed to match route"), http.StatusInternalServerError
     }
    Suggestion importance[1-10]: 8

    Why: Implementing error handling for route matching failures enhances the robustness of the code by ensuring that unexpected conditions are managed gracefully, preventing potential silent failures.

    8
    Enhance the reliability of request body copying by handling errors and nil bodies

    Use a more robust method for copying the request body to handle potential errors and
    ensure the body is not empty.

    gateway/mw_streaming.go [361-363]

     body, err = copyBody(r.Body, true)
    -if err != nil {
    -    return false // break
    +if err != nil || body == nil {
    +    return false // break, handle nil body or error
     }
    Suggestion importance[1-10]: 7

    Why: Improving the method for copying the request body to handle errors and nil bodies increases the reliability of the code, preventing potential issues during request processing. This suggestion addresses a possible bug and enhances code robustness.

    7
    Security
    Add validation for the httpServerInputPath to enhance security and reliability

    Ensure that the httpServerInputPath is properly validated before usage to prevent
    potential security risks or runtime errors.

    gateway/mw_streaming.go [308]

    -httpServerInputPath: findHTTPServerInputPath(config),
    +httpServerInputPath: validateHTTPServerInputPath(findHTTPServerInputPath(config)),
    Suggestion importance[1-10]: 7

    Why: Adding validation for httpServerInputPath can prevent potential security risks or runtime errors, enhancing the robustness and security of the code. This suggestion is relevant and improves the reliability of the system.

    7

    💡 Need additional feedback ? start a PR chat

    Copy link
    Member

    @buger buger left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    @buraksezer if there is no defaultStreamManager how it would support background jobs? You have to create default streamManager if theare no http_server entries in any case. E.g. you have a input as kafka, and output as webhook.

    Also you can't depend on findHTTPServerInputPath, you should use GetHTTPPaths, because findHTTPServerInputPath does not consider broker. Also it check only for input but it should work with output too.

    @buraksezer
    Copy link
    Contributor Author

    @buraksezer if there is no defaultStreamManager how it would support background jobs? You have to create default streamManager if theare no http_server entries in any case. E.g. you have a input as kafka, and output as webhook.

    I tried to fix this by calling s.createStreamManager(nil) in Init method. Could you please review it again?

    Also you can't depend on findHTTPServerInputPath, you should use GetHTTPPaths, because findHTTPServerInputPath does not consider broker. Also it check only for input but it should work with output too.

    I understood the issue but I'm not sure how to extract path from the return value of GetHTTPPaths. I thought I only need the configuration variable named path of the input section.

    Here it is: https://warpstreamlabs.github.io/bento/docs/components/inputs/http_server/#path

    @buraksezer
    Copy link
    Contributor Author

    @buger

    Also you can't depend on findHTTPServerInputPath, you should use GetHTTPPaths, because findHTTPServerInputPath does not consider broker. Also it check only for input but it should work with output too.

    I would like to extend the previous comment.inputHttpServerPublishHandler only intended in the case of HTTP Server is configured as input. So findHTTPServerInputPath only tries to find this configuration element: https://warpstreamlabs.github.io/bento/docs/components/inputs/http_server/#path

    Copy link

    sonarcloud bot commented Oct 1, 2024

    Quality Gate Failed Quality Gate failed

    Failed conditions
    C Reliability Rating on New Code (required ≥ A)

    See analysis details on SonarCloud

    Catch issues before they fail your Quality Gate with our IDE extension SonarLint

    @buraksezer buraksezer closed this Oct 3, 2024
    buraksezer added a commit that referenced this pull request Oct 9, 2024
    …p inputs (#6601)
    
    ### **User description**
    <details open>
    <summary><a href="https://tyktech.atlassian.net/browse/TT-13139"
    title="TT-13139" target="_blank">TT-13139</a></summary>
      <br />
      <table>
        <tr>
          <th>Summary</th>
    <td>Request times out in some cases when sending input via http
    inputs</td>
        </tr>
        <tr>
          <th>Type</th>
          <td>
    <img alt="Bug"
    src="https://tyktech.atlassian.net/rest/api/2/universal_avatar/view/type/issuetype/avatar/10303?size=medium"
    />
            Bug
          </td>
        </tr>
        <tr>
          <th>Status</th>
          <td>In Dev</td>
        </tr>
        <tr>
          <th>Points</th>
          <td>N/A</td>
        </tr>
        <tr>
          <th>Labels</th>
          <td>-</td>
        </tr>
      </table>
    </details>
    <!--
      do not remove this marker as it will break jira-lint's functionality.
      added_by_jira_lint
    -->
    
    ---
    
    Cherry-picked stream caching feature from this branch:
    #6538
    
    Two new integration tests have been added to test `input http -> output
    http` scenario. See this issue for the details:
    https://tyktech.atlassian.net/browse/TT-13139
    
    Closing the previous one:
    #6592
    
    
    ___
    
    ### **PR Type**
    Enhancement, Tests
    
    
    ___
    
    ### **Description**
    - Implemented stream caching and garbage collection in the
    `StreamingMiddleware` to manage inactive streams and improve
    performance.
    - Added new fields and methods to handle stream activity and caching
    efficiently.
    - Introduced a garbage collection routine to periodically clean up
    inactive stream managers.
    - Added integration tests for single and multiple client scenarios,
    focusing on HTTP server input and WebSocket output.
    - Verified message distribution and handling in the new tests to ensure
    correct functionality.
    
    
    ___
    
    
    
    ### **Changes walkthrough** 📝
    <table><thead><tr><th></th><th align="left">Relevant
    files</th></tr></thead><tbody><tr><td><strong>Enhancement</strong></td><td><table>
    <tr>
      <td>
        <details>
    <summary><strong>mw_streaming.go</strong><dd><code>Implement stream
    caching and garbage collection in
    StreamingMiddleware</code></dd></summary>
    <hr>
    
    gateway/mw_streaming.go
    
    <li>Introduced stream caching and garbage collection for inactive
    streams.<br> <li> Added new fields to manage stream activity and
    cache.<br> <li> Implemented a garbage collection routine for stream
    managers.<br> <li> Updated stream manager creation to utilize caching.
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6601/files#diff-6f565750150d990575c808f1ca8f38483160dc6edf05f1534cd0bedb27c2e6c8">+98/-20</a>&nbsp;
    </td>
    
    </tr>                    
    </table></td></tr><tr><td><strong>Tests</strong></td><td><table>
    <tr>
      <td>
        <details>
    <summary><strong>mw_streaming_test.go</strong><dd><code>Add integration
    tests for HTTP server streaming scenarios</code></dd></summary>
    <hr>
    
    gateway/mw_streaming_test.go
    
    <li>Added tests for single and multiple client streaming scenarios.<br>
    <li> Implemented test for HTTP server input and WebSocket output.<br>
    <li> Verified message distribution and handling in tests.
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6601/files#diff-a0d1bd0196a741537a3c850e340225c8993e49d709c838af0f1b48b9893af1da">+137/-0</a>&nbsp;
    </td>
    
    </tr>                    
    </table></td></tr></tr></tbody></table>
    
    ___
    
    > 💡 **PR-Agent usage**: Comment `/help "your question"` on any pull
    request to receive relevant information
    
    ---------
    
    Co-authored-by: Leonid Bugaev <[email protected]>
    titpetric pushed a commit that referenced this pull request Oct 9, 2024
    …p inputs (#6601)
    
    ### **User description**
    <details open>
    <summary><a href="https://tyktech.atlassian.net/browse/TT-13139"
    title="TT-13139" target="_blank">TT-13139</a></summary>
      <br />
      <table>
        <tr>
          <th>Summary</th>
    <td>Request times out in some cases when sending input via http
    inputs</td>
        </tr>
        <tr>
          <th>Type</th>
          <td>
    <img alt="Bug"
    src="https://tyktech.atlassian.net/rest/api/2/universal_avatar/view/type/issuetype/avatar/10303?size=medium"
    />
            Bug
          </td>
        </tr>
        <tr>
          <th>Status</th>
          <td>In Dev</td>
        </tr>
        <tr>
          <th>Points</th>
          <td>N/A</td>
        </tr>
        <tr>
          <th>Labels</th>
          <td>-</td>
        </tr>
      </table>
    </details>
    <!--
      do not remove this marker as it will break jira-lint's functionality.
      added_by_jira_lint
    -->
    
    ---
    
    Cherry-picked stream caching feature from this branch:
    #6538
    
    Two new integration tests have been added to test `input http -> output
    http` scenario. See this issue for the details:
    https://tyktech.atlassian.net/browse/TT-13139
    
    Closing the previous one:
    #6592
    
    
    ___
    
    ### **PR Type**
    Enhancement, Tests
    
    
    ___
    
    ### **Description**
    - Implemented stream caching and garbage collection in the
    `StreamingMiddleware` to manage inactive streams and improve
    performance.
    - Added new fields and methods to handle stream activity and caching
    efficiently.
    - Introduced a garbage collection routine to periodically clean up
    inactive stream managers.
    - Added integration tests for single and multiple client scenarios,
    focusing on HTTP server input and WebSocket output.
    - Verified message distribution and handling in the new tests to ensure
    correct functionality.
    
    
    ___
    
    
    
    ### **Changes walkthrough** 📝
    <table><thead><tr><th></th><th align="left">Relevant
    files</th></tr></thead><tbody><tr><td><strong>Enhancement</strong></td><td><table>
    <tr>
      <td>
        <details>
    <summary><strong>mw_streaming.go</strong><dd><code>Implement stream
    caching and garbage collection in
    StreamingMiddleware</code></dd></summary>
    <hr>
    
    gateway/mw_streaming.go
    
    <li>Introduced stream caching and garbage collection for inactive
    streams.<br> <li> Added new fields to manage stream activity and
    cache.<br> <li> Implemented a garbage collection routine for stream
    managers.<br> <li> Updated stream manager creation to utilize caching.
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6601/files#diff-6f565750150d990575c808f1ca8f38483160dc6edf05f1534cd0bedb27c2e6c8">+98/-20</a>&nbsp;
    </td>
    
    </tr>                    
    </table></td></tr><tr><td><strong>Tests</strong></td><td><table>
    <tr>
      <td>
        <details>
    <summary><strong>mw_streaming_test.go</strong><dd><code>Add integration
    tests for HTTP server streaming scenarios</code></dd></summary>
    <hr>
    
    gateway/mw_streaming_test.go
    
    <li>Added tests for single and multiple client streaming scenarios.<br>
    <li> Implemented test for HTTP server input and WebSocket output.<br>
    <li> Verified message distribution and handling in tests.
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6601/files#diff-a0d1bd0196a741537a3c850e340225c8993e49d709c838af0f1b48b9893af1da">+137/-0</a>&nbsp;
    </td>
    
    </tr>                    
    </table></td></tr></tr></tbody></table>
    
    ___
    
    > 💡 **PR-Agent usage**: Comment `/help "your question"` on any pull
    request to receive relevant information
    
    ---------
    
    Co-authored-by: Leonid Bugaev <[email protected]>
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    3 participants