IPFS如何建立DataStore

語言: CN / TW / HK

概述

本文件目標在於解析DataStore相關配置以及搞清楚其內部實現。

配置格式

以下是摘自我本地的IPFS相關與DataStore相關的配置選項:

"Datastore": {
    "BloomFilterSize": 0,
    "GCPeriod": "1h",
    "HashOnRead": false,
    "Spec": {
      "mounts": [
        {
          "child": {
            "path": "blocks",
            "shardFunc": "/repo/flatfs/shard/v1/next-to-last/2",
            "sync": true,
            "type": "flatfs"
          },
          "mountpoint": "/blocks",
          "prefix": "flatfs.datastore",
          "type": "measure"
        },
        {
          "child": {
            "compression": "none",
            "path": "datastore",
            "type": "levelds"
          },
          "mountpoint": "/",
          "prefix": "leveldb.datastore",
          "type": "measure"
        }
      ],
      "type": "mount"
    },
    "StorageGCWatermark": 90,
    "StorageMax": "10GB"
  },

其中:

  • StorageMax: 本機可以使用的最大儲存容量,這裡使用預設值,10GB
  • StorageGCWatermark: 垃圾回收閾值,是一個百分比
  • Spec.mounts: 不同的掛載點

資料結構

與DataStore相關配置檔案解析資料結構包括DatastoreConfig及其子類。

DatastoreConfig

type DatastoreConfig interface {
    // DiskSpec returns a minimal configuration of the datastore
    // represting what is stored on disk.  Run time values are
    // excluded.
    DiskSpec() DiskSpec

    // Create instantiate a new datastore from this config
    Create(path string) (repo.Datastore, error)
}

DataStoreConfig 目的是解析配置,並根據配置建立Datastore物件。它有很多實現的子類,包括:

mountDatastoreConfig

type premount struct {
    ds     DatastoreConfig
    prefix ds.Key
}

type mountDatastoreConfig struct {
    mounts []premount
}

可以看到,mountDatastoreConfig內維護了premount陣列,premount中維護了

  • prefix:尚不清楚該結構的作用是什麼,其解析來自mountpoint欄位
  • ds:內嵌了另外一個DatastoreConfig,這個跟我們前面看到的示例配置一樣,都是層層包裹,類似洋蔥

其建立的DataStore型別為:

type Mount struct {
        Prefix    ds.Key
        Datastore ds.Datastore
}

type Datastore struct {
        mounts []Mount
}

memDatastoreConfig

這個用來建立基於Memory的DataStore,具體來說,建立的是MapDatastore,忽略。

logDatastoreConfig

暫時不清楚其作用

measureDatastoreConfig

type measureDatastoreConfig struct {
        child  DatastoreConfig
        prefix string
}

這個類似mountDatastoreConfig,但measureDatastoreConfig內部並非一個數組,而是隻包含一個child,當然,child又是一個巢狀的DatastoreConfig。其prefix欄位目前含義不明。其建立的DataStore型別為:

type measure struct {
        backend datastore.Datastore
        putNum     metrics.Counter
        putErr     metrics.Counter
        putLatency metrics.Histogram
        ...
}

measure只是對更底層DataStore的二次封裝,在其上增加了一些統計資訊。其中backend代表了其更底層的DataStore。

另外,通過measure的建立方法發現,prefix欄位是作用就是為各種統計欄位增加字首:

// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
func New(prefix string, ds datastore.Datastore) *measure {
        m := &measure{
                backend: ds,

                putNum: metrics.New(prefix+".put_total", "Total number of Datastore.Put calls").Counter(),
                putErr: metrics.New(prefix+".put.errors_total", "Number of errored Datastore.Put calls").Counter(),
                ...
             }
         ...
 }

實現分析

解析入口

在建立FSRepo的時候會解析DataStore相關配置:

func Init(repoPath string, conf *config.Config) error {
    ...
    if err := initSpec(repoPath, conf.Datastore.Spec); err != nil {
        return err
    }
    ...
}

func initSpec(path string, conf map[string]interface{}) error {
    dsc, err := AnyDatastoreConfig(conf)
}

真正的解析留到了AnyDatastoreConfig()函式。

AnyDatastoreConfig

AnyDatastoreConfig會解析上述格式的配置,然後建立DatastoreConfig物件。

func AnyDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
    which, ok := params["type"].(string)
    fun, ok := datastores[which]
    return fun(params)
}

由於上述的配置格式中Spec.mounts可以看到是類似一個遞迴的結構,這裡的解析也差不多采用遞迴的解析方法。我們以上面的配置為例來描述這個函式的執行過程。

首先從引數中獲得type,通過上面的配置我們可以看到拿到的type為"mount"。查詢datastores["mount"],我們看看拿到的是什麼:

func init() {
    datastores = map[string]ConfigFromMap{
        "mount":   MountDatastoreConfig,
        "mem":     MemDatastoreConfig,
        "log":     LogDatastoreConfig,
        "measure": MeasureDatastoreConfig,
    }
}

可以發現,拿到的是MountDatastoreConfig。接下來呼叫該方法:

func MountDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
    var res mountDatastoreConfig
    mounts, ok := params["mounts"].([]interface{})
    for _, iface := range mounts {
        cfg, ok := iface.(map[string]interface{})
        child, err := AnyDatastoreConfig(cfg)
        prefix, found := cfg["mountpoint"]
        res.mounts = append(res.mounts, premount{
            ds:     child,
            prefix: ds.NewKey(prefix.(string)),
        })
    }
    sort.Slice(res.mounts,
        func(i, j int) bool {
            return res.mounts[i].prefix.String() > res.mounts[j].prefix.String()
        })

    return &res, nil
}

這裡會解析Spec.mounts下的所有配置,為每一個配置建立一個premount物件。我們先來看看其格式如何:

"mounts": [
        {
          "child": {
            "path": "blocks",
            "shardFunc": "/repo/flatfs/shard/v1/next-to-last/2",
            "sync": true,
            "type": "flatfs"
          },
          "mountpoint": "/blocks",
          "prefix": "flatfs.datastore",
          "type": "measure"
        },
        {
          "child": {
            "compression": "none",
            "path": "datastore",
            "type": "levelds"
          },
          "mountpoint": "/",
          "prefix": "leveldb.datastore",
          "type": "measure"
        }
      ],

本例子中mounts下一共有兩個子配置,其type均為measure,但其mountpoint和prefix有所不同,目前還不清楚mountpoint和prefix到底起什麼作用。

上面MountDatastoreConfig函式中會掃描每一個子配置項,獲得其type,然後呼叫AnyDatastoreConfig從datastores這個map中查詢,由於本例中兩個子配置項的type全部是"measure",因此獲得的是MeasureDatastoreConfig。因而,在AnyDatastoreConfig中會為每個子配置項呼叫MeasureDatastoreConfig方法。

func MeasureDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
    childField, ok := params["child"].(map[string]interface{})
    ...
    child, err := AnyDatastoreConfig(childField)
    ...
    prefix, ok := params["prefix"].(string)
    ...
    return &measureDatastoreConfig{child, prefix}, nil
}

這裡解析具體的每個子配置項,例如第一個子配置項的格式為:

{
          "child": {
            "path": "blocks",
            "shardFunc": "/repo/flatfs/shard/v1/next-to-last/2",
            "sync": true,
            "type": "flatfs"
          },
          "mountpoint": "/blocks",
          "prefix": "flatfs.datastore",
          "type": "measure"
},

上述MeasureDatastoreConfig函式中首先獲得其child欄位,然後獲得其type(得到的值為"flatfs"),然後繼續通過AnyDatastoreConfig從datastores內找到對於該型別的配置的解析方法。datastores內預設沒有內建該型別(flatfs)的配置解析方法,而是通過外掛的方式支援:

func (loader *PluginLoader) Inject() error {
    for _, pl := range loader.plugins {
        ...
        if pl, ok := pl.(plugin.PluginDatastore); ok {
            err := injectDatastorePlugin(pl)
        }
    }
}

func injectDatastorePlugin(pl plugin.PluginDatastore) error {
    return fsrepo.AddDatastoreConfigHandler(pl.DatastoreTypeName(), pl.DatastoreConfigParser())
}

func AddDatastoreConfigHandler(name string, dsc ConfigFromMap) error {
    _, ok := datastores[name]
    if ok {
        return fmt.Errorf("already have a datastore named %q", name)
    }

    datastores[name] = dsc
    return nil
}

而支援flatfs型別的外掛為flatfsPlugin:

func (*flatfsPlugin) DatastoreConfigParser() fsrepo.ConfigFromMap {
    return func(params map[string]interface{}) (fsrepo.DatastoreConfig, error) {
        var c datastoreConfig
        c.path, ok = params["path"].(string)
        
        sshardFun, ok := params["shardFunc"].(string)
        
        c.shardFun, err = flatfs.ParseShardFunc(sshardFun)
        
        c.syncField, ok = params["sync"].(bool)
        
        return &c, nil
    }
}

而所謂的flatfs,其實現上很簡單,就是將block資料通過hash打散到磁碟上不同的目錄下儲存。這也是最簡單的儲存方式。

而另外一個子配置項的型別為levelds(leveldb datastore),也是通過外掛的方式注入系統,其外掛為leveldsPlugin,其建立的DataStore底層儲存依賴levelDB。

func (*leveldsPlugin) DatastoreConfigParser() fsrepo.ConfigFromMap {
    return func(params map[string]interface{}) (fsrepo.DatastoreConfig, error) {
        var c datastoreConfig
        c.path, ok = params["path"].(string)
        switch cm := params["compression"]; cm {
        case "none":
            c.compression = ldbopts.NoCompression
        case "snappy":
          ...
        }
        return &c, nil
    }
}

func (c *datastoreConfig) Create(path string) (repo.Datastore, error) {
    p := c.path
    if !filepath.IsAbs(p) {
        p = filepath.Join(path, p)
    }

    return levelds.NewDatastore(p, &levelds.Options{
        Compression: c.compression,
    })
}

上面的示例配置中形成的各種DatastoreConfig的結構圖如下:

而據此建立的DataStore的結構如下圖:

讀寫請求訪問Datastore

我們以上傳物件舉例來說明如何使用上面構建的Datastore:

func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options.UnixfsAddOption) (path.Resolved, error) {
    ...
    if settings.OnlyHash {
        ...
    } else {
        syncDserv = &syncDagService{
            DAGService: dserv,
            syncFn: func() error {
                ds := api.repo.Datastore()
                // 這裡的BlockPrefix為"/blocks"
                if err := ds.Sync(ctx, bstore.BlockPrefix); err != nil {
                    return err
                }
                // 這裡的BlockPrefix為"/filestore"
                return ds.Sync(ctx, filestore.FilestorePrefix)
            },
        }
    }
}

我們之前說過,寫入的時候構建Merkle-Tree,將原始檔案按照固定大小切分成block,並將其快取,直到最後sync。而最終的Sync方法就是呼叫上面的ds.Sync,這裡的ds來自repo.Datastore,也就是我們上面通過Config一步步創建出來的,最頂層的為mount.Datastore。

我們看看mount.Datastore的Sync實現( https:// github.com/ipfs/go-data store/blob/master/mount/mount.go ):

func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
        dstores, prefixes, rest := d.lookupAll(prefix)
        for i, suffix := range rest {
                if err := dstores[i].Sync(ctx, suffix); err != nil {
                        merr = multierr.Append(merr, fmt.Errorf(
                                "syncing datastore at %s: %w",
                                prefixes[i].String(),
                                err,
                        ))
                }
        }
        return merr
}

這裡根據prefix(mountpoint)找到對應的Datastore,然後繼續呼叫下層Datastore的Sync。由於傳入的prefix為"/blocks",match了我們之前建立的,於是繼續呼叫measure.Datastore的Sync方法( https:// github.com/ipfs/go-ds-m easure/blob/master/measure.go )。

func (m *measure) Sync(ctx context.Context, prefix datastore.Key) error {
        defer recordLatency(m.syncLatency, time.Now())
        m.syncNum.Inc()
        err := m.backend.Sync(ctx, prefix)
        if err != nil {
                m.syncErr.Inc()
        }
        return err
}

這裡只是在更底層的flatfs.Datastore的Sync基礎上封裝了一些metric。而flatfs.Datastore的Sync如下( https:// github.com/ipfs/go-ds-f latfs/blob/75f430808ac11601ddda8d4fb578c45da9304285/flatfs.go#L120 ):

func (fs *Datastore) Sync(ctx context.Context, prefix datastore.Key) error {
        fs.shutdownLock.RLock()
        defer fs.shutdownLock.RUnlock()
        if fs.shutdown {
                return ErrClosed
        }

        return nil
}

這裡什麼都不做,因為對於flatfs來說,每次寫入block檔案時已經針對檔案進行了sync,無需再針對檔案系統進行sync。