Skip to content

Commit

Permalink
Add redis to video-service
Browse files Browse the repository at this point in the history
  • Loading branch information
reeered committed Sep 11, 2023
1 parent f1109f4 commit c6030b0
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 26 deletions.
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ module toktik
go 1.20

require (
github.com/alicebob/miniredis/v2 v2.30.5
github.com/cloudwego/fastpb v0.0.4
github.com/cloudwego/hertz v0.6.6
github.com/cloudwego/kitex v0.6.2
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/glebarez/sqlite v1.9.0
github.com/go-redis/redis/v7 v7.4.1
github.com/golang/mock v1.6.0
github.com/hashicorp/consul/api v1.20.0
github.com/hertz-contrib/cors v0.0.0-20230423034624-2bc83a8400f0
Expand All @@ -26,6 +28,7 @@ require (
)

require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/apache/thrift v0.13.0 // indirect
github.com/armon/go-metrics v0.4.0 // indirect
github.com/aws/aws-sdk-go v1.38.20 // indirect
Expand Down Expand Up @@ -76,6 +79,8 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nyaruka/phonenumbers v1.0.55 // indirect
github.com/oleiade/lane v1.0.1 // indirect
github.com/onsi/ginkgo v1.14.2 // indirect
github.com/onsi/gomega v1.10.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
Expand All @@ -89,6 +94,7 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/u2takey/go-utils v0.3.1 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
golang.org/x/arch v0.2.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
Expand Down
5 changes: 4 additions & 1 deletion internal/video/etc/config.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ RabbitMQ:
Password: "Aa112211"
Queue: "publish"
Exchange: "default"
Routing_Key: "publish"
Routing_Key: "publish"
Redis:
Addr: "localhost:6379"
Password: "123456"
4 changes: 3 additions & 1 deletion internal/video/pkg/ctx/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"toktik/pkg/config"
"toktik/pkg/db"
"toktik/pkg/rabbitmq"
"toktik/pkg/redis"
)

// ServiceContext contains the components required by the service.
Expand All @@ -30,6 +31,7 @@ type ServiceContext struct {
// NewServiceContext initialize the components and returns a new ServiceContext instance.
func NewServiceContext() *ServiceContext {
db.Init()
rdb := redis.NewRedisClient(redis.VideoBucket)
minioClient, err := minio.New(
config.Conf.GetString(config.KEY_MINIO_ENDPOINT),
config.Conf.GetString(config.KEY_MINIO_ACCESS_KEY),
Expand Down Expand Up @@ -58,7 +60,7 @@ func NewServiceContext() *ServiceContext {
}

return &ServiceContext{
VideoService: video.NewVideoService(db.Instance),
VideoService: video.NewVideoService(db.Instance, rdb),
MinioClient: minioClient,
UserClient: user.MustNewClient("user", client.WithResolver(r), client.WithRPCTimeout(time.Second*3)),
FavoriteClient: favorite.MustNewClient("favorite", client.WithResolver(r), client.WithRPCTimeout(time.Second*3)),
Expand Down
176 changes: 162 additions & 14 deletions internal/video/pkg/video/video.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package video

import (
"errors"
"fmt"
"strconv"
"time"

"github.com/go-redis/redis/v7"
"gorm.io/gorm"

"toktik/pkg/db/model"
Expand All @@ -11,43 +15,175 @@ import (
type Video = model.Video

type VideoService struct {
dbInstance func() *gorm.DB
dbInstance func() *gorm.DB
redisInstance func() *redis.Client
}

func NewVideoService(db func() *gorm.DB) *VideoService {
const (
KeyId = "Id"
KeyUserId = "UserId"
KeyTitle = "Title"
KeyPlayUrl = "PlayUrl"
KeyCoverUrl = "CoverUrl"
KeyExisted = "Existed"
)

const ExpiredTime = 60 * 60 * 24

func NewVideoService(db func() *gorm.DB, rdb func() *redis.Client) *VideoService {
return &VideoService{
dbInstance: db,
dbInstance: db,
redisInstance: rdb,
}
}

func (s *VideoService) CreateVideo(userId int64, title, playUrl, coverUrl string) error {
db := s.dbInstance()
rdb := s.redisInstance()

return db.Create(&Video{
video := &Video{
UserId: userId,
Title: title,
PlayUrl: playUrl,
CoverUrl: coverUrl,
}).Error
}

if err := db.Create(video).Error; err != nil {
return err
}

key := strconv.FormatInt(video.Id, 10)
err := rdb.HSet(key, map[string]interface{}{
KeyId: video.Id,
KeyUserId: video.UserId,
KeyTitle: video.Title,
KeyPlayUrl: video.PlayUrl,
KeyCoverUrl: video.CoverUrl,
KeyExisted: true,
}).Err()
if err != nil {
return err
}
rdb.Expire(key, ExpiredTime*time.Second)

// 将视频ID添加到Set中
setKey := fmt.Sprintf("user_videos:%d", userId)

_, err = rdb.SAdd(setKey, video.Id).Result()
if err != nil {
return err
}
rdb.Expire(setKey, ExpiredTime*time.Second)
return nil
}

func (s *VideoService) ListVideoByUserId(userId int64) ([]*Video, error) {
db := s.dbInstance()
rdb := s.redisInstance()

videos := make([]*Video, 0)
if err := db.Where("user_id = ?", userId).Find(&videos).Error; err != nil {
setKey := fmt.Sprintf("user_videos:%d", userId)
videoIds, err := rdb.SMembers(setKey).Result()
if err != nil {
return nil, err
}

// 未缓存,从数据库中获取
if len(videoIds) == 0 {
db := s.dbInstance()

var videos []*Video
if err := db.Where("user_id = ?", userId).Order("created_at desc").Find(&videos).Error; err != nil {
return nil, err
}

// 如果数据库中没有数据,将0添加到缓存中
if len(videos) == 0 {
_, err := rdb.SAdd(setKey, 0).Result()
if err != nil {
return nil, err
}
rdb.Expire(setKey, ExpiredTime*time.Second)
return []*Video{}, nil
}

// 将视频ID添加到Set中
for _, video := range videos {
_, err := rdb.SAdd(setKey, video.Id).Result()
if err != nil {
return nil, err
}
}
rdb.Expire(setKey, ExpiredTime*time.Second)

return videos, nil
}

// 缓存了但为空,直接返回
if len(videoIds) == 1 && videoIds[0] == "0" {
return []*Video{}, nil
}

// 命中缓存,从缓存中获取
videos := make([]*Video, 0, len(videoIds))
for _, key := range videoIds {
videoData, err := rdb.HGetAll(key).Result()
if err != nil {
return nil, err
}

// 解析videoData
video := ParseVideoInfo(videoData)
videos = append(videos, video)
}

return videos, nil
}

func (s *VideoService) GetVideoByIds(videoIds []int64) ([]*Video, error) {
db := s.dbInstance()

videos := make([]*Video, 0)
if err := db.Where("id IN ?", videoIds).Find(&videos).Error; err != nil {
return nil, err
rdb := s.redisInstance()

videos := make([]*Video, 0, len(videoIds))
for _, videoId := range videoIds {
videoData, err := rdb.HGetAll(strconv.FormatInt(videoId, 10)).Result()
if err != nil {
return nil, err
} else if len(videoData) == 0 {
// 未命中缓存,从数据库中获取
db := s.dbInstance()
video := &Video{}
if err := db.Where("id = ?", videoId).First(video).Error; err != nil {
// 如果未找到记录,则在缓存中标识视频不存在
if errors.Is(err, gorm.ErrRecordNotFound) {
key := strconv.FormatInt(videoId, 10)
err = rdb.HSet(key, map[string]interface{}{
KeyId: video.Id,
KeyUserId: video.UserId,
KeyTitle: video.Title,
KeyPlayUrl: video.PlayUrl,
KeyCoverUrl: video.CoverUrl,
KeyExisted: false,
}).Err()
if err != nil {
return nil, err
}
rdb.Expire(key, ExpiredTime*time.Second)
return nil, fmt.Errorf("video not found: %d", videoId)
}
// internal error
return nil, err
} else {
videos = append(videos, video)
}
} else {
// 判断视频是否存在
existed := videoData[KeyExisted]
if existed != "1" {
return nil, fmt.Errorf("video not found: %d", videoId)
}

// 解析videoData
video := ParseVideoInfo(videoData)
videos = append(videos, video)
}
}

return videos, nil
Expand All @@ -73,7 +209,7 @@ func (s *VideoService) GetFeed(latestTime int64) ([]*Video, error) {
if latestTime == 0 {
timeValue = time.Now()
} else {
timeValue = time.Unix(latestTime / time.Microsecond.Nanoseconds(), 0)
timeValue = time.Unix(latestTime/time.Microsecond.Nanoseconds(), 0)
}

if err := db.Where("created_at < ?", timeValue).Order("created_at desc").Limit(30).Find(&videos).Error; err != nil {
Expand All @@ -82,3 +218,15 @@ func (s *VideoService) GetFeed(latestTime int64) ([]*Video, error) {

return videos, nil
}

func ParseVideoInfo(videoData map[string]string) *Video {
Id, _ := strconv.ParseInt(videoData[KeyId], 10, 64)
UserId, _ := strconv.ParseInt(videoData[KeyUserId], 10, 64)
return &Video{
Id: Id,
UserId: UserId,
Title: videoData[KeyTitle],
PlayUrl: videoData[KeyPlayUrl],
CoverUrl: videoData[KeyCoverUrl],
}
}
Loading

0 comments on commit c6030b0

Please sign in to comment.