Skip to content

Commit

Permalink
Copy CM-generated streamsets.keytab to fixed location and refer to it
Browse files Browse the repository at this point in the history
  • Loading branch information
kirtiv1 authored and dimaspivak committed May 22, 2018
1 parent 0c6dba9 commit bd7f762
Showing 1 changed file with 33 additions and 5 deletions.
38 changes: 33 additions & 5 deletions start.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
DEFAULT_CLUSTER_NAME = 'cluster'
SECONDARY_NODE_TEMPLATE_NAME = 'Secondary'

KERBEROS_CONFIG_CONTAINER_DIR = '/etc/clusterdock/kerberos'
KERBEROS_CONFIG_CONTAINER_DIR = '/etc/clusterdock/client/kerberos'
KDC_HOSTNAME = 'kdc'
KDC_IMAGE = 'clusterdock/topology_nodebase_kerberos:centos6.6'
KDC_ACL_FILENAME = '/var/kerberos/krb5kdc/kadm5.acl'
Expand All @@ -54,10 +54,10 @@
JAAS_CONFIG = """KafkaClient {{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="_KEYTAB_PATH"
principal="{0}";
keyTab="{}/streamsets.keytab"
principal="{}";
}};"""
JAAS_CONFIG_FILE_PATH = '/etc/kafka/kafka_client_jaas.conf'
JAAS_CONFIG_FILE_PATH = '/etc/clusterdock/client/kafka/kafka_client_jaas.conf'

KUDU_PARCEL_VERSION_REGEX = r'(.*)-.*\.cdh(.*)\.p'
KAFKA_PARCEL_REPO_URL = 'https://archive.cloudera.com/kafka/parcels'
Expand Down Expand Up @@ -357,6 +357,8 @@ def cm_server_not_dead(primary_node):
if args.sdc_version:
logger.info('Configuring StreamSets Data Collector ...')
_configure_sdc(deployment, cluster, args)
logger.info('Adding sdc user to YARN user whitelist ..')
_configure_yarn(deployment, cluster, cluster_name=DEFAULT_CLUSTER_NAME)

if args.kerberos:
logger.info('Configure Cloudera Manager for Kerberos ...')
Expand Down Expand Up @@ -914,6 +916,7 @@ def _setup_ssl_encryption_authentication(cluster, service):

cluster.primary_node.execute(' && '.join(ssl_authentication_commands + ssl_commands))


def _configure_sdc(deployment, cluster, args):
logger.info('Adding StreamSets service to cluster (%s) ...', DEFAULT_CLUSTER_NAME)
datacollector_role = {'type': 'DATACOLLECTOR',
Expand Down Expand Up @@ -942,7 +945,7 @@ def _configure_sdc(deployment, cluster, args):
primary_node = cluster.primary_node
sdc_principal = 'sdc/{kafka_node_name}@{realm}'.format(kafka_node_name=primary_node.fqdn,
realm=cluster.network.upper())
primary_node.put_file(JAAS_CONFIG_FILE_PATH, JAAS_CONFIG.format(sdc_principal))
primary_node.put_file(JAAS_CONFIG_FILE_PATH, JAAS_CONFIG.format(KERBEROS_CONFIG_CONTAINER_DIR, sdc_principal))
# Configure SDC to use the JAAS config file created above.
opts = '-Djava.security.auth.login.config={0} -Dsun.security.krb5.debug=true'.format(JAAS_CONFIG_FILE_PATH)
configs['java.opts'] = opts
Expand Down Expand Up @@ -1167,7 +1170,32 @@ def _configure_for_streamsets_before_start(deployment, cluster_name):
break


def _configure_yarn(deployment, cluster, cluster_name):
logger.info('Configuring Yarn ...')

for role_config_group in deployment.get_service_role_config_groups(cluster_name, 'yarn'):
if role_config_group['roleType'] == 'NODEMANAGER':
configs = deployment.get_service_role_config_group_config(cluster_name,
'yarn',
role_config_group['name'],
view='full')
whitelisted_users = configs['container_executor_allowed_system_users']
configs = {'container_executor_allowed_system_users': '{},sdc'.format(whitelisted_users)}
deployment.update_service_role_config_group_config(cluster_name,
'yarn',
role_config_group['name'],
configs)


def _configure_for_streamsets_after_start(deployment, cluster_name, cluster, quiet):
# Following is needed for Kerberos and Kafka to work correctly.
logger.info('Copying streamsets keytab to a fixed location which is shared on all clustered nodes ...')
commands = [('cp "$(find /var/run/cloudera-scm-agent/process/*streamsets-DATACOLLECTOR -maxdepth 0 -mindepth 0 | '
'sort -rs | head -1)/streamsets.keytab" {}/streamsets.keytab').format(KERBEROS_CONFIG_CONTAINER_DIR),
'chown sdc:sdc {}/streamsets.keytab'.format(KERBEROS_CONFIG_CONTAINER_DIR),
'chown sdc:sdc {}'.format(JAAS_CONFIG_FILE_PATH)]
cluster.primary_node.execute(' && '.join(commands))

cluster_service_types = {service['type']
for service
in deployment.get_cluster_services(cluster_name=DEFAULT_CLUSTER_NAME)}
Expand Down

0 comments on commit bd7f762

Please sign in to comment.