Tuesday, August 21, 2018

Exporting data into a file from AWS Dynamo DB table

I have created a utility to export data from dynamo db into a csv file without using AWS data pipeline. This utility queries dynamo for all the records and creates a csv file. Please note that this will be counted against the read capacity (number of reads) of your dynamo table. If you are interested in looking at code, it's on my GitHub repo. This utility uses dynamo's scan operation. You can read about scan operation on AWS documentation.

Monday, August 20, 2018

Sending data to Kinesis Firehose on AWS

In my recent project, we had a use case to send API metrics to Kinesis Firehose which then pushes data to an S3 bucket. Java code to send data to Firehose delivery stream is given below:


       

@Component
public class ApiLoggingService {

 private static final Logger LOGGER = LoggerFactory.getLogger(ApiLoggingService.class);

 @Autowired
 private AwsCredentialsProperties awsCredentialsProperties;

 @Autowired
 private AwsKinesisProperties firehoseProperties;

 private AmazonKinesisFirehose firehoseClient;

 ExecutorService executorService;

 @PostConstruct
 private void initFirehoseClient() {
  firehoseClient = AmazonKinesisFirehoseClientBuilder.standard()
    .withCredentials(AwsUtils.createCredentialsProvider(awsCredentialsProperties.getKeyId(),
      awsCredentialsProperties.getSecret()))
    .withRegion(firehoseProperties.getFirehoseRegion()).build();
  
  executorService = firehoseProperties.getThreadpoolSize() > 0
    ? Executors.newFixedThreadPool(firehoseProperties.getThreadpoolSize())
    : Executors.newFixedThreadPool(20);
 }

 /**
  * Asynchronous method to log data into AWS Kinesis Firehose delivery
  * stream. This method will not log data if value is null or empty.
  * Properties like region and name of delivery stream is read from
  * properties file. This method uses putRecord API method which writes
  * single record into delivery stream. By default, each delivery stream can
  * take in up to 2,000 transactions per second, 5,000 records per second, or
  * 5 MB per second. The PutRecord operation returns a RecordId, which is a
  * unique string assigned to each record. Producer applications can use this
  * ID for purposes such as auditability and investigation.
  * 
  * @param dataSet
  *             data to log
  */
 public void logEvent(final Object dataSet) {
  final String streamName = firehoseProperties.getFirehoseDeliveryStream();
  if (!StringUtils.isNullOrEmpty(streamName) && null != dataSet) {
   // Spawn a thread to log data into Firehose.
   executorService.submit(() -> {
    LOGGER.debug("Going to log data into Firehose delivery stream: {}", streamName);
    try {
     final String data = new ObjectMapper().writeValueAsString(dataSet);
     LOGGER.debug("Following data is being logged: {}", data);
     if (!StringUtils.isNullOrEmpty(data)) {
      final PutRecordRequest putRequest = new PutRecordRequest();
      putRequest.setDeliveryStreamName(streamName);
      putRequest.setRecord(
        new Record().withData(ByteBuffer.wrap(new StringBuilder(data).toString().getBytes())));
      final PutRecordResult putResult = firehoseClient.putRecord(putRequest);
      if(LOGGER.isDebugEnabled()){
       LOGGER.debug("Data added to Firehose stream {} with Record Id: {} ", streamName,
         putResult.getRecordId());
      }
     } else {
      LOGGER.info("Nothing is logged into Firehose because data to be logged is null or empty.");
     }

    } catch (final IOException ex) {
     LOGGER.error("Exception occurred when trying to parse the LogData object.", ex);
    } catch (final Exception ex) {
     LOGGER.error("Exception while trying to log data.", ex);
    }
   });
  }
  else{
   LOGGER.info("Either stream name or dataset is null/empty.");
  }
 }
}


You can invoke this as follows:
       


public class A{
//Create an instance
@Autowired
private ApiLoggingService loggingService;
someMethod(){
//Invoke the loggingService
  loggingService.logEvent(dataSet);
  }
}