diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go new file mode 100644 index 00000000..6a126b6a --- /dev/null +++ b/services/storehost/spaceMon.go @@ -0,0 +1,215 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storehost + +import ( + "os" + "sync" + "syscall" + "time" + + "github.com/uber-common/bark" + "github.com/uber/cherami-server/common" + "github.com/uber/cherami-server/common/metrics" + "github.com/uber/cherami-server/services/storehost/load" +) + +const ( + warnThreshold = 200 * gigaBytes + alertThreshold = 100 * gigaBytes + resumeWritesThreshold = 200 * gigaBytes +) + +// interval at which to monitor space +const spaceMonInterval = 2 * time.Minute + +const ( + kiloBytes = 1024 + megaBytes = 1024 * kiloBytes + gigaBytes = 1024 * megaBytes + teraBytes = 1024 * gigaBytes +) + +// StorageMode defines the read write mode of the storage host +type StorageMode int32 + +const ( + // StorageModeReadWrite read/write + StorageModeReadWrite StorageMode = iota + // StorageModeReadOnly read only + StorageModeReadOnly +) + +type ( + // SpaceMon keep monitoring disk usage, and log/alert/trigger necessary handling + SpaceMon interface { + common.Daemon + GetMode() StorageMode + } + + // SpaceMon is an implementation of SpaceMon. + spaceMon struct { + sync.RWMutex + + storeHost *StoreHost // TODO: use this to trigger turning into read only mode + logger bark.Logger + m3Client metrics.Client + hostMetrics *load.HostMetrics + + stopCh chan struct{} + path string + mode StorageMode + } +) + +// NewSpaceMon returns an instance of SpaceMon. +func NewSpaceMon(store *StoreHost, m3Client metrics.Client, hostMetrics *load.HostMetrics, logger bark.Logger, path string) SpaceMon { + + return &spaceMon{ + storeHost: store, + logger: logger, + m3Client: m3Client, + hostMetrics: hostMetrics, + path: path, + mode: StorageModeReadWrite, + } +} + +// Start starts the monitoring +func (s *spaceMon) Start() { + + s.logger.Info("SpaceMon: started") + s.stopCh = make(chan struct{}) + go s.pump() +} + +// Stop stops the monitoring +func (s *spaceMon) Stop() { + + close(s.stopCh) + s.logger.Info("SpaceMon: stopped") +} + +// GetMode returns the read/write mode of storage +func (s *spaceMon) GetMode() StorageMode { + + s.RLock() + defer s.RUnlock() + + return s.mode +} + +func (s *spaceMon) pump() { + + var stat syscall.Statfs_t + + path := s.path + + if len(path) <= 0 { + + s.logger.Warn("SpaceMon: monitoring path is empty, trying working directory") + + cwd, err := os.Getwd() + if err != nil { + s.logger.Error("SpaceMon: os.Getwd() failed", err) + return + } + + path = cwd + } + + log := s.logger.WithField(`path`, path) + + ticker := time.NewTicker(spaceMonInterval) + defer ticker.Stop() + + for range ticker.C { + + select { + case <-s.stopCh: + return + default: + // continue below + } + + // query available/total space + err := syscall.Statfs(path, &stat) + if err != nil { + log.WithField(common.TagErr, err).Error("SpaceMon: syscall.Statfs failed", path) + continue + } + + avail := stat.Bavail * uint64(stat.Bsize) + total := stat.Blocks * uint64(stat.Bsize) + + if total <= 0 { + log.Error(`SpaceMon: total space unavailable`) + continue + } + + availMiBs, totalMiBs := avail/megaBytes, total/megaBytes + availPercent := 100.0 * float64(avail) / float64(total) + + s.hostMetrics.Set(load.HostMetricFreeDiskSpaceBytes, int64(avail)) + s.m3Client.UpdateGauge(metrics.SystemResourceScope, metrics.StorageDiskAvailableSpaceMB, int64(availMiBs)) + + xlog := log.WithFields(bark.Fields{ + `avail`: availMiBs, + `total`: totalMiBs, + `percent`: availPercent, + }) + + s.Lock() + defer s.Unlock() + + switch { + case s.mode == StorageModeReadOnly: + + // disable read-only, if above resume-writes threshold + if avail > resumeWritesThreshold { + + s.storeHost.EnableWrite() + s.mode = StorageModeReadWrite + + xlog.Info("SpaceMon: disabling read-only") + + } else { + + xlog.Warn(`SpaceMon: continuing in read-only mode`) + } + + case avail < alertThreshold: // enable read-only, if below alert-threshold + + xlog.Error("SpaceMon: available space less than alert-threshold") + + if s.mode != StorageModeReadOnly { + s.mode = StorageModeReadOnly + s.storeHost.DisableWrite() + } + + case avail < warnThreshold: // warn, if below warn-threshold + xlog.Warn("SpaceMon: available space less than warn-threshold") + + default: + xlog.Debug("SpaceMon: monitoring") + } + } +} diff --git a/services/storehost/storagemonitor.go b/services/storehost/storagemonitor.go deleted file mode 100644 index eeb85878..00000000 --- a/services/storehost/storagemonitor.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright (c) 2016 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package storehost - -import ( - "os" - "sync" - "syscall" - "time" - - "github.com/uber-common/bark" - "github.com/uber/cherami-server/common" - "github.com/uber/cherami-server/common/metrics" - "github.com/uber/cherami-server/services/storehost/load" -) - -// Monitoring housekeeping will happen every 2 minutes -const storageMonitoringInterval = time.Duration(2 * time.Minute) - -const ( - thresholdWarn = 75 * gigaBytes - thresholdReadOnly = 50 * gigaBytes - thresholdResumeWrites = 100 * gigaBytes -) - -const ( - kiloBytes = 1024 - megaBytes = 1024 * kiloBytes - gigaBytes = 1024 * megaBytes - teraBytes = 1024 * gigaBytes -) - -// StorageStatus defines the different storage status -type StorageStatus int32 - -// StorageMode defines the read write mode of the storage host -type StorageMode int32 - -const ( - // SMReadWrite allows both read and write - SMReadWrite = iota - // SMReadOnly allows read only - SMReadOnly -) - -type ( - // StorageMonitor keep monitoring disk usage, and log/alert/trigger necessary handling - StorageMonitor interface { - common.Daemon - GetStorageMode() StorageMode - } - - // StorageMonitor is an implementation of StorageMonitor. - storageMonitor struct { - sync.RWMutex - - storeHost *StoreHost // TODO: use this to trigger turning into read only mode - - logger bark.Logger - m3Client metrics.Client - hostMetrics *load.HostMetrics - - closeChannel chan struct{} - - monitoringTicker *time.Ticker - - monitoringPath string - mode StorageMode - } -) - -// NewStorageMonitor returns an instance of NewStorageMonitor. -func NewStorageMonitor(store *StoreHost, m3Client metrics.Client, hostMetrics *load.HostMetrics, logger bark.Logger, path string) StorageMonitor { - return &storageMonitor{ - storeHost: store, - logger: logger, - m3Client: m3Client, - hostMetrics: hostMetrics, - monitoringPath: path, - mode: SMReadWrite, - } -} - -// Start starts the monitoring -func (s *storageMonitor) Start() { - - s.closeChannel = make(chan struct{}) - - go func() { - - ticker := time.NewTicker(storageMonitoringInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - go s.checkStorage() - case <-s.closeChannel: - return - } - } - }() - - s.logger.Info("StorageMonitor: started") -} - -// Stop stops the monitoring -func (s *storageMonitor) Stop() { - close(s.closeChannel) - - s.logger.Info("StorageMonitor: stopped") -} - -// GetStorageMode returns the read/write mode of storage -func (s *storageMonitor) GetStorageMode() StorageMode { - s.RLock() - defer s.RUnlock() - - return s.mode -} - -func (s *storageMonitor) checkStorage() { - var stat syscall.Statfs_t - - path := s.monitoringPath - - if len(path) <= 0 { - s.logger.Warn("StorageMonitor: monitoring path is empty, try working directory") - wd, err := os.Getwd() - if err != nil { - s.logger.Error("StorageMonitor: os.Getwd() failed", err) - return - } - path = wd - } - - log := s.logger.WithField(`path`, path) - - err := syscall.Statfs(path, &stat) - if err != nil { - log.WithField(common.TagErr, err).Error("StorageMonitor: syscall.Statfs failed") - return - } - - avail := int64(stat.Bavail) * int64(stat.Bsize) - total := int64(stat.Blocks) * int64(stat.Bsize) - - xlog := log.WithFields(bark.Fields{ - `availMiB`: avail / megaBytes, - `totalMiB`: total / megaBytes, - }) - - if total <= 0 { - xlog.Error(`StorageMonitor: total space unavailable`) - return - } - - s.hostMetrics.Set(load.HostMetricFreeDiskSpaceBytes, avail) - s.m3Client.UpdateGauge(metrics.SystemResourceScope, metrics.StorageDiskAvailableSpaceMB, avail/megaBytes) - - s.Lock() - defer s.Unlock() - - switch { - case s.mode == SMReadOnly: - - // disable read-only, if above resume-writes threshold - if avail > thresholdResumeWrites { - xlog.Info("StorageMonitor: disabling read-only") - s.storeHost.EnableWrite() - s.mode = SMReadWrite - } else { - xlog.Warn("StorageMonitor: continuing read-only") - } - - case avail < thresholdReadOnly: // enable read-only, if below readonly-threshold - xlog.Error("StorageMonitor: available space less than readonly-threshold") - - if s.mode != SMReadOnly { - s.mode = SMReadOnly - s.storeHost.DisableWrite() - } - - case avail < thresholdWarn: // warn, if below warn-threshold - xlog.Warn("StorageMonitor: available space less than warn-threshold") - - default: - xlog.Debug("StorageMonitor: monitoring") - } -} diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index e4f0f0ae..eaa62a8b 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -196,7 +196,7 @@ type ( extStatsReporter *ExtStatsReporter // Storage Monitoring - storageMonitor StorageMonitor + spaceMon SpaceMon // metrics aggregated at host level and reported to controller hostMetrics *load.HostMetrics @@ -283,8 +283,8 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) { t.xMgr = NewExtentManager(storeMgr, t.m3Client, t.hostMetrics, t.logger) - t.storageMonitor = NewStorageMonitor(t, t.m3Client, t.hostMetrics, t.logger, baseDir) - t.storageMonitor.Start() + t.spaceMon = NewSpaceMon(t, t.m3Client, t.hostMetrics, t.logger, baseDir) + t.spaceMon.Start() t.replMgr = NewReplicationManager(t.xMgr, t.m3Client, t.mClient, t.logger, hostID, t.GetWSConnector()) @@ -316,7 +316,7 @@ func (t *StoreHost) Stop() { t.loadReporter.Stop() t.hostIDHeartbeater.Stop() - t.storageMonitor.Stop() + t.spaceMon.Stop() t.replicationJobRunner.Stop() t.extStatsReporter.Stop() t.SCommon.Stop() @@ -361,7 +361,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque req, err := common.GetOpenAppendStreamRequestHTTP(r.Header) if err != nil { - t.logger.WithField(`error`, err).Error("unable to parse all needed headers") + t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to parse all needed headers") t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) http.Error(w, err.Error(), http.StatusBadRequest) return @@ -370,7 +370,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque // setup websocket wsStream, err := t.GetWSConnector().AcceptAppendStream(w, r) if err != nil { - t.logger.WithField(`error`, err).Error("unable to upgrade websocket connection") + t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to upgrade websocket connection") t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) return } @@ -382,7 +382,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque // create thrift stream call wrapper and deligate to streaming call if err = t.OpenAppendStream(ctx, wsStream); err != nil { - t.logger.WithField(`error`, err).Error("unable to open append stream") + t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to open append stream") return } } @@ -392,25 +392,13 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageRequests) - if atomic.LoadInt32(&t.started) == 0 { - call.Done() - t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) - return newInternalServiceError("StoreHost not started") - } - - // If the disk available space is low, we should fail any request to write extent - if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly { - call.Done() - t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) - return newInternalServiceError("StoreHost in read-only mode") - } - // read in args passed in via the Thrift context headers args, err := getInConnArgs(ctx) if err != nil { call.Done() t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + t.logger.Error("OpenAppendStream: error parsing args") return err } @@ -419,8 +407,20 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore common.TagDst: common.FmtDst(args.destID.String()), }) - log.WithField("args", fmt.Sprintf("destType=%v mode=%v", args.destType, args.mode)). - Info("OpenAppendStream: starting inConn") + if atomic.LoadInt32(&t.started) == 0 { + call.Done() + t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + log.Error("OpenAppendStream: storehost not started") + return newInternalServiceError("StoreHost not started") + } + + // If the disk available space is low, we should fail any request to write extent + if t.spaceMon != nil && t.spaceMon.GetMode() == StorageModeReadOnly { + call.Done() + t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + log.Error("OpenAppendStream: storehost currently readonly") + return newInternalServiceError("StoreHost in read-only mode") + } in := newInConn(args, call, t.xMgr, t.m3Client, log) @@ -431,7 +431,12 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore numInConn := atomic.AddInt64(&t.numInConn, 1) t.m3Client.UpdateGauge(metrics.OpenAppendStreamScope, metrics.StorageWriteStreams, numInConn) - log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: write stream opened") + + log.WithFields(bark.Fields{ + `destType`: args.destType, + `mode`: args.mode, + `numInConn`: numInConn, + }).Info("OpenAppendStream: inConn started") select { // wait for inConn to be done @@ -439,19 +444,19 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore // .. or wait for shutdown to be triggered case <-t.shutdownC: + log.Info("OpenAppendStream: shutdown, stopping inConn") err = in.Stop() // attempt to stop connection // listen to extreme situations case <-t.disableWriteC: - log.Info("Stop write due to available disk space is extremely low") + log.Error("OpenAppendStream: writes disabled, stopping inConn") err = in.Stop() } numInConn = atomic.AddInt64(&t.numInConn, -1) t.m3Client.UpdateGauge(metrics.OpenAppendStreamScope, metrics.StorageWriteStreams, numInConn) - log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: write stream closed") + log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: inConn done") - log.Info("OpenAppendStream done") return err // FIXME: tchannel does *not* currently propagate this to the remote caller } @@ -534,12 +539,6 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageRequests) - if atomic.LoadInt32(&t.started) == 0 { - call.Done() - t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures) - return newInternalServiceError("StoreHost not started") - } - // read in args passed in via the Thrift context headers args, e := getOutConnArgs(ctx) @@ -566,6 +565,13 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp common.TagCnsm: common.FmtCnsm(args.consGroupID.String()), }) + if atomic.LoadInt32(&t.started) == 0 { + call.Done() + t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures) + log.Error("OpenReadStream: storehost not started") + return newInternalServiceError("StoreHost not started") + } + out := newOutConn(args, call, t.xMgr, t.m3Client, log) t.shutdownWG.Add(1) @@ -590,6 +596,7 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp // .. or wait for shutdown to be triggered case <-t.shutdownC: + log.Info("OpenReadStream: shutdown, stopping inConn") out.Stop() // attempt to stop connection } @@ -1342,14 +1349,14 @@ func (t *StoreHost) reportHostMetric(reporter common.LoadReporter, diffSecs int6 } // check and notify read-only state - if t.storageMonitor.GetStorageMode() == SMReadOnly { + if t.spaceMon.GetMode() == StorageModeReadOnly { hostMetrics.NodeState = common.Int64Ptr(controller.NODE_STATE_READONLY) } remDiskSpaceBytes := t.hostMetrics.Get(load.HostMetricFreeDiskSpaceBytes) if remDiskSpaceBytes > 0 { // the remaining disk space computation happens - // as part of the storageMonitor thread and the + // as part of the spaceMon thread and the // load reporter could be called before the storage // monitor gets a chance to do this computation. // Make sure we don't report zero values in the