ES集群节点扩建

1. ES服务扩建

  系统准备

# 添加es用户组用户,更改用户
groupadd elk
useradd elk -g elk -p elk
#更改es文件目录权限
chown -R elk:elk elk
#添加sudo权限
%elk  ALL=(ALL)        ALL

#    添加host记录
sudo cat >> /etc/hosts <<-'EOF'
1.1.1.1  xxx
EOF

#/etc/sysctl.conf添加如下内容
fs.file-max=655360
vm.max_map_count=655360
#执行
sysctl -p
#修改vim /etc/security/limits.conf
  * soft nofile 65536
  * hard nofile 65536
  * soft nproc 65536
  * hard nproc 65536
  * soft memlock unlimited
  * hard memlock unlimited
#开启防火墙
firewall-cmd --zone=public --add-port=9200/tcp --permanent
firewall-cmd --zone=public --add-port=9300/tcp --permanent

ES集群搭建

cluster.name: autopat
#节点名称 (不同节点修改不一样的name)
node.name: node-150
#是不是有资格竞选主节点
node.master: true
#是否存储数据
node.data: true
#最大集群节点数
#node.max_local_storage_nodes: 3
#数据存储路径
path.data: /home/elk/storage/data
path.logs: /home/elk/storage/logs
#节点所绑定的IP地址,并且该节点会被通知到集群中的其他节点
#通过指定相同网段的其他节点会加入该集群中 0.0.0.0任意IP都可以访问elasticsearch
network.host: 0.0.0.0
#对外提供服务的http端口,默认为9200
http.port: 9200
#内部节点之间沟通端口
transport.tcp.port: 9300
#es7.x 之后新增的配置,写入候选主节点的设备地址,在开启服务后可以被选为主节点
discovery.seed_hosts: [ ]
#
##es7.x 之后新增的配置,初始化一个新的集群时需要此配置来选举master
cluster.initial_master_nodes: [  ]
#
gateway.recover_after_nodes: 3
#es7.x 之后新增的配置,初始化一个新的集群时需要此配置来选举master
#ES默认开启了内存地址锁定,为了避免内存交换提高性能。但是Centos6不支持SecComp功能,启动会报错,所以需要将其设置为false
bootstrap.memory_lock: true
bootstrap.system_call_filter: false
# 是否支持跨域
http.cors.enabled: true
# *表示支持所有域名
http.cors.allow-origin: "*"

2. 基于系统的数据迁移

2.1  基于clone API

               使用克隆索引 API 将现有索引克隆到新索引中,其中每个 原始主分片被克隆到新索引中的新主分片中。

                使用场景: 如果是在新的机器上 搭建相同规模的集群分支,建议使用clone API进行索引复制

               使用步骤:

1. 关闭原来index的写入

PUT  / source_Index/ _settings 
{ 
  "settings" :  { 
    "index.blocks.write" :  true 
  } 
} 

2.   调用clone API

curl -X POST "localhost:9200/my_source_index/_clone/my_target_index?pretty" 
-H 'Content-Type: application/json' -d'
{
  "settings": {
    "index.number_of_shards": 5 
  },
  "aliases": {
    "my_search_indices": {}
  }
}
'

2.2 基于split-index API

               将现有索引拆分为具有更多主分片的新索引

          使用步骤 : 1. 关闭原来index的写入

                             2. 调用split API  注意:目标索引的分配数量,必须是原索引的整数倍,且不超过1024

例如,一个 5 分片索引 number_of_routing_shards设置 30 ( 5 x 2 x 3) 可以被一个分割 的因素 2或者 3.

它可以拆分如下:

  • 51030(除以 2,然后除以 3)
  • 51530(除以 3,然后除以 2)
  • 530(除以 6)
    curl -X POST "localhost:9200/my_source_index/_split/my_target_index?pretty" -H 'Content-Type: application/json' -d'
    {
      "settings": {
        "index.number_of_shards": 2
      }
    }

       2.3 基于reindex api

     当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;或者说需要跨集群复制index数据,则可以使用reindex的api

        使用步骤:1.创建新的索引,设置好分片

                         2.调用api

POST _reindex
{
  "source": {
    "index": "old_index"
  },
  "dest": {
    "index": "new_index",
    "version_type": "internal"
  }
}

      调优问题: 

           1. 目标索引调优 调整目标索引的写入效率,例如  减少refresh时间间隔,减少index副本

           2. scroll 查询调优 将scroll查询分片,分成多个并行执行的任务,分片数量设置等于数据分片数

            3. 基于主机配置调整  bulk数量和 并行数,直到达到集群写入瓶颈

3. 程序迁移功能

     提供了 基于transport Client的 reindex的迁移代码,功能与reindex类似,只是加入了 对新索引的数据存在校验,

适合在执行完官方的reindexAPI以后,再使用程序执行,确认数据是否丢失的情况

  

public class Reindex {

    private static final boolean DEBUG = false;
    private static final Gson gson = new Gson();

    private final static String LINE = "---------------------------------";

    //ARGS NAMES
    //source
    private final static String ARGS_SRC_HOST = "--shost";
    private final static String ARGS_SRC_PORT = "--sport";
    private final static String ARGS_SRC_CLUSTER_NAME = "--sclsname";
    private final static String ARGS_SRC_INDEX = "--sidx";
    //destination
    private final static String ARGS_DST_HOST = "--dhost";
    private final static String ARGS_DST_PORT = "--dport";
    private final static String ARGS_DST_CLUSTER_NAME = "--dclsname";
    private final static String ARGS_DST_INDEX = "--didx";
    private final static String ARGS_USE_SAME_CLS = "--usesamecls";
    //config
    private final static String ARGS_CFG_SCROLL_SIZE = "--scrollsize";
    private final static String ARGS_CFG_BULK_SIZE = "--bulksize";
    private final static String ARGS_CFG_BULK_CONC_REQ = "--bulkconcreq";
    private final static String ARGS_CFG_SNIFF = "--sniff";
    private final static String ARGS_CFG_BULK_QUEUE_WARN_TH = "--bulkqwth";
    //auth
    private final static String ARGS_AUTH_SUSER = "--suser";
    private final static String ARGS_AUTH_SPASS = "--spass";
    private final static String ARGS_AUTH_DUSER = "--duser";
    private final static String ARGS_AUTH_DPASS = "--dpass";


    //DEFAULT PARAMS
    private static int srcPort = 9300;
    private static String srcHost = "localhost";
    private static String srcClusterName = "elasticsearch";
    private static String srcIndex = "";

    private static int dstPort = 9300;
    private static String dstHost = "localhost";
    private static String dstClusterName = "elasticsearch";
    private static String dstIndex = "";

    private static String srcClsUser = "";
    private static String srcClsPass = "";
    private static String dstClsUser = "";
    private static String dstClsPass = "";

    private static int cfgScrollSize = 200;
    private static int cfgBulkSize = 200;
    private static int cfgBulkConcReq = 5;
    private static boolean cfgSniff = true;


    //RUNTIME VALUES
    private static Client srcClient = null;
    private static Client dstClient = null;
    private static boolean useSameCls = false;
    private static long srcIdxDocCount = -1;
    private static final AtomicLong dstIdxDocCount = new AtomicLong(0);

    private static int bulkQWarnThreshold = 10;
    private static final long sleepOnBulkRejected = 10000;
    private static final ScheduledExecutorService scheduledExecutorService =
            Executors.newScheduledThreadPool(1);
    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(12, 24,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());

    private static final AtomicInteger threadSize = new AtomicInteger(0);


    public static void main(String[] args) throws UnknownHostException, InterruptedException {

        parseArgs(args);
        printParams();
        srcClient = EsOperation.initClient(srcHost, srcPort, srcClusterName, srcClsUser, srcClsPass, false);
        if (useSameCls) {
            dstHost = srcHost;
            dstPort = srcPort;
            dstClusterName = srcClusterName;
            dstClsUser = srcClsUser;
            dstClsPass = srcClsPass;
        }
        dstClient = EsOperation.initClient(dstHost, dstPort, dstClusterName, dstClsUser, dstClsPass, cfgSniff);

        srcIdxDocCount = countIndexDocs(srcClient, srcIndex);
        System.out.println("Reindexing " + srcIdxDocCount + " documents from " +
                srcClusterName + "@" + srcHost + ":" + srcPort + "//" + srcClusterName + "/" + srcIndex + " to " +
                dstClusterName + "@" + dstHost + ":" + dstPort + "//" + dstClusterName + "/" + dstIndex);

        scheduledExecutorService.scheduleAtFixedRate(new Thread() {
            private final ThreadLocal<Long> dstIdxDocCountLast = ThreadLocal.withInitial(() -> 1L);
            private final ThreadLocal<Long> dstIdxDocCountTimeLast = ThreadLocal.withInitial(() -> new Date().getTime());


            @Override
            public void run() {
                try {
                    long currentDocCount = dstIdxDocCount.get();
                    long currentTimeMillis = System.currentTimeMillis();
                    String percentageDoneString = new BigDecimal(currentDocCount)
                            .divide(new BigDecimal(srcIdxDocCount), 4, BigDecimal.ROUND_DOWN)
                            .multiply(new BigDecimal(100)).toString();
                    if (srcIdxDocCount == currentDocCount) {
                        percentageDoneString = "100";
                    }
                    System.out.println("Indexed " + currentDocCount + "/" + srcIdxDocCount + " [ " + percentageDoneString + "% ] ,Speed ["
                            + dive(currentDocCount, dstIdxDocCountLast.get(), currentTimeMillis, dstIdxDocCountTimeLast.get()) + " doc/s ]  from " + srcHost + ":" + srcPort + "/" + srcIndex + " " + " to "
                            + dstHost
                            + ":" + dstPort
                            + "/" + dstIndex + " ");
                    this.dstIdxDocCountLast.set(currentDocCount);
                    this.dstIdxDocCountTimeLast.set(currentTimeMillis);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 10, 30, TimeUnit.SECONDS);

        BulkProcessor bulkProcessor = BulkProcessor.builder(dstClient,
                createLoggingBulkProcessorListener())
                .setBulkActions(cfgBulkSize)
                .setConcurrentRequests(cfgBulkConcReq)
                .setFlushInterval(TimeValue.timeValueSeconds(1000))
                .build();

        System.out.println("Staring reindex at  " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
        reIndex(srcClient, dstClient, bulkProcessor, matchAllQuery(), srcIndex, dstIndex);
        while (threadSize.get() >= 1) {
            System.out.println("Wait for all thread submit...sleeping 10 seconds");
            Thread.sleep(sleepOnBulkRejected);
        }
        threadPool.shutdown();
        threadPool.awaitTermination(1, TimeUnit.HOURS);//关闭线程


        bulkProcessor.flush();
        bulkProcessor.awaitClose(1, TimeUnit.HOURS);//等待全部提交

        closeClient(srcClient);
        closeClient(dstClient);
        scheduledExecutorService.shutdownNow();
        System.out.println("Ending reindex at  " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
        System.exit(0);

    }


    public static void closeClient(Client client) {
        if (client != null) {
            client.close();
        }
    }

    private static QueryBuilder matchAllQuery() {
        return QueryBuilders.matchAllQuery();
    }


    public static void reIndex(Client srcClient,
                               Client dstClient, BulkProcessor bulkProcessor, QueryBuilder qb, String srcIndex, String dstIndex) {
        try {
            SearchResponse scrollResp = srcClient.prepareSearch(srcIndex)
                    .setScroll(new TimeValue(120000))
                    .setQuery(qb)
                    .setFetchSource(false)
                    .setFetchSource(new String[]{"_id"}, null)
                    .setSize(cfgScrollSize).execute().actionGet();

            scrollResp = srcClient.prepareSearchScroll(scrollResp.getScrollId())
                    .setScroll(new TimeValue(120000)).execute().actionGet();
            int b = 1;
            while (true) {
                if (scrollResp.getHits().getHits().length == 0) {
                    System.out.println("Finished scrollResp job.....");
                    break;
                }
                List<String> hisList = new LinkedList<>();
                for (SearchHit hit : scrollResp.getHits()) {
                    hisList.add(hit.getId());
                }
                threadSize.incrementAndGet();
                threadPool.submit(new ReTask(hisList, srcClient, dstClient, dstIndex, srcIndex, bulkProcessor, dstIdxDocCount, threadSize));
                if (threadSize.get() >= 30) {
                    long sleep = sleepOnBulkRejected * b * b;
                    System.out.println("WARN: too mange ReTask...sleeping [" + sleep / 1000 + "] seconds");
                    Thread.sleep(sleep);//等待其他线程提交
                    b++;
                } else {
                    b = 1;
                }
                scrollResp = srcClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(120000)).execute().actionGet();
                if (Math.random() <= 0.1) {
                    ActionFuture<NodesStatsResponse> nodesStatsResponse = getStats(dstClient);
                    double avgBulkQueueSize = getBulkQueueAvg(nodesStatsResponse);
                    boolean bulkRejects = isBulkRejecting(nodesStatsResponse);
                    int cfgBulkSizeDecrease;
                    if (bulkRejects) {
                        System.out.println("WARN: bulk thread rejected...sleeping 10 seconds");
                        Thread.sleep(sleepOnBulkRejected);
                        System.out.println("WARN: resuming operations");
                    }
                    if (avgBulkQueueSize > bulkQWarnThreshold) {
                        System.out.println("WARN: detected non-null average bulk queue size " + avgBulkQueueSize);
                        cfgBulkSizeDecrease = cfgBulkSize / 5;
                        cfgBulkSize = cfgBulkSize - cfgBulkSizeDecrease;
                        System.out.println("WARN: decreasing bulk-size to " + cfgBulkSize);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static ActionFuture<NodesStatsResponse> getStats(Client client) {
        NodesStatsRequest nodesStatsRequest = new NodesStatsRequest()
                .addMetric(Metric.THREAD_POOL.metricName());
        return client.admin().cluster().nodesStats(nodesStatsRequest);
    }

    private static double getBulkQueueAvg(ActionFuture<NodesStatsResponse> nodesStatsResponse) {
        //returns average bulk queue size across nodes;
        double result = 0;
        int howManyNodes = 1;

        String rawJSON = nodesStatsResponse.actionGet().toString();
        try {
            JsonObject rootObject = gson.fromJson(rawJSON, JsonObject.class);
            JsonObject nodes = rootObject.getAsJsonObject("nodes");
            Set<?> keys = nodes.keySet();
            Iterator<?> iterator = keys.iterator();
            while (iterator.hasNext()) {
                // Iterate on nodes
                // Extract nodesID
                String nodeID = iterator.next().toString();
                // Count nodes
                howManyNodes++;
                // Get Bulk Queue value
                JsonObject node = nodes.getAsJsonObject(nodeID);
                JsonObject threadpool = node.getAsJsonObject("thread_pool");
                JsonObject bulk = threadpool.getAsJsonObject("bulk");
                if (bulk != null && bulk.has("queue")) {
                    result += bulk.get("queue").getAsLong();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result / howManyNodes;
    }

    private static boolean isBulkRejecting(ActionFuture<NodesStatsResponse> nodesStatsResponse) {
        //returns true if bulk thread rejections happening;
        boolean result = false;
        String rawJSON = nodesStatsResponse.actionGet().toString();
        try {
            JsonObject rootObject = gson.fromJson(rawJSON, JsonObject.class);
            JsonObject nodes = rootObject.getAsJsonObject("nodes");
            Set<?> keys = nodes.keySet();
            Iterator<?> iterator = keys.iterator();
            while (iterator.hasNext()) {
                // Iterate on nodes
                // Extract nodesID
                String nodeID = iterator.next().toString();
                JsonObject node = nodes.getAsJsonObject(nodeID);
                JsonObject threadpool = node.getAsJsonObject("thread_pool");
                JsonObject bulk = threadpool.getAsJsonObject("bulk");
                if (bulk != null && bulk.has("rejected")) {
                    long rejected = bulk.get("rejected").getAsLong();
                    if (rejected > 0) {
                        return true;
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    private static long countIndexDocs(Client client, String idxName) {
        IndicesStatsResponse searchResponse = client.admin().indices()
                .prepareStats(idxName)
                .all()
                .execute().actionGet();
        return searchResponse.getPrimaries().docs.getCount();
    }


    private static BulkProcessor.Listener createLoggingBulkProcessorListener() {
        return new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                if (DEBUG) {
                    System.out.println("DEBUG: Going to execute new bulk composed of {} actions: " + request.numberOfActions());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (DEBUG) {
                    System.out.println("DEBUG: Executed bulk composed of {} actions: " + request.numberOfActions());
                }
                dstIdxDocCount.addAndGet(request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                System.out.println("ERROR: Error executing bulk: " + failure);
                failure.printStackTrace();
            }
        };
    }


    public static String dive(long p1, long p2, long p3, long p4) {
        BigDecimal t1 = BigDecimal.valueOf(p1).subtract(BigDecimal.valueOf(p2));
        BigDecimal t2 = BigDecimal.valueOf(p3).subtract(BigDecimal.valueOf(p4)).divide(new BigDecimal(1000), 2, BigDecimal.ROUND_HALF_UP);
        if (t2.compareTo(BigDecimal.ZERO) == 0) {
            return "0";
        }
        return t1.divide(t2, 4, BigDecimal.ROUND_HALF_UP).toString();
    }


    private static void printUsage() {
        System.out.println(LINE);
        System.out.println("Elasticsearch Java Reindex tool v0.5");
        System.out.println(LINE);
        System.out.println("Usage: java -jar es-java-cli.jar [OPTIONS]");
        System.out.println(LINE);
        System.out.println("Generic Options:");
        String options = ARGS_SRC_HOST + " <source_host> [default localhost]\n" +
                ARGS_SRC_PORT + " <source_port> [default 9300]\n" +
                ARGS_SRC_CLUSTER_NAME + " <source_cluster_name> [default elasticsearch]\n" +
                ARGS_SRC_INDEX + " <source_index_name>\n" +
                ARGS_DST_HOST + " <destination_host> [default localhost]\n" +
                ARGS_DST_PORT + " <destination_port> [default 9300]\n" +
                ARGS_DST_CLUSTER_NAME + " <destination_cluster_name> [default elasticsearch]\n" +
                ARGS_DST_INDEX + " <destination_index_name>\n" +
                ARGS_USE_SAME_CLS + " <use_src_cluster_params(host/port/clsname)_for_dst_cluster> [default false]";
        System.out.println(options);
        System.out.println(LINE);
        System.out.println("Scroll/Bulk Options:");
        System.out.println(
                ARGS_CFG_SCROLL_SIZE + " <scroll_size>  [default 200]\n" +
                        ARGS_CFG_BULK_SIZE + " <bulk_doc_size>  [default 1000]\n" +
                        ARGS_CFG_BULK_CONC_REQ + " <bulk_concurrent_requests> [default 5]\n" +
                        ARGS_CFG_SNIFF + " <use_sniffing> [default true]\n" +
                        ARGS_CFG_BULK_QUEUE_WARN_TH + " <when_bulk_threadpool_q_passes_this_threshold_decrease_bulk_size_by_20%> [default " + bulkQWarnThreshold + "]");
        System.out.println(LINE);
        System.out.println("Shield options:");
        options = ARGS_AUTH_SUSER + " <source_cluster_username>\n" +
                ARGS_AUTH_SPASS + " <source_cluster_password>\n" +
                ARGS_AUTH_DUSER + " <destination_cluster_username>\n" +
                ARGS_AUTH_DPASS + " <destination_cluster_password>\n";
        System.out.println(options);
        System.exit(1);
    }

    private static void parseArgs(String[] args) {
        if ((args.length == 0) || (args.length % 2 != 0)) {
            printUsage();
            if (DEBUG) {
                printParams();
            }
        }

        try {
            for (int i = 0; i < args.length; i += 2) {
                if (!args[i].startsWith("--")) {
                    throw new Exception("Invalid parameter " + args[i]);
                }
                switch (args[i]) {
                    case ARGS_SRC_HOST:
                        srcHost = args[i + 1];
                        break;
                    case ARGS_SRC_PORT:
                        srcPort = Integer.valueOf(args[i + 1]);
                        break;
                    case ARGS_SRC_INDEX:
                        srcIndex = args[i + 1];
                        break;
                    case ARGS_SRC_CLUSTER_NAME:
                        srcClusterName = args[i + 1];
                        break;
                    case ARGS_DST_HOST:
                        dstHost = args[i + 1];
                        break;
                    case ARGS_DST_PORT:
                        dstPort = Integer.valueOf(args[i + 1]);
                        break;
                    case ARGS_DST_INDEX:
                        dstIndex = args[i + 1];
                        break;
                    case ARGS_DST_CLUSTER_NAME:
                        dstClusterName = args[i + 1];
                        break;
                    case ARGS_CFG_SCROLL_SIZE:
                        cfgScrollSize = Integer.valueOf(args[i + 1]);
                        break;
                    case ARGS_CFG_BULK_CONC_REQ:
                        cfgBulkConcReq = Integer.valueOf(args[i + 1]);
                        break;
                    case ARGS_CFG_BULK_SIZE:
                        cfgBulkSize = Integer.valueOf(args[i + 1]);
                        break;
                    case ARGS_CFG_SNIFF:
                        cfgSniff = Boolean.valueOf(args[i + 1]);
                        break;
                    case ARGS_AUTH_SUSER:
                        srcClsUser = args[i + 1];
                        break;
                    case ARGS_AUTH_SPASS:
                        srcClsPass = args[i + 1];
                        break;
                    case ARGS_AUTH_DUSER:
                        dstClsUser = args[i + 1];
                        break;
                    case ARGS_AUTH_DPASS:
                        dstClsPass = args[i + 1];
                        break;
                    case ARGS_USE_SAME_CLS:
                        useSameCls = Boolean.valueOf(args[i + 1]);
                        break;
                    case ARGS_CFG_BULK_QUEUE_WARN_TH:
                        bulkQWarnThreshold = Integer.valueOf(args[i + 1]);
                        break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);

        }
    }

    public static void printParams() {
        System.out.println("#### SETTINGS ####");
        System.out.println("USESAMECLS -> " + useSameCls);
        System.out.println("#### SOURCE ####");
        System.out.println("SRC HOST -> " + srcHost);
        System.out.println("SRC PORT -> " + srcPort);
        System.out.println("SRC CNAME -> " + srcClusterName);
        System.out.println("SRC IDX -> " + srcIndex);
        System.out.println("#### DESTINATION ####");
        System.out.println("DST HOST -> " + dstHost);
        System.out.println("DST PORT -> " + dstPort);
        System.out.println("DST CNAME -> " + dstClusterName);
        System.out.println("DST IDX -> " + dstIndex);
        System.out.println("USE SAME CLS -> " + useSameCls);
        System.out.println("#### CONFIG ####");
        System.out.println("CFG SCROLL SIZE -> " + cfgScrollSize + " [default 200]");
        System.out.println("CFG BULK SIZE -> " + cfgBulkSize + " [default 1000]");
        System.out.println("CFG BULK CONCUR REQ -> " + cfgBulkConcReq + " [default 5]");
        System.out.println("CFG SNIFF -> " + cfgSniff + " [default true]");
        System.out.println("#### AUTH ####");
        System.out.println("SRC USERNAME -> " + srcClsUser);
        System.out.println("SRC PASSWORD -> " + srcClsPass);
        System.out.println("DST USERNAME -> " + dstClsUser);
        System.out.println("DST PASSWORD -> " + dstClsPass);
    }
}



public class ReTask extends Thread {

    private final List<String> searchHits;
    private final Client srcClient;
    private final Client dstClient;
    private final String dstIndex;
    private final String srcIndex;
    private final BulkProcessor bulkProcessor;
    private AtomicLong dstIdxDocCount;
    private AtomicInteger threadSize;


    public ReTask(List<String> searchHits,
                  Client srcClient, Client dstClient,
                  String dstIndex, String srcIndex, BulkProcessor bulkProcessor, AtomicLong dstIdxDocCount, AtomicInteger threadSize) {
        this.searchHits = searchHits;
        this.srcClient = srcClient;
        this.dstClient = dstClient;
        this.dstIndex = dstIndex;
        this.srcIndex = srcIndex;
        this.bulkProcessor = bulkProcessor;
        this.dstIdxDocCount = dstIdxDocCount;
        this.threadSize = threadSize;
    }

    public void run() {
        for (String hit : this.searchHits) {
            if (!EsOperation.isExist(this.dstClient, this.dstIndex, hit)) {
                this.bulkProcessor.add(Requests.indexRequest(this.dstIndex).id(hit).source(EsOperation.getDoc(this.srcClient, this.srcIndex, hit)));
            } else {
                this.dstIdxDocCount.incrementAndGet();
            }
        }
        if (Math.random() <= 0.05) {
            long currentDocCount = dstIdxDocCount.get();
            System.out.println(Thread.currentThread() + "keep indexing...[" + currentDocCount + "] ");
        }
        threadSize.decrementAndGet();
    }
}
public class EsOperation {


  public static Client initClient(String host, int port,
      String clusterName, String userName, String password, boolean sniff) throws UnknownHostException {
    Builder build = Settings.builder()
        .put("cluster.name", clusterName)
        .put("client.transport.ping_timeout", "60s")
        .put("client.transport.nodes_sampler_interval", "60s")
        .put("client.transport.sniff", sniff);
    if (!StringUtil.isNullOrEmpty(userName) && !StringUtil.isNullOrEmpty(password)) {
      build.put("shield.user", userName + ":" + password);
    }
    TransportClient transportClient = new PreBuiltTransportClient(build.build());
    transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
    return transportClient;
  }


  /**
   * @param : client
   * @description : 判断文档是否存在
   */
  public static boolean isExist(Client client, String index, String id) {
    return client.prepareGet()
        .setIndex(index)
        .setId(id)
        .setFetchSource(false).get().isExists();
  }

  /**
   * @param : client
   * @description : 判断文档是否存在
   */
  public static Map<String, Object> getDoc(Client client, String index, String id) {
    return client.get(new GetRequest(index, id)).actionGet().getSourceAsMap();
  }

}

关于作者

小小鼠标垫
哼嗬哈嘿
获得点赞
文章被阅读