Formatting DSP data for Parquet files in Amazon S3
When you send data from the to an Amazon S3 bucket, you can choose to store the data in Parquet format. However, when the records in the pipeline are converted to Parquet format, any data that is treated as a union of multiple data types instead of being explicitly cast to a specific data type becomes nested one level deeper under container fields. These container fields use generic names such as member0
, member1
, and so on. As a result, union-typed data becomes difficult to retrieve from the Parquet files.
To prevent important data from becoming obscured this way, extract relevant data from union-typed fields into top-level fields that are cast to a data type other than union. For more information about union-typed data, see data types in the User Manual. For information about the functions that you can use to extract and cast your data, see the Evaluation scalar functions chapter in the Function Reference manual.
Guidelines for formatting union-typed data for Parquet output
The exact data transformations that you need to include in your pipeline vary depending on the specific data that you are working with. The following are some general guidelines for how to check and format your data:
- If your pipeline is streaming records that use the standard DSP event or metrics schemas, check the
body
andattributes
fields for data that needs to be extracted. Thebody
field is a union-typed field, and theattributes
field is a map of string type to union type (map<string, union<...>>
).- To format data from the
body
field, start by casting the field to a basic data type such as string. Casting the field to a data type other than union allows it to be used as input in streaming functions. Then, extract the relevant values from thebody
field into top-level fields. If necessary, cast the resulting top-level fields to other appropriate data types. See Example 1: Formatting body data for Parquet. - To format data from the
attributes
field, use dot notation to select relevant values from the map and promote them into top-level fields. If necessary, cast the resulting top-level fields to other appropriate data types. See Example 2: Formatting attributes data for Parquet.
- To format data from the
- You can confirm the data types of your fields by selecting the sink function in your pipeline, then selecting the View Configurations tab, and then checking the list of fields under Input Fields. The names of union-typed fields have multiple data type symbols displayed beside them, as shown in the image below:
Union-typed data can be contained by hierarchical types such as map or list. In such cases, the symbol for the hierarchical type is shown instead. For example, a map field is represented by the map symbol (
{}
) even if it's a map of string type to union type.
Example 1: Formatting body data for Parquet
Assume that records with the following body
content are streaming through your pipeline:
Record 1:
body: "region:us-west-1,userid:user1"
Record 2:
body: "region:us-west-2,userid:user2"
The body
field is a union-typed field. If you send these records as is to Amazon S3 using Parquet format, the data from body
gets nested under a container field with a generic name like member0
. To retrieve that data afterwards, you would need to know that the body
data got nested under member0
, and then run queries on body.member0
.
You can make the Parquet-formatted data easier to work with by transforming those records in the pipeline before they reach the Send to Amazon S3 sink function:
- Cast the
body
field from union to string so that the field can be used as input in streaming functions. Then, extract the key-value pairs from thebody
string into a map field namedextracted_body
:- Add an Eval function to your pipeline.
- On the View Configurations tab, enter the following expression in the Function field:
extracted_body=extract_key_value(cast(body, "string"), ":", ",")
extracted_body
fields look like this:Record 1:
extracted_body: { "region": "us-west-1", "userid": "user1" }
Record 2:
extracted_body: { "region": "us-west-2", "userid": "user2" }
- Promote the
region
key from theextracted_body
field into a top-level string field namedRegion
.- Add an Eval function to your pipeline. Make sure to place this Eval function somewhere after the one that you added during step 1.
- On the View Configurations tab, enter the following expression in the Function field:
Region=ucast(extracted_body.region, "string", null)
- Promote the
userid
key from theextracted_body
field into a top-level string field namedUserID
.- Add an Eval function to your pipeline. Make sure to place this Eval function somewhere after the one that you added during step 1.
- On the View Configurations tab, enter the following expression in the Function field:
UserID=ucast(extracted_body.userid, "string", null)
- Now that you have extracted the relevant data from the
body
field, drop it from your records. You can also drop theextracted_body
field, which was only used earlier in the pipeline to support the extraction and casting of thebody
data.- Add a Fields function to your pipeline. Make sure to place this Fields function somewhere after all the Eval functions that you added during the previous steps.
- On the View Configurations tab, in the Field List field, enter body.
- Click Add, and in the newly added field, enter extracted_body.
- In the Operator field, enter a minus sign ( - ).
The following SPL2 statement shows what a complete pipeline containing these transformations looks like:
| from splunk_firehose() | eval extracted_body=extract_key_value(cast(body, "string"), ":", ",") | eval Region=ucast(extracted_body.region, "string", null) | eval UserID=ucast(extracted_body.userid, "string", null) | fields - body, extracted_body | into into_s3("0d5690ec-7da0-446f-b2b8-ccc18e84a634", "my-bucket", "#{datetime:yyyy-MM}", 0, 0, "Parquet", allow_dropping_events:false);
When the transformed records are converted to Parquet format, Region
and UserID
remain as top-level fields. You can then retrieve the data by querying Region
and UserID
directly.
Example 2: Formatting attributes data for Parquet
Assume that records with the following attributes
content are streaming through your pipeline:
Record 1:
attributes: { "purchase_status": "complete", "item_count": "3" }
Record 2:
attributes: { "purchase_status": "pending", "item_count": "5" }
The attributes
field is a map of string type to union type (map<string, union<...>>
). If you send these records as is to Amazon S3 using Parquet format, the data from the attributes
field gets nested under container fields with generic names like member0
, member1
, and so on. To retrieve that data afterwards, you would need to know which container fields purchase_status
and item_count
got nested under, and run queries on entities like attributes.member0
and attributes.member1
.
You can make the Parquet-formatted data easier to work with by transforming those records in the pipeline before they reach the Send to Amazon S3 sink function:
- Extract the
purchase_status
key from theattributes
field into a top-level string field namedPurchaseStatus
.- Add an Eval function to your pipeline.
- On the View Configurations tab, enter the following expression in the Function field:
PurchaseStatus=ucast(attributes.purchase_status, "string", null)
- Extract the
item_count
key from theattributes
field into a top-level string field namedItemCount
.- Add an Eval function to your pipeline.
- On the View Configurations tab, enter the following expression in the Function field:
ItemCount=ucast(attributes.item_count), "integer", null)
- Now that you have extracted the relevant data from the
attributes
field, drop it from your records.- Add a Fields function to your pipeline. Make sure to place this Fields function somewhere after all the Eval functions that you added during the previous steps.
- On the View Configurations tab, in the Field List field, enter attributes.
- In the Operator field, enter a minus sign ( - ).
The following SPL2 statement shows what a complete pipeline containing these transformations looks like:
| from splunk_firehose() | eval PurchaseStatus=ucast(attributes.purchase_status, "string", null) | eval ItemCount=ucast(attributes.item_count, "integer", null) | fields - attributes | into into_s3("0d5690ec-7da0-446f-b2b8-ccc18e84a634", "my-bucket", "#{datetime:yyyy-MM}", 0, 0, "Parquet", allow_dropping_events:false);
When the transformed records are converted to Parquet format, PurchaseStatus
and ItemCount
remain as top-level fields. You can then retrieve the data by querying PurchaseStatus
and ItemCount
directly.
See also
- Functions
- Eval
- Casting
- Fields
- Send data to Amazon S3
- String manipulation
- Related topics
- Accessing map elements using dot notation
Create a DSP connection to send data to Amazon S3 | Connecting AWS metadata sources to your DSP pipeline |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0, 1.2.1-patch02, 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5
Feedback submitted, thanks!