A. 电脑怎么用es文件管理器传文件
1、首先需要打开ES文件管理器,界面直观,文件自动分类显示。
B. java导数据到esid重复
,代码主要逻辑如下:
// 读取要导入数据的文件
BufferedReader br = new BufferedReader(new FileReader(
"D:\\test\\test.txt"));
String json = null;
int count = 0;
// 开启批量插入
BulkRequestBuilder bulkRequest = client.prepareBulk();
while ((json = br.readLine()) != null) {
bulkRequest.add(client.prepareIndex("test", "all")
.setSource(json));
// 每一千条提交一次
if (count % 1000 == 0) {
bulkRequest.execute().actionGet();
System.out.println("提交了:" + count);
}
count++;
}
bulkRequest.execute().actionGet();
System.out.println("插入完毕");
br.close();
登录后复制
运行后发现一个问题,我100多万条的数据,导入到es中怎么生成了1000多万条,而且还是在没有完全导入的情况下
然后用小批量数据导入到es,再把这些数据导出来,发现有好多重复的数据
为什么会重复呢,原因是在每一千条提交一次代码这块,第一次一千条提交了,并没有把bulkRequest置空,所以第二次提交的时候,会提交两千条,包括第一次已经提交的一千条,然后我们自己也没有设置_id,所以es会自动给数据生成一个_id,即使是重复的数据,搞清楚了原因,下面来说解决方法,主要有两种:
第一种就是在提交了一千条后,对bulkRequest进行重置,因为bulkRequest并没有重置的方法,所以可以新建一个bulkRequest,类似于重置,具体代码如下:
// 读取要导入数据的文件
BufferedReader br = new BufferedReader(new FileReader(
"D:\\test\\test.txt"));
String json = null;
int count = 0;
// 开启批量插入
BulkRequestBuilder bulkRequest = client.prepareBulk();
while ((json = br.readLine()) != null) {
bulkRequest.add(client.prepareIndex("test", "all")
.setSource(json));
// 每一千条提交一次
if (count % 1000 == 0) {
bulkRequest.execute().actionGet();
//此处新建一个bulkRequest,类似于重置效果
bulkRequest = client.prepareBulk();
System.out.println("提交了:" + count);
}
count++;
}
bulkRequest.execute().actionGet();
System.out.println("插入完毕");
br.close();
登录后复制
第二种就是自己设置_id,确保每一条数据只有一个_id,这样的话,即使数据重复了,因为_id是一样的,所以es会进行更新,这样的话并没有从根源上解决数据重复的问题,只是重复数据会更新,这样的话效率会慢,具体代码如下:
// 读取要导入数据的文件
BufferedReader br = new BufferedReader(new FileReader(
"D:\\test\\test.txt"));
String json = null;
int count = 0;
// 开启批量插入
BulkRequestBuilder bulkRequest = client.prepareBulk();
while ((json = br.readLine()) != null) {
//设置_id为count
bulkRequest.add(client.prepareIndex("test", "all",
String.valueOf(count)).setSource(json));
// 每一千条提交一次
if (count % 1000 == 0) {
bulkRequest.execute().actionGet();
//此处新建一个bulkRequest,类似于重置效果
System.out.println("提交了:" + count);
}
count++;
}
bulkRequest.execute().actionGet();
System.out.println("插入完毕");
br.close();
登录后复制
建议使用第一种方法,效率会快很多。
C. spark->es快速导入数据
elasticsearch-spark 提供了saveToEs api以支持快速导入数据。但es集群线程池有限,在大量写入数据的同时,对cpu的压力非常大,影响线上es的查询服务。如果能参考hbase 的bulkload方法,对es也采用“bulkload”模式,写入性能会有巨大提升。核心思想是通过spark作业生成es的lucene文件,并通过网络传输,写入es的数据文件。
本方案参考滴滴的fastIndex: 滴滴FastIndex
采用spark改写,部分特性适应了公司的原始流程,会有不一样的地方。如您采用的是spark saveToEs需要通过该方法进行改写,可参考。
git地址为: https://github.com/Dengyu123/fast-es-rdd
D. 记事本的数据怎么导入到excel中
1、使用记事本编辑数据之后,我们导入到excel中。
E. 【elasticsearch实战】mysql的数据如何迁移到es中
如果你被上述问题困扰过,可以参考以下方案
这里需要介绍三种字段的type,分别是 object 、 nested 、 join
现在有个问题,下面的数据如何存入到es中呢,它对应的mapping应该是什么样的呢
name、url这些字段好处理,直接设定字段 "type" : "text" 或者 "type" : "keword" 或者
就行了,但是对于address和links,这种里面包含json对象,或者数组的,怎么处理呢。这里可以采用 "type" : "object" 来处理。如下
可能会对links有疑问,它明明是数组,却怎么和address的设置类似。其实es中是没有单独的数组这一类型,因为他所有的字段都支持数组,比如你是text,你可以放多个值进去,以name为例,你可以放 "name":["张三", "李四"] 这样的数据进去。
而且,es默认对这种嵌套结构建立的索引就是object类型, "type": "object" 可以省略 😂
于是可以变为下面这样
甚至,通过添加properties,可以无限嵌套下去。
下面说object类型的缺点了,缺点也是由它本身结构导致的
对于数组结构,是这么存储数据的,以上面的address为例,他会把json结构平铺开,然后把所有这个字段的值放在平铺后的字段上:
这在查询时就出现问题了,本来Google和 http://www.google.com 是绑定的,但是这种结构无法满足这种绑定的关系,也就是如果你想查name是Bai,并且url是 http://www.google.com 的,竟然也能查出来😂,而这和前面所插入的文档内容不符。
所以需要nested结构和join结构出场了
嵌套结构解决了我们查询嵌套文档字段的问题,同样的,也可以解决,在es中实现类似mysql的join查询的问题。
外键就需要设置为nested(虽然现在设计表几乎不用外键约束了,但外键的逻辑还是在的 😂 )
另外,nested字段本身会形成一个文档,只不过是嵌套在大的文档下,所以在统计索引的文档数时,实际上是最外层的文档数加上nested字段形成的文档数
这里需要注意,以nested里面的字段为查询条件,需要修改下查询DSL,在外层加一层nested,每有一层nested嵌套关系,就需要加一层
由于es本身对文档通过nested字段进行了绑定,索引更新数据时,整个文档都会被替换,代价会大一些,但是由于关系绑定好了,查询会快一些。这里的代价大一些,查询快一些自然就是和join类型对比啦。
join 其实有父子文档的概念,父文档通过一个字段关联一个子文档,
这个结构比较复杂的是在你推数据时,需要指定对应的父文档是哪个
mapping结构如下
解释一下
优点就是更新数据时,不用连带着父子文档一起改,缺点是查询效率不如nested结构
以后再说吧😂
F. ES跨集群数据迁移
reindex是Elasticsearch提供的一个api接口,可以把数据从源ES集群导入到当前的ES集群,同样实现了数据的迁移
如果是A集群 --> B集群,就需要在B中的elasticsearch.yml 设置A地址为白名单
在目标集群(B集群 116)的elasticsearch.yml配置文件,设置远程集群的白名单,添加如下配置:
对mapping有要求的,提前创建好索引,再执行数据迁移。
1、get /索引名称 获取索引
2、put /索引名称 + mapping信息。创建索引
这个帖子写的很好,很好
https://blog.csdn.net/qq_21383435/article/details/108953326
G. 使用canal将mysql同步到es中
因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧
根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 mp 协议。
mysql master 收到 mp 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。
经 canal 解析过的对象,我们使用起来就非常的方便了。
再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。
根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.4.tar.gz 进行解析 MySQL 的 binlog 日志。
下载后,复制 canal.deployer-1.1.4.tar.gz 到 MySQL 主机上,比如放在 /usr/local/soft/目录下。然后依次执行下面的命令:
然后修改 canal 的配置文件 vim conf/example/instance.properties
这三项改成你自己的,比如我的配置如下:
然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)
接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。
canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。
执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。
执行 vim /etc/my.cnf 命令。添加下面 3 个配置。
然后保存并退出。
接着执行 sudo service mysqld restart 重启 MySQL。
需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate 操作的权利。
如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。
MySQL 启动后,就可以开启 canal 服务了。
开启后,观察 canal 服务的日志,确保服务正常。
查看 canal 的日志
确定没有问题后,开始编写我们的测试程序。
pom.xml 中导入下面的依赖。
使用JAVA进行测试
然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。上面是使用的Java代码进行运行,如果想用canal.adapter来进行运行可以下载
放入服务器中,依次执行下面命令
然后修改配置文件 :
然后将需要运行存储到es的的yml文件放入到
目录下。例如:
然后开启canal-adapter服务
/usr/local/soft/canal-adapter/bin/startup.sh
查看 canal-adapter 的日志,确定没有问题后修改数据 就可以同步到es了
注意:
1、canal-adapter自带mysql连接使用的5.x的,如果自己安装的是高版本的mysql需要自己去/usr/local/soft/canal-adapter/lib增加对应的jar包
2、因项目中同步es使用的sql中有数据库中没有的字段,导致原生程序一直报异常,后修改源码中
加了一个判断后才可以
3、es中使用的date字段类型和数据库中不一致,所以这里又修改了部分源码兼容我们项目中的类型
可以根据各自情况修改。
H. elasticmp实现es数据导入导出
elasticmp 提供了多种导入导出数据的方式,可以 index <-> index 、 index <-> .json 文件,还支持将 index 查询结果导出到 .json 文件。执行的命令也很简单,只需指定数据来源 input 、数据输出 output 、数据类型 type 即可。
a.导出 index 数据到 .json 文件
b.导入 .json 文件中的数据到 es
在导入数据前可以先把测试用的index数据清空
I. logstash 怎么将数据导入ES
在配置文件
input {
file {
type => "json"
path => "/home/hadoop/xinwang_XW351464_2110"
}
}
output {
elasticsearch {
cluster => "es_master"
#manage_template => false
embedded => true
index => "huhu"
}
}
版本是logstash-1.3.2,es版本是1.1.1
错误提示
java -jar logstash-1.3.2-flatjar.jar agent -f ogstash-syslog.conf
Using milestone 2 input plugin 'file'. This plugin should be stable, but if you see strange behavior, please let us know! For more information on plugin milestones, see http://logstash.net/docs/1.3.2/plugin-milestones {:level=>:warn}
log4j, [2014-10-31T11:55:01.977] WARN: org.elasticsearch.discovery: [Jocasta] waited for 30s and no initial state was set by the discovery
Unable to check template. Automatic template management disabled. {:error=>"waited for [30s]",
Redhat 5.7 64bit / CentOS 5.x
JDK 1.6.0_45
logstash 1.3.2 (内带kibana)
elasticsearch 0.90.10
redis 2.8.4
对应下版本
J. Logstash导入csv到es
[TOC]
如导入的文件为 phone_area.csv ,文件格式如下:
定义配置文件 phone_area_imp.conf :
执行导入命令:
导入界面, 将一直停留在命令行,不退出 :
当csv文件发生变动时,仍然会同步新数据到es中 。