Skip to content

BoltDB Instead of etcd: Embedded State Store

BoltDB Instead of etcd: Embedded State Store

Written by:

Igor Gorovyy
DevOps Engineer Lead & Senior Solutions Architect

LinkedIn


Kubernetes uses etcd -- a distributed key-value store with Raft consensus. For a cluster with hundreds of nodes, that's necessary. But for a learning project, etcd is overkill. In Shepherd, we use BoltDB -- an embedded database that lives in a single file.

Buckets -- BoltDB Tables

var (
    bucketPods        = []byte("pods")
    bucketServices    = []byte("services")
    bucketDeployments = []byte("deployments")
    bucketNodes       = []byte("nodes")
    bucketEvents      = []byte("events")
)
graph TD
    DB[("shepherd.db")]
    DB --> PODS["<b>pods</b>"]
    DB --> SVCS["<b>services</b>"]
    DB --> DEPS["<b>deployments</b>"]
    DB --> NODES["<b>nodes</b>"]
    DB --> EVENTS["<b>events</b>"]
    PODS --> PV["default/web-0 → Pod JSON<br/>default/web-1 → Pod JSON"]
    SVCS --> SV["default/web-service → Service JSON"]
    DEPS --> DV["default/web → Deployment JSON"]
    NODES --> NV["node-1 → Node JSON"]
    EVENTS --> EV["1714300000-pod/web-0 → Event JSON"]

Initialization

func NewStore(path string) (*Store, error) {
    db, err := bolt.Open(path, 0600,
        &bolt.Options{Timeout: 1 * time.Second})
    if err != nil {
        return nil, fmt.Errorf("open store: %w", err)
    }

    err = db.Update(func(tx *bolt.Tx) error {
        for _, b := range [][]byte{
            bucketPods, bucketServices,
            bucketDeployments, bucketNodes, bucketEvents,
        } {
            if _, err := tx.CreateBucketIfNotExists(b); err != nil {
                return err
            }
        }
        return nil
    })

    return &Store{db: db}, nil
}

bolt.Open creates or opens the shepherd.db file. Buckets are created if they don't exist yet.

Keys with Namespace

Pods, services, and deployments are namespaced resources. The key is formed as namespace/name:

func nsKey(namespace, name string) []byte {
    if namespace == "" {
        namespace = "default"
    }
    return []byte(namespace + "/" + name)
}

Nodes don't have a namespace, so the key is just the node name.

CRUD Helpers

Three simple functions for all operations:

func (s *Store) put(bucket []byte, key []byte, v any) error {
    data, _ := json.Marshal(v)
    return s.db.Update(func(tx *bolt.Tx) error {
        return tx.Bucket(bucket).Put(key, data)
    })
}

func (s *Store) get(bucket []byte, key []byte, v any) error {
    return s.db.View(func(tx *bolt.Tx) error {
        data := tx.Bucket(bucket).Get(key)
        if data == nil {
            return fmt.Errorf("not found")
        }
        return json.Unmarshal(data, v)
    })
}

func (s *Store) delete(bucket []byte, key []byte) error {
    return s.db.Update(func(tx *bolt.Tx) error {
        return tx.Bucket(bucket).Delete(key)
    })
}

Update is a read-write transaction. View is read-only (doesn't block other Views).

Listing with Namespace Filter

func (s *Store) list(bucket []byte, prefix string,
    fn func([]byte) error) error {
    return s.db.View(func(tx *bolt.Tx) error {
        b := tx.Bucket(bucket)
        return b.ForEach(func(k, v []byte) error {
            if prefix == "" ||
                strings.HasPrefix(string(k), prefix+"/") {
                return fn(v)
            }
            return nil
        })
    })
}

If prefix is empty -- return everything. If a namespace is specified -- filter by key prefix.

Watch -- Change Notifications

type Store struct {
    db *bolt.DB
    mu sync.RWMutex

    podWatchers        []chan Event
    deploymentWatchers []chan Event
    watchMu            sync.Mutex
}

func (s *Store) WatchPods() chan Event {
    s.watchMu.Lock()
    defer s.watchMu.Unlock()
    ch := make(chan Event, 64)
    s.podWatchers = append(s.podWatchers, ch)
    return ch
}

func (s *Store) notify(watchers []chan Event, evt Event) {
    s.watchMu.Lock()
    defer s.watchMu.Unlock()
    for _, ch := range watchers {
        select {
        case ch <- evt:
        default: // don't block if channel is full
        }
    }
}

Controllers subscribe to changes via WatchPods() and WatchDeployments(). When a pod is updated, the Store sends an event to all channels.

Events -- Event Log

func (s *Store) RecordEvent(evt Event) error {
    key := fmt.Sprintf("%d-%s",
        evt.Timestamp.UnixNano(), evt.Object)
    return s.put(bucketEvents, []byte(key), evt)
}

func (s *Store) ListEvents(limit int) ([]Event, error) {
    var events []Event
    s.db.View(func(tx *bolt.Tx) error {
        c := tx.Bucket(bucketEvents).Cursor()
        count := 0
        // From newest to oldest
        for k, v := c.Last(); k != nil && count < limit;
            k, v = c.Prev() {
            var evt Event
            json.Unmarshal(v, &evt)
            events = append(events, evt)
            count++
        }
        return nil
    })
    return events, nil
}

The event key is a nanosecond timestamp plus the object. BoltDB stores keys in sorted order, so Cursor.Last() gives you the newest events.

One Thing to Keep in Mind

BoltDB is single-node. If the API Server crashes, the data stays in the file, but: - No replication - No distributed watch - One writer at a time (though many readers)

For a production cluster, you need etcd with its Raft consensus and linearizable reads. For learning, BoltDB is a perfect choice: zero external dependencies, everything in a single file.

Try It Yourself

# Check the database size:
ls -lh /var/lib/shepherd/shepherd.db
# Inspect contents via API:
curl -s localhost:9876/api/v1/nodes | jq .
curl -s localhost:9876/api/v1/events | jq '.[0:3]'

State is persisted. Next up -- Scheduler: how to pick the best node for a pod.

Resources

Source code for the series: github.com/igorgorovoy/sheep-shepherd-meadow

Previous: Kubernetes API Server | Next: Scheduler