Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use stellar-core replay-in-memory catchup mode #23

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/astrologer
vendor
dist

stellar-core/buckets
31 changes: 19 additions & 12 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
FROM golang:alpine AS build

RUN apk add --no-cache git
FROM golang:stretch AS build

RUN mkdir -p $GOPATH/src/github.com/astroband/astrologer
WORKDIR $GOPATH/src/github.com/astroband/astrologer

ADD . .
COPY . .

RUN GO111MODULE=on CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -tags netgo -ldflags '-w'

RUN GO111MODULE=on go build
#===========================

# ===============================================================================================
FROM stellar/stellar-core:13.2.0-1297-b5dda51e AS stellar-core

FROM alpine:latest
#===========================

FROM stellar/base

ENV DATABASE_URL=postgres://localhost/core?sslmode=disable
ENV ES_URL=http://localhost:9200
ENV INGEST_GAP=-50

WORKDIR /root

COPY --from=build /go/src/github.com/astroband/astrologer/astrologer .
RUN chmod +x ./astrologer
COPY dependencies.sh entry.sh ./

COPY entry.sh /entry.sh
RUN ["chmod", "+x", "./dependencies.sh"]
RUN ./dependencies.sh

COPY --from=stellar-core /usr/local/bin/stellar-core /usr/local/bin/

COPY --from=build /go/src/github.com/astroband/astrologer/astrologer .
RUN ["chmod", "+x", "./astrologer"]

ENTRYPOINT ["/entry.sh"]
CMD /root/astrologer ingest -- $INGEST_GAP
ENTRYPOINT ["./entry.sh"]
CMD ./astrologer ingest -- $INGEST_GAP
195 changes: 99 additions & 96 deletions commands/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,150 +2,134 @@ package commands

import (
"bytes"
"log"
"math/rand"
"time"
log "github.com/sirupsen/logrus"

progressbar "github.com/schollz/progressbar/v2"

"github.com/astroband/astrologer/config"
"github.com/astroband/astrologer/db"
"github.com/astroband/astrologer/es"
)

var (
bar *progressbar.ProgressBar
lb "github.com/stellar/go/exp/ingest/ledgerbackend"
)

// ExportCommandConfig represents configuration options for `export` CLI command
type ExportCommandConfig struct {
Start config.NumberWithSign
Count int
RetryCount int
DryRun bool
BatchSize int
Start int
Count int
RetryCount int
DryRun bool
BatchSize int
NetworkPassphrase string
}

// ExportCommand represents the `export` CLI command
type ExportCommand struct {
ES es.Adapter
DB db.Adapter
Config ExportCommandConfig

firstLedger int
lastLedger int
firstLedger uint32
lastLedger uint32
}

// Execute starts the export process
func (cmd *ExportCommand) Execute() {
cmd.firstLedger, cmd.lastLedger = cmd.getRange()
total := cmd.Config.Count

total := cmd.DB.LedgerHeaderRowCount(cmd.firstLedger, cmd.lastLedger)
cmd.firstLedger = uint32(cmd.Config.Start)
cmd.lastLedger = cmd.firstLedger + uint32(cmd.Config.Count) - 1

if total == 0 {
log.Fatal("Nothing to export within given range!", cmd.firstLedger, cmd.lastLedger)
}

log.Println("Exporting ledgers from", cmd.firstLedger, "to", cmd.lastLedger, "total", total)
log.Infof("Exporting ledgers from %d to %d. Total: %d ledgers\n", cmd.firstLedger, cmd.lastLedger, total)
log.Infof("Will insert %d batches %d ledgers each\n", cmd.blockCount(total), cmd.Config.BatchSize)

createBar(total)
ledgerBackend, err := lb.NewCaptive(
"stellar-core",
"",
cmd.Config.NetworkPassphrase,
getHistoryURLs(cmd.Config.NetworkPassphrase),
)

for i := 0; i < cmd.blockCount(total); i++ {
i := i
pool.Submit(func() { cmd.exportBlock(i) })
if err != nil {
log.Fatal("error creating captive core backend", err)
}

pool.StopWait()
finishBar()
}
log.Info("Preparing range...")
err = ledgerBackend.PrepareRange(lb.BoundedRange(cmd.firstLedger, cmd.lastLedger))

func (cmd *ExportCommand) exportBlock(i int) {
var b bytes.Buffer

rows := cmd.DB.LedgerHeaderRowFetchBatch(i, cmd.firstLedger, cmd.Config.BatchSize)
if err != nil {
log.Fatal("Failed preparing range", err)
}

for n := 0; n < len(rows); n++ {
txs := cmd.DB.TxHistoryRowForSeq(rows[n].LedgerSeq)
fees := cmd.DB.TxFeeHistoryRowsForRows(txs)
var batchBuffer bytes.Buffer

err := es.SerializeLedger(rows[n], txs, fees, &b)
for ledgerSeq := cmd.firstLedger; ledgerSeq <= cmd.lastLedger; ledgerSeq++ {
_, meta, err := ledgerBackend.GetLedger(ledgerSeq)

if err != nil {
log.Fatalf("Failed to ingest ledger %d: %v\n", rows[n].LedgerSeq, err)
// FIXME skip instead of failing
log.Fatal(err)
}

if !*config.Verbose {
bar.Add(1)
}
}
es.SerializeLedgerFromHistory(cmd.Config.NetworkPassphrase, meta, &batchBuffer)

if *config.Verbose {
log.Println(b.String())
}
if (ledgerSeq-cmd.firstLedger+1)%uint32(cmd.Config.BatchSize) == 0 || ledgerSeq == cmd.lastLedger {
payload := batchBuffer.String()
pool.Submit(func() {
log.Infof("Gonna bulk insert %d bytes\n", len(payload))
err := cmd.ES.BulkInsert(payload)

if err != nil {
log.Fatal("Cannot bulk insert", err)
} else {
log.Printf("Batch successfully inserted\n")
}
})

if !cmd.Config.DryRun {
cmd.ES.IndexWithRetries(&b, cmd.Config.RetryCount)
batchBuffer.Reset()
}
}
}

func (cmd *ExportCommand) index(b *bytes.Buffer, retry int) {
indexed := cmd.ES.BulkInsert(b)
// for i := 0; i < cmd.blockCount(total); i++ {
// var b bytes.Buffer
// ledgerCounter := 0
// batchNum := i + 1

if !indexed {
if retry > cmd.Config.RetryCount {
log.Fatal("Retries for bulk failed, aborting")
}
// for meta := range channel {
// seq := int(meta.V0.LedgerHeader.Header.LedgerSeq)

delay := time.Duration((rand.Intn(10) + 5))
time.Sleep(delay * time.Second)
// if seq < cmd.firstLedger || seq > cmd.lastLedger {
// continue
// }

cmd.index(b, retry+1)
}
}
// ledgerCounter += 1

// Parses range of export command
func (cmd *ExportCommand) getRange() (first int, last int) {
firstLedger := cmd.DB.LedgerHeaderFirstRow()
lastLedger := cmd.DB.LedgerHeaderLastRow()
// log.Println(seq)

if cmd.Config.Start.Explicit {
if cmd.Config.Start.Value < 0 {
first = lastLedger.LedgerSeq + cmd.Config.Start.Value + 1
} else if config.Start.Value > 0 {
first = firstLedger.LedgerSeq + cmd.Config.Start.Value
}
} else if cmd.Config.Start.Value != 0 {
first = cmd.Config.Start.Value
} else {
first = firstLedger.LedgerSeq
}
// es.SerializeLedgerFromHistory(meta, &b)

if cmd.Config.Count == 0 {
last = lastLedger.LedgerSeq
} else {
last = first + cmd.Config.Count - 1
}
// log.Printf("Ledger %d of %d in batch %d\n", ledgerCounter, cmd.Config.BatchSize, batchNum)

return first, last
}
// if ledgerCounter == cmd.Config.BatchSize {
// break
// }
// }

func createBar(count int) {
bar = progressbar.NewOptions(
count,
progressbar.OptionEnableColorCodes(false),
progressbar.OptionShowCount(),
progressbar.OptionThrottle(500*time.Millisecond),
progressbar.OptionSetRenderBlankState(true),
progressbar.OptionSetWidth(100),
)
// if cmd.Config.DryRun {
// continue
// }

bar.RenderBlank()
}
// pool.Submit(func() {
// log.Printf("Gonna bulk insert %d bytes\n", b.Len())
// err := cmd.ES.BulkInsert(b)

func finishBar() {
if !*config.Verbose {
bar.Finish()
}
// if err != nil {
// log.Fatal("Cannot bulk insert", err)
// } else {
// log.Printf("Batch %d successfully inserted\n", batchNum)
// }
// })
// }

pool.StopWait()
}

func (cmd *ExportCommand) blockCount(count int) (blocks int) {
Expand All @@ -157,3 +141,22 @@ func (cmd *ExportCommand) blockCount(count int) (blocks int) {

return blocks
}

func getHistoryURLs(networkPassphrase string) []string {
switch networkPassphrase {
case "Public Global Stellar Network ; September 2015":
return []string{
"https://history.stellar.org/prd/core-live/core_live_001",
"https://history.stellar.org/prd/core-live/core_live_002",
"https://history.stellar.org/prd/core-live/core_live_003",
}
case "Test SDF Network ; September 2015":
return []string{
"http://history.stellar.org/prd/core-testnet/core_testnet_001",
"http://history.stellar.org/prd/core-testnet/core_testnet_002",
"http://history.stellar.org/prd/core-testnet/core_testnet_003",
}
default:
return []string{}
}
}
6 changes: 3 additions & 3 deletions config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ var (
Default("25").
Int()

// Start ledger to start with
Start = NumberWithSignParse(exportCommand.Arg("start", "Ledger to start indexing, +100 means offset 100 from the first"))
Start = exportCommand.Arg("start", "Ledger to start indexing, +100 means offset 100 from the first").Default("0").Int()

// Count ledgers
Count = exportCommand.Arg("count", "Count of ledgers to ingest, should be aliquout batch size").Default("0").Int()
Count = exportCommand.Arg("count", "Count of ledgers to ingest, should be aliquout batch size").Default("0").Int()
Network = exportCommand.Flag("network", "Stellar network to use").Default("testnet").Enum("public", "test")

// StartIngest ledger to start with ingesting
StartIngest = ingestCommand.Arg("start", "Ledger to start ingesting").Int()
Expand Down
28 changes: 0 additions & 28 deletions db/main.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,13 @@
package db

import (
"bytes"
"log"
"net/url"
"unicode/utf8"

"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" // Postgres driver
)

// Copy paste from Horizon
func utf8Scrub(in string) string {

// First check validity using the stdlib, returning if the string is already
// valid
if utf8.ValidString(in) {
return in
}

left := []byte(in)
var result bytes.Buffer

for len(left) > 0 {
r, n := utf8.DecodeRune(left)

_, err := result.WriteRune(r)
if err != nil {
panic(err)
}

left = left[n:]
}

return result.String()
}

// Adapter defines the interface to work with ledger database
type Adapter interface {
LedgerHeaderRowCount(first int, last int) int
Expand Down
Loading