From c289fc69e52b7915ec135ae6f877ae405d6d851a Mon Sep 17 00:00:00 2001 From: Jack Lin Date: Tue, 5 Nov 2024 17:41:39 +0800 Subject: [PATCH] feat: support v2 backing image download file from sync service ref: longhorn/longhorn 6341 Signed-off-by: Jack Lin --- pkg/client/sync_client.go | 12 ++++--- pkg/datasource/service.go | 34 +++++++++++++++++--- pkg/manager/service.go | 2 +- pkg/sync/router.go | 2 +- pkg/sync/server_test.go | 18 +++++------ pkg/sync/service.go | 67 +++++++++++++++++++++++++++++++++------ pkg/sync/sync_file.go | 57 ++++++++++++++++++++++++++------- pkg/types/types.go | 3 ++ 8 files changed, 154 insertions(+), 41 deletions(-) diff --git a/pkg/client/sync_client.go b/pkg/client/sync_client.go index 5178e9ed..489ab788 100644 --- a/pkg/client/sync_client.go +++ b/pkg/client/sync_client.go @@ -186,7 +186,7 @@ func (client *SyncClient) Fetch(srcFilePath, dstFilePath, uuid, diskUUID, expect return nil } -func (client *SyncClient) DownloadFromURL(downloadURL, filePath, uuid, diskUUID, expectedChecksum string) error { +func (client *SyncClient) DownloadFromURL(downloadURL, filePath, uuid, diskUUID, expectedChecksum, dataEngine string) error { httpClient := &http.Client{Timeout: 0} requestURL := fmt.Sprintf("http://%s/v1/files", client.Remote) @@ -201,6 +201,7 @@ func (client *SyncClient) DownloadFromURL(downloadURL, filePath, uuid, diskUUID, q.Add("uuid", uuid) q.Add("disk-uuid", diskUUID) q.Add("expected-checksum", expectedChecksum) + q.Add("data-engine", dataEngine) req.URL.RawQuery = q.Encode() resp, err := httpClient.Do(req) @@ -220,7 +221,7 @@ func (client *SyncClient) DownloadFromURL(downloadURL, filePath, uuid, diskUUID, return nil } -func (client *SyncClient) CloneFromBackingImage(sourceBackingImage, sourceBackingImageUUID, encryption, filePath, uuid, diskUUID, expectedChecksum string, credential map[string]string) error { +func (client *SyncClient) CloneFromBackingImage(sourceBackingImage, sourceBackingImageUUID, encryption, filePath, uuid, diskUUID, expectedChecksum string, credential map[string]string, dataEngine string) error { httpClient := &http.Client{Timeout: 0} encodedCredential, err := json.Marshal(credential) if err != nil { @@ -242,6 +243,7 @@ func (client *SyncClient) CloneFromBackingImage(sourceBackingImage, sourceBackin q.Add("uuid", uuid) q.Add("disk-uuid", diskUUID) q.Add("expected-checksum", expectedChecksum) + q.Add("data-engine", dataEngine) req.URL.RawQuery = q.Encode() @@ -262,7 +264,7 @@ func (client *SyncClient) CloneFromBackingImage(sourceBackingImage, sourceBackin return nil } -func (client *SyncClient) RestoreFromBackupURL(backupURL, concurrentLimit, filePath, uuid, diskUUID, expectedChecksum string, credential map[string]string) error { +func (client *SyncClient) RestoreFromBackupURL(backupURL, concurrentLimit, filePath, uuid, diskUUID, expectedChecksum string, credential map[string]string, dataEngine string) error { httpClient := &http.Client{Timeout: 0} encodedCredential, err := json.Marshal(credential) if err != nil { @@ -283,6 +285,7 @@ func (client *SyncClient) RestoreFromBackupURL(backupURL, concurrentLimit, fileP q.Add("disk-uuid", diskUUID) q.Add("expected-checksum", expectedChecksum) q.Add("concurrent-limit", concurrentLimit) + q.Add("data-engine", dataEngine) req.URL.RawQuery = q.Encode() @@ -363,7 +366,7 @@ func (client *SyncClient) Upload(src, dst, uuid, diskUUID, expectedChecksum stri return nil } -func (client *SyncClient) Receive(filePath, uuid, diskUUID, expectedChecksum, fileType string, receiverPort int, size int64) error { +func (client *SyncClient) Receive(filePath, uuid, diskUUID, expectedChecksum, fileType string, receiverPort int, size int64, dataEngine string) error { httpClient := &http.Client{Timeout: 0} requestURL := fmt.Sprintf("http://%s/v1/files", client.Remote) @@ -380,6 +383,7 @@ func (client *SyncClient) Receive(filePath, uuid, diskUUID, expectedChecksum, fi q.Add("file-type", fileType) q.Add("port", strconv.Itoa(receiverPort)) q.Add("size", strconv.FormatInt(size, 10)) + q.Add("data-engine", dataEngine) req.URL.RawQuery = q.Encode() resp, err := httpClient.Do(req) diff --git a/pkg/datasource/service.go b/pkg/datasource/service.go index 78185d25..387e3537 100644 --- a/pkg/datasource/service.go +++ b/pkg/datasource/service.go @@ -200,6 +200,11 @@ func (s *Service) cloneFromBackingImage() (err error) { return fmt.Errorf("%v is not specified", types.DataSourceTypeCloneParameterBackingImageUUID) } + dataEngine := s.parameters[types.DataSourceTypeParameterDataEngine] + if dataEngine == "" { + dataEngine = types.DataEnginev1 + } + encryption := s.parameters[types.DataSourceTypeCloneParameterEncryption] if types.EncryptionType(encryption) != types.EncryptionTypeEncrypt && types.EncryptionType(encryption) != types.EncryptionTypeDecrypt && @@ -214,7 +219,7 @@ func (s *Service) cloneFromBackingImage() (err error) { } } - return s.syncClient.CloneFromBackingImage(sourceBackingImage, sourceBackingImageUUID, encryption, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, s.credential) + return s.syncClient.CloneFromBackingImage(sourceBackingImage, sourceBackingImageUUID, encryption, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, s.credential, dataEngine) } func (s *Service) restoreFromBackupURL() (err error) { @@ -227,7 +232,12 @@ func (s *Service) restoreFromBackupURL() (err error) { return fmt.Errorf("no %v for restore", types.DataSourceTypeRestoreParameterConcurrentLimit) } - return s.syncClient.RestoreFromBackupURL(backupURL, concurrentLimit, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, s.credential) + dataEngine := s.parameters[types.DataSourceTypeParameterDataEngine] + if dataEngine == "" { + dataEngine = types.DataEnginev1 + } + + return s.syncClient.RestoreFromBackupURL(backupURL, concurrentLimit, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, s.credential, dataEngine) } func (s *Service) downloadFromURL(parameters map[string]string) (err error) { @@ -235,8 +245,12 @@ func (s *Service) downloadFromURL(parameters map[string]string) (err error) { if url == "" { return fmt.Errorf("no URL for file downloading") } + dataEngine := parameters[types.DataSourceTypeParameterDataEngine] + if dataEngine == "" { + dataEngine = types.DataEnginev1 + } - return s.syncClient.DownloadFromURL(url, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum) + return s.syncClient.DownloadFromURL(url, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, dataEngine) } func (s *Service) prepareForUpload() (err error) { @@ -262,6 +276,11 @@ func (s *Service) exportFromVolume(parameters map[string]string) error { } fileType := parameters[types.DataSourceTypeFileType] + dataEngine := parameters[types.DataSourceTypeParameterDataEngine] + if dataEngine == "" { + dataEngine = types.DataEnginev1 + } + var size int64 var err error if size, err = strconv.ParseInt(parameters[types.DataSourceTypeExportFromVolumeParameterVolumeSize], 10, 64); err != nil { @@ -283,7 +302,7 @@ func (s *Service) exportFromVolume(parameters map[string]string) error { } s.log.Infof("DataSource Service: export volume via %v", storageIP) - if err := s.syncClient.Receive(s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, fileType, types.DefaultVolumeExportReceiverPort, size); err != nil { + if err := s.syncClient.Receive(s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, fileType, types.DefaultVolumeExportReceiverPort, size, dataEngine); err != nil { return err } @@ -323,6 +342,13 @@ func (s *Service) Upload(writer http.ResponseWriter, request *http.Request) { q.Add("file-path", s.filePath) q.Add("uuid", s.uuid) q.Add("expected-checksum", s.expectedChecksum) + + dataEngine := s.parameters[types.DataSourceTypeParameterDataEngine] + if dataEngine == "" { + dataEngine = types.DataEnginev1 + } + q.Add("data-engine", dataEngine) + request.URL.RawQuery = q.Encode() s.log.Debugf("DataSource Service: forwarding upload request to sync server %v", request.URL.String()) diff --git a/pkg/manager/service.go b/pkg/manager/service.go index 71167328..2abd9f7e 100644 --- a/pkg/manager/service.go +++ b/pkg/manager/service.go @@ -341,7 +341,7 @@ func (m *Manager) Sync(ctx context.Context, req *rpc.SyncRequest) (resp *rpc.Bac }() biFilePath := types.GetBackingImageFilePath(m.diskPath, req.Spec.Name, req.Spec.Uuid) - if err := m.syncClient.Receive(biFilePath, req.Spec.Uuid, m.diskUUID, req.Spec.Checksum, "", int(port), req.Spec.Size); err != nil { + if err := m.syncClient.Receive(biFilePath, req.Spec.Uuid, m.diskUUID, req.Spec.Checksum, "", int(port), req.Spec.Size, types.DataEnginev1); err != nil { portReleaseChannel <- nil return nil, err } diff --git a/pkg/sync/router.go b/pkg/sync/router.go index 3174fe5d..96dc44b3 100644 --- a/pkg/sync/router.go +++ b/pkg/sync/router.go @@ -17,7 +17,7 @@ func NewRouter(service *Service) *mux.Router { router.HandleFunc("/v1/files/{id}", service.Delete).Methods("DELETE") router.HandleFunc("/v1/files/{id}", service.Forget).Methods("POST").Queries("action", "forget") router.HandleFunc("/v1/files/{id}", service.SendToPeer).Methods("POST").Queries("action", "sendToPeer") - router.HandleFunc("/v1/files/{id}/download", service.DownloadToDst).Methods("GET") + router.HandleFunc("/v1/files/{id}/download", service.DownloadToDst).Methods("GET", "HEAD") // Launch a new file router.HandleFunc("/v1/files", service.Fetch).Methods("POST").Queries("action", "fetch") diff --git a/pkg/sync/server_test.go b/pkg/sync/server_test.go index 94192944..43ef86cf 100644 --- a/pkg/sync/server_test.go +++ b/pkg/sync/server_test.go @@ -103,7 +103,7 @@ func (s *SyncTestSuite) BenchmarkMultipleDownload(c *C) { Remote: s.addr, } - err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, curUUID, TestDiskUUID, "") + err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, curUUID, TestDiskUUID, "", types.DataEnginev1) c.Assert(err, IsNil) _, err = getAndWaitFileState(cli, curPath, string(types.StateReady), 30) @@ -218,7 +218,7 @@ func (s *SyncTestSuite) BenchmarkOneReceiveAndMultiSendWithSendingLimit(c *C) { curUUID := TestSyncingFileUUID + "-dst-" + strconv.Itoa(i) curReceiverPort := TestSyncServiceReceivePort + i curReceiverAddress := fmt.Sprintf("localhost:%d", curReceiverPort) - err := cli.Receive(dstFilePath, curUUID, TestDiskUUID, checksum, types.SyncingFileTypeQcow2, curReceiverPort, int64(sizeInMB*MB)) + err := cli.Receive(dstFilePath, curUUID, TestDiskUUID, checksum, types.SyncingFileTypeQcow2, curReceiverPort, int64(sizeInMB*MB), types.DataEnginev1) c.Assert(err, IsNil) err = cli.Send(originalFilePath, curReceiverAddress) @@ -314,7 +314,7 @@ func (s *SyncTestSuite) BenchmarkMultiReceiveAndMultiSend(c *C) { Remote: s.addr, } - err := cli.Receive(dstFilePath, curUUID, TestDiskUUID, checksum, types.SyncingFileTypeQcow2, curReceiverPort, int64(sizeInMB*MB)) + err := cli.Receive(dstFilePath, curUUID, TestDiskUUID, checksum, types.SyncingFileTypeQcow2, curReceiverPort, int64(sizeInMB*MB), types.DataEnginev1) c.Assert(err, IsNil) err = cli.Send(srcFilePath, curReceiverAddress) c.Assert(err, IsNil) @@ -350,7 +350,7 @@ func (s *SyncTestSuite) TestTimeoutReceiveFromPeers(c *C) { } go func() { - err := cli.Receive(curPath, TestSyncingFileUUID, TestDiskUUID, "", types.SyncingFileTypeQcow2, TestSyncServiceReceivePort, MockFileSize) + err := cli.Receive(curPath, TestSyncingFileUUID, TestDiskUUID, "", types.SyncingFileTypeQcow2, TestSyncServiceReceivePort, MockFileSize, types.DataEnginev1) c.Assert(err, IsNil) }() @@ -522,7 +522,7 @@ func (s *SyncTestSuite) TestDuplicateCalls(c *C) { Remote: s.addr, } - err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "") + err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "", types.DataEnginev1) c.Assert(err, IsNil) _, err = getAndWaitFileState(cli, curPath, string(types.StateReady), 30) @@ -530,13 +530,13 @@ func (s *SyncTestSuite) TestDuplicateCalls(c *C) { // Duplicate file launching calls should error out: // "resp.StatusCode(500) != http.StatusOK(200), response body content: file /root/test-dir/sync-tests/sync-download-file-for-dup-calls already exists\n" - err = cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "") + err = cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "", types.DataEnginev1) c.Assert(err, ErrorMatches, `.*already exists[\s\S]*`) err = cli.Upload(curPath, curPath, TestSyncingFileUUID, TestDiskUUID, "") c.Assert(err, ErrorMatches, `.*already exists[\s\S]*`) - err = cli.Receive(curPath, TestDiskUUID, TestSyncingFileUUID, "", "", types.DefaultVolumeExportReceiverPort, MockFileSize) + err = cli.Receive(curPath, TestDiskUUID, TestSyncingFileUUID, "", "", types.DefaultVolumeExportReceiverPort, MockFileSize, types.DataEnginev1) c.Assert(err, ErrorMatches, `.*already exists[\s\S]*`) - err = cli.DownloadFromURL("http://test-download-from-url.io", curPath+"-non-existing", TestSyncingFileUUID, TestDiskUUID, "") + err = cli.DownloadFromURL("http://test-download-from-url.io", curPath+"-non-existing", TestSyncingFileUUID, TestDiskUUID, "", types.DataEnginev1) c.Assert(err, ErrorMatches, `.*already exists[\s\S]*`) // Duplicate delete or forget calls won't error out @@ -608,7 +608,7 @@ func (s *SyncTestSuite) TestReadyFileValidation(c *C) { Remote: s.addr, } - err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "") + err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "", types.DataEnginev1) c.Assert(err, IsNil) fInfo, err := getAndWaitFileState(cli, curPath, string(types.StateReady), 30) diff --git a/pkg/sync/service.go b/pkg/sync/service.go index 0e10de9a..7a88cb52 100644 --- a/pkg/sync/service.go +++ b/pkg/sync/service.go @@ -9,6 +9,7 @@ import ( "mime/multipart" "net/http" "net/url" + "os" "strconv" "strings" "sync" @@ -219,6 +220,9 @@ func (s *Service) DownloadToDst(writer http.ResponseWriter, request *http.Reques } }() + queryParams := request.URL.Query() + forV2Creation := queryParams.Get("forV2Creation") + var filePath string if filePath, err = url.QueryUnescape(mux.Vars(request)["id"]); err != nil { return @@ -232,6 +236,47 @@ func (s *Service) DownloadToDst(writer http.ResponseWriter, request *http.Reques return } + sf.lock.RLock() + if sf.state != types.StateReady { + sf.lock.RUnlock() + err = fmt.Errorf("cannot get the reader for a non-ready file, current state %v", sf.state) + return + } + sf.lock.RUnlock() + + // If it is for v2 creation, we don't compress the data and will temporarily transform the qcow2 to raw image + if forV2Creation == "true" { + stat, statErr := os.Stat(sf.filePath) + if statErr != nil { + err = errors.Wrapf(statErr, "failed to stat the download file %v", sf.filePath) + return + } + + src, openErr := os.Open(sf.filePath) + if openErr != nil { + err = errors.Wrapf(openErr, "failed to stat the download file %v", sf.filePath) + return + } + defer src.Close() + + writer.Header().Set("Content-Length", strconv.FormatInt(stat.Size(), 10)) + writer.Header().Set("Content-Type", "application/octet-stream") + + if request.Method == http.MethodHead { + return + } + + if _, ioErr := io.Copy(writer, src); ioErr != nil { + err = ioErr + return + } + return + } + + // e.g. filePath=/data/backing-images/parrot-6846a0b2/backing + filePathSlices := strings.Split(sf.filePath, "/") + backingImageNameUUID := filePathSlices[len(filePathSlices)-2] + src, sfErr := sf.GetFileReader() if sfErr != nil { err = sfErr @@ -242,15 +287,10 @@ func (s *Service) DownloadToDst(writer http.ResponseWriter, request *http.Reques gzipWriter := gzip.NewWriter(writer) defer gzipWriter.Close() - // e.g. filePath=/data/backing-images/parrot-6846a0b2/backing - filePathSlices := strings.Split(sf.filePath, "/") - backingImageNameUUID := filePathSlices[len(filePathSlices)-2] - writer.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s.gz", strings.Split(backingImageNameUUID, "-")[0])) writer.Header().Set("Content-Type", "application/octet-stream") if _, ioErr := io.Copy(gzipWriter, src); ioErr != nil { err = ioErr - return } } @@ -365,6 +405,7 @@ func (s *Service) doDownloadFromURL(request *http.Request) (err error) { } diskUUID := queryParams.Get("disk-uuid") expectedChecksum := queryParams.Get("expected-checksum") + dataEngine := queryParams.Get("data-engine") sf, err := s.checkAndInitSyncFile(filePath, uuid, diskUUID, expectedChecksum, 0) if err != nil { @@ -379,7 +420,7 @@ func (s *Service) doDownloadFromURL(request *http.Request) (err error) { return } - if _, err := sf.DownloadFromURL(url); err != nil { + if _, err := sf.DownloadFromURL(url, dataEngine); err != nil { s.log.Errorf("Sync Service: failed to download sync file %v: %v", filePath, err) return } @@ -428,6 +469,7 @@ func (s *Service) doCloneFromBackingImage(request *http.Request) (err error) { } diskUUID := queryParams.Get("disk-uuid") expectedChecksum := queryParams.Get("expected-checksum") + dataEngine := queryParams.Get(types.DataSourceTypeParameterDataEngine) credential := map[string]string{} if err := json.NewDecoder(request.Body).Decode(&credential); err != nil { @@ -446,7 +488,7 @@ func (s *Service) doCloneFromBackingImage(request *http.Request) (err error) { return } - if _, err := sf.CloneToFileWithEncryption(sourceBackingImage, sourceBackingImageUUID, encryption, credential); err != nil { + if _, err := sf.CloneToFileWithEncryption(sourceBackingImage, sourceBackingImageUUID, encryption, credential, dataEngine); err != nil { s.log.Errorf("Sync Service: failed to clone sync file %v: %v", filePath, err) return } @@ -499,6 +541,8 @@ func (s *Service) doRestoreFromBackupURL(request *http.Request) (err error) { } } + dataEngine := queryParams.Get("data-engine") + sf, err := s.checkAndInitSyncFile(filePath, uuid, diskUUID, expectedChecksum, 0) if err != nil { return err @@ -510,7 +554,7 @@ func (s *Service) doRestoreFromBackupURL(request *http.Request) (err error) { return } - if err := sf.RestoreFromBackupURL(backupURL, credential, concurrentLimit); err != nil { + if err := sf.RestoreFromBackupURL(backupURL, credential, concurrentLimit, dataEngine); err != nil { s.log.Errorf("Sync Service: failed to download sync file %v: %v", filePath, err) return } @@ -555,6 +599,8 @@ func (s *Service) doUploadFromRequest(request *http.Request) (err error) { return fmt.Errorf("the uploaded file size %d should be a multiple of %d bytes since Longhorn uses directIO by default", size, types.DefaultSectorSize) } + dataEngine := queryParams.Get(types.DataSourceTypeParameterDataEngine) + sf, err := s.checkAndInitSyncFile(filePath, uuid, diskUUID, expectedChecksum, size) if err != nil { return err @@ -589,7 +635,7 @@ func (s *Service) doUploadFromRequest(request *http.Request) (err error) { return err } - if _, err := sf.IdleTimeoutCopyToFile(p); err != nil { + if _, err := sf.IdleTimeoutCopyToFile(p, dataEngine); err != nil { return err } @@ -638,6 +684,7 @@ func (s *Service) doReceiveFromPeer(request *http.Request) (err error) { if err != nil { return err } + dataEngine := queryParams.Get(types.DataSourceTypeParameterDataEngine) sf, err := s.checkAndInitSyncFile(filePath, uuid, diskUUID, expectedChecksum, size) if err != nil { @@ -652,7 +699,7 @@ func (s *Service) doReceiveFromPeer(request *http.Request) (err error) { return } - if err := sf.Receive(int(port), fileType); err != nil { + if err := sf.Receive(int(port), fileType, dataEngine); err != nil { s.log.Errorf("Sync Service: failed to receive sync file %v: %v", filePath, err) return } diff --git a/pkg/sync/sync_file.go b/pkg/sync/sync_file.go index 921f25a2..d3fd77d3 100644 --- a/pkg/sync/sync_file.go +++ b/pkg/sync/sync_file.go @@ -448,7 +448,7 @@ func (sf *SyncingFile) Fetch(srcFilePath string) (err error) { } defer func() { - if finalErr := sf.finishProcessing(err); finalErr != nil { + if finalErr := sf.finishProcessing(err, types.DataEnginev1); finalErr != nil { err = finalErr } }() @@ -484,7 +484,7 @@ func (sf *SyncingFile) Fetch(srcFilePath string) (err error) { return nil } -func (sf *SyncingFile) DownloadFromURL(url string) (written int64, err error) { +func (sf *SyncingFile) DownloadFromURL(url, dataEngine string) (written int64, err error) { sf.log.Infof("SyncingFile: start to download sync file from URL %v", url) needProcessing, err := sf.isProcessingRequired() @@ -496,7 +496,7 @@ func (sf *SyncingFile) DownloadFromURL(url string) (written int64, err error) { } defer func() { - if finalErr := sf.finishProcessing(err); finalErr != nil { + if finalErr := sf.finishProcessing(err, dataEngine); finalErr != nil { err = finalErr } }() @@ -513,7 +513,7 @@ func (sf *SyncingFile) DownloadFromURL(url string) (written int64, err error) { return sf.handler.DownloadFromURL(sf.ctx, url, sf.tmpFilePath, sf) } -func (sf *SyncingFile) RestoreFromBackupURL(backupURL string, credential map[string]string, concurrentLimit int) (err error) { +func (sf *SyncingFile) RestoreFromBackupURL(backupURL string, credential map[string]string, concurrentLimit int, dataEngine string) (err error) { sf.log.Infof("SyncingFile: start to restore sync file from backup URL %v", backupURL) needProcessing, err := sf.isProcessingRequired() @@ -525,7 +525,7 @@ func (sf *SyncingFile) RestoreFromBackupURL(backupURL string, credential map[str } defer func() { - if finalErr := sf.finishProcessing(err); finalErr != nil { + if finalErr := sf.finishProcessing(err, dataEngine); finalErr != nil { err = finalErr } }() @@ -588,7 +588,7 @@ func (sf *SyncingFile) waitForRestoreComplete() (err error) { // when doing encryption, it creates a loop device from the target backing file, setup the encrypted device from the loop device and then dump the data from the source file to the target encrypted device. // When doing decryption, it creates a loop device from the source backing file, setup the encrypted device from the loop device and then dump the data from the source encrypted device to the target file. // When doing ignore clone, it directly dumps the data from the source backing file to the target backing file. -func (sf *SyncingFile) CloneToFileWithEncryption(sourceBackingImage, sourceBackingImageUUID string, encryption types.EncryptionType, credential map[string]string) (copied int64, err error) { +func (sf *SyncingFile) CloneToFileWithEncryption(sourceBackingImage, sourceBackingImageUUID string, encryption types.EncryptionType, credential map[string]string, dataEngine string) (copied int64, err error) { sf.log.Infof("SyncingFile: start to clone the file") defer func() { @@ -605,7 +605,7 @@ func (sf *SyncingFile) CloneToFileWithEncryption(sourceBackingImage, sourceBacki return 0, nil } defer func() { - if finalErr := sf.finishProcessing(err); finalErr != nil { + if finalErr := sf.finishProcessing(err, dataEngine); finalErr != nil { err = finalErr } }() @@ -755,7 +755,7 @@ func (sf *SyncingFile) prepareCloneTargetFile(sourceFile string, encryption type return nil } -func (sf *SyncingFile) IdleTimeoutCopyToFile(src io.ReadCloser) (copied int64, err error) { +func (sf *SyncingFile) IdleTimeoutCopyToFile(src io.ReadCloser, dataEngine string) (copied int64, err error) { sf.log.Infof("SyncingFile: start to copy data to sync file") defer func() { @@ -782,7 +782,7 @@ func (sf *SyncingFile) IdleTimeoutCopyToFile(src io.ReadCloser) (copied int64, e } defer func() { - if finalErr := sf.finishProcessing(err); finalErr != nil { + if finalErr := sf.finishProcessing(err, dataEngine); finalErr != nil { err = finalErr } }() @@ -794,7 +794,7 @@ func (sf *SyncingFile) IdleTimeoutCopyToFile(src io.ReadCloser) (copied int64, e return nw, err } -func (sf *SyncingFile) Receive(port int, fileType string) (err error) { +func (sf *SyncingFile) Receive(port int, fileType, dataEngine string) (err error) { sf.log.Infof("SyncingFile: start to launch a receiver at port %v", port) needProcessing, err := sf.isProcessingRequired() @@ -806,7 +806,7 @@ func (sf *SyncingFile) Receive(port int, fileType string) (err error) { } defer func() { - if finalErr := sf.finishProcessing(err); finalErr != nil { + if finalErr := sf.finishProcessing(err, dataEngine); finalErr != nil { err = finalErr } }() @@ -871,7 +871,7 @@ func (sf *SyncingFile) Send(toAddress string, sender Sender) (err error) { return nil } -func (sf *SyncingFile) finishProcessing(err error) (finalErr error) { +func (sf *SyncingFile) finishProcessing(err error, dataEngine string) (finalErr error) { sf.lock.Lock() defer sf.lock.Unlock() @@ -906,6 +906,39 @@ func (sf *SyncingFile) finishProcessing(err error) (finalErr error) { } sf.modificationTime = stat.ModTime().UTC().String() + // If the file is qcow2, we need to convert it to raw for dumping the data to the spdk lvol + // This will only happen when preparing the first backing image in data source. + if dataEngine == types.DataEnginev2 { + imgInfo, qemuErr := util.GetQemuImgInfo(sf.tmpFilePath) + if qemuErr != nil { + finalErr = errors.Wrapf(qemuErr, "failed to detect if file %v is qcow2", sf.tmpFilePath) + return + } + tmpRawFile := fmt.Sprintf("%v-raw.tmp", sf.tmpFilePath) + if imgInfo.Format == "qcow2" { + if convertErr := util.ConvertFromQcow2ToRaw(sf.tmpFilePath, tmpRawFile); convertErr != nil { + finalErr = errors.Wrapf(convertErr, "failed to create raw image from qcow2 image %v", sf.tmpFilePath) + return + } + if removeErr := os.RemoveAll(sf.tmpFilePath); removeErr != nil { + sf.log.Warnf("SyncingFile: failed to remove the qcow2 file %v after converting to raw file", sf.tmpFilePath) + } + if renameErr := os.Rename(tmpRawFile, sf.tmpFilePath); renameErr != nil { + finalErr = errors.Wrapf(renameErr, "failed to rename tmp raw file %v to file %v", tmpRawFile, sf.tmpFilePath) + return + } + } + + stat, statErr := os.Stat(sf.tmpFilePath) + if statErr != nil { + finalErr = errors.Wrapf(statErr, "failed to stat tmp file %v after converting from qcow2 to raw file", sf.tmpFilePath) + return + } + sf.size = stat.Size() + sf.processedSize = stat.Size() + sf.modificationTime = stat.ModTime().UTC().String() + } + // Check if there is an existing config file then try to load the checksum configFilePath := util.GetSyncingFileConfigFilePath(sf.filePath) config, confReadErr := util.ReadSyncingFileConfig(configFilePath) diff --git a/pkg/types/types.go b/pkg/types/types.go index 22e589bd..9d384ddf 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -70,6 +70,9 @@ const ( DataSourceTypeRestoreParameterBackupURL = "backup-url" DataSourceTypeRestoreParameterConcurrentLimit = "concurrent-limit" DataSourceTypeFileType = "file-type" + DataSourceTypeParameterDataEngine = "data-engine" + DataEnginev1 = "v1" + DataEnginev2 = "v2" DataSourceTypeExportFromVolumeParameterVolumeSize = "volume-size" DataSourceTypeExportFromVolumeParameterSnapshotName = "snapshot-name"