Skip to content

Commit

Permalink
Merge branch 'zhoushuguang:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
cpf2021-gif authored Nov 4, 2023
2 parents 0820fb9 + 4fa9a51 commit 144c557
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 16 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,11 @@ https://www.bilibili.com/video/BV1oB4y1f7Tr/
#### 文档
https://pwmzlkcu3p.feishu.cn/docx/Si1Cd4EGxoZXkJxGenzcFttOnsh
#### 视频
https://www.bilibili.com/video/BV1je411R7iy/

### 第十一课
#### 文档
https://pwmzlkcu3p.feishu.cn/docx/NYyHdpSzhoB8Zkxdb6NcpUGynH4
#### 视频


23 changes: 20 additions & 3 deletions application/article/mq/etc/article.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,30 @@ ArticleKqConsumerConf:
Name: article-cache-kq-consumer
Brokers:
- 127.0.1:9092
Group: group-article
Topic: topic-article
Group: group-article-event
Topic: topic-article-event
Offset: last
Consumers: 1
Processors: 1
Datasource: root:Zsg123456@tcp(127.0.0.1:3306)/beyond_article?parseTime=true
BizRedis:
Host: 127.0.0.1:6379
Pass:
Type: node
Type: node
Es:
Addresses:
- http://localhost:9200/
Username: elastic
Password: HLmVfh-0Pr*YX5x4JSFl
Prometheus:
Host: 0.0.0.0
Port: 9101
Path: /metrics
Telemetry:
Endpoint: http://127.0.0.1:14268/api/traces
UserRPC:
Etcd:
Hosts:
- 127.0.0.1:2379
Key: user.rpc
NonBlock: true
11 changes: 11 additions & 0 deletions application/article/mq/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ package config

import (
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/zrpc"
)

type Config struct {
service.ServiceConf

KqConsumerConf kq.KqConf
ArticleKqConsumerConf kq.KqConf
Datasource string
BizRedis redis.RedisConf
// es config
Es struct {
Addresses []string
Username string
Password string
}
UserRPC zrpc.RpcClientConf
}
89 changes: 78 additions & 11 deletions application/article/mq/internal/logic/articlelogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

"beyond/application/article/mq/internal/svc"
"beyond/application/article/mq/internal/types"
"beyond/application/user/rpc/user"

"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/zeromicro/go-zero/core/logx"
)

Expand All @@ -36,50 +39,114 @@ func (l *ArticleLogic) Consume(_, val string) error {
return err
}

return l.articleOperate(l.ctx, msg)
return l.articleOperate(msg)
}

func (l *ArticleLogic) articleOperate(ctx context.Context, msg *types.CanalArticleMsg) error {
func (l *ArticleLogic) articleOperate(msg *types.CanalArticleMsg) error {
if len(msg.Data) == 0 {
return nil
}

var esData []*types.ArticleEsMsg
for _, d := range msg.Data {
status, _ := strconv.Atoi(d.Status)
likNum, _ := strconv.ParseInt(d.LikeNum, 10, 64)
articleId, _ := strconv.ParseInt(d.ID, 10, 64)
authorId, _ := strconv.ParseInt(d.AuthorId, 10, 64)

t, err := time.ParseInLocation("2006-01-02 15:04:05", d.PublishTime, time.Local)
publishTimeKey := articlesKey(d.AuthorId, 0)
likeNumKey := articlesKey(d.AuthorId, 1)

switch status {
case types.ArticleStatusVisible:
b, _ := l.svcCtx.BizRedis.ExistsCtx(ctx, publishTimeKey)
b, _ := l.svcCtx.BizRedis.ExistsCtx(l.ctx, publishTimeKey)
if b {
_, err = l.svcCtx.BizRedis.ZaddCtx(ctx, publishTimeKey, t.Unix(), d.ID)
_, err = l.svcCtx.BizRedis.ZaddCtx(l.ctx, publishTimeKey, t.Unix(), d.ID)
if err != nil {
l.Logger.Errorf("ZaddCtx key: %s req: %v error: %v", publishTimeKey, d, err)
}
}
b, _ = l.svcCtx.BizRedis.ExistsCtx(ctx, likeNumKey)
b, _ = l.svcCtx.BizRedis.ExistsCtx(l.ctx, likeNumKey)
if b {
_, err = l.svcCtx.BizRedis.ZaddCtx(ctx, likeNumKey, likNum, d.ID)
_, err = l.svcCtx.BizRedis.ZaddCtx(l.ctx, likeNumKey, likNum, d.ID)
if err != nil {
l.Logger.Errorf("ZaddCtx key: %s req: %v error: %v", likeNumKey, d, err)
}
}
case types.ArticleStatusUserDelete:
_, err = l.svcCtx.BizRedis.ZremCtx(ctx, publishTimeKey, d.ID)
_, err = l.svcCtx.BizRedis.ZremCtx(l.ctx, publishTimeKey, d.ID)
if err != nil {
logx.Errorf("ZremCtx key: %s req: %v error: %v", publishTimeKey, d, err)
l.Logger.Errorf("ZremCtx key: %s req: %v error: %v", publishTimeKey, d, err)
}
_, err = l.svcCtx.BizRedis.ZremCtx(ctx, likeNumKey, d.ID)
_, err = l.svcCtx.BizRedis.ZremCtx(l.ctx, likeNumKey, d.ID)
if err != nil {
logx.Errorf("ZremCtx key: %s req: %v error: %v", likeNumKey, d, err)
l.Logger.Errorf("ZremCtx key: %s req: %v error: %v", likeNumKey, d, err)
}
}

u, err := l.svcCtx.UserRPC.FindById(l.ctx, &user.FindByIdRequest{
UserId: authorId,
})
if err != nil {
l.Logger.Errorf("FindById userId: %d error: %v", authorId, err)
return err
}

esData = append(esData, &types.ArticleEsMsg{
ArticleId: articleId,
AuthorId: authorId,
AuthorName: u.Username,
Title: d.Title,
Content: d.Content,
Description: d.Description,
Status: status,
LikeNum: likNum,
})
}
err := l.BatchUpSertToEs(l.ctx, esData)
if err != nil {
l.Logger.Errorf("BatchUpSertToEs data: %v error: %v", esData, err)
}

return err
}

func (l *ArticleLogic) BatchUpSertToEs(ctx context.Context, data []*types.ArticleEsMsg) error {
if len(data) == 0 {
return nil
}

bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: l.svcCtx.Es.Client,
Index: "article-index",
})
if err != nil {
return err
}

for _, d := range data {
v, err := json.Marshal(d)
if err != nil {
return err
}

payload := fmt.Sprintf(`{"doc":%s,"doc_as_upsert":true}`, string(v))
err = bi.Add(ctx, esutil.BulkIndexerItem{
Action: "update",
DocumentID: fmt.Sprintf("%d", d.ArticleId),
Body: strings.NewReader(payload),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem) {
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, err error) {
},
})
if err != nil {
return err
}
}

return nil
return bi.Close(ctx)
}

func articlesKey(uid string, sortType int32) string {
Expand Down
11 changes: 11 additions & 0 deletions application/article/mq/internal/svc/servicecontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package svc
import (
"beyond/application/article/mq/internal/config"
"beyond/application/article/mq/internal/model"
"beyond/application/user/rpc/user"
"beyond/pkg/es"
"github.com/zeromicro/go-zero/zrpc"

"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/sqlx"
Expand All @@ -12,6 +15,8 @@ type ServiceContext struct {
Config config.Config
ArticleModel model.ArticleModel
BizRedis *redis.Redis
UserRPC user.User
Es *es.Es
}

func NewServiceContext(c config.Config) *ServiceContext {
Expand All @@ -29,5 +34,11 @@ func NewServiceContext(c config.Config) *ServiceContext {
Config: c,
ArticleModel: model.NewArticleModel(conn),
BizRedis: rds,
UserRPC: user.NewUser(zrpc.MustNewClient(c.UserRPC)),
Es: es.MustNewEs(&es.Config{
Addresses: c.Es.Addresses,
Username: c.Es.Username,
Password: c.Es.Password,
}),
}
}
31 changes: 30 additions & 1 deletion application/article/mq/internal/types/article.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,38 @@ type CanalLikeMsg struct {
type CanalArticleMsg struct {
Data []struct {
ID string `json:"id"`
Title string `json:"title"`
Content string `json:"content"`
Description string `json:"description"`
AuthorId string `json:"author_id"`
Status string `json:"status"`
PublishTime string `json:"publish_time"`
CommentNum string `json:"comment_num"`
LikeNum string `json:"like_num"`
CollectNum string `json:"collect_num"`
ViewNum string `json:"view_num"`
ShareNum string `json:"share_num"`
TagIds string `json:"tag_ids"`
PublishTime string `json:"publish_time"`
CreateTime string `json:"create_time"`
UpdateTime string `json:"update_time"`
}
}

type ArticleEsMsg struct {
ArticleId int64 `json:"article_id"`
Title string `json:"title"`
Content string `json:"content"`
Description string `json:"description"`
AuthorId int64 `json:"author_id"`
AuthorName string `json:"author_name"`
Status int `json:"status"`
CommentNum int64 `json:"comment_num"`
LikeNum int64 `json:"like_num"`
CollectNum int64 `json:"collect_num"`
ViewNum int64 `json:"view_num"`
ShareNum int64 `json:"share_num"`
TagIds []int64 `json:"tag_ids"`
PublishTime string `json:"publish_time"`
CreateTime string `json:"create_time"`
UpdateTime string `json:"update_time"`
}
4 changes: 4 additions & 0 deletions application/article/mq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func main() {

var c config.Config
conf.MustLoad(*configFile, &c)
err := c.ServiceConf.SetUp()
if err != nil {
panic(err)
}

logx.DisableStat()
svcCtx := svc.NewServiceContext(c)
Expand Down
5 changes: 4 additions & 1 deletion db/article.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ CREATE TABLE `article` (


insert into article(title, content, author_id, like_num, publish_time) values ('文章标题1', '文章内容1', 1, 1, '2023-10-04 17:01:01');
insert into article(title, content, author_id, like_num, publish_time) values ('文章标题2', '文章内容2', 1, 10, '2023-10-04 15:01:01');
insert into article(title, content, author_id, like_num, publish_time) values ('文章标题2', '文章内容2', 1, 10, '2023-10-04 15:01:01');



2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/elastic/go-elasticsearch/v8 v8.10.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-elasticsearch/v8 v8.10.1 h1:JJ3i2DimYTsJcUoEGbg6tNB0eehTNdid9c5kTR1TGuI=
github.com/elastic/go-elasticsearch/v8 v8.10.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE=
Expand Down
Loading

0 comments on commit 144c557

Please sign in to comment.