diff --git a/.github/workflows/ci-test.yml b/.github/workflows/ci-test.yml index 0ef91de0..760bbfac 100644 --- a/.github/workflows/ci-test.yml +++ b/.github/workflows/ci-test.yml @@ -73,7 +73,7 @@ jobs: fail-fast: false max-parallel: 1 matrix: - go_version: ['1.11', '1.12', '1.13', '1.14', '1.15', '1.16', '1.17', '1.18', '1.19', '1.20', '1.21', '1.22'] + go_version: ['1.11', '1.12', '1.13', '1.14', '1.15', '1.16', '1.17', '1.18', '1.19', '1.20', '1.21', '1.22', 'stable'] runs-on: ubuntu-latest permissions: actions: read @@ -91,14 +91,14 @@ jobs: go-version: ${{ matrix.go_version }} - name: Format run: | - if [ "${{ matrix.go_version }}" = "1.22" ]; then + if [ "${{ matrix.go_version }}" = "stable" ]; then if [ "$(gofmt -s -l . | wc -l)" -gt 0 ]; then exit 1 fi fi - name: Golint run: | - if [ "${{ matrix.go_version }}" = "1.22" ]; then + if [ "${{ matrix.go_version }}" = "stable" ]; then set -e go install honnef.co/go/tools/cmd/staticcheck@latest make staticcheck @@ -106,9 +106,6 @@ jobs: - name: Run unit cases run: | set -e - if [ "${{ matrix.go_version }}" = "1.11" ] || [ "${{ matrix.go_version }}" = "1.12" ] || [ "${{ matrix.go_version }}" = "1.13" ] || [ "${{ matrix.go_version }}" = "1.14" ] || [ "${{ matrix.go_version }}" = "1.15" ]; then - go get modernc.org/fileutil@v1.0.0 - fi make unittest env: GO111MODULE: 'on' diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d2e1301..9f023d88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## 7.24.0 +* 新增 + * 支持工作流模版 + * 在 [storage](github.com/qiniu/go-sdk/v7/storage) 包中支持上传加速 +* 优化 + * 超时连接重试机制 + * 域名解析请求机制 + * 域名解析缓存过期机制 + ## 7.23.0 * 新增 * 支持上传加速 diff --git a/Makefile b/Makefile index 9c6bdab1..ee18a8f6 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,11 @@ test: - go test -tags='unit integration' -failfast -v -timeout 350m -coverprofile=coverage.txt `go list ./... | egrep -v 'examples|sms'` + go test -tags='unit integration' -failfast -count=1 -v -timeout 350m -coverprofile=coverage.txt `go list ./... | egrep -v 'examples|sms'` unittest: - go test -tags=unit -failfast -v -coverprofile=coverage.txt `go list ./... | egrep -v 'examples|sms'` + go test -tags=unit -failfast -count=1 -v -coverprofile=coverage.txt `go list ./... | egrep -v 'examples|sms'` integrationtest: - go test -tags=integration -failfast -parallel 1 -v -coverprofile=coverage.txt `go list ./... | egrep -v 'examples|sms'` + go test -tags=integration -failfast -count=1 -parallel 1 -v -coverprofile=coverage.txt `go list ./... | egrep -v 'examples|sms'` staticcheck: staticcheck `go list ./... | egrep -v 'examples|sms'` diff --git a/README.md b/README.md index 1995a825..b5c204b8 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ github.com/qiniu/go-sdk 在您的项目中的 `go.mod` 文件内添加这行代码 ``` -require github.com/qiniu/go-sdk/v7 v7.23.0 +require github.com/qiniu/go-sdk/v7 v7.24.0 ``` 并且在项目中使用 `"github.com/qiniu/go-sdk/v7"` 引用 Qiniu Go SDK。 diff --git a/api-specs b/api-specs index 02461c7d..2f1f9934 160000 --- a/api-specs +++ b/api-specs @@ -1 +1 @@ -Subproject commit 02461c7d1eef0e76837a14b842c11a4a0d08a4c0 +Subproject commit 2f1f993428521f1065067e6bc0ad148a477f5dc8 diff --git a/audit/apis/query_log/api.go b/audit/apis/query_log/api.go index e2ae5762..bf5c05a4 100644 --- a/audit/apis/query_log/api.go +++ b/audit/apis/query_log/api.go @@ -208,6 +208,7 @@ func (j *LogInfo) validate() error { // 返回的审计日志列表 type LogInfos = []LogInfo +type QueryLogResp = Response type jsonResponse struct { NextMark string `json:"next_mark,omitempty"` // 用于请求下一页检索的结果 AuditLogInfos LogInfos `json:"audit_log_infos"` // 日志集合 diff --git a/client/dialer.go b/client/dialer.go index 328ff026..56c88295 100644 --- a/client/dialer.go +++ b/client/dialer.go @@ -4,6 +4,8 @@ import ( "context" "net" "time" + + "github.com/qiniu/go-sdk/v7/internal/dialer" ) type ( @@ -31,20 +33,7 @@ func defaultDialFunc(ctx context.Context, network string, address string) (net.C keepAliveInterval = 15 * time.Second } if resolved, ok := ctx.Value(resolverContextKey{}).(resolverContextValue); ok && len(resolved.ips) > 0 && resolved.domain == host { - dialTimeout = dialTimeout / time.Duration(len(resolved.ips)) - if dialTimeout < 3*time.Second { - dialTimeout = 3 * time.Second - } - dialer := net.Dialer{Timeout: dialTimeout, KeepAlive: keepAliveInterval} - for _, ip := range resolved.ips { - newAddr := ip.String() - if port != "" { - newAddr = net.JoinHostPort(newAddr, port) - } - if conn, err := dialer.DialContext(ctx, network, newAddr); err == nil { - return conn, nil - } - } + return dialer.DialContext(ctx, network, resolved.ips, port, dialer.DialOptions{Timeout: dialTimeout, KeepAlive: keepAliveInterval}) } return (&net.Dialer{Timeout: dialTimeout, KeepAlive: keepAliveInterval}).DialContext(ctx, network, address) } diff --git a/conf/conf.go b/conf/conf.go index 2f90b7ce..d8a39c1d 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -4,7 +4,7 @@ import ( "github.com/qiniu/go-sdk/v7/internal/env" ) -const Version = "7.23.0" +const Version = "7.24.0" const ( CONTENT_TYPE_JSON = "application/json" diff --git a/go.mod b/go.mod index 05741f50..c16cc750 100644 --- a/go.mod +++ b/go.mod @@ -6,26 +6,22 @@ require ( github.com/BurntSushi/toml v1.3.2 github.com/alex-ant/gomath v0.0.0-20160516115720-89013a210a82 github.com/dave/jennifer v1.6.1 - github.com/elastic/go-sysinfo v1.0.2 // indirect - github.com/gammazero/toposort v0.1.1 // indirect + github.com/elastic/go-sysinfo v1.0.2 + github.com/gammazero/toposort v0.1.1 github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/validator/v10 v10.7.0 github.com/gofrs/flock v0.8.1 - github.com/gorilla/mux v1.8.1 // indirect + github.com/gorilla/mux v1.8.1 github.com/iancoleman/strcase v0.3.0 - github.com/jessevdk/go-flags v1.4.0 // indirect + github.com/jessevdk/go-flags v1.4.0 github.com/kr/pretty v0.3.0 // indirect github.com/leodido/go-urn v1.2.1 // indirect - github.com/matishsiao/goInfo v0.0.0-20210923090445-da2e3fa8d45f github.com/qiniu/dyn v1.3.0 github.com/rogpeppe/go-internal v1.8.0 // indirect github.com/stretchr/testify v1.6.1 - github.com/yuin/goldmark v1.4.13 // indirect - golang.org/x/mod v0.6.0-dev // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 - golang.org/x/sys v0.0.0-20190425145619-16072639606e // indirect golang.org/x/text v0.3.7 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b - modernc.org/fileutil v1.3.0 + modernc.org/fileutil v1.0.0 ) diff --git a/go.sum b/go.sum index 30b3d6c4..fd7a7680 100644 --- a/go.sum +++ b/go.sum @@ -7,8 +7,6 @@ github.com/dave/jennifer v1.6.1 h1:T4T/67t6RAA5AIV6+NP8Uk/BIsXgDoqEowgycdQQLuk= github.com/dave/jennifer v1.6.1/go.mod h1:nXbxhEmQfOZhWml3D1cDK5M1FLnMSozpbFN/m3RmGZc= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/elastic/go-sysinfo v1.0.2 h1:Wq1bOgnSz7Obl7DbMjbn0tzx1bE5G8Cfy3MVFa6C1Cc= github.com/elastic/go-sysinfo v1.0.2/go.mod h1:O/D5m1VpYLwGjCYzEt63g3Z1uO3jXfwyzzjiW90t8cY= github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY= @@ -25,18 +23,16 @@ github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/j github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/validator/v10 v10.7.0 h1:gLi5ajTBBheLNt0ctewgq7eolXoDALQd5/y90Hh9ZgM= github.com/go-playground/validator/v10 v10.7.0/go.mod h1:xm76BBt941f7yWdGnI2DVPFFg1UK3YY04qifoXU3lOk= -github.com/go-playground/validator/v10 v10.8.0 h1:1kAa0fCrnpv+QYdkdcRzrRM7AyYs5o8+jZdJCz9xj6k= -github.com/go-playground/validator/v10 v10.8.0/go.mod h1:9JhgTzTaE31GZDpH/HSvHiRJrJ3iKAgqqH0Bl/Ocjdk= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= -github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4= -github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4= +github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -48,8 +44,6 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= -github.com/matishsiao/goInfo v0.0.0-20210923090445-da2e3fa8d45f h1:B0OD7nYl2FPQEVrw8g2uyc1lGEzNbvrKh7fspGZcbvY= -github.com/matishsiao/goInfo v0.0.0-20210923090445-da2e3fa8d45f/go.mod h1:aEt7p9Rvh67BYApmZwNDPpgircTO2kgdmDUoF/1QmwA= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -61,7 +55,6 @@ github.com/qiniu/dyn v1.3.0 h1:s+xPTeV0H8yikgM4ZMBc7Rrefam8UNI3asBlkaOQg5o= github.com/qiniu/dyn v1.3.0/go.mod h1:E8oERcm8TtwJiZvkQPbcAh0RL8jO1G0VXJMW3FAWdkk= github.com/qiniu/x v1.10.5 h1:7V/CYWEmo9axJULvrJN6sMYh2FdY+esN5h8jwDkA4b0= github.com/qiniu/x v1.10.5/go.mod h1:03Ni9tj+N2h2aKnAz+6N0Xfl8FwMEDRC2PAlxekASDs= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= @@ -70,56 +63,23 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= -golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= -golang.org/x/mod v0.6.0-dev/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190425145619-16072639606e h1:4ktJgTV34+N3qOZUc5fAaG3Pb11qzMm3PkAoTAgUZ2I= golang.org/x/sys v0.0.0-20190425145619-16072639606e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= -golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= @@ -132,6 +92,5 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= howett.net/plist v0.0.0-20181124034731-591f970eefbb h1:jhnBjNi9UFpfpl8YZhA9CrOqpnJdvzuiHsl/dnxl11M= howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0= -modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= -modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= -modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/fileutil v1.0.0 h1:Z1AFLZwl6BO8A5NldQg/xTSjGLetp+1Ubvl4alfGx8w= +modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8= diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 80a7f2bd..46a26928 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -20,6 +20,7 @@ import ( type ( CacheValue interface { IsEqual(CacheValue) bool + ShouldRefresh() bool IsValid() bool } @@ -106,10 +107,11 @@ func NewPersistentCache( type GetResult uint8 const ( - GetResultFromCache GetResult = 0 - GetResultFromFallback GetResult = 1 - GetResultFromInvalidCache GetResult = 2 - NoResultGot GetResult = 3 + GetResultFromCache GetResult = 0 + GetResultFromCacheAndRefreshAsync GetResult = 1 + GetResultFromFallback GetResult = 2 + GetResultFromInvalidCache GetResult = 3 + NoResultGot GetResult = 4 ) func (cache *Cache) Get(key string, fallback func() (CacheValue, error)) (CacheValue, GetResult) { @@ -122,7 +124,12 @@ func (cache *Cache) Get(key string, fallback func() (CacheValue, error)) (CacheV }() if ok && value.Value.IsValid() { - return value.Value, GetResultFromCache + if value.Value.ShouldRefresh() { + cache.doFallbackAsync(key, fallback) + return value.Value, GetResultFromCacheAndRefreshAsync + } else { + return value.Value, GetResultFromCache + } } newValue, err := cache.doFallback(key, fallback) @@ -145,22 +152,32 @@ func (cache *Cache) doFallback(key string, fallback func() (CacheValue, error)) return newValue.(CacheValue), nil } +func (cache *Cache) doFallbackAsync(key string, fallback func() (CacheValue, error)) { + go func() { + newValue, err := cache.doFallback(key, fallback) + if err == nil { + cache.Set(key, newValue) + } + }() +} + func (cache *Cache) Set(key string, value CacheValue) { cache.set(key, value, true) } func (cache *Cache) set(key string, value CacheValue, willFlushAsync bool) { - if value.IsValid() { - cache.checkType(value) + cache.checkType(value) + if !value.IsValid() { + return + } - now := time.Now() - cache.cacheMapMutex.Lock() - cache.cacheMap[key] = cacheValue{Value: value, CreatedAt: now} - cache.cacheMapMutex.Unlock() + now := time.Now() + cache.cacheMapMutex.Lock() + cache.cacheMap[key] = cacheValue{Value: value, CreatedAt: now} + cache.cacheMapMutex.Unlock() - if willFlushAsync { - go cache.flush() - } + if willFlushAsync { + go cache.flush() } } diff --git a/internal/cache/cache_test.go b/internal/cache/cache_test.go index e1757c8b..03ec4e4a 100644 --- a/internal/cache/cache_test.go +++ b/internal/cache/cache_test.go @@ -13,8 +13,13 @@ import ( ) type integerCacheValue struct { - Value int `json:"value"` - ExpiredAt time.Time `json:"expired_at"` + Value int `json:"value"` + RefreshAfter time.Time `json:"refresh_after"` + ExpiredAt time.Time `json:"expired_at"` +} + +func (v integerCacheValue) ShouldRefresh() bool { + return time.Now().After(v.RefreshAfter) } func (v integerCacheValue) IsValid() bool { @@ -32,7 +37,11 @@ func (left integerCacheValue) IsEqual(rightV CacheValue) bool { func TestCache(t *testing.T) { cache := NewCache(200 * time.Millisecond) if value, result := cache.Get("key_1", func() (CacheValue, error) { - return integerCacheValue{Value: 1, ExpiredAt: time.Now().Add(100 * time.Millisecond)}, nil + return integerCacheValue{ + Value: 1, + RefreshAfter: time.Now().Add(50 * time.Millisecond), + ExpiredAt: time.Now().Add(100 * time.Millisecond), + }, nil }); result != GetResultFromFallback { t.Fatalf("unexpected result: %v", result) } else if v := value.(integerCacheValue).Value; v != 1 { @@ -48,12 +57,25 @@ func TestCache(t *testing.T) { t.Fatalf("unexpected cache value: %v", v) } - time.Sleep(150 * time.Millisecond) + time.Sleep(50 * time.Millisecond) + if value, result := cache.Get("key_1", func() (CacheValue, error) { + return integerCacheValue{ + Value: 2, + RefreshAfter: time.Now().Add(50 * time.Millisecond), + ExpiredAt: time.Now().Add(100 * time.Millisecond), + }, nil + }); result != GetResultFromCacheAndRefreshAsync { + t.Fatalf("unexpected result: %v", result) + } else if v := value.(integerCacheValue).Value; v != 1 { + t.Fatalf("unexpected cache value: %v", v) + } + + time.Sleep(250 * time.Millisecond) if value, result := cache.Get("key_1", func() (CacheValue, error) { return nil, errors.New("test error") }); result != GetResultFromInvalidCache { t.Fatalf("unexpected result: %v", result) - } else if v := value.(integerCacheValue).Value; v != 1 { + } else if v := value.(integerCacheValue).Value; v != 2 { t.Fatalf("unexpected cache value: %v", v) } time.Sleep(150 * time.Millisecond) @@ -65,18 +87,18 @@ func TestCache(t *testing.T) { } if value, result := cache.Get("key_2", func() (CacheValue, error) { - return integerCacheValue{Value: 2, ExpiredAt: time.Now().Add(-100 * time.Millisecond)}, nil + return integerCacheValue{Value: 3, ExpiredAt: time.Now().Add(-100 * time.Millisecond)}, nil }); result != GetResultFromFallback { t.Fatalf("unexpected result: %v", result) - } else if v := value.(integerCacheValue).Value; v != 2 { + } else if v := value.(integerCacheValue).Value; v != 3 { t.Fatalf("unexpected cache value: %v", v) } if value, result := cache.Get("key_3", func() (CacheValue, error) { - return integerCacheValue{Value: 3, ExpiredAt: time.Now().Add(100 * time.Millisecond)}, nil + return integerCacheValue{Value: 4, ExpiredAt: time.Now().Add(100 * time.Millisecond)}, nil }); result != GetResultFromFallback { t.Fatal("unexpected ok") - } else if v := value.(integerCacheValue).Value; v != 3 { + } else if v := value.(integerCacheValue).Value; v != 4 { t.Fatalf("unexpected cache value: %v", v) } cache.Delete("key_3") @@ -122,7 +144,11 @@ func TestCachePersist(t *testing.T) { t.Fatal(err) } if value, result := cache.Get("key_1", func() (CacheValue, error) { - return integerCacheValue{Value: 1, ExpiredAt: time.Now().Add(200 * time.Millisecond)}, nil + return integerCacheValue{ + Value: 1, + RefreshAfter: time.Now().Add(100 * time.Millisecond), + ExpiredAt: time.Now().Add(200 * time.Millisecond), + }, nil }); result != GetResultFromFallback { t.Fatal("unexpected ok") } else if v := value.(integerCacheValue).Value; v != 1 { diff --git a/internal/clientv2/interceptor_retry_simple.go b/internal/clientv2/interceptor_retry_simple.go index c3e9d444..fa019ccf 100644 --- a/internal/clientv2/interceptor_retry_simple.go +++ b/internal/clientv2/interceptor_retry_simple.go @@ -99,13 +99,15 @@ func (interceptor *simpleRetryInterceptor) Intercept(req *http.Request, handler interceptor.config.init() hostname := req.URL.Hostname() - resolvedIPs := interceptor.resolve(req, hostname) + resolvedIPs := interceptor.resolve(req, hostname, false) + resolvedIPsMap := make(map[string]struct{}, len(resolvedIPs)) + addIPsToMap(resolvedIPsMap, resolvedIPs) // 可能会被重试多次 for i := 0; ; i++ { // Clone 防止后面 Handler 处理对 req 有污染 reqBefore := cloneReq(req) - req, chosenIPs = interceptor.choose(req, resolvedIPs, hostname) + req, chosenIPs = interceptor.ensureChoose(req, hostname, resolvedIPs, resolvedIPsMap) resp, err = interceptor.callHandler(req, &retrier.RetrierOptions{Attempts: i}, handler) if err == nil && resp.StatusCode/100 >= 4 { @@ -144,6 +146,24 @@ func (interceptor *simpleRetryInterceptor) Intercept(req *http.Request, handler return resp, err } +func (interceptor *simpleRetryInterceptor) ensureChoose(req *http.Request, hostname string, resolvedIPs []net.IP, resolvedIPsMap map[string]struct{}) (*http.Request, []net.IP) { + var chosenIPs []net.IP +beforeChoose: + req, chosenIPs = interceptor.choose(req, resolvedIPs, hostname, true) + for len(chosenIPs) == 0 { + newResolvedIPs := interceptor.resolve(req, hostname, true) + if len(newResolvedIPs) > 0 && !isAllIPsInMap(resolvedIPsMap, newResolvedIPs) { // 但凡能拿到新的 IP 就尝试新的 IP + addIPsToMap(resolvedIPsMap, newResolvedIPs) + resolvedIPs = mergeIPs(resolvedIPs, newResolvedIPs) + goto beforeChoose + } else { // 如果拿不到新的 IP,只能从已有的 IP 集合里选择 + req, chosenIPs = interceptor.choose(req, resolvedIPs, hostname, false) + break + } + } + return req, chosenIPs +} + func (interceptor *simpleRetryInterceptor) callHandler(req *http.Request, options *retrier.RetrierOptions, handler Handler) (resp *http.Response, err error) { if interceptor.config.BeforeRequest != nil { interceptor.config.BeforeRequest(req, options) @@ -171,8 +191,9 @@ func (interceptor *simpleRetryInterceptor) resolver() resolver.Resolver { return r } -func (interceptor *simpleRetryInterceptor) resolve(req *http.Request, hostname string) []net.IP { +func (interceptor *simpleRetryInterceptor) resolve(req *http.Request, hostname string, bypassCache bool) []net.IP { var ( + ctx context.Context ips []net.IP err error ) @@ -180,7 +201,11 @@ func (interceptor *simpleRetryInterceptor) resolve(req *http.Request, hostname s if interceptor.config.BeforeResolve != nil { interceptor.config.BeforeResolve(req) } - if ips, err = r.Resolve(req.Context(), hostname); err == nil { + ctx = req.Context() + if bypassCache { + ctx = resolver.WithByPassResolverCache(ctx) + } + if ips, err = r.Resolve(ctx, hostname); err == nil { if interceptor.config.AfterResolve != nil { interceptor.config.AfterResolve(req, ips) } @@ -207,22 +232,26 @@ func (interceptor *simpleRetryInterceptor) chooser() chooser.Chooser { return cs } -func (interceptor *simpleRetryInterceptor) choose(req *http.Request, ips []net.IP, hostname string) (*http.Request, []net.IP) { +func (interceptor *simpleRetryInterceptor) choose(req *http.Request, ips []net.IP, hostname string, failFast bool) (*http.Request, []net.IP) { if len(ips) > 0 { - ips = interceptor.chooser().Choose(req.Context(), ips, &chooser.ChooseOptions{Domain: hostname}) - req = req.WithContext(clientv1.WithResolvedIPs(req.Context(), hostname, ips)) + ips = interceptor.chooser().Choose(req.Context(), ips, &chooser.ChooseOptions{Domain: hostname, FailFast: failFast}) + if len(ips) > 0 { + req = req.WithContext(clientv1.WithResolvedIPs(req.Context(), hostname, ips)) + } } return req, ips } func (interceptor *simpleRetryInterceptor) feedbackGood(req *http.Request, hostname string, ips []net.IP) { if len(ips) > 0 { + interceptor.resolver().FeedbackGood(req.Context(), hostname, ips) interceptor.chooser().FeedbackGood(req.Context(), ips, &chooser.FeedbackOptions{Domain: hostname}) } } func (interceptor *simpleRetryInterceptor) feedbackBad(req *http.Request, hostname string, ips []net.IP) { if len(ips) > 0 { + interceptor.resolver().FeedbackBad(req.Context(), hostname, ips) interceptor.chooser().FeedbackBad(req.Context(), ips, &chooser.FeedbackOptions{Domain: hostname}) } } @@ -252,3 +281,33 @@ func bufferResponse(resp *http.Response) error { func defaultRetryInterval() time.Duration { return time.Duration(50+rand.Int()%50) * time.Millisecond } + +func addIPsToMap(m map[string]struct{}, ips []net.IP) { + for _, ip := range ips { + m[string(ip.To16())] = struct{}{} + } +} + +func isAllIPsInMap(m map[string]struct{}, ips []net.IP) bool { + for _, ip := range ips { + if _, ok := m[string(ip.To16())]; !ok { + return false + } + } + return true +} + +func mergeIPs(ips1, ips2 []net.IP) []net.IP { + newIPs := make([]net.IP, 0, len(ips1)+len(ips2)) + newIPs = append(newIPs, ips1...) +loop: + for _, ip2 := range ips2 { + for _, ip1 := range ips1 { + if ip1.Equal(ip2) { + continue loop + } + } + newIPs = append(newIPs, ip2) + } + return newIPs +} diff --git a/internal/dialer/dialer.go b/internal/dialer/dialer.go new file mode 100644 index 00000000..3ee90fde --- /dev/null +++ b/internal/dialer/dialer.go @@ -0,0 +1,125 @@ +package dialer + +import ( + "context" + "errors" + "net" + "sync" + "time" +) + +type ( + DialOptions struct { + Timeout time.Duration + KeepAlive time.Duration + } + + eitherConnOrError struct { + conn net.Conn + err error + } + + dialerErrs struct { + errs []error + } +) + +func DialContext(ctx context.Context, network string, ips []net.IP, port string, dialOptions DialOptions) (net.Conn, error) { + var wg sync.WaitGroup + resultsChan := make(chan eitherConnOrError, len(ips)) + cancels := make([]context.CancelFunc, 0, len(ips)) + err := &dialerErrs{errs: make([]error, 0, len(ips))} + interval := dialOptions.Timeout / time.Duration(len(ips)) + + defer func() { + for _, cancel := range cancels { + cancel() + } + wg.Wait() + close(resultsChan) + }() + + if len(ips) > 0 { + ip := ips[0] + ips = ips[1:] + newCtx, newCancel := context.WithCancel(ctx) + cancels = append(cancels, newCancel) + wg.Add(1) + dialContextAsync(newCtx, &wg, network, ip, port, DialOptions{Timeout: dialOptions.Timeout, KeepAlive: dialOptions.KeepAlive}, resultsChan) + } else { + return nil, errors.New("no ip could be dialed") + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if len(ips) > 0 { + ip := ips[0] + ips = ips[1:] + newCtx, newCancel := context.WithCancel(ctx) + cancels = append(cancels, newCancel) + wg.Add(1) + dialContextAsync(newCtx, &wg, network, ip, port, DialOptions{Timeout: dialOptions.Timeout - interval*time.Duration(len(cancels)-1), KeepAlive: dialOptions.KeepAlive}, resultsChan) + } else { + return nil, err + } + case connOrErr := <-resultsChan: + if connOrErr.err != nil { + err.errs = append(err.errs, connOrErr.err) + } else if connOrErr.conn != nil { + return connOrErr.conn, nil + } + } + } +} + +func dialContextSync(ctx context.Context, network string, ip net.IP, port string, dialOptions DialOptions) (net.Conn, error) { + dialer := net.Dialer{Timeout: dialOptions.Timeout, KeepAlive: dialOptions.KeepAlive} + newAddr := ip.String() + if port != "" { + newAddr = net.JoinHostPort(newAddr, port) + } + return dialer.DialContext(ctx, network, newAddr) +} + +func dialContextAsync(ctx context.Context, wg *sync.WaitGroup, network string, ip net.IP, port string, dialOptions DialOptions, c chan<- eitherConnOrError) { + go func() { + defer wg.Done() + conn, err := dialContextSync(ctx, network, ip, port, dialOptions) + if err != nil { + c <- eitherConnOrError{err: err} + } else { + c <- eitherConnOrError{conn: conn} + } + }() +} + +func (e *dialerErrs) Error() string { + if len(e.errs) > 0 { + return e.errs[0].Error() + } else { + return context.DeadlineExceeded.Error() + } +} + +func (e *dialerErrs) Unwrap() error { + if len(e.errs) > 0 { + return e.errs[0] + } else { + return context.DeadlineExceeded + } +} + +func (e *dialerErrs) Timeout() bool { + if len(e.errs) > 0 { + if te, ok := e.errs[0].(interface{ Timeout() bool }); ok { + return te.Timeout() + } else { + return false + } + } else { + return true + } +} diff --git a/internal/dialer/dialer_test.go b/internal/dialer/dialer_test.go new file mode 100644 index 00000000..efb96dcb --- /dev/null +++ b/internal/dialer/dialer_test.go @@ -0,0 +1,48 @@ +//go:build unit +// +build unit + +package dialer_test + +import ( + "context" + "net" + "os" + "testing" + "time" + + "github.com/qiniu/go-sdk/v7/internal/dialer" +) + +func TestDialContext(t *testing.T) { + listener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 9901}) + if err != nil { + t.Fatal(err) + } + defer listener.Close() + + now := time.Now() + conn, err := dialer.DialContext(context.Background(), "tcp", []net.IP{net.IPv4(8, 8, 8, 8), net.IPv4(8, 8, 4, 4)}, "9901", dialer.DialOptions{Timeout: 3 * time.Second, KeepAlive: time.Second}) + if err != nil { + if !os.IsTimeout(err) { + t.Fatal("Unexpected error", err) + } + } else { + conn.Close() + t.Fatalf("Returning connection is unexpected") + } + + if time.Since(now) < 3*time.Second || time.Since(now) > 4*time.Second { + t.Fatalf("Unexpected time elapsed") + } + + now = time.Now() + conn, err = dialer.DialContext(context.Background(), "tcp", []net.IP{net.IPv4(8, 8, 8, 8), net.IPv4(8, 8, 4, 4), net.IPv4(127, 0, 0, 1)}, "9901", dialer.DialOptions{Timeout: 3 * time.Second, KeepAlive: time.Second}) + if err != nil { + t.Fatal("Unexpected error", err) + } + conn.Close() + + if time.Since(now) < 2*time.Second || time.Since(now) > 3*time.Second { + t.Fatalf("Unexpected time elapsed") + } +} diff --git a/media/apis/api_pfop.go b/media/apis/api_pfop.go index 60cc7301..d70a2927 100644 --- a/media/apis/api_pfop.go +++ b/media/apis/api_pfop.go @@ -33,8 +33,6 @@ func (form *innerPfopRequest) build() (url.Values, error) { } if form.Fops != "" { formValues.Set("fops", form.Fops) - } else { - return nil, errors.MissingRequiredFieldError{Name: "Fops"} } if form.NotifyUrl != "" { formValues.Set("notifyURL", form.NotifyUrl) @@ -48,6 +46,9 @@ func (form *innerPfopRequest) build() (url.Values, error) { if form.Pipeline != "" { formValues.Set("pipeline", form.Pipeline) } + if form.WorkflowTemplateId != "" { + formValues.Set("workflowTemplateID", form.WorkflowTemplateId) + } return formValues, nil } diff --git a/media/apis/pfop/api.go b/media/apis/pfop/api.go index 3cc9aae6..f5ae8b13 100644 --- a/media/apis/pfop/api.go +++ b/media/apis/pfop/api.go @@ -11,14 +11,15 @@ import ( // 调用 API 所用的请求 type Request struct { - Credentials credentials.CredentialsProvider // 鉴权参数,用于生成鉴权凭证,如果为空,则使用 HTTPClientOptions 中的 CredentialsProvider - BucketName string // 空间名称 - ObjectName string // 对象名称 - Fops string // 数据处理命令列表,以 `;` 分隔,可以指定多个数据处理命令 - NotifyUrl string // 处理结果通知接收 URL - Force int64 // 强制执行数据处理,设为 `1`,则可强制执行数据处理并覆盖原结果 - Type int64 // 任务类型,支持 `0` 表示普通任务,`1` 表示闲时任务 - Pipeline string // 对列名,仅适用于普通任务 + Credentials credentials.CredentialsProvider // 鉴权参数,用于生成鉴权凭证,如果为空,则使用 HTTPClientOptions 中的 CredentialsProvider + BucketName string // 空间名称 + ObjectName string // 对象名称 + Fops string // 数据处理命令列表,以 `;` 分隔,可以指定多个数据处理命令 + NotifyUrl string // 处理结果通知接收 URL + Force int64 // 强制执行数据处理,设为 `1`,则可强制执行数据处理并覆盖原结果 + Type int64 // 任务类型,支持 `0` 表示普通任务,`1` 表示闲时任务 + Pipeline string // 对列名,仅适用于普通任务 + WorkflowTemplateId string // 工作流模板 ID } // 获取 API 所用的响应 diff --git a/media/apis/prefop/api.go b/media/apis/prefop/api.go index 25144eee..a65142ba 100644 --- a/media/apis/prefop/api.go +++ b/media/apis/prefop/api.go @@ -23,6 +23,7 @@ type Response struct { ObjectName string // 源对象名称 BucketName string // 源空间名称 Pipeline string // 云处理操作的处理队列 + TaskFrom string // 如果没有,则表示通过 `api+fops` 命令提交的任务,否则遵循规则 `: `,其中 `` 当前可选 `workflow` 或 `trigger` RequestId string // 云处理请求的请求 ID Type int64 // 任务类型,支持 `0` 表示普通任务,`1` 表示闲时任务 CreatedAt string // 任务创建时间 @@ -102,7 +103,8 @@ type jsonResponse struct { Description string `json:"desc"` // 与状态码相对应的详细描述 ObjectName string `json:"inputKey"` // 源对象名称 BucketName string `json:"inputBucket"` // 源空间名称 - Pipeline string `json:"pipeline"` // 云处理操作的处理队列 + Pipeline string `json:"pipeline,omitempty"` // 云处理操作的处理队列 + TaskFrom string `json:"taskFrom,omitempty"` // 如果没有,则表示通过 `api+fops` 命令提交的任务,否则遵循规则 `: `,其中 `` 当前可选 `workflow` 或 `trigger` RequestId string `json:"reqid"` // 云处理请求的请求 ID Type int64 `json:"type,omitempty"` // 任务类型,支持 `0` 表示普通任务,`1` 表示闲时任务 CreatedAt string `json:"creationDate,omitempty"` // 任务创建时间 @@ -113,7 +115,7 @@ func (j *Response) MarshalJSON() ([]byte, error) { if err := j.validate(); err != nil { return nil, err } - return json.Marshal(&jsonResponse{PersistentId: j.PersistentId, Code: j.Code, Description: j.Description, ObjectName: j.ObjectName, BucketName: j.BucketName, Pipeline: j.Pipeline, RequestId: j.RequestId, Type: j.Type, CreatedAt: j.CreatedAt, Items: j.Items}) + return json.Marshal(&jsonResponse{PersistentId: j.PersistentId, Code: j.Code, Description: j.Description, ObjectName: j.ObjectName, BucketName: j.BucketName, Pipeline: j.Pipeline, TaskFrom: j.TaskFrom, RequestId: j.RequestId, Type: j.Type, CreatedAt: j.CreatedAt, Items: j.Items}) } func (j *Response) UnmarshalJSON(data []byte) error { var nj jsonResponse @@ -126,6 +128,7 @@ func (j *Response) UnmarshalJSON(data []byte) error { j.ObjectName = nj.ObjectName j.BucketName = nj.BucketName j.Pipeline = nj.Pipeline + j.TaskFrom = nj.TaskFrom j.RequestId = nj.RequestId j.Type = nj.Type j.CreatedAt = nj.CreatedAt @@ -148,9 +151,6 @@ func (j *Response) validate() error { if j.BucketName == "" { return errors.MissingRequiredFieldError{Name: "BucketName"} } - if j.Pipeline == "" { - return errors.MissingRequiredFieldError{Name: "Pipeline"} - } if j.RequestId == "" { return errors.MissingRequiredFieldError{Name: "RequestId"} } diff --git a/storage/config.go b/storage/config.go index eb5a2d1b..b992412b 100644 --- a/storage/config.go +++ b/storage/config.go @@ -11,9 +11,10 @@ type Config struct { // 如果设置的Host本身是以http://开头的,又设置了该字段为true,那么优先使用该字段,使用https协议 // 同理如果该字段为false, 但是设置的host以https开头,那么使用http协议通信 - UseHTTPS bool //是否使用https域名 - UseCdnDomains bool //是否使用cdn加速域名 - CentralRsHost string //中心机房的RsHost,用于list bucket + UseHTTPS bool //是否使用https域名 + UseCdnDomains bool //是否使用cdn加速域名 + AccelerateUploading bool //是否启用上传加速 + CentralRsHost string //中心机房的RsHost,用于list bucket // 兼容保留 RsHost string diff --git a/storage/form_upload.go b/storage/form_upload.go index 9174b79c..70a24a20 100644 --- a/storage/form_upload.go +++ b/storage/form_upload.go @@ -80,6 +80,7 @@ func NewFormUploaderEx(cfg *Config, clt *client.Client) *FormUploader { opts := http_client.Options{ BasicHTTPClient: clt.Client, UseInsecureProtocol: !cfg.UseHTTPS, + AccelerateUploading: cfg.AccelerateUploading, HostRetryConfig: &clientv2.RetryConfig{}, } if region := cfg.GetRegion(); region != nil { diff --git a/storage/pfop.go b/storage/pfop.go index 0cb14479..97a04545 100644 --- a/storage/pfop.go +++ b/storage/pfop.go @@ -71,6 +71,7 @@ type PrefopRet struct { InputBucket string `json:"inputBucket,omitempty"` InputKey string `json:"inputKey,omitempty"` Pipeline string `json:"pipeline,omitempty"` + TaskFrom string `json:"taskFrom,omitempty"` Reqid string `json:"reqid,omitempty"` CreatedAt time.Time `json:"creationDate,omitempty"` Items []FopResult @@ -87,6 +88,9 @@ func (r *PrefopRet) String() string { if r.Pipeline != "" { strData += fmt.Sprintf("Pipeline: %s\n", r.Pipeline) } + if r.TaskFrom != "" { + strData += fmt.Sprintf("TaskFrom: %s\n", r.TaskFrom) + } if r.Type != 0 { strData += fmt.Sprintf("Type: %d\n", r.Type) } @@ -125,13 +129,14 @@ func (r *PrefopRet) String() string { } type PfopRequest struct { - BucketName string // 空间名称 - ObjectName string // 对象名称 - Fops string // 数据处理命令列表,以 `;` 分隔,可以指定多个数据处理命令,与 `workflowTemplateID` 二选一 - NotifyUrl string // 处理结果通知接收 URL - Force int64 // 强制执行数据处理,设为 `1`,则可强制执行数据处理并覆盖原结果 - Type int64 // 任务类型,支持 `0` 表示普通任务,`1` 表示闲时任务 - Pipeline string // 对列名,仅适用于普通任务 + BucketName string // 空间名称 + ObjectName string // 对象名称 + Fops string // 数据处理命令列表,以 `;` 分隔,可以指定多个数据处理命令,与 `workflowTemplateID` 二选一 + NotifyUrl string // 处理结果通知接收 URL + Force int64 // 强制执行数据处理,设为 `1`,则可强制执行数据处理并覆盖原结果 + Type int64 // 任务类型,支持 `0` 表示普通任务,`1` 表示闲时任务 + Pipeline string // 对列名,仅适用于普通任务 + WorkflowTemplateID string // 工作流模板 ID } // FopResult 云处理操作列表,包含每个云处理操作的状态信息 @@ -176,13 +181,14 @@ func (m *OperationManager) Pfop(bucket, key, fops, pipeline, notifyURL string, // Pfop 持久化数据处理 v2 func (m *OperationManager) PfopV2(ctx context.Context, pfopRequest *PfopRequest) (*PfopRet, error) { response, err := m.apiClient.Pfop(context.Background(), &apis.PfopRequest{ - BucketName: pfopRequest.BucketName, - ObjectName: pfopRequest.ObjectName, - Fops: pfopRequest.Fops, - NotifyUrl: pfopRequest.NotifyUrl, - Force: pfopRequest.Force, - Type: pfopRequest.Type, - Pipeline: pfopRequest.Pipeline, + BucketName: pfopRequest.BucketName, + ObjectName: pfopRequest.ObjectName, + Fops: pfopRequest.Fops, + NotifyUrl: pfopRequest.NotifyUrl, + Force: pfopRequest.Force, + Type: pfopRequest.Type, + Pipeline: pfopRequest.Pipeline, + WorkflowTemplateId: pfopRequest.WorkflowTemplateID, }, nil) if err != nil { return nil, err @@ -210,6 +216,7 @@ func (m *OperationManager) Prefop(persistentID string) (PrefopRet, error) { InputBucket: response.BucketName, InputKey: response.ObjectName, Pipeline: response.Pipeline, + TaskFrom: response.TaskFrom, Reqid: response.RequestId, CreatedAt: createdAt, Items: make([]FopResult, 0, len(response.Items)), diff --git a/storage/region_uc_v2.go b/storage/region_uc_v2.go index be89a624..86f87e1c 100644 --- a/storage/region_uc_v2.go +++ b/storage/region_uc_v2.go @@ -172,6 +172,9 @@ type UCApiOptions struct { // 是否使用 HTTPS 协议 UseHttps bool + // 是否加速上传 + AccelerateUploading bool + // 单域名重试次数 RetryMax int @@ -231,6 +234,7 @@ func (options *UCApiOptions) getApiStorageClient() *apis.Storage { return apis.NewStorage(&http_client.Options{ Interceptors: []clientv2.Interceptor{}, UseInsecureProtocol: !options.UseHttps, + AccelerateUploading: options.AccelerateUploading, Resolver: options.Resolver, Chooser: options.Chooser, HostRetryConfig: &clientv2.RetryConfig{RetryMax: options.RetryMax, Retrier: options.Retrier}, @@ -276,7 +280,7 @@ func getRegionByV2(ak, bucket string, options UCApiOptions) (*Region, error) { }() } - regionCacheKey := makeRegionCacheKey(ak, bucket, options.Hosts) + regionCacheKey := makeRegionCacheKey(ak, bucket, options.Hosts, options.AccelerateUploading) // check from cache if v, ok := regionV2Cache.Load(regionCacheKey); ok && time.Now().Before(v.(regionV2CacheValue).Deadline) { return v.(regionV2CacheValue).Region, nil @@ -306,9 +310,13 @@ func getRegionByV2(ak, bucket string, options UCApiOptions) (*Region, error) { return newRegion.(*Region), err } -func makeRegionCacheKey(ak, bucket string, ucHosts []string) string { +func makeRegionCacheKey(ak, bucket string, ucHosts []string, accelerateUploading bool) string { hostStrings := fmt.Sprintf("%v", ucHosts) - return fmt.Sprintf("%s:%s:%x", ak, bucket, md5.Sum([]byte(hostStrings))) + s := fmt.Sprintf("%s:%s:%x", ak, bucket, md5.Sum([]byte(hostStrings))) + if accelerateUploading { + s += ":1" + } + return s } func _getRegionByV2WithoutCache(ak, bucket string, options UCApiOptions) (*Region, int64, error) { @@ -323,11 +331,40 @@ func _getRegionByV2WithoutCache(ak, bucket string, options UCApiOptions) (*Regio if err != nil { return nil, 0, err } + var srcUpHosts, cdnUpHosts []string var rsHost, rsfHost, apiHost, ioVipHost, ioSrcHost string - srcUpHosts := append(response.UpDomains.SourceUpDomains.MainSourceUpDomains, response.UpDomains.SourceUpDomains.BackupSourceUpDomains...) - srcUpHosts = append(srcUpHosts, response.UpDomains.OldSourceDomains.OldMainSourceUpDomains...) - cdnUpHosts := append(response.UpDomains.AcceleratedUpDomains.MainAcceleratedUpDomains, response.UpDomains.AcceleratedUpDomains.BackupAcceleratedUpDomains...) - cdnUpHosts = append(cdnUpHosts, response.UpDomains.OldAcceleratedDomains.OldMainAcceleratedUpDomains...) + if options.AccelerateUploading && len(response.UpDomains.AcceleratedUpDomains.MainAcceleratedUpDomains) > 0 { + srcUpHosts = make([]string, 0, + len(response.UpDomains.AcceleratedUpDomains.MainAcceleratedUpDomains)+ + len(response.UpDomains.AcceleratedUpDomains.BackupAcceleratedUpDomains)+ + len(response.UpDomains.OldAcceleratedDomains.OldMainAcceleratedUpDomains)+ + len(response.UpDomains.SourceUpDomains.MainSourceUpDomains)+ + len(response.UpDomains.SourceUpDomains.BackupSourceUpDomains)+ + len(response.UpDomains.OldSourceDomains.OldMainSourceUpDomains)) + srcUpHosts = append(srcUpHosts, response.UpDomains.AcceleratedUpDomains.MainAcceleratedUpDomains...) + srcUpHosts = append(srcUpHosts, response.UpDomains.AcceleratedUpDomains.BackupAcceleratedUpDomains...) + srcUpHosts = append(srcUpHosts, response.UpDomains.OldAcceleratedDomains.OldMainAcceleratedUpDomains...) + srcUpHosts = append(srcUpHosts, response.UpDomains.SourceUpDomains.MainSourceUpDomains...) + srcUpHosts = append(srcUpHosts, response.UpDomains.SourceUpDomains.BackupSourceUpDomains...) + srcUpHosts = append(srcUpHosts, response.UpDomains.OldSourceDomains.OldMainSourceUpDomains...) + cdnUpHosts = make([]string, 0, len(response.UpDomains.AcceleratedUpDomains.MainAcceleratedUpDomains)) + cdnUpHosts = append(cdnUpHosts, response.UpDomains.AcceleratedUpDomains.MainAcceleratedUpDomains...) + } else { + srcUpHosts = make([]string, 0, + len(response.UpDomains.SourceUpDomains.MainSourceUpDomains)+ + len(response.UpDomains.SourceUpDomains.BackupSourceUpDomains)+ + len(response.UpDomains.OldSourceDomains.OldMainSourceUpDomains)) + srcUpHosts = append(srcUpHosts, response.UpDomains.SourceUpDomains.MainSourceUpDomains...) + srcUpHosts = append(srcUpHosts, response.UpDomains.SourceUpDomains.BackupSourceUpDomains...) + srcUpHosts = append(srcUpHosts, response.UpDomains.OldSourceDomains.OldMainSourceUpDomains...) + cdnUpHosts = make([]string, 0, + len(response.UpDomains.AcceleratedUpDomains.MainAcceleratedUpDomains)+ + len(response.UpDomains.AcceleratedUpDomains.BackupAcceleratedUpDomains)+ + len(response.UpDomains.OldAcceleratedDomains.OldMainAcceleratedUpDomains)) + cdnUpHosts = append(cdnUpHosts, response.UpDomains.AcceleratedUpDomains.MainAcceleratedUpDomains...) + cdnUpHosts = append(cdnUpHosts, response.UpDomains.AcceleratedUpDomains.BackupAcceleratedUpDomains...) + cdnUpHosts = append(cdnUpHosts, response.UpDomains.OldAcceleratedDomains.OldMainAcceleratedUpDomains...) + } if len(response.RsDomains.AcceleratedRsDomains.MainAcceleratedRsDomains) > 0 { rsHost = response.RsDomains.AcceleratedRsDomains.MainAcceleratedRsDomains[0] } diff --git a/storage/region_uc_v4.go b/storage/region_uc_v4.go index d68a6057..0a497973 100644 --- a/storage/region_uc_v4.go +++ b/storage/region_uc_v4.go @@ -110,7 +110,7 @@ func getRegionByV4(ak, bucket string, options UCApiOptions) (*RegionGroup, error }() } - regionCacheKey := makeRegionCacheKey(ak, bucket, options.Hosts) + regionCacheKey := makeRegionCacheKey(ak, bucket, options.Hosts, options.AccelerateUploading) // check from cache if v, ok := regionV4Cache.Load(regionCacheKey); ok && time.Now().Before(v.(regionV4CacheValue).Deadline) { cacheValue, _ := v.(regionV4CacheValue) @@ -145,7 +145,12 @@ func getRegionByV4(ak, bucket string, options UCApiOptions) (*RegionGroup, error func _getRegionByV4WithoutCache(ak, bucket string, options UCApiOptions) ([]*Region, int64, error) { toRegion := func(r *query_bucket_v4.BucketQueryHost) *Region { var rsHost, rsfHost, apiHost, ioVipHost, ioSrcHost string - upDomains := append(r.UpDomains.PreferedUpDomains, r.UpDomains.AlternativeUpDomains...) + upDomains := make([]string, 0, len(r.UpDomains.AcceleratedUpDomains)+len(r.UpDomains.PreferedUpDomains)+len(r.UpDomains.AlternativeUpDomains)) + if options.AccelerateUploading && len(r.UpDomains.AcceleratedUpDomains) > 0 { + upDomains = append(upDomains, r.UpDomains.AcceleratedUpDomains...) + } + upDomains = append(upDomains, r.UpDomains.PreferedUpDomains...) + upDomains = append(upDomains, r.UpDomains.AlternativeUpDomains...) if len(r.RsDomains.PreferedRsDomains) > 0 { rsHost = r.RsDomains.PreferedRsDomains[0] } diff --git a/storage/resume_uploader.go b/storage/resume_uploader.go index 69cf8593..6f12abef 100644 --- a/storage/resume_uploader.go +++ b/storage/resume_uploader.go @@ -44,6 +44,7 @@ func NewResumeUploaderEx(cfg *Config, clt *client.Client) *ResumeUploader { opts := http_client.Options{ BasicHTTPClient: clt.Client, UseInsecureProtocol: !cfg.UseHTTPS, + AccelerateUploading: cfg.AccelerateUploading, HostRetryConfig: &clientv2.RetryConfig{}, } if region := cfg.GetRegion(); region != nil { @@ -259,6 +260,7 @@ func newResumeUploaderImpl(resumeUploader *ResumeUploader, bucket, key string, h opts := http_client.Options{ BasicHTTPClient: resumeUploader.Client.Client, UseInsecureProtocol: !resumeUploader.Cfg.UseHTTPS, + AccelerateUploading: resumeUploader.Cfg.AccelerateUploading, HostRetryConfig: &clientv2.RetryConfig{}, } if region := resumeUploader.Cfg.GetRegion(); region != nil { diff --git a/storage/resume_uploader_v2.go b/storage/resume_uploader_v2.go index b0dec2f1..59971f98 100644 --- a/storage/resume_uploader_v2.go +++ b/storage/resume_uploader_v2.go @@ -45,6 +45,7 @@ func NewResumeUploaderV2Ex(cfg *Config, clt *client.Client) *ResumeUploaderV2 { opts := http_client.Options{ BasicHTTPClient: clt.Client, UseInsecureProtocol: !cfg.UseHTTPS, + AccelerateUploading: cfg.AccelerateUploading, HostRetryConfig: &clientv2.RetryConfig{}, } if region := cfg.GetRegion(); region != nil { @@ -245,6 +246,7 @@ func newResumeUploaderV2Impl(resumeUploader *ResumeUploaderV2, bucket, key strin opts := http_client.Options{ BasicHTTPClient: resumeUploader.Client.Client, UseInsecureProtocol: !resumeUploader.Cfg.UseHTTPS, + AccelerateUploading: resumeUploader.Cfg.AccelerateUploading, HostRetryConfig: &clientv2.RetryConfig{}, } if region := resumeUploader.Cfg.GetRegion(); region != nil { diff --git a/storage/token.go b/storage/token.go index 2c047dfe..fb6a56fc 100644 --- a/storage/token.go +++ b/storage/token.go @@ -78,6 +78,9 @@ type PutPolicy struct { // 任务类型。0: 普通任务,1: 闲时任务 PersistentType int64 `json:"persistentType,omitempty"` + // 工作流模版 ID + PersistentWorkflowTemplateID string `json:"persistentWorkflowTemplateID,omitempty"` + // saveKey 的优先级设置。为 true 时,saveKey不能为空,会忽略客户端指定的key,强制使用saveKey进行文件命名。参数不设置时, // 默认值为false ForceSaveKey bool `json:"forceSaveKey,omitempty"` // diff --git a/storage/upload_manager.go b/storage/upload_manager.go index 634b5f02..bd036a99 100644 --- a/storage/upload_manager.go +++ b/storage/upload_manager.go @@ -27,9 +27,10 @@ const ( // UploadConfig 为 UploadManager 提供配置信息 type UploadConfig struct { - UseHTTPS bool - UseCdnDomains bool - Regions *RegionGroup + UseHTTPS bool + UseCdnDomains bool + AccelerateUploading bool + Regions *RegionGroup } // NewUploadConfig 创建默认的 UploadConfig 对象 @@ -434,31 +435,34 @@ func (manager *UploadManager) getRecoverRegion(key *string, upToken string, resu func (manager *UploadManager) getFormUploader(region *Region) *FormUploader { return NewFormUploaderEx(&Config{ - Zone: region, - Region: region, - UseHTTPS: manager.cfg.UseHTTPS, - UseCdnDomains: manager.cfg.UseCdnDomains, - CentralRsHost: "", + Zone: region, + Region: region, + UseHTTPS: manager.cfg.UseHTTPS, + UseCdnDomains: manager.cfg.UseCdnDomains, + AccelerateUploading: manager.cfg.AccelerateUploading, + CentralRsHost: "", }, manager.client) } func (manager *UploadManager) getResumeV1Uploader(region *Region) *ResumeUploader { return NewResumeUploaderEx(&Config{ - Zone: region, - Region: region, - UseHTTPS: manager.cfg.UseHTTPS, - UseCdnDomains: manager.cfg.UseCdnDomains, - CentralRsHost: "", + Zone: region, + Region: region, + UseHTTPS: manager.cfg.UseHTTPS, + UseCdnDomains: manager.cfg.UseCdnDomains, + AccelerateUploading: manager.cfg.AccelerateUploading, + CentralRsHost: "", }, manager.client) } func (manager *UploadManager) getResumeV2Uploader(region *Region) *ResumeUploaderV2 { return NewResumeUploaderV2Ex(&Config{ - Zone: region, - Region: region, - UseHTTPS: manager.cfg.UseHTTPS, - UseCdnDomains: manager.cfg.UseCdnDomains, - CentralRsHost: "", + Zone: region, + Region: region, + UseHTTPS: manager.cfg.UseHTTPS, + UseCdnDomains: manager.cfg.UseCdnDomains, + AccelerateUploading: manager.cfg.AccelerateUploading, + CentralRsHost: "", }, manager.client) } @@ -468,9 +472,10 @@ func (manager *UploadManager) getRegionGroupWithUploadToken(upToken string, extr return nil, err } return getRegionGroupWithOptions(ak, bucket, UCApiOptions{ - UseHttps: manager.cfg.UseHTTPS, - RetryMax: extra.TryTimes, - HostFreezeDuration: extra.HostFreezeDuration, + UseHttps: manager.cfg.UseHTTPS, + AccelerateUploading: manager.cfg.AccelerateUploading, + RetryMax: extra.TryTimes, + HostFreezeDuration: extra.HostFreezeDuration, }) } diff --git a/storage/uploader_base.go b/storage/uploader_base.go index 6dc0c345..be9f76fe 100644 --- a/storage/uploader_base.go +++ b/storage/uploader_base.go @@ -13,8 +13,9 @@ func getUpHost(config *Config, retryMax int, hostFreezeDuration time.Duration, a region := config.GetRegion() if region == nil { if region, err = GetRegionWithOptions(ak, bucket, UCApiOptions{ - RetryMax: retryMax, - HostFreezeDuration: hostFreezeDuration, + RetryMax: retryMax, + HostFreezeDuration: hostFreezeDuration, + AccelerateUploading: config.AccelerateUploading, }); err != nil { return "", err } @@ -24,7 +25,7 @@ func getUpHost(config *Config, retryMax int, hostFreezeDuration time.Duration, a return "", errors.New("can't get region with bucket:" + bucket) } - if config.UseCdnDomains { + if config.UseCdnDomains || config.AccelerateUploading { if len(region.CdnUpHosts) == 0 { return "", errors.New("can't get region cdn host with bucket:" + bucket) } @@ -46,8 +47,9 @@ func getUpHostProvider(config *Config, retryMax int, hostFreezeDuration time.Dur var err error if region == nil { if region, err = GetRegionWithOptions(ak, bucket, UCApiOptions{ - RetryMax: retryMax, - HostFreezeDuration: hostFreezeDuration, + RetryMax: retryMax, + HostFreezeDuration: hostFreezeDuration, + AccelerateUploading: config.AccelerateUploading, }); err != nil { return nil, err } diff --git a/storagev2/apis/query_bucket_v2/api.go b/storagev2/apis/query_bucket_v2/api.go index a15f9c05..bd1387f3 100644 --- a/storagev2/apis/query_bucket_v2/api.go +++ b/storagev2/apis/query_bucket_v2/api.go @@ -26,6 +26,47 @@ type Response struct { ApiDomains ApiDomains // API 域名 } +// 空间级别的主加速上传域名列表 +type MainBucketAcceleratedUpDomains = []string + +// 空间级别的备用加速上传域名列表 +type BackupBucketAcceleratedUpDomains = []string + +// 空间级别的加速上传域名 +type BucketAcceleratedUpDomains struct { + MainAcceleratedUpDomains MainBucketAcceleratedUpDomains // 空间级别的主加速上传域名列表 + BackupAcceleratedUpDomains BackupBucketAcceleratedUpDomains // 空间级别的备用加速上传域名列表 +} +type jsonBucketAcceleratedUpDomains struct { + MainAcceleratedUpDomains MainBucketAcceleratedUpDomains `json:"main"` // 空间级别的主加速上传域名列表 + BackupAcceleratedUpDomains BackupBucketAcceleratedUpDomains `json:"backup"` // 空间级别的备用加速上传域名列表 +} + +func (j *BucketAcceleratedUpDomains) MarshalJSON() ([]byte, error) { + if err := j.validate(); err != nil { + return nil, err + } + return json.Marshal(&jsonBucketAcceleratedUpDomains{MainAcceleratedUpDomains: j.MainAcceleratedUpDomains, BackupAcceleratedUpDomains: j.BackupAcceleratedUpDomains}) +} +func (j *BucketAcceleratedUpDomains) UnmarshalJSON(data []byte) error { + var nj jsonBucketAcceleratedUpDomains + if err := json.Unmarshal(data, &nj); err != nil { + return err + } + j.MainAcceleratedUpDomains = nj.MainAcceleratedUpDomains + j.BackupAcceleratedUpDomains = nj.BackupAcceleratedUpDomains + return nil +} +func (j *BucketAcceleratedUpDomains) validate() error { + if len(j.MainAcceleratedUpDomains) == 0 { + return errors.MissingRequiredFieldError{Name: "MainAcceleratedUpDomains"} + } + if len(j.BackupAcceleratedUpDomains) == 0 { + return errors.MissingRequiredFieldError{Name: "BackupAcceleratedUpDomains"} + } + return nil +} + // 主加速上传域名列表 type MainAcceleratedUpDomains = []string @@ -192,29 +233,32 @@ func (j *OldSourceDomains) validate() error { // 上传域名 type UpDomains struct { - AcceleratedUpDomains AcceleratedUpDomains // 加速上传域名 - SourceUpDomains SourceUpDomains // 源站上传域名 - OldAcceleratedDomains OldAcceleratedUpDomains // 已经过时的加速上传域名 - OldSourceDomains OldSourceUpDomains // 已经过时的源站上传域名 + BucketAcceleratedUpDomains BucketAcceleratedUpDomains // 空间级别的加速上传域名 + AcceleratedUpDomains AcceleratedUpDomains // 加速上传域名 + SourceUpDomains SourceUpDomains // 源站上传域名 + OldAcceleratedDomains OldAcceleratedUpDomains // 已经过时的加速上传域名 + OldSourceDomains OldSourceUpDomains // 已经过时的源站上传域名 } type jsonUpDomains struct { - AcceleratedUpDomains AcceleratedUpDomains `json:"acc"` // 加速上传域名 - SourceUpDomains SourceUpDomains `json:"src"` // 源站上传域名 - OldAcceleratedDomains OldAcceleratedUpDomains `json:"old_acc"` // 已经过时的加速上传域名 - OldSourceDomains OldSourceUpDomains `json:"old_src"` // 已经过时的源站上传域名 + BucketAcceleratedUpDomains BucketAcceleratedUpDomains `json:"bucket_acc"` // 空间级别的加速上传域名 + AcceleratedUpDomains AcceleratedUpDomains `json:"acc"` // 加速上传域名 + SourceUpDomains SourceUpDomains `json:"src"` // 源站上传域名 + OldAcceleratedDomains OldAcceleratedUpDomains `json:"old_acc"` // 已经过时的加速上传域名 + OldSourceDomains OldSourceUpDomains `json:"old_src"` // 已经过时的源站上传域名 } func (j *UpDomains) MarshalJSON() ([]byte, error) { if err := j.validate(); err != nil { return nil, err } - return json.Marshal(&jsonUpDomains{AcceleratedUpDomains: j.AcceleratedUpDomains, SourceUpDomains: j.SourceUpDomains, OldAcceleratedDomains: j.OldAcceleratedDomains, OldSourceDomains: j.OldSourceDomains}) + return json.Marshal(&jsonUpDomains{BucketAcceleratedUpDomains: j.BucketAcceleratedUpDomains, AcceleratedUpDomains: j.AcceleratedUpDomains, SourceUpDomains: j.SourceUpDomains, OldAcceleratedDomains: j.OldAcceleratedDomains, OldSourceDomains: j.OldSourceDomains}) } func (j *UpDomains) UnmarshalJSON(data []byte) error { var nj jsonUpDomains if err := json.Unmarshal(data, &nj); err != nil { return err } + j.BucketAcceleratedUpDomains = nj.BucketAcceleratedUpDomains j.AcceleratedUpDomains = nj.AcceleratedUpDomains j.SourceUpDomains = nj.SourceUpDomains j.OldAcceleratedDomains = nj.OldAcceleratedDomains @@ -222,6 +266,9 @@ func (j *UpDomains) UnmarshalJSON(data []byte) error { return nil } func (j *UpDomains) validate() error { + if err := j.BucketAcceleratedUpDomains.validate(); err != nil { + return err + } if err := j.AcceleratedUpDomains.validate(); err != nil { return err } diff --git a/storagev2/apis/query_bucket_v4/api.go b/storagev2/apis/query_bucket_v4/api.go index 58783c9f..0df5d012 100644 --- a/storagev2/apis/query_bucket_v4/api.go +++ b/storagev2/apis/query_bucket_v4/api.go @@ -19,6 +19,9 @@ type Response struct { Hosts BucketQueryHosts // 存储空间服务域名 } +// 加速上传域名列表 +type AcceleratedUpDomains = []string + // 主上传域名列表 type PreferedUpDomains = []string @@ -27,30 +30,36 @@ type AlternativeUpDomains = []string // 上传域名 type UpDomains struct { + AcceleratedUpDomains AcceleratedUpDomains // 加速上传域名列表 PreferedUpDomains PreferedUpDomains // 主上传域名列表 AlternativeUpDomains AlternativeUpDomains // 备选上传域名列表 } type jsonUpDomains struct { - PreferedUpDomains PreferedUpDomains `json:"domains"` // 主上传域名列表 - AlternativeUpDomains AlternativeUpDomains `json:"old"` // 备选上传域名列表 + AcceleratedUpDomains AcceleratedUpDomains `json:"acc_domains"` // 加速上传域名列表 + PreferedUpDomains PreferedUpDomains `json:"domains"` // 主上传域名列表 + AlternativeUpDomains AlternativeUpDomains `json:"old"` // 备选上传域名列表 } func (j *UpDomains) MarshalJSON() ([]byte, error) { if err := j.validate(); err != nil { return nil, err } - return json.Marshal(&jsonUpDomains{PreferedUpDomains: j.PreferedUpDomains, AlternativeUpDomains: j.AlternativeUpDomains}) + return json.Marshal(&jsonUpDomains{AcceleratedUpDomains: j.AcceleratedUpDomains, PreferedUpDomains: j.PreferedUpDomains, AlternativeUpDomains: j.AlternativeUpDomains}) } func (j *UpDomains) UnmarshalJSON(data []byte) error { var nj jsonUpDomains if err := json.Unmarshal(data, &nj); err != nil { return err } + j.AcceleratedUpDomains = nj.AcceleratedUpDomains j.PreferedUpDomains = nj.PreferedUpDomains j.AlternativeUpDomains = nj.AlternativeUpDomains return nil } func (j *UpDomains) validate() error { + if len(j.AcceleratedUpDomains) == 0 { + return errors.MissingRequiredFieldError{Name: "AcceleratedUpDomains"} + } if len(j.PreferedUpDomains) == 0 { return errors.MissingRequiredFieldError{Name: "PreferedUpDomains"} } diff --git a/storagev2/apis/verify_share/api.go b/storagev2/apis/verify_share/api.go index 966430b8..0f5f11b6 100644 --- a/storagev2/apis/verify_share/api.go +++ b/storagev2/apis/verify_share/api.go @@ -16,6 +16,7 @@ type Request struct { Credentials credentials.CredentialsProvider // 鉴权参数,用于生成鉴权凭证,如果为空,则使用 HTTPClientOptions 中的 CredentialsProvider ExtractCode string // 提取码 } +type VerifyShareParam = Request type jsonRequest struct { ExtractCode string `json:"extract_code"` // 提取码 } diff --git a/storagev2/chooser/chooser.go b/storagev2/chooser/chooser.go index 0672ba66..59f0efa3 100644 --- a/storagev2/chooser/chooser.go +++ b/storagev2/chooser/chooser.go @@ -10,6 +10,9 @@ type ( ChooseOptions struct { // Domain 是待选择的域名 Domain string + + // 如果找不到合适的域名就直接返回空 + FailFast bool } // FeedbackOptions 反馈的选项 diff --git a/storagev2/chooser/ip_chooser.go b/storagev2/chooser/ip_chooser.go index 04dace35..465780ae 100644 --- a/storagev2/chooser/ip_chooser.go +++ b/storagev2/chooser/ip_chooser.go @@ -71,7 +71,7 @@ func (chooser *ipChooser) Choose(_ context.Context, ips []net.IP, options *Choos chosenIPs = append(chosenIPs, ip) } } - if len(chosenIPs) > 0 { + if len(chosenIPs) > 0 || options.FailFast { return chosenIPs } diff --git a/storagev2/chooser/subnet_chooser.go b/storagev2/chooser/subnet_chooser.go index 033998e1..3e9f14a6 100644 --- a/storagev2/chooser/subnet_chooser.go +++ b/storagev2/chooser/subnet_chooser.go @@ -61,7 +61,7 @@ func (chooser *subnetChooser) Choose(_ context.Context, ips []net.IP, options *C chosenIPs = append(chosenIPs, ip) } } - if len(chosenIPs) > 0 { + if len(chosenIPs) > 0 || options.FailFast { return chosenIPs } diff --git a/storagev2/downloader/urls_provider.go b/storagev2/downloader/urls_provider.go index 60ff7e11..42a8d049 100644 --- a/storagev2/downloader/urls_provider.go +++ b/storagev2/downloader/urls_provider.go @@ -48,10 +48,11 @@ type ( } domainsQueryURLsProvider struct { - storage *apis.Storage - cache *cache.Cache - credentials credentials.CredentialsProvider - cacheTTL time.Duration + storage *apis.Storage + cache *cache.Cache + credentials credentials.CredentialsProvider + cacheTTL time.Duration + cacheRefreshAfter time.Duration } combinedDownloadURLsProviders struct { @@ -91,11 +92,15 @@ type ( // 缓存有效周期(默认:3600s) CacheTTL time.Duration + + // 缓存刷新时间(默认:1800s) + CacheRefreshAfter time.Duration } domainCacheValue struct { - Domains []string `json:"domains"` - ExpiredAt time.Time `json:"expired_at"` + Domains []string `json:"domains"` + RefreshAfter time.Time `json:"refresh_after"` + ExpiredAt time.Time `json:"expired_at"` } signingCacheValue struct { @@ -186,7 +191,11 @@ func (scv signingCacheValue) IsEqual(cv cache.CacheValue) bool { } func (scv signingCacheValue) IsValid() bool { - return scv.expiredAt.After(time.Now()) + return time.Now().Before(scv.expiredAt) +} + +func (scv signingCacheValue) ShouldRefresh() bool { + return false } // 创建静态域名下载 URL 生成器 @@ -305,13 +314,17 @@ func NewDomainsQueryURLsProvider(options *DomainsQueryURLsProviderOptions) (Down if cacheTTL == time.Duration(0) { cacheTTL = time.Hour } + cacheRefreshAfter := options.CacheRefreshAfter + if cacheRefreshAfter == time.Duration(0) { + cacheRefreshAfter = time.Hour / 2 + } persistentCache, err := getPersistentCache(persistentFilePath, compactInterval, persistentDuration) if err != nil { return nil, err } storage := apis.NewStorage(&options.Options) - return &domainsQueryURLsProvider{storage, persistentCache, creds, cacheTTL}, nil + return &domainsQueryURLsProvider{storage, persistentCache, creds, cacheTTL, cacheRefreshAfter}, nil } func (g *domainsQueryURLsProvider) GetURLsIter(ctx context.Context, objectName string, options *GenerateOptions) (URLsIter, error) { @@ -336,7 +349,12 @@ func (g *domainsQueryURLsProvider) GetURLsIter(ctx context.Context, objectName s if err != nil { return nil, err } else { - return &domainCacheValue{Domains: response.Domains, ExpiredAt: time.Now().Add(g.cacheTTL)}, nil + now := time.Now() + return &domainCacheValue{ + Domains: response.Domains, + RefreshAfter: now.Add(g.cacheRefreshAfter), + ExpiredAt: now.Add(g.cacheTTL), + }, nil } }) if status == cache.NoResultGot { @@ -365,6 +383,10 @@ func (left *domainCacheValue) IsValid() bool { return time.Now().Before(left.ExpiredAt) } +func (left *domainCacheValue) ShouldRefresh() bool { + return time.Now().After(left.RefreshAfter) +} + func getPersistentCache(persistentFilePath string, compactInterval, persistentDuration time.Duration) (*cache.Cache, error) { var ( persistentCache *cache.Cache diff --git a/storagev2/internal/api-specs/query_bucket_v2.yml b/storagev2/internal/api-specs/query_bucket_v2.yml index 39374404..e86ed7bc 100644 --- a/storagev2/internal/api-specs/query_bucket_v2.yml +++ b/storagev2/internal/api-specs/query_bucket_v2.yml @@ -37,6 +37,30 @@ response: name: UpDomains documentation: 上传域名 fields: + - field_name: bucket_accelerated_up_domains + key: bucket_acc + documentation: 空间级别的加速上传域名 + type: + struct: + name: BucketAcceleratedUpDomains + documentation: 空间级别的加速上传域名 + fields: + - field_name: main_accelerated_up_domains + key: main + documentation: 空间级别的主加速上传域名列表 + type: + array: + name: MainBucketAcceleratedUpDomains + documentation: 空间级别的主加速上传域名列表 + type: string + - field_name: backup_accelerated_up_domains + key: backup + documentation: 空间级别的备用加速上传域名列表 + type: + array: + name: BackupBucketAcceleratedUpDomains + documentation: 空间级别的备用加速上传域名列表 + type: string - field_name: accelerated_up_domains key: acc documentation: 加速上传域名 diff --git a/storagev2/internal/api-specs/query_bucket_v4.yml b/storagev2/internal/api-specs/query_bucket_v4.yml index f6c234b9..b05b3a11 100644 --- a/storagev2/internal/api-specs/query_bucket_v4.yml +++ b/storagev2/internal/api-specs/query_bucket_v4.yml @@ -49,6 +49,14 @@ response: name: UpDomains documentation: 上传域名 fields: + - field_name: accelerated_up_domains + key: acc_domains + documentation: 加速上传域名列表 + type: + array: + name: AcceleratedUpDomains + documentation: 加速上传域名列表 + type: string - field_name: prefered_up_domains key: domains documentation: 主上传域名列表 diff --git a/storagev2/region/all.go b/storagev2/region/all.go index c2851afd..b59ab35d 100644 --- a/storagev2/region/all.go +++ b/storagev2/region/all.go @@ -200,9 +200,11 @@ func (response *regionsResponse) toCacheValue() *v4QueryCacheValue { minTtl = host.Ttl } } + now := time.Now() return &v4QueryCacheValue{ - Regions: regions, - ExpiredAt: time.Now().Add(time.Duration(minTtl) * time.Second), + Regions: regions, + RefreshAfter: now.Add(time.Duration(minTtl/2) * time.Second), + ExpiredAt: now.Add(time.Duration(minTtl) * time.Second), } } diff --git a/storagev2/region/query.go b/storagev2/region/query.go index 05c7d165..d24caa03 100644 --- a/storagev2/region/query.go +++ b/storagev2/region/query.go @@ -109,8 +109,9 @@ type ( } v4QueryCacheValue struct { - Regions []*Region `json:"regions"` - ExpiredAt time.Time `json:"expired_at"` + Regions []*Region `json:"regions"` + RefreshAfter time.Time `json:"refresh_after"` + ExpiredAt time.Time `json:"expired_at"` } v4QueryServiceHosts struct { @@ -278,6 +279,10 @@ func (left *v4QueryCacheValue) IsValid() bool { return time.Now().Before(left.ExpiredAt) } +func (left *v4QueryCacheValue) ShouldRefresh() bool { + return time.Now().After(left.RefreshAfter) +} + func (response *v4QueryResponse) toCacheValue(accelerateUploading bool) *v4QueryCacheValue { var ( minTtl = int64(math.MaxInt64) @@ -289,9 +294,11 @@ func (response *v4QueryResponse) toCacheValue(accelerateUploading bool) *v4Query minTtl = host.Ttl } } + now := time.Now() return &v4QueryCacheValue{ - Regions: regions, - ExpiredAt: time.Now().Add(time.Duration(minTtl) * time.Second), + Regions: regions, + RefreshAfter: now.Add(time.Duration(minTtl) * time.Second / 2), + ExpiredAt: now.Add(time.Duration(minTtl) * time.Second), } } diff --git a/storagev2/resolver/resolver.go b/storagev2/resolver/resolver.go index 58a249aa..fe94a399 100644 --- a/storagev2/resolver/resolver.go +++ b/storagev2/resolver/resolver.go @@ -21,6 +21,12 @@ type ( Resolver interface { // Resolve 解析域名的 IP 地址 Resolve(context.Context, string) ([]net.IP, error) + + // FeedbackGood 反馈一批 IP 地址请求成功 + FeedbackGood(context.Context, string, []net.IP) + + // FeedbackBad 反馈一批 IP 地址请求失败 + FeedbackBad(context.Context, string, []net.IP) } defaultResolver struct{} @@ -38,6 +44,9 @@ func (resolver customizedResolver) Resolve(ctx context.Context, host string) ([] return resolver.resolveFn(ctx, host) } +func (resolver customizedResolver) FeedbackGood(context.Context, string, []net.IP) {} +func (resolver customizedResolver) FeedbackBad(context.Context, string, []net.IP) {} + // NewDefaultResolver 创建默认的域名解析器 func NewDefaultResolver() Resolver { return &defaultResolver{} @@ -55,11 +64,15 @@ func (resolver *defaultResolver) Resolve(ctx context.Context, host string) ([]ne return ips, nil } +func (resolver defaultResolver) FeedbackGood(context.Context, string, []net.IP) {} +func (resolver defaultResolver) FeedbackBad(context.Context, string, []net.IP) {} + type ( cacheResolver struct { - resolver Resolver - cache *cache.Cache - cacheLifetime time.Duration + resolver Resolver + cache *cache.Cache + cacheLifetime time.Duration + cacheRefreshAfter time.Duration } // CacheResolverConfig 缓存域名解析器选项 @@ -75,12 +88,21 @@ type ( // 缓存有效期(默认:120s) CacheLifetime time.Duration + + // 缓存刷新时间(默认:80s) + CacheRefreshAfter time.Duration } - resolverCacheValue struct { - IPs []net.IP `json:"ips"` + resolverCacheValueIP struct { + IP net.IP `json:"ip"` ExpiredAt time.Time `json:"expired_at"` } + + resolverCacheValue struct { + IPs []resolverCacheValueIP `json:"ips"` + RefreshAfter time.Time `json:"refresh_after"` + ExpiredAt time.Time `json:"expired_at"` + } ) const cacheFileName = "resolver_01.cache.json" @@ -112,6 +134,10 @@ func NewCacheResolver(resolver Resolver, opts *CacheResolverConfig) (Resolver, e if cacheLifetime == time.Duration(0) { cacheLifetime = 120 * time.Second } + cacheRefreshAfter := opts.CacheRefreshAfter + if cacheRefreshAfter == time.Duration(0) { + cacheRefreshAfter = 80 * time.Second + } if resolver == nil { resolver = staticDefaultResolver } @@ -121,9 +147,10 @@ func NewCacheResolver(resolver Resolver, opts *CacheResolverConfig) (Resolver, e return nil, err } return &cacheResolver{ - cache: persistentCache, - resolver: resolver, - cacheLifetime: cacheLifetime, + cache: persistentCache, + resolver: resolver, + cacheLifetime: cacheLifetime, + cacheRefreshAfter: cacheRefreshAfter, }, nil } @@ -158,32 +185,105 @@ func getPersistentCache(persistentFilePath string, compactInterval, persistentDu return persistentCache, nil } +func (resolver *cacheResolver) resolve(ctx context.Context, host string) (*resolverCacheValue, error) { + if ips, err := resolver.resolver.Resolve(ctx, host); err != nil { + return nil, err + } else { + now := time.Now() + cacheValueIPs := make([]resolverCacheValueIP, len(ips)) + for i, ip := range ips { + cacheValueIPs[i] = resolverCacheValueIP{IP: ip, ExpiredAt: now.Add(resolver.cacheLifetime)} + } + return &resolverCacheValue{ + IPs: cacheValueIPs, + RefreshAfter: now.Add(resolver.cacheRefreshAfter), + ExpiredAt: now.Add(resolver.cacheLifetime), + }, nil + } +} + func (resolver *cacheResolver) Resolve(ctx context.Context, host string) ([]net.IP, error) { - lip, err := resolver.localIp(host) + lip, err := resolver.localIp() if err != nil { return nil, err } - cacheValue, status := resolver.cache.Get(lip+":"+host, func() (cache.CacheValue, error) { - var ips []net.IP - if ips, err = resolver.resolver.Resolve(ctx, host); err != nil { + cacheKey := lip + ":" + host + var rcv *resolverCacheValue + if shouldByPassResolveCache(ctx) { + if rcv, err = resolver.resolve(ctx, host); err != nil { + return nil, err + } + } else { + cacheValue, status := resolver.cache.Get(cacheKey, func() (cache.CacheValue, error) { + var cacheValue cache.CacheValue + cacheValue, err = resolver.resolve(ctx, host) + return cacheValue, err + }) + if status == cache.NoResultGot || status == cache.GetResultFromInvalidCache { return nil, err - } else { - return &resolverCacheValue{IPs: ips, ExpiredAt: time.Now().Add(resolver.cacheLifetime)}, nil } + rcv = cacheValue.(*resolverCacheValue) + } + now := time.Now() + ips := make([]net.IP, 0, len(rcv.IPs)) + for _, cacheValueIP := range rcv.IPs { + if cacheValueIP.ExpiredAt.After(now) { + ips = append(ips, cacheValueIP.IP) + } + } + if len(ips) < len(rcv.IPs) { + newCacheValue := &resolverCacheValue{ + IPs: make([]resolverCacheValueIP, 0, len(rcv.IPs)), + RefreshAfter: rcv.RefreshAfter, + ExpiredAt: rcv.ExpiredAt, + } + for _, cacheValueIP := range rcv.IPs { + if cacheValueIP.ExpiredAt.After(now) { + newCacheValue.IPs = append(newCacheValue.IPs, cacheValueIP) + } + } + resolver.cache.Set(cacheKey, newCacheValue) + } + + return ips, nil +} + +func (resolver cacheResolver) FeedbackGood(ctx context.Context, host string, ips []net.IP) { + lip, err := resolver.localIp() + if err != nil { + return + } + cacheKey := lip + ":" + host + cacheValue, status := resolver.cache.Get(cacheKey, func() (cache.CacheValue, error) { + return nil, context.Canceled }) - if status == cache.NoResultGot { - return nil, err + if status == cache.GetResultFromCache || status == cache.GetResultFromCacheAndRefreshAsync { + rcv := cacheValue.(*resolverCacheValue) + now := time.Now() + anyIPLiveLonger := false + for i := range rcv.IPs { + if isIPContains(ips, rcv.IPs[i].IP) { + rcv.IPs[i].ExpiredAt = now.Add(resolver.cacheLifetime) + anyIPLiveLonger = true + } + } + if anyIPLiveLonger { + rcv.RefreshAfter = now.Add(resolver.cacheRefreshAfter) + rcv.ExpiredAt = now.Add(resolver.cacheLifetime) + resolver.cache.Set(cacheKey, rcv) + } } - return cacheValue.(*resolverCacheValue).IPs, nil } +func (resolver cacheResolver) FeedbackBad(context.Context, string, []net.IP) {} + func (left *resolverCacheValue) IsEqual(rightValue cache.CacheValue) bool { if right, ok := rightValue.(*resolverCacheValue); ok { if len(left.IPs) != len(right.IPs) { return false } for idx := range left.IPs { - if !left.IPs[idx].Equal(right.IPs[idx]) { + if !left.IPs[idx].IP.Equal(right.IPs[idx].IP) || !left.IPs[idx].ExpiredAt.Equal(right.IPs[idx].ExpiredAt) { return false } } @@ -196,8 +296,12 @@ func (left *resolverCacheValue) IsValid() bool { return time.Now().Before(left.ExpiredAt) } -func (*cacheResolver) localIp(host string) (string, error) { - conn, err := net.Dial("udp", host+":80") +func (left *resolverCacheValue) ShouldRefresh() bool { + return time.Now().After(left.RefreshAfter) +} + +func (*cacheResolver) localIp() (string, error) { + conn, err := net.Dial("udp", "223.5.5.5:80") if err != nil { return "", err } @@ -206,6 +310,21 @@ func (*cacheResolver) localIp(host string) (string, error) { return conn.LocalAddr().(*net.UDPAddr).IP.String(), nil } +type ( + byPassResolverCacheContextKey struct{} + byPassResolverCacheContextValue struct{} +) + +// WithByPassResolverCache 设置 Context 绕过 Resolver 内部缓存 +func WithByPassResolverCache(ctx context.Context) context.Context { + return context.WithValue(ctx, byPassResolverCacheContextKey{}, byPassResolverCacheContextValue{}) +} + +func shouldByPassResolveCache(ctx context.Context) bool { + _, ok := ctx.Value(byPassResolverCacheContextKey{}).(byPassResolverCacheContextValue) + return ok +} + func calcPersistentCacheCrc64(persistentFilePath string, compactInterval, persistentDuration time.Duration) uint64 { bytes := make([]byte, 0, 1024) bytes = strconv.AppendInt(bytes, int64(compactInterval), 36) @@ -214,3 +333,12 @@ func calcPersistentCacheCrc64(persistentFilePath string, compactInterval, persis bytes = append(bytes, byte(0)) return crc64.Checksum(bytes, crc64.MakeTable(crc64.ISO)) } + +func isIPContains(t []net.IP, i net.IP) bool { + for _, tt := range t { + if tt.Equal(i) { + return true + } + } + return false +} diff --git a/storagev2/resolver/resolver_test.go b/storagev2/resolver/resolver_test.go index 9d2baa74..cc10b3e6 100644 --- a/storagev2/resolver/resolver_test.go +++ b/storagev2/resolver/resolver_test.go @@ -9,6 +9,7 @@ import ( "net" "os" "testing" + "time" "github.com/qiniu/go-sdk/v7/storagev2/resolver" ) @@ -32,6 +33,10 @@ func (mr *mockResolver) Resolve(ctx context.Context, host string) ([]net.IP, err return mr.m[host], nil } +func (mr *mockResolver) FeedbackGood(context.Context, string, []net.IP) {} + +func (mr *mockResolver) FeedbackBad(context.Context, string, []net.IP) {} + func TestCacheResolver(t *testing.T) { tmpFile, err := ioutil.TempFile("", "") if err != nil { @@ -40,9 +45,11 @@ func TestCacheResolver(t *testing.T) { defer os.Remove(tmpFile.Name()) defer tmpFile.Close() - mr := &mockResolver{m: map[string][]net.IP{"upload.qiniup.com": {net.IPv4(1, 1, 1, 1)}}, c: make(map[string]int)} + mr := &mockResolver{m: map[string][]net.IP{"upload.qiniup.com": {net.IPv4(1, 1, 1, 1), net.IPv4(1, 1, 2, 2)}}, c: make(map[string]int)} resolver, err := resolver.NewCacheResolver(mr, &resolver.CacheResolverConfig{ PersistentFilePath: tmpFile.Name(), + CacheRefreshAfter: 3 * time.Second, + CacheLifetime: 2 * time.Second, }) if err != nil { @@ -53,11 +60,25 @@ func TestCacheResolver(t *testing.T) { if err != nil { t.Fatal(err) } - if len(ips) != 1 || !ips[0].Equal(net.IPv4(1, 1, 1, 1)) { + if len(ips) != 2 || !ips[0].Equal(net.IPv4(1, 1, 1, 1)) || !ips[1].Equal(net.IPv4(1, 1, 2, 2)) { t.Fatal("Unexpected ips") } } if mr.c["upload.qiniup.com"] != 1 { t.Fatal("Unexpected cache") } + + time.Sleep(1000 * time.Millisecond) + resolver.FeedbackGood(context.Background(), "upload.qiniup.com", []net.IP{net.IPv4(1, 1, 1, 1)}) + time.Sleep(1500 * time.Millisecond) + ips, err := resolver.Resolve(context.Background(), "upload.qiniup.com") + if err != nil { + t.Fatal(err) + } + if len(ips) != 1 || !ips[0].Equal(net.IPv4(1, 1, 1, 1)) { + t.Fatal("Unexpected ips") + } + if mr.c["upload.qiniup.com"] != 1 { + t.Fatal("Unexpected cache") + } } diff --git a/storagev2/uptoken/putpolicy.go b/storagev2/uptoken/putpolicy.go index bf86d83e..7ee2a002 100644 --- a/storagev2/uptoken/putpolicy.go +++ b/storagev2/uptoken/putpolicy.go @@ -12,28 +12,29 @@ import ( type PutPolicy map[string]interface{} const ( - putPolicyKeyScope = "scope" - putPolicyKeyDeadline = "deadline" - putPolicyKeyIsPrefixalScope = "isPrefixalScope" - putPolicyKeyInsertOnly = "insertOnly" - putPolicyKeyEndUser = "endUser" - putPolicyKeyReturnUrl = "returnUrl" - putPolicyKeyReturnBody = "returnBody" - putPolicyKeyCallbackUrl = "callbackUrl" - putPolicyKeyCallbackHost = "callbackHost" - putPolicyKeyCallbackBody = "callbackBody" - putPolicyKeyCallbackBodyType = "callbackBodyType" - putPolicyKeyPersistentOps = "persistentOps" - putPolicyKeyPersistentNotifyUrl = "persistentNotifyUrl" - putPolicyKeyPersistentPipeline = "persistentPipeline" - putPolicyKeyPersistentType = "persistentType" - putPolicyKeyForceSaveKey = "forceSaveKey" - putPolicyKeySaveKey = "saveKey" - putPolicyKeyFsizeMin = "fsizeMin" - putPolicyKeyFsizeLimit = "fsizeLimit" - putPolicyKeyDetectMime = "detectMime" - putPolicyKeyMimeLimit = "mimeLimit" - putPolicyKeyFileType = "fileType" + putPolicyKeyScope = "scope" + putPolicyKeyDeadline = "deadline" + putPolicyKeyIsPrefixalScope = "isPrefixalScope" + putPolicyKeyInsertOnly = "insertOnly" + putPolicyKeyEndUser = "endUser" + putPolicyKeyReturnUrl = "returnUrl" + putPolicyKeyReturnBody = "returnBody" + putPolicyKeyCallbackUrl = "callbackUrl" + putPolicyKeyCallbackHost = "callbackHost" + putPolicyKeyCallbackBody = "callbackBody" + putPolicyKeyCallbackBodyType = "callbackBodyType" + putPolicyKeyPersistentOps = "persistentOps" + putPolicyKeyPersistentNotifyUrl = "persistentNotifyUrl" + putPolicyKeyPersistentPipeline = "persistentPipeline" + putPolicyKeyPersistentType = "persistentType" + putPolicyKeyPersistentWorkflowTemplateID = "persistentWorkflowTemplateID" + putPolicyKeyForceSaveKey = "forceSaveKey" + putPolicyKeySaveKey = "saveKey" + putPolicyKeyFsizeMin = "fsizeMin" + putPolicyKeyFsizeLimit = "fsizeLimit" + putPolicyKeyDetectMime = "detectMime" + putPolicyKeyMimeLimit = "mimeLimit" + putPolicyKeyFileType = "fileType" ) var ( @@ -249,6 +250,17 @@ func (putPolicy PutPolicy) SetPersistentType(value int64) PutPolicy { return putPolicy } +// GetPersistentWorkflowTemplateID 获取工作流模版 ID +func (putPolicy PutPolicy) GetPersistentWorkflowTemplateID() (string, bool) { + return putPolicy.getString(putPolicyKeyPersistentWorkflowTemplateID) +} + +// SetPersistentWorkflowTemplateID 指定工作流模版 ID +func (putPolicy PutPolicy) SetPersistentWorkflowTemplateID(value string) PutPolicy { + _ = putPolicy.Set(putPolicyKeyPersistentWorkflowTemplateID, value) + return putPolicy +} + // GetForceSaveKey 获取 saveKey 的优先级设置 func (putPolicy PutPolicy) GetForceSaveKey() (bool, bool) { return putPolicy.getBool(putPolicyKeyForceSaveKey)