Using the Decodable CLI Tutorial This guide steps you through an end-to-end example of how to use Decodable to parse and structure Envoy logs in real-time. It should give you enough exposure to the platform that you can get rolling with a wide array of use cases in short order. For your convenience, we’ve created some resources in your account for use with this guide. We’ll use the CLI in this guide, but you can follow along in the app if you prefer. Connections Connections are reusable connections to your data infrastructure. They pipe data from an external system to a Decodable stream or vice versa. Let’s list the connections in your account to see what has been pre-created for you: decodable connection list # Output: # # id name connector type create time update time # <connection id> datagen_envoy_connection datagen source 2021-11-03T16:14:06Z 2021-11-05T02:29:01Z The connection we see here uses the 'datagen' connector, which generates test data. Let’s get some more information about it: decodable connection get <connection id> # Output: # # datagen_envoy_connection # id <connection id> # description A source connection that generates envoy-style logs # connector datagen # type source # stream id <stream id> # schema # 0 value STRING # properties # data.type envoy # target state STOPPED # actual state STOPPED # create time 2021-11-03T16:14:06Z # update time 2021-11-05T04:08:41Z Here we see that the type is 'source', meaning this connector feeds new data from an external system into a stream. Let’s try activating it to get some data flowing: decodable connection activate <connection id> # Output: # # datagen_envoy_connection # id <connection id> # description A source connection that generates envoy-style logs # connector datagen # type source # stream id <stream id> # schema # 0 value STRING # properties # data.type envoy # target state RUNNING # actual state STARTING # create time 2021-11-03T16:14:06Z # update time 2021-11-05T04:16:36Z If you wait a moment and run a get on that connection again, you should see the actual state move from 'STARTING' to 'RUNNING'. Let’s get some more information about the stream that we just output to: decodable stream get <stream id> # Output: # # envoy_raw # id <stream id> # description A stream of records in envoy format # schema # 0 value STRING # create time 2021-11-03T16:14:06Z # update time 2021-11-03T16:14:06Z Now that we have some data in this stream, we can write a preview pipeline to take a look. We’ll describe preview pipelines in more detail later, but a simple SELECT query referencing the stream’s name will do the trick: decodable pipeline preview "SELECT * from envoy_raw" # Output: # # Submitting query... done! (took 8.79s) # Waiting for results... # {"value":"[2021-11-05T04:48:03Z] \"GET /products/3 HTTP/1.1\" 500 URX 2001 6345 82 32 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64)\" \"5c70092a-ed05-4d0c-9d2b-f7bf361ad17e\" \"localhost\" \"192.168.0.11:443\""} # {"value":"[2021-11-05T04:48:03Z] \"DELETE /users/1 HTTP/2.0\" 200 NC 4044 3860 41 39 \"-\" \"Mozilla/5.0 (Linux; Android 10) \" \"5ca9fd79-afee-44db-9352-2ee9949dc6df\" \"aws.gateway\" \"10.0.0.1\""} # {"value":"[2021-11-05T04:48:03Z] \"DELETE /products/2 HTTP/2.0\" 500 UH 3826 8831 14 33 \"-\" \"Mozilla/5.0 (Linux; Android 10) \" \"5f0ae73d-c76b-471f-9458-3efc45128509\" \"aws.gateway\" \"10.0.0.1\""} # {"value":"[2021-11-05T04:48:03Z] \"POST /users/1 HTTP/1.1\" 500 - 8303 6274 0 15 \"-\" \"curl/7.64.1\" \"b45ce679-1cdd-4de8-965f-b9a4d821b2bd\" \"locations\" \"127.0.0.1:8080\""} # {"value":"[2021-11-05T04:48:03Z] \"PATCH /products/2 HTTP/1.1\" 422 URX 8246 2097 68 84 \"-\" \"Mozilla/5.0 (Linux; Android 10) \" \"2508af73-93c8-4ab8-bac2-8f6aab2b0292\" \"localhost\" \"10.0.0.2\""} # {"value":"[2021-11-05T04:48:03Z] \"DELETE /products/3 HTTP/2.0\" 400 UF 3386 659 9 83 \"-\" \"Chrome/90.0.4430.212 Safari/537.36\" \"00eeed10-c956-49bf-a56c-a2511cd033fb\" \"auth.default.svc.cluster.local\" \"192.168.0.12\""} # {"value":"[2021-11-05T04:48:03Z] \"PATCH /users/1 HTTP/2.0\" 422 UO 4730 6425 62 37 \"-\" \"Mozilla/5.0 (Linux; Android 10) \" \"97964510-ef91-40bc-b3c6-d094c10618e4\" \"aws.gateway\" \"192.168.0.11:443\""} # {"value":"[2021-11-05T04:48:03Z] \"DELETE /products/3 HTTP/1.1\" 422 UO 5857 2137 17 47 \"-\" \"curl/7.64.1\" \"3b539fe4-7b5c-4593-b2d2-13fe621a794d\" \"envoy.app.mesh\" \"10.0.0.2:443\""} # {"value":"[2021-11-05T04:48:04Z] \"PUT /products/3 HTTP/2.0\" 201 UO 8430 4054 53 51 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64)\" \"112ea83e-df10-4f13-aae2-f1d651f24221\" \"locations\" \"192.168.0.11:443\""} # {"value":"[2021-11-05T04:48:04Z] \"DELETE /products/2 HTTP/2.0\" 201 UO 846 3398 98 42 \"-\" \"Mobile Safari/537.36\" \"9bdc6885-628d-4b98-8018-553aa5b12704\" \"locations\" \"10.0.0.1\""} # Records received: 10 # Time to first record: 26.85s # Total time: 28.06s Pipelines Once we can see our raw data flowing, we can try to apply some structure to it with a pipeline. A pipeline is a streaming SQL query that processes data from one (or more) input streams and writes the results to an output stream. First, we’ll need to make an output stream for the pipeline: decodable stream create \ --name http_events \ --description "Parsed Envoy proxy logs from production" \ --watermark "timestamp=\`timestamp\` - INTERVAL '0.001' SECOND" \ --field timestamp="timestamp(3)" \ --field method=string \ --field original_path=string \ --field protocol=string \ --field response_code=int \ --field response_flags=string \ --field bytes_recv=int \ --field bytes_sent=int \ --field duration=int \ --field upstream_svc_time=int \ --field x_forwarded_for=string \ --field useragent=string \ --field request_id=string \ --field authority=string \ --field upstream_host=string # Output: # # Created stream http_events (06064c97) The watermark argument specifies that the "timestamp" field represents the event time and we’ll allow 1 millisecond for late arriving data. When we create a pipeline we can supply the SQL inline like we did for our preview earlier, or we can store it in a separate file. Let’s put it in a file called parse_envoy_logs.sql: -- sql parse_envoy_logs.sql -- Extract Envoy fields from a map as top level fields and insert them into the -- http_events stream. INSERT INTO http_events SELECT TO_TIMESTAMP(CAST(envoy['timestamp'] AS STRING), 'yyyy-MM-dd''T''HH:mm:ss''Z''') AS `timestamp`, CAST(envoy['method'] AS STRING) AS `method`, CAST(envoy['original_path'] AS STRING) AS original_path, CAST(envoy['protocol'] AS STRING) AS protocol, CAST(envoy['response_code'] AS INT) AS response_code, CAST(envoy['response_flags'] AS STRING) AS response_flags, CAST(envoy['bytes_rcvd'] AS INT) AS bytes_rcvd, CAST(envoy['bytes_sent'] AS INT) AS bytes_sent, CAST(envoy['duration'] AS INT) AS duration, CAST(envoy['upstream_svc_time'] AS INT) AS upstream_svc_time, CAST(envoy['x_forwarded_for'] AS STRING) AS x_forwarded_for, CAST(envoy['useragent'] AS STRING) AS useragent, CAST(envoy['request_id'] AS STRING) AS request_id, CAST(envoy['authority'] AS STRING) AS authority, CAST(envoy['upstream_host'] AS STRING) AS upstream_host FROM ( -- Match and parse Envoy records in the value field of the envoy_raw stream. -- grok() produces a map<field name, value> we call envoy. SELECT grok( `value`, '[%{TIMESTAMP_ISO8601:timestamp}] "%{DATA:method} %{DATA:original_path} %{DATA:protocol}" %{DATA:response_code} %{DATA:response_flags} %{NUMBER:bytes_rcvd} %{NUMBER:bytes_sent} %{NUMBER:duration} %{DATA:upstream_svc_time} "%{DATA:x_forwarded_for}" "%{DATA:useragent}" "%{DATA:request_id}" "%{DATA:authority}" "%{DATA:upstream_host}"' ) AS envoy FROM envoy_raw ) The create command normally takes the SQL statement as an argument, but we can replace it with - (a single dash) which causes the command to read from standard input. We use this feature to read SQL from the file we just created. Feel free to use whatever works best for you! decodable pipeline create --name parse_envoy_logs \ --description "Parse and structure Envoy logs for analysis" \ - < parse_envoy_logs.sql # Output: # # Created pipeline parse_envoy_logs (f2de95de) Using the pipeline id returned, we can get the pipeline definition back from Decodable to make sure it looks right before we activate it. decodable pipeline get <pipeline id> # Output: # # parse_envoy_logs # id <pipeline id> # version 1 # is latest true # target state STOPPED # actual state STOPPED # description Parse and structure Envoy logs for analysis # create time 2021-08-06 20:03:41.42 +0000 +0000 # # -- Extract Envoy fields from a map as top level fields and insert them into the # -- http_events stream. # INSERT INTO http_events # SELECT # CAST(envoy['timestamp'] AS STRING) AS `timestamp`, # CAST(envoy['method'] AS STRING) AS `method`, # CAST(envoy['original_path'] AS STRING) AS original_path, # CAST(envoy['protocol'] AS STRING) AS `protocol`, # CAST(envoy['response_code'] AS INT) AS response_code, # CAST(envoy['response_flags'] AS STRING) AS response_flags, # CAST(envoy['bytes_rcvd'] AS INT) AS bytes_rcvd, # CAST(envoy['bytes_sent'] AS INT) AS bytes_sent, # CAST(envoy['duration'] AS INT) AS `duration`, # CAST(envoy['upstream_svc_time'] AS INT) AS upstream_svc_time, # CAST(envoy['x_forwarded_for'] AS STRING) AS x_forwarded_for, # CAST(envoy['useragent'] AS STRING) AS useragent, # CAST(envoy['request_id'] AS STRING) AS request_id, # CAST(envoy['authority'] AS STRING) AS authority, # CAST(envoy['upstream_host'] AS STRING) AS upstream_host # FROM ( # -- Match and parse Envoy records in the value field of the envoy_raw stream. # -- grok() produces a map<field name, value> we call envoy. # SELECT # grok( # `value`, # '\[%{TIMESTAMP_ISO8601:timestamp}\] "%{DATA:method} %{DATA:original_path} %{DATA:protocol}" %{DATA:response_code} %{DATA:response_flags} %{NUMBER:bytes_rcvd} %{NUMBER:bytes_sent} %{NUMBER:duration} %{DATA:upstream_svc_time} "%{DATA:x_forwarded_for}" "%{DATA:useragent}" "%{DATA:request_id}" "%{DATA:authority}" "%{DATA:upstream_host}"' # ) AS envoy # FROM envoy_raw # ) Activate the Pipeline Like connections, pipelines must be activated in order to start the flow of data. Let’s activate our pipeline: decodable pipeline activate <pipeline id> # Output: # # id version target state # <id> 1 RUNNING Pipeline activation involves provisioning fault-tolerant infrastructure within the Decodable platform and can take up to 30 seconds to begin processing data. Feel free to make a few get pipeline calls until you see the actual state move to 'RUNNING'. See your data decodable pipeline preview "select * from http_events" Preview allows you to see live records from your streams. Submit any SQL using the same grammar as you would for a pipeline, but the output will be sent to your command line output instead. For example, here we select the http events from the stream our connection pointed at before. Previews are short-lived, lighter weight, and more responsive than full pipelines, making them perfect for iterative development on pipelines or just to see the structure of your input and output streams. Cleanup Active connections and pipelines both consume resources while they’re running. If you’re not using them, it’s best to deactivate them. Use the decodable connection deactivate command to deactivate the datagen connection: decodable connection deactivate <connection id> # Output: # # datagen_envoy_connection # id <connection id> # description A source connection that generates envoy-style logs # connector datagen # type source # stream id <stream id> # schema # 0 value STRING # properties # data.type envoy # target state STOPPED # actual state STOPPED # create time 2021-11-03T16:14:06Z # update time 2021-11-05T04:16:36Z Similarly, use decodable pipeline deactivate to deactivate the pipeline: decodable pipeline deactivate <pipeline id> # Output: # # id version target state # <id> 1 STOPPED