Tuesday, August 21, 2018

Exporting data into a file from AWS Dynamo DB table

I have created a utility to export data from dynamo db into a csv file without using AWS data pipeline. This utility queries dynamo for all the records and creates a csv file. Please note that this will be counted against the read capacity (number of reads) of your dynamo table. If you are interested in looking at code, it's on my GitHub repo. This utility uses dynamo's scan operation. You can read about scan operation on AWS documentation.

Monday, August 20, 2018

Sending data to Kinesis Firehose on AWS

In my recent project, we had a use case to send API metrics to Kinesis Firehose which then pushes data to an S3 bucket. Java code to send data to Firehose delivery stream is given below:


       

@Component
public class ApiLoggingService {

 private static final Logger LOGGER = LoggerFactory.getLogger(ApiLoggingService.class);

 @Autowired
 private AwsCredentialsProperties awsCredentialsProperties;

 @Autowired
 private AwsKinesisProperties firehoseProperties;

 private AmazonKinesisFirehose firehoseClient;

 ExecutorService executorService;

 @PostConstruct
 private void initFirehoseClient() {
  firehoseClient = AmazonKinesisFirehoseClientBuilder.standard()
    .withCredentials(AwsUtils.createCredentialsProvider(awsCredentialsProperties.getKeyId(),
      awsCredentialsProperties.getSecret()))
    .withRegion(firehoseProperties.getFirehoseRegion()).build();
  
  executorService = firehoseProperties.getThreadpoolSize() > 0
    ? Executors.newFixedThreadPool(firehoseProperties.getThreadpoolSize())
    : Executors.newFixedThreadPool(20);
 }

 /**
  * Asynchronous method to log data into AWS Kinesis Firehose delivery
  * stream. This method will not log data if value is null or empty.
  * Properties like region and name of delivery stream is read from
  * properties file. This method uses putRecord API method which writes
  * single record into delivery stream. By default, each delivery stream can
  * take in up to 2,000 transactions per second, 5,000 records per second, or
  * 5 MB per second. The PutRecord operation returns a RecordId, which is a
  * unique string assigned to each record. Producer applications can use this
  * ID for purposes such as auditability and investigation.
  * 
  * @param dataSet
  *             data to log
  */
 public void logEvent(final Object dataSet) {
  final String streamName = firehoseProperties.getFirehoseDeliveryStream();
  if (!StringUtils.isNullOrEmpty(streamName) && null != dataSet) {
   // Spawn a thread to log data into Firehose.
   executorService.submit(() -> {
    LOGGER.debug("Going to log data into Firehose delivery stream: {}", streamName);
    try {
     final String data = new ObjectMapper().writeValueAsString(dataSet);
     LOGGER.debug("Following data is being logged: {}", data);
     if (!StringUtils.isNullOrEmpty(data)) {
      final PutRecordRequest putRequest = new PutRecordRequest();
      putRequest.setDeliveryStreamName(streamName);
      putRequest.setRecord(
        new Record().withData(ByteBuffer.wrap(new StringBuilder(data).toString().getBytes())));
      final PutRecordResult putResult = firehoseClient.putRecord(putRequest);
      if(LOGGER.isDebugEnabled()){
       LOGGER.debug("Data added to Firehose stream {} with Record Id: {} ", streamName,
         putResult.getRecordId());
      }
     } else {
      LOGGER.info("Nothing is logged into Firehose because data to be logged is null or empty.");
     }

    } catch (final IOException ex) {
     LOGGER.error("Exception occurred when trying to parse the LogData object.", ex);
    } catch (final Exception ex) {
     LOGGER.error("Exception while trying to log data.", ex);
    }
   });
  }
  else{
   LOGGER.info("Either stream name or dataset is null/empty.");
  }
 }
}


You can invoke this as follows:
       


public class A{
//Create an instance
@Autowired
private ApiLoggingService loggingService;
someMethod(){
//Invoke the loggingService
  loggingService.logEvent(dataSet);
  }
}



Monday, March 26, 2018

Setting Mesos/Marathon on OS X

It's very helpful to have a local setup of Mesos/Marathon on your local computer to run containers. You can also use this setup to replicate any issues that you are facing in actual environments. I am going to write about how to setup Mesos/Marathon on your mac.

Java 1.8: Please update to Java 8 or higher.

Zookeeper: It is a centralized service for maintaining configurations. Please read official documentation here to learn more about Zookeeper. Run following command to install Zookeeper:

brew install zookeeper

Mesos: You can read in details about Mesos from official documentation here.
Run following command to install Mesos:

brew install mesos

Marathon: It is container orchestration framework for Mesos. Marathon provides REST APIs and UI for starting, stopping and scaling applications. More details about Marathon is here from official documentation.

Download Marathon tar from here and unarchive it in your local directory.
After everything mentioned above is done. Start these components in following sequence:
1. Start Zookeeper
Go to terminal and type following command
$ zkServer start
You will see following output in console:

ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg

Starting zookeeper ... STARTED

2. Start Mesos


Master:
$ sudo /usr/local/sbin/mesos-master --registry=in_memory --ip=127.0.0.1 --external_log_file=/tmp/mesos/logs/master.log --log_dir=/tmp/mesos/logs
You will see following output in console:

WARNING: Logging before InitGoogleLogging() is written to STDERR
I0326 19:03:45.263824 2310370112 main.cpp:232] Build: 2017-07-16 19:12:27 by brew
I0326 19:03:45.264577 2310370112 main.cpp:233] Version: 1.3.0
I0326 19:03:45.284205 2310370112 logging.cpp:194] INFO level logging started!
I0326 19:03:45.289458 2310370112 main.cpp:339] Using 'HierarchicalDRF' allocator
I0326 19:03:45.299046 2310370112 master.cpp:436] Master 944136a2-d93a-49f8-a51a-bf090e4e115e (localhost) started on 127.0.0.1:5050
I0326 19:03:45.299108 2310370112 master.cpp:438] Flags at startup: --agent_ping_timeout="15secs" --agent_reregister_timeout="10mins" --allocation_interval="1secs" --allocator="HierarchicalDRF" --authenticate_agents="false" --authenticate_frameworks="false" --authenticate_http_frameworks="false" --authenticate_http_readonly="false" --authenticate_http_readwrite="false" --authenticators="crammd5" --authorizers="local" --external_log_file="/tmp/mesos/logs/master.log" --framework_sorter="drf" --help="false" --hostname_lookup="true" --http_authenticators="basic" --initialize_driver_logging="true" --ip="127.0.0.1" --log_auto_initialize="true" --log_dir="/tmp/mesos/logs" --logbufsecs="0" --logging_level="INFO" --max_agent_ping_timeouts="5" --max_completed_frameworks="50" --max_completed_tasks_per_framework="1000" --max_unreachable_tasks_per_framework="1000" --port="5050" --quiet="false" --recovery_agent_removal_limit="100%" --registry="in_memory" --registry_fetch_timeout="1mins" --registry_gc_interval="15mins" --registry_max_agent_age="2weeks" --registry_max_agent_count="102400" --registry_store_timeout="20secs" --registry_strict="false" --root_submissions="true" --user_sorter="drf" --version="false" --webui_dir="/usr/local/Cellar/mesos/1.3.0/share/mesos/webui" --zk_session_timeout="10secs"
W0326 19:03:45.299901 2310370112 master.cpp:441]
**************************************************
Master bound to loopback interface! Cannot communicate with remote schedulers or agents. You might want to set '--ip' flag to a routable IP address.
**************************************************
I0326 19:03:45.301147 2310370112 master.cpp:490] Master allowing unauthenticated frameworks to register
I0326 19:03:45.301209 2310370112 master.cpp:504] Master allowing unauthenticated agents to register
I0326 19:03:45.301228 2310370112 master.cpp:518] Master allowing HTTP frameworks to register without authentication
I0326 19:03:45.301699 2310370112 master.cpp:560] Using default 'crammd5' authenticator
W0326 19:03:45.301784 2310370112 authenticator.cpp:512] No credentials provided, authentication requests will be refused
I0326 19:03:45.301827 2310370112 authenticator.cpp:519] Initializing server SASL
I0326 19:03:45.377665 2310370112 master.cpp:2161] Elected as the leading master!
I0326 19:03:45.377724 2310370112 master.cpp:1700] Recovering from registrar
E0326 19:03:45.379003 2310370112 master.cpp:2097] Failed to attach file '/tmp/mesos/logs/master.log': Failed to get realpath of '/tmp/mesos/logs/master.log': No such file or directory
I0326 19:03:45.387356 26779648 registrar.cpp:389] Successfully fetched the registry (0B) in 8.036864ms
I0326 19:03:45.389446 26779648 registrar.cpp:493] Applied 1 operations in 440002ns; attempting to update the registry
I0326 19:03:45.391685 26779648 registrar.cpp:550] Successfully updated the registry in 2.155008ms
I0326 19:03:45.391811 26779648 registrar.cpp:422] Successfully recovered registrar
I0326 19:03:45.392280 25169920 master.cpp:1799] Recovered 0 agents from the registry (118B); allowing 10mins for agents to re-register

Slave:

$ sudo /usr/local/sbin/mesos-slave --master=127.0.0.1:5050 --work_dir=/tmp/mesos/slave --containerizers=docker --image_providers=DOCKER --isolation=filesystem/linux,docker/runtime  --external_log_file=/tmp/mesos/logs/slave.log --docker=/usr/local/bin/docker --no-hostname_lookup false --log_dir=/tmp/mesos/logs
You will see following output in console:

RNING: Logging before InitGoogleLogging() is written to STDERR
I0326 19:05:05.834830 2310370112 main.cpp:322] Build: 2017-07-16 19:12:27 by brew
I0326 19:05:05.835503 2310370112 main.cpp:323] Version: 1.3.0
I0326 19:05:05.839320 2310370112 logging.cpp:194] INFO level logging started!
I0326 19:05:06.060097 248102912 slave.cpp:225] Mesos agent started on (1)@10.156.62.125:5051
I0326 19:05:06.060209 248102912 slave.cpp:226] Flags at startup: --appc_simple_discovery_uri_prefix="http://" --appc_store_dir="/tmp/mesos/store/appc" --authenticate_http_readonly="false" --authenticate_http_readwrite="false" --authenticatee="crammd5" --authentication_backoff_factor="1secs" --authorizer="local" --container_disk_watch_interval="15secs" --containerizers="docker" --default_role="*" --disk_watch_interval="1mins" --docker="/usr/local/bin/docker" --docker_kill_orphans="true" --docker_registry="https://registry-1.docker.io" --docker_remove_delay="6hrs" --docker_socket="/var/run/docker.sock" --docker_stop_timeout="0ns" --docker_store_dir="/tmp/mesos/store/docker" --docker_volume_checkpoint_dir="/var/run/mesos/isolators/docker/volume" --enforce_container_disk_quota="false" --executor_registration_timeout="1mins" --executor_shutdown_grace_period="5secs" --external_log_file="/tmp/mesos/logs/slave.log" --fetcher_cache_dir="/tmp/mesos/fetch" --fetcher_cache_size="2GB" --frameworks_home="" --gc_delay="1weeks" --gc_disk_headroom="0.1" --hadoop_home="" --help="false" --hostname_lookup="false" --http_command_executor="false" --http_heartbeat_interval="30secs" --image_providers="DOCKER" --initialize_driver_logging="true" --isolation="filesystem/linux,docker/runtime" --launcher="posix" --launcher_dir="/usr/local/Cellar/mesos/1.3.0/libexec/mesos" --log_dir="/tmp/mesos/logs" --logbufsecs="0" --logging_level="INFO" --master="127.0.0.1:5050" --max_completed_executors_per_framework="150" --oversubscribed_resources_interval="15secs" --port="5051" --qos_correction_interval_min="0ns" --quiet="false" --recover="reconnect" --recovery_timeout="15mins" --registration_backoff_factor="1secs" --runtime_dir="/var/run/mesos" --sandbox_directory="/mnt/mesos/sandbox" --strict="true" --switch_user="true" --version="false" --work_dir="/tmp/mesos/slave"
I0326 19:05:06.063592 248102912 slave.cpp:525] Agent resources: cpus(*):8; mem(*):15360; disk(*):471682; ports(*):[31000-32000]
I0326 19:05:06.063694 248102912 slave.cpp:533] Agent attributes: [  ]
I0326 19:05:06.063719 248102912 slave.cpp:538] Agent hostname: 10.156.62.125
I0326 19:05:06.064898 248639488 status_update_manager.cpp:177] Pausing sending status updates
I0326 19:05:06.078497 247566336 state.cpp:62] Recovering state from '/tmp/mesos/slave/meta'
E0326 19:05:06.079037 248102912 slave.cpp:873] Failed to attach file '/tmp/mesos/logs/slave.log': Failed to get realpath of '/tmp/mesos/logs/slave.log': No such file or directory
I0326 19:05:06.079982 245956608 status_update_manager.cpp:203] Recovering status update manager
I0326 19:05:06.080816 246493184 docker.cpp:912] Recovering Docker containers
I0326 19:05:06.155416 248102912 slave.cpp:5970] Finished recovery
I0326 19:05:06.158782 249176064 status_update_manager.cpp:177] Pausing sending status updates
I0326 19:05:06.159219 248102912 slave.cpp:918] New master detected at master@127.0.0.1:5050
I0326 19:05:06.160120 248102912 slave.cpp:942] No credentials provided. Attempting to register without authentication
I0326 19:05:06.160567 248102912 slave.cpp:953] Detecting new master

You can verify if Mesos is up by going to localhost:5050.

3. Start Marathon 

Go to Marathon directory:
$ cd ~/marathon-1.5.0/
MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos.dylib ./bin/marathon --master localhost:5050 --zk zk://localhost:2181/marathon --hostname localhost --webui_url localhost:8080 --logging_level debug

Go to http://localhost:8080 to see Marathon UI.