Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add redis to video-service #77

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ module toktik
go 1.20

require (
github.com/alicebob/miniredis/v2 v2.30.5
github.com/cloudwego/fastpb v0.0.4
github.com/cloudwego/hertz v0.6.6
github.com/cloudwego/kitex v0.6.2
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/glebarez/sqlite v1.9.0
github.com/go-redis/redis/v7 v7.4.1
github.com/golang/mock v1.6.0
github.com/hashicorp/consul/api v1.20.0
github.com/hertz-contrib/cors v0.0.0-20230423034624-2bc83a8400f0
Expand All @@ -26,6 +28,7 @@ require (
)

require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/apache/thrift v0.13.0 // indirect
github.com/armon/go-metrics v0.4.0 // indirect
github.com/aws/aws-sdk-go v1.38.20 // indirect
Expand Down Expand Up @@ -76,6 +79,8 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nyaruka/phonenumbers v1.0.55 // indirect
github.com/oleiade/lane v1.0.1 // indirect
github.com/onsi/ginkgo v1.14.2 // indirect
github.com/onsi/gomega v1.10.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
Expand All @@ -89,6 +94,7 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/u2takey/go-utils v0.3.1 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
golang.org/x/arch v0.2.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
Expand Down
5 changes: 4 additions & 1 deletion internal/video/etc/config.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ RabbitMQ:
Password: "Aa112211"
Queue: "publish"
Exchange: "default"
Routing_Key: "publish"
Routing_Key: "publish"
Redis:
Addr: "localhost:6379"
Password: "123456"
4 changes: 3 additions & 1 deletion internal/video/pkg/ctx/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"toktik/pkg/config"
"toktik/pkg/db"
"toktik/pkg/rabbitmq"
"toktik/pkg/redis"
)

// ServiceContext contains the components required by the service.
Expand All @@ -30,6 +31,7 @@ type ServiceContext struct {
// NewServiceContext initialize the components and returns a new ServiceContext instance.
func NewServiceContext() *ServiceContext {
db.Init()
redis.InitRedisClient()
minioClient, err := minio.New(
config.Conf.GetString(config.KEY_MINIO_ENDPOINT),
config.Conf.GetString(config.KEY_MINIO_ACCESS_KEY),
Expand Down Expand Up @@ -58,7 +60,7 @@ func NewServiceContext() *ServiceContext {
}

return &ServiceContext{
VideoService: video.NewVideoService(db.Instance),
VideoService: video.NewVideoService(db.Instance, redis.Instance),
MinioClient: minioClient,
UserClient: user.MustNewClient("user", client.WithResolver(r), client.WithRPCTimeout(time.Second*3)),
FavoriteClient: favorite.MustNewClient("favorite", client.WithResolver(r), client.WithRPCTimeout(time.Second*3)),
Expand Down
177 changes: 163 additions & 14 deletions internal/video/pkg/video/video.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package video

import (
"errors"
"fmt"
"math/rand"
"strconv"
"time"

goredis "github.com/go-redis/redis/v7"
"gorm.io/gorm"

"toktik/pkg/db/model"
Expand All @@ -11,43 +16,159 @@ import (
type Video = model.Video

type VideoService struct {
dbInstance func() *gorm.DB
dbInstance func() *gorm.DB
redisInstance func() *goredis.Client
}

func NewVideoService(db func() *gorm.DB) *VideoService {
const (
KeyId = "Id"
KeyUserId = "UserId"
KeyTitle = "Title"
KeyPlayUrl = "PlayUrl"
KeyCoverUrl = "CoverUrl"
KeyExisted = "Existed"
)

func NewVideoService(db func() *gorm.DB, rdb func() *goredis.Client) *VideoService {
return &VideoService{
dbInstance: db,
dbInstance: db,
redisInstance: rdb,
}
}

func (s *VideoService) CreateVideo(userId int64, title, playUrl, coverUrl string) error {
db := s.dbInstance()
rdb := s.redisInstance()

return db.Create(&Video{
video := &Video{
UserId: userId,
Title: title,
PlayUrl: playUrl,
CoverUrl: coverUrl,
}).Error
}

if err := db.Create(video).Error; err != nil {
return err
}

key := strconv.FormatInt(video.Id, 10)
err := rdb.HSet(key, FormatVideoInfo(video, true)).Err()
if err != nil {
return err
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个错误应该忽略

rdb.Expire(key, generateExpireTime())

// 将视频ID添加到Set中
setKey := fmt.Sprintf("user_videos:%d", userId)

_, err = rdb.SAdd(setKey, video.Id).Result()
if err != nil {
return err
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rdb.Expire(setKey, generateExpireTime())
return nil
}

func (s *VideoService) ListVideoByUserId(userId int64) ([]*Video, error) {
db := s.dbInstance()
rdb := s.redisInstance()

videos := make([]*Video, 0)
if err := db.Where("user_id = ?", userId).Find(&videos).Error; err != nil {
setKey := fmt.Sprintf("user_videos:%d", userId)
videoIds, err := rdb.SMembers(setKey).Result()
if err != nil {
return nil, err
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redis没有数据时,应该继续从数据库中拿取


// 未缓存,从数据库中获取
if len(videoIds) == 0 {
db := s.dbInstance()

var videos []*Video
if err := db.Where("user_id = ?", userId).Order("created_at desc").Find(&videos).Error; err != nil {
return nil, err
}

// 如果数据库中没有数据,将0添加到缓存中
if len(videos) == 0 {
_, err := rdb.SAdd(setKey, 0).Result()
if err != nil {
return nil, err
}
rdb.Expire(setKey, generateExpireTime())
return []*Video{}, nil
}

// 将视频ID添加到Set中
for _, video := range videos {
_, err := rdb.SAdd(setKey, video.Id).Result()
if err != nil {
return nil, err
}
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里可以一次add多个,videoIds...

rdb.Expire(setKey, generateExpireTime())

return videos, nil
}

// 缓存了但为空,直接返回
if len(videoIds) == 1 && videoIds[0] == "0" {
return []*Video{}, nil
}

// 命中缓存,从缓存中获取
videos := make([]*Video, 0, len(videoIds))
for _, key := range videoIds {
videoData, err := rdb.HGetAll(key).Result()
if err != nil {
return nil, err
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

获取失败从数据库拿


// 解析videoData
video := ParseVideoInfo(videoData)
videos = append(videos, video)
}

return videos, nil
}

func (s *VideoService) GetVideoByIds(videoIds []int64) ([]*Video, error) {
db := s.dbInstance()

videos := make([]*Video, 0)
if err := db.Where("id IN ?", videoIds).Find(&videos).Error; err != nil {
return nil, err
rdb := s.redisInstance()

videos := make([]*Video, 0, len(videoIds))
for _, videoId := range videoIds {
videoData, err := rdb.HGetAll(strconv.FormatInt(videoId, 10)).Result()
if err != nil {
return nil, err
} else if len(videoData) == 0 {
// 未命中缓存,从数据库中获取
db := s.dbInstance()
video := &Video{}
if err := db.Where("id = ?", videoId).First(video).Error; err != nil {
// 如果未找到记录,则在缓存中标识视频不存在
if errors.Is(err, gorm.ErrRecordNotFound) {
key := strconv.FormatInt(videoId, 10)
err = rdb.HSet(key, FormatVideoInfo(video, false)).Err()
if err != nil {
return nil, err
}
rdb.Expire(key, generateExpireTime())
return nil, fmt.Errorf("video not found: %d", videoId)
}
// internal error
return nil, err
} else {
videos = append(videos, video)
}
} else {
// 判断视频是否存在
existed := videoData[KeyExisted]
if existed != "1" {
return nil, fmt.Errorf("video not found: %d", videoId)
}

// 解析videoData
video := ParseVideoInfo(videoData)
videos = append(videos, video)
}
}

return videos, nil
Expand All @@ -73,7 +194,7 @@ func (s *VideoService) GetFeed(latestTime int64) ([]*Video, error) {
if latestTime == 0 {
timeValue = time.Now()
} else {
timeValue = time.Unix(latestTime / time.Microsecond.Nanoseconds(), 0)
timeValue = time.Unix(latestTime/time.Microsecond.Nanoseconds(), 0)
}

if err := db.Where("created_at < ?", timeValue).Order("created_at desc").Limit(30).Find(&videos).Error; err != nil {
Expand All @@ -82,3 +203,31 @@ func (s *VideoService) GetFeed(latestTime int64) ([]*Video, error) {

return videos, nil
}

func FormatVideoInfo(video *Video, existed bool) map[string]interface{} {
return map[string]interface{}{
KeyId: video.Id,
KeyUserId: video.UserId,
KeyTitle: video.Title,
KeyPlayUrl: video.PlayUrl,
KeyCoverUrl: video.CoverUrl,
KeyExisted: existed,
}
}

func ParseVideoInfo(videoData map[string]string) *Video {
id, _ := strconv.ParseInt(videoData[KeyId], 10, 64)
userId, _ := strconv.ParseInt(videoData[KeyUserId], 10, 64)
return &Video{
Id: id,
UserId: userId,
Title: videoData[KeyTitle],
PlayUrl: videoData[KeyPlayUrl],
CoverUrl: videoData[KeyCoverUrl],
}
}

func generateExpireTime() time.Duration {
// 引入随机数减轻缓存雪崩
return time.Duration(60*60*24+rand.Intn(3600)) * time.Second
}
Loading