| /* |
| * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com> |
| * SPDX-License-Identifier: Apache-2.0 |
| */ |
| |
| package badger |
| |
| import ( |
| "bytes" |
| "context" |
| "sort" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| humanize "github.com/dustin/go-humanize" |
| "google.golang.org/protobuf/proto" |
| |
| "github.com/dgraph-io/badger/v4/pb" |
| "github.com/dgraph-io/badger/v4/y" |
| "github.com/dgraph-io/ristretto/v2/z" |
| ) |
| |
| const batchSize = 16 << 20 // 16 MB |
| |
| // maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit |
| // as a single list that is still over the limit will have to be sent as is since it |
| // cannot be split further. This limit prevents the framework from creating batches |
| // so big that sending them causes issues (e.g running into the max size gRPC limit). |
| var maxStreamSize = uint64(100 << 20) // 100MB |
| |
| // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up |
| // key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key |
| // ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted |
| // order, use Iterator. |
| type Stream struct { |
| // Prefix to only iterate over certain range of keys. If set to nil (default), Stream would |
| // iterate over the entire DB. |
| Prefix []byte |
| |
| // Number of goroutines to use for iterating over key ranges. Defaults to 8. |
| NumGo int |
| |
| // Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can |
| // be used to help differentiate them from other activities. Default is "Badger.Stream". |
| LogPrefix string |
| |
| // ChooseKey is invoked each time a new key is encountered. Note that this is not called |
| // on every version of the value, only the first encountered version (i.e. the highest version |
| // of the value a key has). ChooseKey can be left nil to select all keys. |
| // |
| // Note: Calls to ChooseKey are concurrent. |
| ChooseKey func(item *Item) bool |
| |
| // MaxSize is the maximum allowed size of a stream batch. This is a soft limit |
| // as a single list that is still over the limit will have to be sent as is since it |
| // cannot be split further. This limit prevents the framework from creating batches |
| // so big that sending them causes issues (e.g running into the max size gRPC limit). |
| // If necessary, set it up before the Stream starts synchronisation |
| // This is not a concurrency-safe setting |
| MaxSize uint64 |
| |
| // KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It |
| // is upto the caller to iterate over the versions and generate zero, one or more KVs. It |
| // is expected that the user would advance the iterator to go through the versions of the |
| // values. However, the user MUST immediately return from this function on the first encounter |
| // with a mismatching key. See example usage in ToList function. Can be left nil to use ToList |
| // function by default. |
| // |
| // KeyToList has access to z.Allocator accessible via stream.Allocator(itr.ThreadId). This |
| // allocator can be used to allocate KVs, to decrease the memory pressure on Go GC. Stream |
| // framework takes care of releasing those resources after calling Send. AllocRef does |
| // NOT need to be set in the returned KVList, as Stream framework would ignore that field, |
| // instead using the allocator assigned to that thread id. |
| // |
| // Note: Calls to KeyToList are concurrent. |
| KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error) |
| // UseKeyToListWithThreadId is used to indicate that KeyToListWithThreadId should be used |
| // instead of KeyToList. This is a new api that can be used to figure out parallelism |
| // of the stream. Each threadId would be run serially. KeyToList being concurrent makes you |
| // take care of concurrency in KeyToList. Here threadId could be used to do some things serially. |
| // Once a thread finishes FinishThread() would be called. |
| UseKeyToListWithThreadId bool |
| KeyToListWithThreadId func(key []byte, itr *Iterator, threadId int) (*pb.KVList, error) |
| FinishThread func(threadId int) (*pb.KVList, error) |
| |
| // This is the method where Stream sends the final output. All calls to Send are done by a |
| // single goroutine, i.e. logic within Send method can expect single threaded execution. |
| Send func(buf *z.Buffer) error |
| |
| // Read data above the sinceTs. All keys with version =< sinceTs will be ignored. |
| SinceTs uint64 |
| readTs uint64 |
| db *DB |
| rangeCh chan keyRange |
| kvChan chan *z.Buffer |
| nextStreamId atomic.Uint32 |
| doneMarkers bool |
| scanned atomic.Uint64 // used to estimate the ETA for data scan. |
| numProducers atomic.Int32 |
| } |
| |
| // SendDoneMarkers when true would send out done markers on the stream. False by default. |
| func (st *Stream) SendDoneMarkers(done bool) { |
| st.doneMarkers = done |
| } |
| |
| // ToList is a default implementation of KeyToList. It picks up all valid versions of the key, |
| // skipping over deleted or expired keys. |
| func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { |
| a := itr.Alloc |
| ka := a.Copy(key) |
| |
| list := &pb.KVList{} |
| for ; itr.Valid(); itr.Next() { |
| item := itr.Item() |
| if item.IsDeletedOrExpired() { |
| break |
| } |
| if !bytes.Equal(key, item.Key()) { |
| // Break out on the first encounter with another key. |
| break |
| } |
| |
| kv := y.NewKV(a) |
| kv.Key = ka |
| |
| if err := item.Value(func(val []byte) error { |
| kv.Value = a.Copy(val) |
| return nil |
| |
| }); err != nil { |
| return nil, err |
| } |
| kv.Version = item.Version() |
| kv.ExpiresAt = item.ExpiresAt() |
| kv.UserMeta = a.Copy([]byte{item.UserMeta()}) |
| |
| list.Kv = append(list.Kv, kv) |
| if st.db.opt.NumVersionsToKeep == 1 { |
| break |
| } |
| |
| if item.DiscardEarlierVersions() { |
| break |
| } |
| } |
| return list, nil |
| } |
| |
| // keyRange is [start, end), including start, excluding end. Do ensure that the start, |
| // end byte slices are owned by keyRange struct. |
| func (st *Stream) produceRanges(ctx context.Context) { |
| ranges := st.db.Ranges(st.Prefix, st.NumGo) |
| y.AssertTrue(len(ranges) > 0) |
| y.AssertTrue(ranges[0].left == nil) |
| y.AssertTrue(ranges[len(ranges)-1].right == nil) |
| st.db.opt.Infof("Number of ranges found: %d\n", len(ranges)) |
| |
| // Sort in descending order of size. |
| sort.Slice(ranges, func(i, j int) bool { |
| return ranges[i].size > ranges[j].size |
| }) |
| for i, r := range ranges { |
| st.rangeCh <- *r |
| st.db.opt.Infof("Sent range %d for iteration: [%x, %x) of size: %s\n", |
| i, r.left, r.right, humanize.IBytes(uint64(r.size))) |
| } |
| close(st.rangeCh) |
| } |
| |
| // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan. |
| func (st *Stream) produceKVs(ctx context.Context, threadId int) error { |
| st.numProducers.Add(1) |
| defer st.numProducers.Add(-1) |
| |
| var txn *Txn |
| if st.readTs > 0 { |
| txn = st.db.NewTransactionAt(st.readTs, false) |
| } else { |
| txn = st.db.NewTransaction(false) |
| } |
| defer txn.Discard() |
| |
| // produceKVs is running iterate serially. So, we can define the outList here. |
| outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs") |
| defer func() { |
| // The outList variable changes. So, we need to evaluate the variable in the defer. DO NOT |
| // call `defer outList.Release()`. |
| _ = outList.Release() |
| }() |
| |
| iterate := func(kr keyRange) error { |
| iterOpts := DefaultIteratorOptions |
| iterOpts.AllVersions = true |
| iterOpts.Prefix = st.Prefix |
| iterOpts.PrefetchValues = true |
| iterOpts.SinceTs = st.SinceTs |
| itr := txn.NewIterator(iterOpts) |
| itr.ThreadId = threadId |
| defer itr.Close() |
| |
| itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate") |
| defer itr.Alloc.Release() |
| |
| // This unique stream id is used to identify all the keys from this iteration. |
| streamId := st.nextStreamId.Add(1) |
| var scanned int |
| |
| sendIt := func() error { |
| select { |
| case st.kvChan <- outList: |
| outList = z.NewBuffer(2*batchSize, "Stream.ProduceKVs") |
| st.scanned.Add(uint64(itr.scanned - scanned)) |
| scanned = itr.scanned |
| case <-ctx.Done(): |
| return ctx.Err() |
| } |
| return nil |
| } |
| |
| var prevKey []byte |
| for itr.Seek(kr.left); itr.Valid(); { |
| // it.Valid would only return true for keys with the provided Prefix in iterOpts. |
| item := itr.Item() |
| if bytes.Equal(item.Key(), prevKey) { |
| itr.Next() |
| continue |
| } |
| prevKey = append(prevKey[:0], item.Key()...) |
| |
| // Check if we reached the end of the key range. |
| if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 { |
| break |
| } |
| |
| // Check if we should pick this key. |
| if st.ChooseKey != nil && !st.ChooseKey(item) { |
| continue |
| } |
| |
| // Now convert to key value. |
| itr.Alloc.Reset() |
| var list *pb.KVList |
| var err error |
| if st.UseKeyToListWithThreadId { |
| list, err = st.KeyToListWithThreadId(item.KeyCopy(nil), itr, threadId) |
| } else { |
| list, err = st.KeyToList(item.KeyCopy(nil), itr) |
| } |
| if err != nil { |
| st.db.opt.Warningf("While reading key: %x, got error: %v", item.Key(), err) |
| continue |
| } |
| if list == nil || len(list.Kv) == 0 { |
| continue |
| } |
| for _, kv := range list.Kv { |
| kv.StreamId = streamId |
| KVToBuffer(kv, outList) |
| if outList.LenNoPadding() < batchSize { |
| continue |
| } |
| if err := sendIt(); err != nil { |
| return err |
| } |
| } |
| } |
| |
| if st.UseKeyToListWithThreadId { |
| if kvs, err := st.FinishThread(threadId); err != nil { |
| return err |
| } else { |
| for _, kv := range kvs.Kv { |
| kv.StreamId = streamId |
| KVToBuffer(kv, outList) |
| if outList.LenNoPadding() < batchSize { |
| continue |
| } |
| if err := sendIt(); err != nil { |
| return err |
| } |
| } |
| } |
| } |
| // Mark the stream as done. |
| if st.doneMarkers { |
| kv := &pb.KV{ |
| StreamId: streamId, |
| StreamDone: true, |
| } |
| KVToBuffer(kv, outList) |
| } |
| return sendIt() |
| } |
| |
| for { |
| select { |
| case kr, ok := <-st.rangeCh: |
| if !ok { |
| // Done with the keys. |
| return nil |
| } |
| if err := iterate(kr); err != nil { |
| return err |
| } |
| case <-ctx.Done(): |
| return ctx.Err() |
| } |
| } |
| } |
| |
| func (st *Stream) streamKVs(ctx context.Context) error { |
| onDiskSize, uncompressedSize := st.db.EstimateSize(st.Prefix) |
| // Manish has seen uncompressed size to be in 20% error margin. |
| uncompressedSize = uint64(float64(uncompressedSize) * 1.2) |
| st.db.opt.Infof("%s Streaming about %s of uncompressed data (%s on disk)\n", |
| st.LogPrefix, humanize.IBytes(uncompressedSize), humanize.IBytes(onDiskSize)) |
| |
| tickerDur := 5 * time.Second |
| var bytesSent uint64 |
| t := time.NewTicker(tickerDur) |
| defer t.Stop() |
| now := time.Now() |
| |
| sendBatch := func(batch *z.Buffer) error { |
| defer func() { _ = batch.Release() }() |
| sz := uint64(batch.LenNoPadding()) |
| if sz == 0 { |
| return nil |
| } |
| bytesSent += sz |
| // st.db.opt.Infof("%s Sending batch of size: %s.\n", st.LogPrefix, humanize.IBytes(sz)) |
| if err := st.Send(batch); err != nil { |
| st.db.opt.Warningf("Error while sending: %v\n", err) |
| return err |
| } |
| return nil |
| } |
| |
| slurp := func(batch *z.Buffer) error { |
| loop: |
| for { |
| // Send the batch immediately if it already exceeds the maximum allowed size. |
| // If the size of the batch exceeds maxStreamSize, break from the loop to |
| // avoid creating a batch that is so big that certain limits are reached. |
| if uint64(batch.LenNoPadding()) > st.MaxSize { |
| break loop |
| } |
| select { |
| case kvs, ok := <-st.kvChan: |
| if !ok { |
| break loop |
| } |
| y.AssertTrue(kvs != nil) |
| y.Check2(batch.Write(kvs.Bytes())) |
| y.Check(kvs.Release()) |
| |
| default: |
| break loop |
| } |
| } |
| return sendBatch(batch) |
| } // end of slurp. |
| |
| writeRate := y.NewRateMonitor(20) |
| scanRate := y.NewRateMonitor(20) |
| outer: |
| for { |
| var batch *z.Buffer |
| select { |
| case <-ctx.Done(): |
| return ctx.Err() |
| |
| case <-t.C: |
| // Instead of calculating speed over the entire lifetime, we average the speed over |
| // ticker duration. |
| writeRate.Capture(bytesSent) |
| scanned := st.scanned.Load() |
| scanRate.Capture(scanned) |
| numProducers := st.numProducers.Load() |
| |
| st.db.opt.Infof("%s [%s] Scan (%d): ~%s/%s at %s/sec. Sent: %s at %s/sec."+ |
| " jemalloc: %s\n", |
| st.LogPrefix, y.FixedDuration(time.Since(now)), numProducers, |
| y.IBytesToString(scanned, 1), humanize.IBytes(uncompressedSize), |
| humanize.IBytes(scanRate.Rate()), |
| y.IBytesToString(bytesSent, 1), humanize.IBytes(writeRate.Rate()), |
| humanize.IBytes(uint64(z.NumAllocBytes()))) |
| |
| case kvs, ok := <-st.kvChan: |
| if !ok { |
| break outer |
| } |
| y.AssertTrue(kvs != nil) |
| batch = kvs |
| |
| // Otherwise, slurp more keys into this batch. |
| if err := slurp(batch); err != nil { |
| return err |
| } |
| } |
| } |
| |
| st.db.opt.Infof("%s Sent data of size %s\n", st.LogPrefix, humanize.IBytes(bytesSent)) |
| return nil |
| } |
| |
| // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of |
| // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single |
| // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also |
| // spits logs out to Infof, using provided LogPrefix. Note that all calls to Output.Send |
| // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and |
| // return that error. Orchestrate can be called multiple times, but in serial order. |
| func (st *Stream) Orchestrate(ctx context.Context) error { |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists. |
| |
| // kvChan should only have a small capacity to ensure that we don't buffer up too much data if |
| // sending is slow. Page size is set to 4MB, which is used to lazily cap the size of each |
| // KVList. To get 128MB buffer, we can set the channel size to 32. |
| st.kvChan = make(chan *z.Buffer, 32) |
| |
| if st.KeyToList == nil { |
| st.KeyToList = st.ToList |
| } |
| |
| // Picks up ranges from Badger, and sends them to rangeCh. |
| go st.produceRanges(ctx) |
| |
| errCh := make(chan error, st.NumGo) // Stores error by consumeKeys. |
| var wg sync.WaitGroup |
| for i := 0; i < st.NumGo; i++ { |
| wg.Add(1) |
| |
| go func(threadId int) { |
| defer wg.Done() |
| // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan. |
| if err := st.produceKVs(ctx, threadId); err != nil { |
| select { |
| case errCh <- err: |
| default: |
| } |
| } |
| }(i) |
| } |
| |
| // Pick up key-values from kvChan and send to stream. |
| kvErr := make(chan error, 1) |
| go func() { |
| // Picks up KV lists from kvChan, and sends them to Output. |
| err := st.streamKVs(ctx) |
| if err != nil { |
| cancel() // Stop all the go routines. |
| } |
| kvErr <- err |
| }() |
| wg.Wait() // Wait for produceKVs to be over. |
| close(st.kvChan) // Now we can close kvChan. |
| defer func() { |
| // If due to some error, we have buffers left in kvChan, we should release them. |
| for buf := range st.kvChan { |
| _ = buf.Release() |
| } |
| }() |
| |
| select { |
| case err := <-errCh: // Check error from produceKVs. |
| return err |
| default: |
| } |
| |
| // Wait for key streaming to be over. |
| err := <-kvErr |
| return err |
| } |
| |
| func (db *DB) newStream() *Stream { |
| return &Stream{ |
| db: db, |
| NumGo: db.opt.NumGoroutines, |
| LogPrefix: "Badger.Stream", |
| MaxSize: maxStreamSize, |
| } |
| } |
| |
| // NewStream creates a new Stream. |
| func (db *DB) NewStream() *Stream { |
| if db.opt.managedTxns { |
| panic("This API can not be called in managed mode.") |
| } |
| return db.newStream() |
| } |
| |
| // NewStreamAt creates a new Stream at a particular timestamp. Should only be used with managed DB. |
| func (db *DB) NewStreamAt(readTs uint64) *Stream { |
| if !db.opt.managedTxns { |
| panic("This API can only be called in managed mode.") |
| } |
| stream := db.newStream() |
| stream.readTs = readTs |
| return stream |
| } |
| |
| func BufferToKVList(buf *z.Buffer) (*pb.KVList, error) { |
| var list pb.KVList |
| err := buf.SliceIterate(func(s []byte) error { |
| kv := new(pb.KV) |
| if err := proto.Unmarshal(s, kv); err != nil { |
| return err |
| } |
| list.Kv = append(list.Kv, kv) |
| return nil |
| }) |
| return &list, err |
| } |
| |
| func KVToBuffer(kv *pb.KV, buf *z.Buffer) { |
| in := buf.SliceAllocate(proto.Size(kv))[:0] |
| _, err := proto.MarshalOptions{}.MarshalAppend(in, kv) |
| y.AssertTrue(err == nil) |
| } |