diff --git a/hkube_python_wrapper/config/config.py b/hkube_python_wrapper/config/config.py index cc23e91e..f1797731 100644 --- a/hkube_python_wrapper/config/config.py +++ b/hkube_python_wrapper/config/config.py @@ -16,8 +16,7 @@ def getBoolEnv(name, defaultValue): "host": os.environ.get('WORKER_SOCKET_HOST', "127.0.0.1"), "protocol": os.environ.get('WORKER_SOCKET_PROTOCOL', "ws"), "url": os.environ.get('WORKER_SOCKET_URL', None), - "encoding": os.environ.get('WORKER_ALGORITHM_ENCODING', 'bson'), - "delay": getIntEnv('WORKER_TOLERABLE_DELAY',1500) + "encoding": os.environ.get('WORKER_ALGORITHM_ENCODING', 'bson') } discovery = { "host": os.environ.get('POD_IP', '127.0.0.1'), @@ -25,13 +24,14 @@ def getBoolEnv(name, defaultValue): "encoding": os.environ.get('DISCOVERY_ENCODING', 'msgpack'), "enable": getBoolEnv('DISCOVERY_ENABLE', 'True'), "timeout": getIntEnv('DISCOVERY_TIMEOUT', 10000), + "delay": getIntEnv('TOLERABLE_DELAY_MS',10), "networkTimeout": getIntEnv('DISCOVERY_NETWORK_TIMEOUT', 1000), "maxCacheSize": getIntEnv('DISCOVERY_MAX_CACHE_SIZE', 400), "num_threads": getIntEnv('DISCOVERY_SERVER_NUM_THREADS', 5), "num_ping_threads": getIntEnv('DISCOVERY_SERVER_NUM_PING_THREADS', 5), "servingReportInterval": getIntEnv('DISCOVERY_SERVING_REPORT_INTERVAL', 5000), "streaming": { - "messagesMemoryBuff": getIntEnv('STREAMING_MAX_BUFFER_MB', 10), + "messagesMemoryBuff": getIntEnv('STREAMING_MAX_BUFFER_MB', 1500), "port": os.environ.get('STREAMING_DISCOVERY_PORT', 9022), "statisticsInterval": os.environ.get('STREAMING_STATISTICS_INTERVAL', 2), "stateful": getBoolEnv('STREAMING_STATEFUL', 'True') diff --git a/hkube_python_wrapper/wrapper/algorunner.py b/hkube_python_wrapper/wrapper/algorunner.py index bfad7aea..d3094fdc 100644 --- a/hkube_python_wrapper/wrapper/algorunner.py +++ b/hkube_python_wrapper/wrapper/algorunner.py @@ -343,7 +343,7 @@ def _init(self, options): def _discovery_update(self, discovery): log.debug('Got discovery update {discovery}', discovery=discovery) - messageListenerConfig = {'encoding': config.discovery['encoding'],'delay':config.socket['delay']} + messageListenerConfig = {'encoding': config.discovery['encoding'],'delay':config.discovery['delay']} self.streamingManager.setupStreamingListeners( messageListenerConfig, discovery, self._job.nodeName)