Kafka Kerberos Enable and Testing.

May 16, 2017 Leave a comment
Apache Kafka is a distributed streaming platform. Kafka 2.0 supports Kerberos authentication, Enabling Kerberos Authentication Using the Wizard on cloudera manager. Courtesy – Apache Kafka
Before we start a little about kafka.
We think of a streaming platform as having three key capabilities:
  • It lets you publish and subscribe to streams of records. In this respect it is similar to a message queue or enterprise messaging system.
  • It lets you store streams of records in a fault-tolerant way.
  • It lets you process streams of records as they occur.
What is Kafka good for?
It gets used for two broad classes of application:
  • Building real-time streaming data pipelines that reliably get data between systems or applications
  • Building real-time streaming applications that transform or react to the streams of data
To understand how Kafka does these things, let’s dive in and explore Kafka’s capabilities from the bottom up.
First a few concepts:
  • Kafka is run as a cluster on one or more servers.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.
Kafka has four core APIs:
  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

Kafka Kerberos Testing.

Let us get started with enabling kafka kerberos and test the setting.

Step 1: To enable Kerberos authentication for Kafka:

Follow the steps below.
  • Install the CDH Kafka v2.0.x parcel and Cloudera Manager 5.5.3 or higher.
  • Enable Kerberos using Cloudera Manager
  • From Cloudera Manager, navigate to Kafka > Configurations.
  • Set SSL client authentication to none.
  • Set Inter Broker Protocol to SASL_PLAINTEXT.
  • Click Save Changes.
  • Restart the kafka service.

Step 2: Check logs if we see the listeners = SASL_PLAINTEXT set configuration set.

Follow below steps
  • look for listeners = SASL_PLAINTEXT is present in the Kafka broker logs /var/log/kafka/server.log.

Step 3: Create a keytab file for current login user.

Creating a keytab.
[ahmed@ahmed-server-kafka-001 ~]$ ktutil
ktutil:  addent -password -p ahmed@AHMED.AHMEDINC.COM -k 1 -e RC4-HMAC
Password for ahmed@AHMED.AHMEDINC.COM: ********
ktutil:  wkt ahmed.keytab
ktutil:  quit

Step 4a: Create a jaas.conf in $HOME directory.

Add below configuration in the file.
[ahmed@ahmed-server-kafka-001 ~]$ cat jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
debug=true
useTicketCache=true
serviceName="kafka"
renewTicker=true
doNotPrompt=true
client=true
principal="ahmed@AHMED.AHMEDINC.COM"
keyTab="/export/home/ahmed/ahmed.keytab"
useKeyTab=true;
};

Step 4b: Create a client.properties file in $HOME directory.

Add below configuration in the file.
[ahmed@ahmed-server-kafka-001 ~]$ cat client.properties
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
[ahmed@ahmed-server-kafka-001 ~]$

Step 5: Adding KAFKA_OPTS to the environment.

Command to get the KAFKA_OPTS set.
[ahmed@ahmed-server-kafka-001 ~]$ export KAFKA_OPTS="-Djava.security.auth.login.config=/export/home/ahmed/jaas.conf"         

Step 6: Creating a topic on kafka cluster.

Command to use.
kafka-topics --create --zookeeper ahmed-server-kafka-001:2181/kafka --replication-factor 3 --partitions 3 --topic test1
Output.
[ahmed@ahmed-server-kafka-001 ~]$ kafka-topics --create --zookeeper ahmed-server-kafka-001:2181/kafka --replication-factor 3 --partitions 3 --topic test1
......
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Client environment:java.compiler=
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Client environment:os.version=2.6.32-642.6.2.el6.centos.plus.x86_64
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Client environment:user.name=ahmed
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Client environment:user.home=/export/home/ahmed
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Client environment:user.dir=/export/home/ahmed
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=ahmed-server-kafka-001.tigris.equif  ax.com:2181/kafka sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@27d9954b
17/05/15 07:01:26 INFO zkclient.ZkClient: Waiting for keeper state SyncConnected
17/05/15 07:01:26 INFO zookeeper.ClientCnxn: Opening socket connection to server ahmed-server-kafka-001.ahmedinc.com/  192.168.12.100:2181. Will not attempt to authenticate using SASL (unknown error)
17/05/15 07:01:26 INFO zookeeper.ClientCnxn: Socket connection established to http://ift.tt/2pPSd9J  .168.12.100:2181, initiating session
17/05/15 07:01:26 INFO zookeeper.ClientCnxn: Session establishment complete on server http://ift.tt/2rbb2rQ, sessionid = 0x655bfe5b19d311e5, negotiated timeout = 30000
17/05/15 07:01:26 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
17/05/15 07:01:26 INFO admin.AdminUtils$: Topic creation {"version":1,"partitions":{"0":[274]}}
Created topic "test1".
17/05/15 07:01:26 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
17/05/15 07:01:26 INFO zookeeper.ZooKeeper: Session: 0x655bfe5b19d311e5 closed
17/05/15 07:01:26 INFO zookeeper.ClientCnxn: EventThread shut down

Step 7: Checking for the created topics.

Command to execute.
kafka-topics --list --zookeeper ahmed-server-kafka-001:2181/kafka
Output.
[ahmed@ahmed-server-kafka-001 ~]$ kafka-topics --list --zookeeper ahmed-server-kafka-001:2181/kafka
17/05/15 08:51:13 INFO zkclient.ZkClient: JAAS File name: /export/home/ahmed/jaas.conf
17/05/15 08:51:13 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.5-cdh5.4.0-SNAPSHOT--1, built on 04/01/2015 08:35 GMT
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:host.name=ahmed-server-kafka-001.ahmedinc.com
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:java.version=1.7.0_67
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:java.home=/usr/java/jdk1.7.0_67-cloudera/jre
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:java.class.path=:/opt/cloudera/parcels/KAFKA-2.0.2-1.2.0.2.p ...... /bin/../libs/guava-18.0.jar
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:java.compiler=
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:os.version=2.6.32-642.6.2.el6.centos.plus.x86_64
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:user.name=ahmed
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:user.home=/export/home/ahmed
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Client environment:user.dir=/export/home/ahmed
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=ahmed-server-kafka-001:2181/kafka sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@1381ee41
17/05/15 08:51:13 INFO zkclient.ZkClient: Waiting for keeper state SyncConnected
17/05/15 08:51:13 WARN zookeeper.ClientCnxn: SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/export/home/ahmed/jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
17/05/15 08:51:13 INFO zookeeper.ClientCnxn: Opening socket connection to server http://ift.tt/2rbb2rQ
17/05/15 08:51:13 INFO zkclient.ZkClient: zookeeper state changed (AuthFailed)
17/05/15 08:51:13 INFO zookeeper.ClientCnxn: Socket connection established to http://ift.tt/2rbb2rQ, initiating session
17/05/15 08:51:13 INFO zookeeper.ClientCnxn: Session establishment complete on server http://ift.tt/2rbb2rQ, sessionid = 0x655bfe5b19d3127d, negotiated timeout = 30000
17/05/15 08:51:13 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
__consumer_offsets
test1
testz
17/05/15 08:51:13 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
17/05/15 08:51:13 INFO zookeeper.ZooKeeper: Session: 0x655bfe5b19d3127d closed
17/05/15 08:51:13 INFO zookeeper.ClientCnxn: EventThread shut down

Step 8: Creating a console producer to generate messages.

Command to execute.
kafka-console-producer --broker-list ahmed-server-kafka-001.ahmedinc.com:9092 --topic test1 --producer.config client.properties
Output.
[ahmed@ahmed-server-kafka-001 ~]$ kafka-console-producer --broker-list ahmed-server-kafka-001.ahmedinc.com:9092 --topic test1 --producer.config client.properties
17/05/15 08:34:13 INFO producer.ProducerConfig: ProducerConfig values:
        request.timeout.ms = 1500
        retry.backoff.ms = 100
        buffer.memory = 33554432
        ssl.truststore.password = null
        batch.size = 16384
        ssl.keymanager.algorithm = SunX509
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.key.password = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.provider = null
        sasl.kerberos.service.name = kafka
        max.in.flight.requests.per.connection = 5
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [ahmed-server-kafka-001.ahmedinc.com:9092]
        client.id = console-producer
        max.request.size = 1048576
        acks = 0
        linger.ms = 1000
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        metadata.fetch.timeout.ms = 60000
        ssl.endpoint.identification.algorithm = null
        ssl.keystore.location = null
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        ssl.truststore.location = null
        ssl.keystore.password = null
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        block.on.buffer.full = false
        metrics.sample.window.ms = 30000
        metadata.max.age.ms = 300000
        security.protocol = SASL_PLAINTEXT
        ssl.protocol = TLS
        sasl.kerberos.min.time.before.relogin = 60000
        timeout.ms = 30000
        connections.max.idle.ms = 540000
        ssl.trustmanager.algorithm = PKIX
        metric.reporters = []
        compression.type = none
        ssl.truststore.type = JKS
        max.block.ms = 60000
        retries = 3
        send.buffer.bytes = 102400
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        reconnect.backoff.ms = 50
        metrics.num.samples = 2
        ssl.keystore.type = JKS

Debug is  true storeKey false useTicketCache true useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /export/home/ahmed/ahmed.keytab refreshKrb5Config is false principal is ahmed@AHMED.AHMEDINC.COM tryFirstPass is false useFirstPass is false storePass is false clearPass is false
Acquire TGT from Cache
Principal is ahmed@AHMED.AHMEDINC.COM
null credentials from Ticket Cache
principal is ahmed@AHMED.AHMEDINC.COM
Will use keytab
Commit Succeeded

17/05/15 08:34:14 INFO kerberos.Login: Successfully logged in.
17/05/15 08:34:14 INFO kerberos.Login: TGT refresh thread started.
17/05/15 08:34:14 INFO kerberos.Login: TGT valid starting at: Mon May 15 08:34:13 UTC 2017
17/05/15 08:34:14 INFO kerberos.Login: TGT expires: Mon May 15 18:34:13 UTC 2017
17/05/15 08:34:14 INFO kerberos.Login: TGT refresh sleeping until: Mon May 15 16:52:23 UTC 2017
17/05/15 08:34:14 INFO utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2
17/05/15 08:34:14 INFO utils.AppInfoParser: Kafka commitId : unknown
Hello World
How are you
This is a test message from test1 Topic
This is a test message 2 from test1 topic
message 3
^C17/05/15 08:37:05 INFO producer.KafkaProducer: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
17/05/15 08:37:05 WARN kerberos.Login: TGT renewal thread has been interrupted and will exit.
[ahmed@ahmed-server-kafka-001 ~]$

Step 9: Create a duplicate session and login as a consumer.

Follow below steps.
  1. Login to a different server (or create a duplicate session)
  2. Set KAFKA_OPTS for the environment.
  3. Execute below command.
Command.
kafka-console-consumer --new-consumer --topic test1 --from-beginning --bootstrap-server ahmed-server-kafka-001.ahmedinc.com:9092 --consumer.config client.properties
Output.
[ahmed@ahmed-server-kafka-001 ~]$ kafka-console-consumer --new-consumer --topic test1 --from-beginning --bootstrap-server ahmed-server-kafka-001.ahmedinc.com:9092 --consumer.config client.properties
17/05/15 08:36:21 INFO consumer.ConsumerConfig: ConsumerConfig values:
        request.timeout.ms = 40000
        check.crcs = true
        retry.backoff.ms = 100
        ssl.truststore.password = null
        ssl.keymanager.algorithm = SunX509
        receive.buffer.bytes = 65536
        ssl.cipher.suites = null
        ssl.key.password = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.provider = null
        sasl.kerberos.service.name = kafka
        session.timeout.ms = 30000
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [ahmed-server-kafka-001.ahmedinc.com:9092]
        client.id =
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        auto.offset.reset = earliest
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        ssl.endpoint.identification.algorithm = null
        max.partition.fetch.bytes = 1048576
        ssl.keystore.location = null
        ssl.truststore.location = null
        ssl.keystore.password = null
        metrics.sample.window.ms = 30000
        metadata.max.age.ms = 300000
        security.protocol = SASL_PLAINTEXT
        auto.commit.interval.ms = 5000
        ssl.protocol = TLS
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.trustmanager.algorithm = PKIX
        group.id = console-consumer-7657
        enable.auto.commit = true
        metric.reporters = []
        ssl.truststore.type = JKS
        send.buffer.bytes = 131072
        reconnect.backoff.ms = 50
        metrics.num.samples = 2
        ssl.keystore.type = JKS
        heartbeat.interval.ms = 3000

Debug is  true storeKey false useTicketCache true useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /export/home/ahmed/ahmed.keytab refreshKrb5Config is false principal is ahmed@AHMED.AHMEDINC.COM tryFirstPass is false useFirstPass is false storePass is false clearPass is false
Acquire TGT from Cache
Principal is ahmed@AHMED.AHMEDINC.COM
null credentials from Ticket Cache
principal is ahmed@AHMED.AHMEDINC.COM
Will use keytab
Commit Succeeded

17/05/15 08:36:21 INFO kerberos.Login: Successfully logged in.
17/05/15 08:36:21 INFO kerberos.Login: TGT refresh thread started.
17/05/15 08:36:21 INFO kerberos.Login: TGT valid starting at: Mon May 15 08:36:20 UTC 2017
17/05/15 08:36:21 INFO kerberos.Login: TGT expires: Mon May 15 18:36:20 UTC 2017
17/05/15 08:36:21 INFO kerberos.Login: TGT refresh sleeping until: Mon May 15 16:55:07 UTC 2017
17/05/15 08:36:21 INFO utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2
17/05/15 08:36:21 INFO utils.AppInfoParser: Kafka commitId : unknown
17/05/15 08:36:22 INFO internals.AbstractCoordinator: Discovered coordinator ahmed-server-kafka-006.ahmedinc.com:9092 (id: 2147483367) for group console-consumer-7657.
17/05/15 08:36:22 INFO internals.ConsumerCoordinator: Revoking previously assigned partitions [] for group console-consumer-7657
17/05/15 08:36:22 INFO internals.AbstractCoordinator: (Re-)joining group console-consumer-7657
17/05/15 08:36:22 INFO internals.AbstractCoordinator: Marking the coordinator ahmed-server-kafka-006.ahmedinc.com:9092 (id: 2147483367) dead for group console-consumer-7657
17/05/15 08:36:22 INFO internals.AbstractCoordinator: Discovered coordinator ahmed-server-kafka-006.ahmedinc.com:9092 (id: 2147483367) for group console-consumer-7657.
17/05/15 08:36:22 INFO internals.AbstractCoordinator: (Re-)joining group console-consumer-7657
17/05/15 08:36:23 INFO internals.AbstractCoordinator: Successfully joined group console-consumer-7657 with generation 1
17/05/15 08:36:23 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [test1-0] for group console-consumer-7657
Hello World
How are you
This is a test message from test1 Topic
This is a test message 2 from test1 topic
message 3
^C17/05/15 08:37:10 WARN kerberos.Login: TGT renewal thread has been interrupted and will exit.
Processed a total of 5 messages
17/05/15 08:37:10 INFO zkclient.ZkClient: JAAS File name: /export/home/ahmed/jaas.conf
17/05/15 08:37:10 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.5-cdh5.4.0-SNAPSHOT--1, built on 04/01/2015 08:35 GMT
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:host.name=ahmed-server-kafka-001.ahmedinc.com
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:java.version=1.7.0_67
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:java.home=/usr/java/jdk1.7.0_67-cloudera/jre
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:java.class.path=:/opt/cloudera/parcels/KAFKA-2.0...... n/../libs/guava-18.0.jar
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:java.compiler=
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:os.version=2.6.32-642.6.2.el6.centos.plus.x86_64
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:user.name=ahmed
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:user.home=/export/home/ahmed
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Client environment:user.dir=/export/home/ahmed
17/05/15 08:37:10 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=null sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@26132588
17/05/15 08:37:10 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.

from Blogger http://ift.tt/2pQ3WVQ
via IFTTT

Advertisements
Categories: Others Tags: ,

Cloudera Manager – Duplicate entry ‘zookeeper’ for key ‘NAME’.

May 16, 2017 Leave a comment
We had recently built a cluster using cloudera API’s and had all the services running on it with Kerberos enabled.
Next we had a requirement to add another kafka cluster to our already exsisting cluster in cloudera manager. Since it is a quick task to get the zookeeper and kafka up and running.
We decided to get this done using the cloudera manager instead of the API’s. But we faced the Duplicate entry 'zookeeper' for key 'NAME' issue as described in the bug below.
I have set up two clusters that share a Cloudera Manger.
The first I set up with the API and created the services with capital letter names, e.g., ZOOKEEPER, HDFS, HIVE.
Now, I add the second cluster using the Wizard.

Add Cluster->Select Hosts->Distribute Parcels->Select base HDFS Cluster install

On the next page i get SQL errros telling that the services i want to add already exist. 
I suspect that the check for existing service names does not include capitalized letters.
Note that renaming the services does not help, as it would only change the DisplayName in the database and not the Name column, 
which is unfortunately also a key column. 

Here the excerpt of the error message (attached full log).
javax.persistence.PersistenceException:org.hibernate.exception.ConstraintViolationException: could not perform addBatch

at AbstractEntityManagerImpl.java line 1387
in org.hibernate.ejb.AbstractEntityManagerImpl convert()
Caused by: java.sql.BatchUpdateException:Duplicate entry 'zookeeper' for key 'NAME'

at PreparedStatement.java line 2024
in com.mysql.jdbc.PreparedStatement executeBatchSerially()
Solutions:
For now solution was to use the API and change the names to other than what we had used earlier. 
Example we used `ZOOKEEPER` in out earlier build, so we changed it to `ZOOKEEPER001` in the new build. 

NOTE: Apparently Cloudera manager does not look for CAPITAL `ZOOKEEPER`, 
so if it did, then CM would generated a different name automatically.
And yes this bug was fixed recently and might take a couple of versions to see it on the stable release.
Issue is that when we deploy our services using API we named them as ZOOKEEPER (all caps) but cloudera manager check for all versions except ‘Capital’.
so it continues to build and fail with Duplicate error. If it detects then it would create a different name automatically.
Since this was not working, current workaround is to deploy the services using Cloudera API using a different name (Currently named as ZOOKEEPER001/KAFKA001) .
Another fix would be to change the API script to change the service name to SERVICE_NAME_, example: ‘ZOOKEEPER_HAD’, ‘ZOOKEEPER_KAF’ or a number ZOOKEEPER_1.

from Blogger http://ift.tt/2rbvj0x
via IFTTT

Categories: Others Tags: ,

Parcel Not Distributing Cloudera CDH.

March 27, 2017 Leave a comment
We were deploying one of the cluster on our lab environment which is used by everyone.
So the lab has it own share of stale information on it.
During installation we notice that the distribution is not working. There could be couple of reasons.
This was the second time we are having this issue.
  1. Check /etc/hosts file if we have all the server names added correctly
  2. Second reason would be due to the fact that, one of the installation was terminated midway, leaving a stale config which set the status to ACTIVATING for CDH parcel. So when we try to install, parcel was not distributing.
  3. Again there could be similar issue if we do not have enough space on the node for /opt/cloudera.

Solution:

  • Deactivating parcel and retry.
    curl -u username:password -X POST http://adminnode:7180/api/v14/clusters//parcels/products/CDH/versions/5.10.0-1.cdh5.10.0.p0.41/commands/deactivate
    
  • Check for space and increase space for /opt/cloudera/
  • Most of the time should see the issue on the logs, server-logs: /var/log/cloudera-scm-server/cloudera-scm-server.log or agent-logs: /var/log/cloudera-scm-agent/cloudera-scm-agent.log

from Blogger http://ift.tt/2o70m9m
via IFTTT

Categories: Others Tags: ,

Creating /etc/hosts file in Chef.

March 26, 2017 Leave a comment
We had a cluster environment which we needed to update the /etc/hosts file. Which would help communicate between the server over a private network. Our servers have multiple interfaces and we need them to communicate between each other using the private network.
Goal for us is to create a /etc/hosts with all the nodes within the cluster with their private IP addresses.

Chef Setup (Assumptions).

  • We have multiple cluster.
  • Each cluster has an environment set.
  • We have a multiple interfaces on each node (But this solutions should work for single interface as well).

    Steps we do to get information from each node.

  1. We take 3 attributes to look for in the cluster.
    1. Each server with the cluster have specific fqdn.
    2. Private IP (string search).
    3. Interface to look for on the node.
  2. all_nodes_in_cluster will get all the nodes with that fqdn (This can be change based on requirement to node, tags, roles, hostname).
  3. For every node we look for the specific interface and get the private IP.

    Before we start.

    Before we start we need to check for our search criteria using knife search more details here
    Below if an example to look for server fqdn containing string env-lab-1.
    knife search node "fqdn:*env-lab-1*"
    

    About the Recipe.

The interface information on hash is located as below
 node_in_cluster['network']['interfaces']['bond.007']['addresses']
This has a Dictionary with multiple values, we are specifically looking for IP.
 if private_interface[0].include? node['private_ip_search_filter']
Above we are looking for the interface which matches out search filter. Required information is in private_interface[0]
Here is how we would write the information in our /etc/hosts file, IP,FQDN,HOSTNAME.
 puts "#{private_interface[0]} #{node_in_cluster['fqdn']} #{node_in_cluster['hostname']}"
Here is complete ruby code which does the same thing as in the erb template file.
 all_nodes_in_cluster.each do |node_in_cluster|
   node_in_cluster['network']['interfaces'][int_to_look]['addresses'].each do |private_interface|
     if private_interface[0].include? node['private_ip_search_filter']
       puts "#{private_interface[0]} #{node_in_cluster['fqdn']} #{node_in_cluster['hostname']}"
     end
   end
 end

Attribute File.

 # Example :  We take the search Criteria to generate /etc/hosts
 default['env_search_filter'] = "fqdn:*lab-env-1*"
 default['private_ip_search_filter'] = "192.168.149"
 default['interface_to_look'] = 'bond.007'

Recipe

 # Search Criteria
 all_nodes_in_cluster = search(:node, node['env_search_filter'])
 int_to_look = node['interface_to_look']

 template '/etc/hosts' do
   source 'etc_hosts_file.erb'
   mode '0755'
   owner 'root'
   group 'root'
   variables({
     all_nodes_in_cluster: all_nodes_in_cluster,
     int_to_look: int_to_look,
     private_ip_search_filter: node['private_ip_search_filter']
   })
 end

Template File etc_hosts_file.erb.

 127.0.0.1       localhost localhost.localdomain localhost4 localhost4.localdomain4
 <% @all_nodes_in_cluster.each do |node_in_cluster| -%>
   <% node_in_cluster['network']['interfaces'][@int_to_look]['addresses'].each do |private_interface| -%>
     <% if private_interface[0].include? @private_ip_search_filter -%>
 <%= private_interface[0] %>     <%= node_in_cluster['fqdn'] %>      <%= node_in_cluster['hostname'] %>  # Serial Number: <%= node_in_cluster['dmi']['system']['serial_number'] %> ( <%= node_in_cluster['dmi']['system']['manufacturer'] %> ) <%= node_in_cluster['dmi']['system']['product_name'] %>
     <% end -%>
   <% end -%>
 <% end -%>
 ::1     localhost localhost.localdomain localhost6 localhost6.localdomain6

Disclaimer.

  1. This does not look like an optimized solution, but something which worked for me.
  2. search method which will be run every 30mins, will query to get all information for all the nodes, which I think would be time/bandwidth consuming operation if we have a very large cluster. (A single node information was about 10000 lines of ruby Hash for our nodes).
  3. If any one has a better way to do it, please post it in the comments below. Thanks 🙂

from Blogger http://ift.tt/2nnNYn6
via IFTTT

Categories: Others Tags: ,

Enable Kerberos Using Cloudera API.

March 25, 2017 Leave a comment
Python API for cloudera is really nice, apart from getting the cluster setup, we can also do configuration and automation. We use a lot of automation using Chef/Ansible, but cloudera API give more control over the cluster.
One of the awesome features of cloudera API is to setup kerberos for the cluster, which otherwise done manually (command line) is very tricky task (loads of this to go wrong).
For our kerberos setup we will need to complete below steps.
  1. Cloudera Manager Configuration.
  2. Stop Cluster.
  3. Stop Cloudera Manager service.
  4. Service configuration for all the services which needs updates like HDFS, HBASE, KAFKA, ZOOKEEPER, HUE.
  5. Creating KT_RENEWER for HUE service.
  6. Start Cloudera Manager Services.
  7. Start Cluster.

Steps To kerberize Cloudera Cluster

Step 1: Configure cloudera with LDAP information.
 #
 # Deploy LDAP configuration for CM
 #
 self.deploy_cm_ldap_configuration()
Step 2: Import Admin credentials, using the username / password, which has permission to create/delete AD service accounts.
 #
 # Creating Admin Credentials
 #
 self.cm_import_admin_credentials()
Step 3: Stopping cloudera cluster services.
 #
 # Stopping Cluster
 #
 self.gets_cm_cluster.stop().wait()
 logging.debug("Waiting for CLUSTER to stop completely !!!")
 time.sleep(5)
Step 4: Stopping cloudera management services.
 #
 # Stopping CM services
 #
 self.get_mgmt_services.stop().wait()
 logging.debug("Waiting for CM to stop completely !!!")
 time.sleep(5)
Step 5: Update Zookeeper configuration.
 #
 # Deploy Kerberos Config Zookeeper
 #
 logging.debug("Deploy Service Configuration!!!")
 zk_service = Zookeeper(self.cluster, self.config, self.cloudera_manager)
 zk_service.update_configuration()
 logging.debug("Deploy Service Configuration COMPLETE!!!")
Step 6: Update HDFS configuration.
 #
 # Deploy Kerberos Config Hdfs
 #
 logging.debug("Deploy Service Configuration!!!")
 hdfs_service = Hdfs(self.cluster, self.config, self.cloudera_manager)
 hdfs_service.update_configuration()
 logging.debug("Deploy Service Configuration COMPLETE!!!")
Step 6: Update HBASE configuration.
 #
 # Deploy Kerberos Config Hbase
 #
 logging.debug("Deploy Service Configuration!!!")
 hbase_service = Hbase(self.cluster, self.config, self.cloudera_manager)
 hbase_service.update_configuration()
 logging.debug("Deploy Service Configuration COMPLETE!!!")
Step 7: Update KAFKA configuration.
 #
 # Deploy Kerberos Config Kafka
 #
 logging.debug("Deploy Service Configuration!!!")
 kafka_service = Kafka(self.cluster, self.config, self.cloudera_manager)
 kafka_service.update_configuration()
 logging.debug("Deploy Service Configuration COMPLETE!!!")
Step 8: Update HUE configuration.
 #
 # Deploy Kerberos Config Hue
 #
 logging.debug("Deploy Service Configuration!!!")
 hue_service = Hue(self.cluster, self.config, self.cloudera_manager)
 hue_service.update_configuration()
 hue_service.add_service_kt_renewer_to_cluster()
 logging.debug("Deploy Service Configuration COMPLETE!!!")
Step 9: Generating Credentials.
 #
 # Generated kerberos credentials in AD.
 #
 self.cm_generate_credentials()
 time.sleep(5)
Step 10: Deploy Client Configuration.
 #
 # Deploy Client Configuration
 #
 logging.info("Deploying Client Config...")
 self.deploy_client_configuration()
 logging.debug("Waiting for CLUSTER to deploy completely !!!")
 time.sleep(5)
Step 11: Starting CM Services.
 #
 # Starting CM services.
 #
 self.get_mgmt_services.start().wait()
 logging.debug("Waiting for CM to start completely !!!")
 time.sleep(5)
Step 12: Starting cluster services.
 #
 # Restart Cluster based on stale configuration and redeploy config if required.
 #
 self.gets_cm_cluster.start().wait()
 logging.info("Cluster Kerberos Deployment Complete.")

Important configuration information.

LDAP setup.
 'cm_ldap': {
     'KDC_HOST': 'adserver1.service.ahmedinc.com',
     'SECURITY_REALM': 'SERVICE.AHMEDINC.COM',
     'KRB_ENC_TYPES': 'rc4-hmac',
     'KDC_ACCOUNT_CREATION_HOST_OVERRIDE': 'adserver2.service.ahmedinc.com',
     'AD_KDC_DOMAIN': 'OU=accounts,OU=test-lab-ou,OU=bigdata,DC=service,DC=ahmedinc,DC=com',
     'AD_DELETE_ON_REGENERATE': True,
     'AD_ACCOUNT_PREFIX': 'Lab1Test'
   }
LDAP credentials for user with create, delete, modify permissions.
 'cm_kdc_import_credentials': {
     'kdc_password': 'Ahm3d@123',
     'kdc_username': 'service-acc-lab@SERVICE.AHMEDINC.COM'
 }
Configuration to connect to Cloudera Manager.
 'cm': {
     'username': 'admin',
     'tls': False,
     'host': 'server-admin-node.ahmedinc.com',
     'api-version': 13,
     'password': 'admin',
     'port': 7180
 }
Cluster configuration.
 'cluster': {
     'version': 'CDH5',
     'hosts': [
       'server-admin-node.ahmedinc.com',
       'server-edge-node.ahmedinc.com',
       'server-worker-node.ahmedinc.com'
     ],
     'name': 'AutomatedHadoopCluster',
     'fullVersion': '5.8.3'
 }
Service Configuration Hdfs.
 'HDFS': {
   'config': {
     'hadoop_security_authentication': 'kerberos',
     'trusted_realms': 'USERACC.AHMEDINC.COM',
     'hadoop_security_authorization': True
   },
   'roles': [
     {
       'config': {
         'dfs_datanode_http_port': 1006,
         'dfs_datanode_data_dir_perm': 700,
         'dfs_datanode_port': 1004
       },
       'hosts': [
         'server-admin-node.ahmedinc.com',
         'server-edge-node.ahmedinc.com',
         'server-worker-node.ahmedinc.com'
       ],
       'group': 'DATANODE'
     }
   ]
 }
Service Configuration Hbase.
 'HBASE': {
   'config': {
     'hbase_thriftserver_security_authentication': 'auth',
     'hbase_security_authorization': True,
     'hbase_security_authentication': 'kerberos'
   }
 }
Service Configuration Zookeeper.
 'ZOOKEEPER': {
   'config': {
     'enableSecurity': True
   }
 }
Service Configuration Kafka.
 'KAFKA': {
   'config': {
     'kerberos.auth.enable': True
   }
 }
Service Configuration Hue.
 'HUE': {
   'roles': [
     {
       'group': 'KT_RENEWER',
       'hosts': [
         'server-admin-node.ahmedinc.com'
       ]
     }
   ]
 }
NOTE : In a kerberos setup, when we do a update_config, generate_missing_credentials command is triggered.
  • we need to wait for this command to complete, before we start the next update_config or else generate_credential command will FAIL.
  • get_commands will return a List of ApiCommand responses. we iterate through all the command to complete. [currently we will receive only one command]
  • get_commands
  • Code snippet.
    current_running_commands = self.cm.get_commands()
    logging.debug("Current Running Commands: " + str(current_running_commands))
    if not current_running_commands:
      logging.info("Currently no command is running.")
    elif current_running_commands:
      for cmd in current_running_commands:
          if cmd.wait().success:
              logging.info("Command Completed. moving on..")
    else:
      logging.error("We are in ELSE, something went wrong. :(")
      sys.exit(1)
    
    logging.info("Configuration Updated Successfully..")
    time.sleep(5)
    
    Update Configuration for Cloudera Manager LDAP.
    def deploy_cm_ldap_configuration(self):
      """
          Updating LDAP configuratio no CM server.
          This will require a `sudo servive cloudera-scm-server restart`, we are doing a restart in the end of the script.
      :return:
      """
      cm_api_handle = ApiResource(config['cm']['host'],
                                            config['cm']['port'],
                                            config['cm']['username'],
                                            config['cm']['password'],
                                            config['cm']['tls'],
                                            version=config['cm']['api-version'])
      cluster = cm_api_handle.get_cluster(config['cluster']['name'])
      cloudera_manager = cm_api_handle.get_cloudera_manager()
      cloudera_manager.update_config(self.config['cm_ldap'])
    
    Example Configuration update for Zookeeper.
    def update_configuration(self):
      """
          Update service configurations
      :return:
      """
      cm_api_handle = ApiResource(config['cm']['host'],
                                            config['cm']['port'],
                                            config['cm']['username'],
                                            config['cm']['password'],
                                            config['cm']['tls'],
                                            version=config['cm']['api-version'])
      cluster = cm_api_handle.get_cluster(config['cluster']['name'])
      get_service = cluster.get_service('ZOOKEEPER')
      get_service.update_config(config['ZOOKEEPER']['config'])
      logging.info("Service Configuration Updated.")
    
      #   In a kerberos setup, when we do a `update_config`
      #      `generate_missing_credentials` command is triggered.
      #      we need to wait for this command to complete,
      #      before we start the next `update_config`
      #      or else `generate_credential` command will FAIL.
    
      cm = cm_api_handle.get_cloudera_manager()
      current_running_commands = cm.get_commands()
      logging.debug("Current Running Commands: " + str(current_running_commands))
      if not current_running_commands:
          logging.info("Currently no command is running.")
      elif current_running_commands:
          for cmd in current_running_commands:
              if cmd.wait().success:
                  logging.info("Command Completed. moving on..")
      else:
          logging.error("We are in ELSE, something went wrong. :(")
          sys.exit(1)
    
      logging.info("Configuration Updated Successfully..")
      time.sleep(5)
    
    Creating KT_RENEWER code snippet.
 def create_kt_renewer_service(self):
     
     
     cm_api_handle = ApiResource(config['cm']['host'],
                                           config['cm']['port'],
                                           config['cm']['username'],
                                           config['cm']['password'],
                                           config['cm']['tls'],
                                           version=config['cm']['api-version'])
     cluster = cm_api_handle.get_cluster(config['cluster']['name'])
     get_service = cluster.get_service('HUE')
     
     
     for role in self.config['HUE']['roles']:
         role_group = self.get_service.get_role_config_group("{0}-{1}-BASE".format('HUE', role['group']))
         # Update the group's configuration.
         # [https://cloudera.github.io/cm_api/epydoc/5.10.0/cm_api.endpoints.role_config_groups.ApiRoleConfigGroup-class.html#update_config]
         role_group.update_config(role.get('config', {}))
         self.create_roles(get_service, role, role['group']) 
Creating Roles.
 def create_roles(self, service, role, group):
     """
     Create individual roles for all the hosts under a specific role group
     
     :param role: Role configuration from yaml
     :param group: Role group name
     """
     role_id = 0
     for host in role.get('hosts', []):
         role_id += 1
         role_name = '{0}-{1}-{2}'.format('HUE', group, role_id)
         logging.info("Creating Role name as: " + str(role_name))
         try:
             service.get_role(role_name)
         except ApiException:
             service.create_role(role_name, group, host) 
Example Configuration update for Zookeeper.

from Blogger http://ift.tt/2n2oPvG

via IFTTT

Categories: Others Tags: ,

Setting Up HDFS Services Using Cloudera API [Part 3]

March 24, 2017 Leave a comment
This is the second follow up post. In the earlier post
  1. Create a cluster.
  2. Install HDFS service to our cluster.
    Creating a Cluster and setting up parcel is part of earlier post.

Install HDFS Service.

HDFS service is installed in stages.
  1. Create a HDFS service (if not exist).
  2. Update configuration for our newly create HDFS service.
  3. Create HDFS roles (NAMENODE, SECONDARYNAMENODE, DATANODE, GATEWAY) on the Cluster.
  4. Format Namenode.
  5. Start HDFS service.
  6. Create Temporary /tmp directory in HDFS

    Create a HDFS service.

    This is simple create a service if it does not exist.
    def create_service(cluster):
     try:
         zk_service = cluster.get_service('HDFS')
         logging.debug("Service {0} already present on the cluster".format('HDFS'))
     except ApiException:
         #
         # Create service if it the first time.
         #
         zk_service = cluster.create_service('HDFS', 'HDFS')
         logging.info("Created New Service: HDFS")
    
     return zk_service
    

    Update configuration for HDFS.

    This information is picked up from the configuration yaml file.
    yaml file.
    HDFS:
     config:
       dfs_replication: 3
       dfs_permissions: false
       dfs_block_local_path_access_user: impala,hbase,mapred,spark
     roles:
       - group: NAMENODE
         hosts:
           - mycmhost.ahmed.com
         config:
           dfs_name_dir_list: /data/1/dfs/nn,/data/2/dfs/nn
           dfs_namenode_handler_count: 30
       - group: SECONDARYNAMENODE
         hosts:
           - mycmhost.ahmed.com
         config:
           fs_checkpoint_dir_list: /data/1/dfs/snn,/data/2/dfs/snn
    
       - group: DATANODE
         hosts:
           - mycmhost.ahmed.com
         config:
           dfs_data_dir_list: /data/1/dfs/dn,/data/2/dfs/dn
           dfs_datanode_handler_count: 30
           #dfs_datanode_du_reserved: 1073741824
           dfs_datanode_failed_volumes_tolerated: 0
           dfs_datanode_data_dir_perm: 755
       - group: GATEWAY
         hosts:
           - mycmhost.ahmed.com
         config:
           dfs_client_use_trash: true
    
    Code snippet.
    def service_update_configuration(zk_service):
     """
         Update service configurations
     :return:
     """
     zk_service.update_config(config['services']['HDFS']['config'])
     logging.info("Service Configuration Updated.")
    

    Create HDFS roles (NAMENODE, SECONDARYNAMENODE, DATANODE, GATEWAY) on the Cluster.

    To create all the roles.
  • Each role needs to be unique on each host.
  • We create a unique role_name for each node.
    Each role is unique based on below set of strings. (service_name, group, role_id)
    role_name = '{0}-{1}-{2}'.format(service_name, group, role_id)
    
    Here is the code snippet.
    def hdfs_create_cluster_services(config, service, service_name):
      """
          Creating Cluster services
      :return:
      """
    
      #
      # Get the role config for the group
      # Update group configuration and create roles.
      #
      for role in config['services'][service_name]['roles']:
          role_group = service.get_role_config_group("{0}-{1}-BASE".format(service_name, role['group']))
          #
          # Update the group's configuration.
          # [http://ift.tt/2o1ijoO]
          #
          role_group.update_config(role.get('config', {}))
          #
          # Create roles now.
          #
          hdfs_create_roles(service, service_name, role, role['group'])
    
    def hdfs_create_roles(service, service_name, role, group):
      """
      Create individual roles for all the hosts under a specific role group
    
      :param role: Role configuration from yaml
      :param group: Role group name
      """
      role_id = 0
      for host in role.get('hosts', []):
          role_id += 1
          role_name = '{0}-{1}-{2}'.format(service_name, group, role_id)
          logging.info("Creating Role name as: " + str(role_name))
          try:
              service.get_role(role_name)
          except ApiException:
              service.create_role(role_name, group, host)
    

Format Namenode.

First time when we create a HDFS environment we need to format namenode, this init the HDFS cluster.
format_hdfs method returns as ApiCommand which we can track progress of execution.
 def format_namenode(hdfs_service, namenode):
     try:
         #
         # Formatting HDFS - this will have no affect the second time it runs.
         # Format NameNode instances of an HDFS service.
         #
         # http://ift.tt/2nc7voC
         #

         cmd = hdfs_service.format_hdfs(namenode)[0]
         logging.debug("Command Response: " + str(cmd))
         if not cmd.wait(300).success:
             print "WARNING: Failed to format HDFS, attempting to continue with the setup"
     except ApiException:
         logging.info("HDFS cannot be formatted. May be already in use.")

Start HDFS service

We do this using the service.start() method. This method return ApiCommand which we can track the progress and wait for the service to start using cmd.wait().success
More details about the Api here
Our service should be up and running.

Finally Creating /tmp directory on HDFS.

When we create a HDFS cluster we create a /tmp directory, HDFS /tmp directory is used as a temporary storage during mapreduce operation.
Mapreduce artifacts, intermediate data will be kept under this directory. If we delete the /tmp contents then any MR jobs currently running will loose its current intermediate data.
Any MR run after the /tmp is clear will still work without any issues.
Creating /tmp is done using create_hdfs_tmp method which returns ApiCommand response.

Code Location

from Blogger http://ift.tt/2nhFBct
via IFTTT

Categories: Others Tags: ,

Setting Up Zookeeper Services Using Cloudera API [Part 2]

March 23, 2017 Leave a comment
This is the second follow up post. In the earlier post Setting Up Cloudera Manager Services Using Cloudera API [Part 1] we install the cloudera management services. Now we will be installing Zookeeper service to the cluster.
But first we need to to couple of things before we install Zookeeper.
  1. Create a cluster.
  2. Download, Distribute, Activate CDH Parcels.
  3. Install Zookeeper service to our cluster.

Creating a Cluster

First we create a cluster if it does not exists. Config details below.
def init_cluster(cm_api_handle):
    try:
        cluster = cm_api_handle.get_cluster(config['cluster']['name'])
        return cluster
    except ApiException:
        cluster = cm_api_handle.create_cluster(config['cluster']['name'],
                                                config['cluster']['version'],
                                                config['cluster']['fullVersion'])
If it is the first time then we need to add all the hosts to the cluster (including the admin node), this information is coming from the configuration yaml file.
# Basic cluster information
cluster:
  name: AutomatedHadoopCluster
  version: CDH5
  fullVersion: 5.8.3
  hosts:
     - mycmhost.ahmed.com
In the above yaml snippet hosts will have all the hosts in the cluster, since we are testing on a VM with just one host we have that host added there.

    cluster_hosts = []
    #
    # Picking up all the nodes from the yaml configuration.
    #
    for host_in_cluster in cluster.list_hosts():
        cluster_hosts.append(host_in_cluster)

    hosts = []

    #
    # Create a host list, make sure we dont have duplicates.
    #
    for host in config['cluster']['hosts']:
        if host not in cluster_hosts:
            hosts.append(host)

    #
    # Adding all hosts to the cluster.
    #
    cluster.add_hosts(hosts)
    return cluster
Once we have the cluster ready then we can install the parcels to the cluster.

Download, Distribute, Activate CDH Parcels

For Parcels there are three major methods which initiate the parcels distribution.
  1. parcel.start_download()
  2. parcel.start_distribution()
  3. parcel.activate()
These are the three methods which do all the work. When the start_download command is running we need to keep track of the progress, this is done using the get_parcels method.
get_parcelshttp://ift.tt/2nD3tbG
#
# When we execute and parcel download/distribute/activate command
# we can track the progress using the `get_parcel` method.
# This return a JSON described here : http://ift.tt/2nD3tbG
# We can check progress by checking `stage`
#
#   AVAILABLE_REMOTELY: Stable stage - the parcel can be downloaded to the server.
#   DOWNLOADING: Transient stage - the parcel is in the process of being downloaded to the server.
#   DOWNLOADED: Stable stage - the parcel is downloaded and ready to be distributed or removed from the server.
#   DISTRIBUTING: Transient stage - the parcel is being sent to all the hosts in the cluster.
#   DISTRIBUTED: Stable stage - the parcel is on all the hosts in the cluster. The parcel can now be activated, or removed from all the hosts.
#   UNDISTRIBUTING: Transient stage - the parcel is being removed from all the hosts in the cluster>
#   ACTIVATING: Transient stage - the parcel is being activated on the hosts in the cluster. New in API v7
#   ACTIVATED: Steady stage - the parcel is set to active on every host in the cluster. If desired, a parcel can be deactivated from this stage.
#
We track the progress of each stage using the snippet below.
def check_current_state(cluster, product, version, states):
    logging.info("Checking Status for Parcel.")
    while True:
        parcel = cluster.get_parcel(product, version)
        logging.info("Parcel Current Stage: " + str(parcel.stage))
        if parcel.stage in states:
            break
        if parcel.state.errors:
            raise Exception(str(parcel.state.errors))

        logging.info("%s progress: %s / %s" % (states[0], parcel.state.progress,
                                               parcel.state.totalProgress))
        time.sleep(15)
Rest of the parcel execute is straight forward.

Install Zookeeper Service.

Zookeeper service is installed in stages.
  1. Create a service (if not exist)
  2. Update configuration for our newly create Zookeeper service.
  3. Create Zookeeper role (SERVER) on the Cluster.
  4. Initalize Zookeeper using the init_zookeeper() command.
  5. Start Zookeeper service.

Create a service.

This is simple create a service if it does not exist.
def zk_create_service(cluster):
    try:
        zk_service = cluster.get_service('ZOOKEEPER')
        logging.debug("Service {0} already present on the cluster".format(self.name))
    except ApiException:
        #
        # Create service if it the first time.
        #
        zk_service = cluster.create_service('ZOOKEEPER', 'ZOOKEEPER')
        logging.info("Created New Service: ZOOKEEPER")

    return zk_service

Update configuration for Zookeeper.

This information is picked up from the configuration yaml file.
yaml file.
  ZOOKEEPER:
    config:
      zookeeper_datadir_autocreate: true
Code snippet.
def zk_update_configuration(zk_service):
    """
        Update service configurations
    :return:
    """
    zk_service.update_config(config['services']['ZOOKEEPER']['config'])
    logging.info("Service Configuration Updated.")

Create Zookeeper role (SERVER) on the Cluster.

This is the important part, here we create Zookeeper roles SERVER (each instance of zookeeper on each server is referred as a SERVER role).
Role names should be unique, we combine the service_name, role, zookeeper_id to form a unique identifier. Example : Here it would be ZOOKEEPER-SERVER-1
Here is the code snippet.
zookeeper_host_id = 0

    #
    # Configure all the host.
    #
    for zookeeper_host in config['services']['ZOOKEEPER']['roles'][0]['hosts']:
        zookeeper_host_id += 1
        zookeeper_role_config = config['services']['ZOOKEEPER']['roles'][0]['config']
        role_name = "{0}-{1}-{2}".format('ZOOKEEPER', 'SERVER', zookeeper_host_id)
Next once we create the role we update the configuration which we get from the yaml file.
Yaml file.
roles:
  - group: SERVER
    hosts:
      - mycmhost.ahmed.com
    config:
      quorumPort: 2888
      electionPort: 3888
      dataLogDir: /var/lib/zookeeper
      dataDir: /var/lib/zookeeper
Code snippet.
#
# Configuring Zookeeper server ID
#
zookeeper_role_config['serverId'] = zookeeper_host_id

#
# Update configuration
#
role.update_config(zookeeper_role_config)
Now we are set to start the Zookeeper.

Initalize Zookeeper using the init_zookeeper() command.

Before we start Zookeeper we need to init the service, which create id for each service running on each server. ps: it creates the my_id for each service in /var/lib/zookeeper location. This will help each Zookeeper identify itself as unique.
If there were 3 Zookeeper servers (which is minimum recommended) we would have something like below.
  1. Role ZOOKEEPER-SERVER-1, Server 1, Zookeeper ID my_id 1
  2. Role ZOOKEEPER-SERVER-2, Server 2, Zookeeper ID my_id 2
  3. Role ZOOKEEPER-SERVER-3, Server 3, Zookeeper ID my_id 3
And so on.
Once we have the service initialized we are ready to start the service.

Start Zookeeper service

We do this using the zk_service.start() method. This method return ApiCommand which we can track the progress and wait for the service to start using cmd.wait().success
More details about the Api here
Our service should be up and running.

Yaml File

Code File.

Executing Code

from Blogger http://ift.tt/2nD1hAV
via IFTTT

Categories: Others Tags: ,