Skip to content

Commit

Permalink
Merge pull request #3 from ShubhanshuJha/release
Browse files Browse the repository at this point in the history
Added Kinesis Reading Feature
  • Loading branch information
ShubhanshuJha authored Apr 20, 2024
2 parents b817023 + 26aec48 commit 5b1be8a
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 0 deletions.
38 changes: 38 additions & 0 deletions kinesis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Kinesis Stream Reader

This Python script reads data from an Amazon Kinesis stream and provides options for displaying, filtering, and saving the data to a file.
___
## Prerequisites
* **Python 3.x**
* **AWS credentials configured**
___
## Usage
#### Run the script with the desired arguments:
`python kds_reader.py --stream_name YOUR_KDS_NAME --shard_id KDS_SHARD_ID --region KDS_REGION --display true --max_records 100 --filter "your_filter_value" --save_data true --output_file kds_data_points.json`

#### Example 1: Read from Kinesis Stream
`python kinesis_reader.py --stream_name my_kinesis_stream --shard_id shard-00001 --region us-west-2`

#### Example 2: Read and Display Data
`python kinesis_reader.py --stream_name my_kinesis_stream --shard_id shard-00001 --region us-west-2 --display true`

#### Example 3: Read, Filter, and Save Data
`python kinesis_reader.py --stream_name my_kinesis_stream --shard_id shard-00001 --region us-west-2 --filter keyword --save_data true --output_file filtered_data.json`
___
## Command Line Arguments
- `--stream_name`: Name of the Kinesis stream (default: `kds_name`)
- `--shard_id`: ID of the shard to read from (default: `12345678`)
- `--region`: AWS region where the Kinesis stream is located (default: `us-south-1`)
- `--display`: Display data (true/false) (default: `false`)
- `--max_records`: Maximum number of records to collect (optional)
- `--filter`: Property/word for filtering (optional)
- `--save_data`: Save the result to a file (true/false) (optional)
- `--output_file`: Output file name (default: `kds_data_points.json`)

> Note: To save data, either `--output_file` or `--save_data` must be passed.
___
## Author
#### Shubhanshu Jha
- GitHub: [ShubhanshuJha](https://github.com/ShubhanshuJha)
- LinkedIn: [Shubhanshu Jha](https://www.linkedin.com/in/shubhanshu-jha/)
___
74 changes: 74 additions & 0 deletions kinesis/kds_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import boto3
import json
import argparse


def read_kinesis_stream(kds_stream_name, stream_shard_id, kds_region, max_records, display_data, filter_value, get_records):
kinesis_client = boto3.client('kinesis', region_name=kds_region)
shard_iterator = kinesis_client.get_shard_iterator(
StreamName=kds_stream_name,
ShardId=stream_shard_id,
ShardIteratorType='TRIM_HORIZON' # Read from the beginning of the shard
)['ShardIterator']
display_data = display_data.lower() == 'true'
data_collection = []
if not get_records:
record_counter = 0

while True:
records_response = kinesis_client.get_records(
ShardIterator=shard_iterator,
Limit=100 # Adjust batch size as needed
)

records = records_response.get('Records', [])
for record in records:
data = record['Data'].decode('utf-8')
if filter_value and filter_value not in data:
continue
if get_records:
data_collection.append(data)
else:
record_counter += 1
if display_data:
print(data)
if max_records and ((get_records and len(data_collection) >= max_records) or (not get_records and record_counter == max_records)):
return data_collection
shard_iterator = records_response.get('NextShardIterator')
if not shard_iterator:
break
return data_collection


def write_to_file(file_name, stream_data):
if not stream_data:
print('No Data To Be Saved.')
return
try:
stream_data_json = [json.loads(item) for item in stream_data]
with open(file_name, 'w') as f:
json.dump(stream_data_json, f)
print('*' * 100)
print('Data Saved Successfully.')
print('*' * 100)
except Exception as e:
print('Data Saving Failed:', str(e))


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Read data from a Kinesis stream')
parser.add_argument('--stream_name', type=str, default='kds_name', help='Name of the Kinesis stream') # Change kds name
parser.add_argument('--shard_id', type=str, default='12345678', help='ID of the shard to read from') # Change shard id
parser.add_argument('--region', type=str, default='us-south-1', help='AWS region where the Kinesis stream is located') # Adjust region as needed
parser.add_argument('--display', type=str, default='false', help='Display data: true or false [By default false]')
parser.add_argument('--max_records', type=int, help='Maximum number of records to collect (optional)')
parser.add_argument('--filter', type=str, help='Enter the property/word for filtering (optional)')
parser.add_argument('--save_data', type=str, help='Want to save the result: true/false (optional)')
parser.add_argument('--output_file', type=str, default='kds_data_points.json', help='Enter the output file name (optional)')
args = parser.parse_args()

write_output = args.save_data.lower() == 'true' if args.save_data else (True if args.output_file else False)
data = read_kinesis_stream(kds_stream_name=args.stream_name, stream_shard_id=args.shard_id, kds_region=args.region,
max_records=args.max_records, display_data=args.display, filter_value=args.filter, get_records=write_output)
if write_output:
write_to_file(file_name=args.output_file, stream_data=data)

0 comments on commit 5b1be8a

Please sign in to comment.