国内有没有claude这种ai的代理网站集合? |
当下分布式系统的 日志收集、日志分析、日志处理、可视化 的热门技术栈方案当然非 ELK(ElasticSearch、Logstash、Kibana)莫属,从 L → E → K 构成了一条数据的 Pipeline管道:
而且在我的前文《利用 ELK搭建 Docker容器化应用日志中心》之中,曾利用 ELK 搭建了一条数据管道,用作 Docker容器化应用的日志中心。
注: 本文首发于 My 公众号 CodeSheep ,可 长按 或 扫描 下面的 小心心 来订阅 ↓ ↓ ↓
作为与数据源 “直接对接” 的 Logstash,位置处于 ELK 数据管道的 最前端,其主要作用是 收集、过滤分析、输出 各种结构化或者非结构化的原始数据(典型的如日志数据),原始数据从 “无序变有序” 的重担就落在了Logstash的肩上了,因此其作用举足轻重。
说到Logstash,不得不说其中的 插件机制,其几乎所有的功能都是靠插件来实现的,因此灵活易用:
Logstash 插件是使用 Ruby开发的,Logstash 从很早的1.5.0+版开始,其插件模块和核心模块便分开维护,其插件使用的是 RubyGems包管理器来管理维护。所以 Logstash插件本质上就是自包含的RubyGems。
RubyGems(简称 gems)是一个用于对 Ruby组件进行打包的 Ruby 打包系统。 它提供一个分发 Ruby 程序和库的标准格式,还提供一个管理程序包安装的工具。
可以在网址 rubygems.org
上搜索所有Logstash插件:
关于插件的常用操作如下:
可以在线安装:
bin/plugin install [插件名称]
当然也可以将插件提前下载到本地,然后本地安装:
bin/plugin install path/logstash-xxx-x.x.x.gem
bin/plugin uninstall [插件名称]
bin/plugin update [插件名称]
其会将插件更新到最新的版本
Logstash 插件的定义其实使用的就是一套其自定义的 DSL语法,我还是习惯用图来说明吧:
从图中可以看出主要包含以下几大部分内容:
该部分一般会用require语法引入如下依赖:
require "logstash/XXX/base"
require "logstash/namespace"
需要用 class
语法给每一个插件定义一个类,后面我会用实际代码说明
通过 config_name
语法来给插件取一个名字,这个名字将会用到 Logstash.conf
配置文件的插件配置之中
可以使用 config
语法来按需定义任意个配置项。可以设置配置选项的名字、数据类型、默认值以及是否为必选项:
举例:
config :percentage, :validate => :number, :default =>100
:percentage
:定义配置项的名字:validate
:配置指定参数的数据类型,如此处为 number类型:default
:指定配置项的默认值:required
:用于指定配置项是否必选每一种类型的插件都需要实现一些方法,如下表所示:
插件类型 | 插件方法 |
---|---|
输入插件 | register、 run |
过滤器插件 | register、 filter |
输出插件 | register、 receive |
编解码插件 | register、 encode、 decode |
Logstash 插件所具备的业务处理功能就来源于上述插件方法业务逻辑实现!
好了,理论部分总结到这,下面结合一份Logstash插件定义的源码来例析一下!
我们以 Logstash 插件的官网给出的一个 Logstash 过滤器插件 logstash-filter-example 的源码为例来进行分析,麻雀虽小,五脏俱全!代码解析已经标注于图中,不再赘述。
当然此处的实例给出的是一个入门实例,毕竟不可能在一篇篇幅有限的文章里给出一个太过复杂的 Logstash的插件源码。对照该源码和上一节的内容,我想应该不难理解Logstash的插件源码结构了吧。
计划后续展示一个 根据具体数据需求 来自定义开发一个满足特定需求的 Logstash插件的实例。
作者更多的SpringBt实践文章在此:
如果有兴趣,也可以抽点时间看看作者一些关于容器化、微服务化方面的文章:
可 长按 或 扫描 下面的 小心心 来订阅 CodeSheep,获取更多 务实、能看懂、可复现的 原创文 ↓↓↓
kafka -> logstash -> es
一个请求响应 两条数据进了 kafka logstash这边就两条 进了 es 根据 uuid做数据更新 楼主有了解过么 , 多层json 目前内层json还是字符串 才开始玩 能指点迷津不咯 谢啦.
ConsumerRecord(topic = apiTest, partition = 2, offset = 135676, CreateTime = 1532496191672, checksum = 775077821, serialized key size = -1, serialized value size = 629, key = null, value = {"logType":"apilog","logId":"c627625ee7054191bb3febf637c13440","hostIp":"192.168.5.34","hostName":"Dell-PC","dealIp":"0:0:0:0:0:0:0:1","externalIp":null,"domainName":null,"userloginId":null,"createTime":"2018-07-25 13:23:01","url":"http://localhost:9191/tracking/v1/importbill/subStage","datatKey":null,"dataId":null,"method":"qurey","status":"begin","request":"{ \"voyage\": \"1811\", \"vslName\": \"ARCTIC APIRIT\", \"blNo\": \"0418WHG04ASP35\", \"subStage\": \"CUS_DCLR_IM_RLS\"}","response":null,"spendTime":null,"httpMethod":"POST","classMethod":"com.easipass.api.controller.TrackingV1Controller.getImportbillInfo"})
ConsumerRecord(topic = apiTest, partition = 1, offset = 135679, CreateTime = 1532496204494, checksum = 2573605207, serialized key size = -1, serialized value size = 637, key = null, value = {"logType":"apilog","logId":"c627625ee7054191bb3febf637c13440","hostIp":"192.168.5.34","hostName":"Dell-PC","dealIp":null,"externalIp":null,"domainName":null,"userloginId":null,"createTime":null,"url":null,"datatKey":null,"dataId":null,"method":"qurey","status":"end","request":null,"response":"{\"statusCode\":200,\"msg\":\"\",\"dataList\":[{\"stage\":\"CUS_DCLR_IM\",\"cusLetpasStatusCode\":\"1\",\"cusLetpasStatus\":\"\u5DF2\u653E\u884C\",\"vslName\":\"ARCTIC APIRIT\",\"subStage\":\"CUS_DCLR_IM_RLS\",\"cusLetpasTime\":\"2018-06-04T09:14:00+08:00\",\"voyage\":\"1811\"}]}","spendTime":"19883","httpMethod":null,"classMethod":null})
input {
kafka {
bootstrap_servers => "192.168.129.122:9092,192.168.129.123:9092"
group_id => "elasticconsumer"
topics => "apiTest"
codec => json{
charset =>"UTF-8"
}
}
}
filter{
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["192.168.129.171:9200"]
index => "daaslog-%{+YYYY-MM}"
}
}
@zhangergou 保证es的_id是你的uuid即可,es根据_id判断是否是存在,不存在就插入,存在就更新。
过早客微信公众号:guozaoke • 过早客新浪微博:@过早客 • 广告投放合作微信:fullygroup50 鄂ICP备2021016276号-2 • 鄂公网安备42018502001446号