分享一款非常好用的kafka可视化web管理工具

使用过kafka的小伙伴应该都知道kafka本身是没有管理界面的,所有操作都需要手动执行命令来完成。但有些命令又多又长,如果没有做笔记,别说是新手,就连老手也不一定能记得住,每次想要使用的时候都要上网搜索一下。有些崇尚geek精神的人或许觉得命令行才是真爱,但使用一款好用的可视化管理工具真的可以极大的提升效率。 今天给大家介绍的这款工具叫做kafka-map,是我针对日常工作中高频使用的场景开发的,使用了这款工具之后就不必费心费力的去查资料某个命令要怎么写,就像是:“给编程插上翅膀,给kafka装上导航”。 kafka-map 介绍 kafka map是使用Java11和React开发的一款kafka可视化工具。 目前支持的功能有: 多集群管理 集群状态监控(分区数量、副本数量、存储大小、offset) 主题创建、删除、扩容(删除需配置delete.topic.enable = true) broker状态监控 消费者组查看、删除 重置offset 消息查询(支持String和json方式展示) 发送消息(支持向指定的topic和partition发送字符串消息) 功能截图 添加集群 集群管理 broker 主题管理 消费组 查看消费组已订阅主题 topic详情——分区 topic详情——broker topic详情——消费组 topic详情——消费组重置offset topic详情——配置信息 生产消息 消费消息 docker 方式安装 一行命令即可完成安装 docker run -d \ -p 8080:8080 \ -v /opt/kafka-map/data:/usr/local/kafka-map/data \ -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --name kafka-map \ --restart always dushixiang/kafka-map:latest 更多安装方式以及相信信息可查看: https://github.com/dushixiang/kafka-map ...

六月 13, 2021 · 1 分钟 · dushixiang

openstack victoria版安装

近期公司业务需求,需要安装一套Openstack环境学习,看了一下现在已经出了wallaby版了,我果断选择了上一个版本victoria。因为没有足够多的物理服务器了,只好找了一台64核256G内存6T硬盘的机器来创建几台虚拟机来搭环境了。 实验环境 此次实验使用到了三台虚拟机,都是使用centos8系统,一台机器当作控制和网络节点,另外两台当作计算节点,使用OVS+VLAN的网络模式,eth0作为管理网络,eth1互相连接到OVS网桥上模拟trunk网卡,controller多增加一个eth2用于访问外部网络。 节点 作用 eth0 eth1 eth2 controller 控制节点、网络节点 172.16.10.100 无IP 桥接,无IP compute-101 计算节点 172.16.10.101 无IP ❌ compute-102 计算节点 172.16.10.102 无IP ❌ 安装虚拟机 安装依赖 安装KVM和Linux网桥 yum install -y qemu-kvm libvirt virt-install bridge-utils virt-manager dejavu-lgc-sans-fonts dejavu-lgc-sans-fonts用于解决 virt-manaer 乱码 启动 systemctl enable libvirtd && systemctl start libvirtd 安装OVS yum install openvswitch 启动OVS systemctl enable openvswitch && systemctl start openvswitch 创建虚拟机 使用 virt-manager 创建三台虚拟机 配置网络 配置管理网卡 给虚拟机配置桥接网络,参考Linux虚拟化技术KVM,效果如图 配置trunk网卡 使用ovs创建一个虚拟网桥。 ovs-vsctl add-br br-vlan 此时网桥br-vlan上是没有任何虚拟网卡的,然后关闭虚拟机,在virt-manager上添加一个网络设备 ...

六月 12, 2021 · 10 分钟 · dushixiang

KVM 虚拟机磁盘扩容

一、镜像扩容 注意:需要先关闭虚拟机才能操作,+号前面有空格,后面没有空格。 qemu-img resize test.qcow2 +80G 原镜像磁盘大小20GB,扩容完成后可使用以下命令查看 qemu-img info test.qcow2 输出 image: test.qcow2 file format: qcow2 virtual size: 100G (107374182400 bytes) disk size: 885M cluster_size: 65536 Format specific information: compat: 1.1 lazy refcounts: false refcount bits: 16 corrupt: false 二、Windows磁盘扩容 Windows磁盘扩容比较方便,进入 计算机管理>磁盘管理 找到新增的分区把它添加到需要的分区即可。 三、Linux磁盘扩容 启动虚拟机后,进入虚拟机控制台,使用fdisk -l命令查看磁盘信息。 Disk /dev/vda: 100 GiB, 107374182400 bytes, 209715200 sectors Units: sectors of 1 * 512 = 512 bytes Sector size (logical/physical): 512 bytes / 512 bytes I/O size (minimum/optimal): 512 bytes / 512 bytes Disklabel type: dos Disk identifier: 0xe11f7f01 Device Boot Start End Sectors Size Id Type /dev/vda1 * 2048 2099199 2097152 1G 83 Linux /dev/vda2 2099200 41943039 39843840 19G 8e Linux LVM Disk /dev/mapper/cl-root: 17 GiB, 18249416704 bytes, 35643392 sectors Units: sectors of 1 * 512 = 512 bytes Sector size (logical/physical): 512 bytes / 512 bytes I/O size (minimum/optimal): 512 bytes / 512 bytes Disk /dev/mapper/cl-swap: 2 GiB, 2147483648 bytes, 4194304 sectors Units: sectors of 1 * 512 = 512 bytes Sector size (logical/physical): 512 bytes / 512 bytes I/O size (minimum/optimal): 512 bytes / 512 bytes 可以看到这台虚拟机的磁盘大小已经有100GB了,但分区大小还是没有变化,只有初始大小20GB。 ...

五月 31, 2021 · 4 分钟 · dushixiang

Linux虚拟化技术KVM

在Windows平台上我们习惯于使用VmWare或者virtual box来实现虚拟化,虽然它们拥有Linux版本,但大多数企业都选择了使用KVM来做Linux平台的虚拟化,因此学习掌握KVM是一项必不可少的技能。 安装KVM 以centos为例,下面是安装KVM虚拟化的命令。 yum install -y qemu-kvm libvirt virt-install bridge-utils 这么多软件都是什么作用? 软件 作用 qemu-kvm 整合了QEMU 和 KVM 的一个软件。 libvirt 封装了QEMU的接口,可以更加方便的操作虚拟机,并且提供了很多种编程语言的SDK。 virt-install 用来创建虚拟机的命令行工具。 bridge-utils Linux网桥,用来配置虚拟机的桥接网络。 kvm、qemu、qemu-kvm和libvirt到底有什么关系? KVM(Kernel Virtual Machine)是Linux的一个内核驱动模块,它需要CPU的支持,采用硬件辅助虚拟化技术Intel-VT、AMD-V;内存相关如Intel的EPT和AMD的RVI技术,使得它能够让Linux主机成为一个Hypervisor(虚拟机监控器)。 QEMU是一个纯软件实现的虚拟机,它可以模拟CPU、内存、磁盘等其他硬件,让虚拟机认为自己底层就是硬件,其实这些都是QEMU模拟的,虚拟机的所有操作都要经过QEMU转译一层,也就导致了QEMU本身的性能较差。 qemu-kvm是QEMU整合了KVM,把CPU虚拟化和内存虚拟化交给了KVM来做,自己来模拟IO设备,例如网卡和磁盘。这一套组合拳打下来,性能损失大大降低,相较于直接使用硬件,带来的损耗大概在1%-2%之间。 libvirt是目前使用最为广泛的对KVM虚拟机进行管理的工具和API。Libvirtd是一个daemon进程,可以被本地的virsh调用,也可以被远程的virsh调用,Libvirtd调用qemu-kvm操作虚拟机。 启动libvirt systemctl start libvirtd systemctl enable libvirtd 如果你不想使用命令行工具来管理虚拟机,可以安装 virt-manager 。 yum install -y virt-manager 在支持x11转发的ssh客户端(例如:MobaXterm)上可以直接输入 virt-manager 来启动。 虚拟网络类型 和vmware类型,kvm也支持多种类型的网络,主要分为三种。 NAT模式 虚拟机需要把流量发送到宿主机,宿主机器转换网络信息后再发出,外部机器无法感知到虚拟机的存在。此种方式宿主机器相当于一个路由器,因此宿主机上会有一个和虚拟机同网段的IP,并且虚拟机的网关地址是宿主机的这个IP。 主机模式 虚拟机只能互相访问,不能访问宿主机。此种方式与NAT模式类似,但它没有与虚拟机同网段的IP,因此虚拟机也不能借助于宿主机来访问外部网络。 桥接模式 虚拟机和宿主机都关联在一个网桥上,因此虚拟机可以与宿主机在同一个网段,并且外部机器可以直接访问到虚拟机,虚拟机也可以借助网桥来访问外部网络。 还有一种模式在openstack等云平台上使用较为广泛,网桥上绑定的物理网卡没有IP,对应交换机配置端口为trunk模式,虚拟机 端口连接到网桥上,并配置端口不同的VLAN tag以达到隔离和互联的目的。 NAT模式和主机模式都无需单独配置,接下来我们看下如何配置桥接网络。 配置桥接网络 物理网卡绑定到网桥上之后就会导致网络断开,因此我们需要把原IP配置到网桥上。 # 进入网卡配置文件夹 cd /etc/sysconfig/network-scripts/ # 拷贝原网卡配置文件作为桥接网卡 cp ifcfg-enp134s0f0 ifcfg-br0 修改 ifcfg-br0 中的 TYPE=Ethernet 为 TYPE=Bridge,最终效果如下: ...

五月 29, 2021 · 2 分钟 · dushixiang

容器网络——如何为docker添加网卡?

之前我们介绍Network Namespace(以下简称netns)和veth pair时说过docker是使用这些技术来实现的网络隔离,今天我们就来一探究竟,看下docker到底是如何做到的。 启动一个无网络的容器 首先我们使用 --net=none 参数启动一个无网络的容器,为了方便调试,这里我们使用了centos镜像。 docker run -itd --name centos-test --net=none centos 启动成功之后我们进入容器内部确认一下是否无网卡。 [root@localhost ~]# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 28dc2e8853df centos "/bin/bash" 24 seconds ago Up 23 seconds centos-test [root@localhost ~]# docker exec -it 28dc2e8853df bash [root@28dc2e8853df /]# ip a 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000 link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 inet 127.0.0.1/8 scope host lo valid_lft forever preferred_lft forever 可以看到确实只有一个本地环回网卡。 ...

五月 23, 2021 · 2 分钟 · dushixiang

使用libvirt-java采集KVM虚拟机状态信息

虚拟化开发相较于普通开发是一个冷门的方向,大多数是使用Python开发,其中使用Java来做虚拟化的少之又少,资料更是少的可怜,为了实现需求我也是踩了不少坑,今天就为大家分享一下如何使用 libvirt-java 来采集KVM虚拟机的资源使用信息。 CPU使用率 libvirt并没有直接提供获取虚拟机CPU使用率的接口,需要我们自己来计算,网上分享的代码或者公式五花八门,大部分都是错误的,经过我的测试,找到了一个相对准确的计算公式。 cpu_usage = (cpu_time_now - cpu_time_t_second_ago) * 100 / (t * vCpus * 10^9) Java代码如下 // t秒前的CPU时间 long c1 = domain.getInfo().cpuTime; Thread.sleep(1000); // 当前CPU时间 long c2 = domain.getInfo().cpuTime; // 虚拟CPU数量 int vCpus = domain.getMaxVcpus(); // t 为1秒 Double cpuUsage = 100 * (c2 - c1) / (1 * vCpus * Math.pow(10, 9)); log.debug("虚拟机[{}]CPU使用率为: {}", uuid, cpuUsage); 内存使用率 不要使用domain.getInfo()返回的 memory字段,虽然它注释写的是the memory in KBytes used by the domain,但它的意思真的不是虚拟机内部进程已使用的内存大小,而是从宿主机器的角度来看分配给这个虚拟机的内存它使用了多少,如果没有特殊配置,它会和maxMem字段的值是相同的。 正确做法是使用domain.memoryStats(10)来获取,那为什么参数要输入一个10呢?这是因为10代表的是要返回的信息数量,经过我手动执行virsh dommemstat uuid 测试发现有10个参数返回,所以需要填入10。另外命令返回的unused 字段值与数组中tag=8的数据一致,最终我们获取到了未使用的内存大小,计算内存使用率更是轻轻松松。 Java代码如下 MemoryStatistic[] memoryStatistics = domain.memoryStats(10); Optional<MemoryStatistic> first = Arrays.stream(memoryStatistics).filter(x -> x.getTag() == 8).findFirst(); if (first.isPresent()) { MemoryStatistic memoryStatistic = first.get(); long unusedMemory = memoryStatistic.getValue(); long maxMemory = domain.getMaxMemory(); double memoryUsage = (maxMemory - unusedMemory) * 100.0 / maxMemory; log.debug("虚拟机[{}]内存使用率为: {}", uuid, memoryUsage); } 网卡数据包信息 同样libvirt并没有提供获取虚拟机网卡的接口,因此需要获取虚拟机的xml文件来查询。 ...

五月 19, 2021 · 2 分钟 · dushixiang

基于kafka实现延迟队列

基于kafka实现延迟队列 kafka作为一个使用广泛的消息队列,很多人都不会陌生,但当你在网上搜索“kafka 延迟队列”,出现的都是一些讲解时间轮或者只是提供了一些思路,并没有一份真实可用的代码实现,今天我们就来打破这个现象,提供一份可运行的代码,抛砖引玉,吸引更多的大神来分享。 基于kafka如何实现延迟队列? 想要解决一个问题,我们需要先分解问题。kafka作为一个高性能的消息队列,只要消费能力足够,发出的消息都是会立刻收到的,因此我们需要想一个办法,让消息延迟发送出去。 网上已经有大神给出了如下方案: 在发送延迟消息时不直接发送到目标topic,而是发送到一个用于处理延迟消息的topic,例如delay-minutes-1 写一段代码拉取delay-minutes-1中的消息,将满足条件的消息发送到真正的目标主题里。 就像画一匹马一样简单。 方案是好的,但是我们还需要更多细节。 完善细节 问题出在哪里? 问题出在延迟消息发出去之后,代码程序就会立刻收到延迟消息,要如何处理才能让延迟消息等待一段时间才发送到真正的topic里面。 可能有同学会觉得很简单嘛,在代码程序收到消息之后判断条件不满足,就调用sleep方法,过了一段时间我再进行下一个循环拉取消息。 真的可行吗? 一切好像都很美好,但这是不可行的。 这是因为在轮询kafka拉取消息的时候,它会返回由max.poll.records配置指定的一批消息,但是当程序代码不能在max.poll.interval.ms配置的期望时间内处理这些消息的话,kafka就会认为这个消费者已经挂了,会进行rebalance,同时你这个消费者就无法再拉取到任何消息了。 举个例子:当你需要一个24小时的延迟消息队列,在代码里面写下了Thread.sleep(1000*60*60*24);,为了不发生rebalance,你把max.poll.interval.ms 也改成了1000*60*60*24,这个时候你或许会感觉到一丝丝的怪异,我是谁?我在哪?我为什么要写出来这样的代码? 其实我们可以更优雅的处理这个问题。 KafkaConsumer 提供了暂停和恢复的API函数,调用消费者的暂停方法后就无法再拉取到新的消息,同时长时间不消费kafka也不会认为这个消费者已经挂掉了。另外为了能够更加优雅,我们会启动一个定时器来替换sleep。,完整流程如下图,当消费者发现消息不满足条件时,我们就暂停消费者,并把偏移量seek到上一次消费的位置以便等待下一个周期再次消费这条消息。 Java代码实现 import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; @SpringBootTest public class DelayQueueTest { private KafkaConsumer<String, String> consumer; private KafkaProducer<String, String> producer; private volatile Boolean exit = false; private final Object lock = new Object(); private final String servers = ""; @BeforeEach void initConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "d"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000"); consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); } @BeforeEach void initProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<>(props); } @Test void testDelayQueue() throws JsonProcessingException, InterruptedException { String topic = "delay-minutes-1"; List<String> topics = Collections.singletonList(topic); consumer.subscribe(topics); Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { synchronized (lock) { consumer.resume(consumer.paused()); lock.notify(); } } }, 0, 1000); do { synchronized (lock) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200)); if (consumerRecords.isEmpty()) { lock.wait(); continue; } boolean timed = false; for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { long timestamp = consumerRecord.timestamp(); TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); if (timestamp + 60 * 1000 < System.currentTimeMillis()) { String value = consumerRecord.value(); ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(value); JsonNode jsonNodeTopic = jsonNode.get("topic"); String appTopic = null, appKey = null, appValue = null; if (jsonNodeTopic != null) { appTopic = jsonNodeTopic.asText(); } if (appTopic == null) { continue; } JsonNode jsonNodeKey = jsonNode.get("key"); if (jsonNodeKey != null) { appKey = jsonNode.asText(); } JsonNode jsonNodeValue = jsonNode.get("value"); if (jsonNodeValue != null) { appValue = jsonNodeValue.asText(); } // send to application topic ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue); try { producer.send(producerRecord).get(); // success. commit message OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1); HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>(); metadataHashMap.put(topicPartition, offsetAndMetadata); consumer.commitSync(metadataHashMap); } catch (ExecutionException e) { consumer.pause(Collections.singletonList(topicPartition)); consumer.seek(topicPartition, consumerRecord.offset()); timed = true; break; } } else { consumer.pause(Collections.singletonList(topicPartition)); consumer.seek(topicPartition, consumerRecord.offset()); timed = true; break; } } if (timed) { lock.wait(); } } } while (!exit); } } 这段程序是基于SpringBoot 2.4.4版本和 kafka-client 2.7.0版本编写的一个单元测试,需要修改私有变量servers为kafka broker的地址。 ...

四月 18, 2021 · 2 分钟 · dushixiang

Java的奇技淫巧

Java是一种广泛使用的计算机编程语言、面向对象、泛型编程的特性,广泛应用于企业级Web应用开发和移动应用开发。 1995年3月23日Sun公司发布了Java,至今已有近26年,可以说是一门十分成熟的开发语言了,但在某些不为人知的地方存在着一些意料之外的特性。 Java的保留关键字 goto和const 在Java里面没有goto这个功能,但它作为保留字是无法当做变量来使用的,const也是同样。 int goto = 0; int const = 0; 上面这两行代码的写法存在问题,无法正常编译通过。 Java标签Label 上面说了在Java里面没有goto这个功能,但为了处理多重循环引入了Label,目的是为了在多重循环中方便的使用 break 和coutinue ,但好像在其他地方也可以用。 outerLoop: while (true) { System.out.println("I'm the outer loop"); int i = 0; while (true) { System.out.println("I am the inner loop"); i++; if (i >= 3) { break outerLoop; } } } System.out.println("Complete the loop"); // 输出 I'm the outer loop I am the inner loop I am the inner loop I am the inner loop Complete the loop test: { System.out.println("hello"); if (true) { break test; // works } System.out.println("world"); } // 输出 hello test: if (true) { System.out.println("hello"); if (true) { break test; // works } System.out.println("world"); } // 输出 hello test: try { System.out.println("hello"); if (true) { break test; // works } System.out.println("world"); } finally { } // 输出 hello Integer的是否相等问题 日常开发使用到Java基本数据类型是不可避免的一件事,但它却包含了一些很容易犯错的点,踩过一些坑的同学可能了解Java基本包装类型的常量池技术,例如Integer就具有数值[-128,127] 的相应类型的缓存数据,但下面定义的4个变量是否相等你是否能说的出来呢? ...

三月 13, 2021 · 2 分钟 · dushixiang

Linux 环回网络接口

在开发或者调试时,我们经常需要和本地的服务器进行通信,例如启动nginx之后,在浏览器输入lcoalhost或者127.0.0.1就可以访问到本机上面的http服务。 Linux是如何访问本机IP的? 大多数操作系统都在网络层实现了环回能力,通常是使用一个虚拟的环回网络接口来实现。这个虚拟的环回网络接口看着像是一个真实的网卡,实际上是操作系统用软件模拟的,它可以通过TCP/IP与同一台主机上的其他服务进行通信,以127开头的IPv4地址就是为它保留的,主流Linux操作系统为环回网卡分配的地址都是127.0.0.1,主机名是localhost。 环回网络接口之所以被称之为环回网络接口,是因为从本机发送到本机任意一个IP的数据报文都会在网络层交给环回网络接口,不再下发到数据链路层进行处理,环回网络接口直接发送回网络层,最终交由应用层软件程序进行处理。这种方式对于性能测试非常有用,因为省去了硬件的开销,可以直接测试协议栈软件所需要的时间。 那环回网络接口是如何判断目的IP是否为本机地址的呢? 答案就是网络层在进行路由转发的时候会先查本地的路由表,发现是本机IP后交给环回网络接口。查看本地路由表的命令如下: ip route show table local 输出内容如下: broadcast 10.141.128.0 dev eth0 proto kernel scope link src 10.141.155.131 local 10.141.155.131 dev eth0 proto kernel scope host src 10.141.155.131 broadcast 10.141.191.255 dev eth0 proto kernel scope link src 10.141.155.131 broadcast 127.0.0.0 dev lo proto kernel scope link src 127.0.0.1 local 127.0.0.0/8 dev lo proto kernel scope host src 127.0.0.1 local 127.0.0.1 dev lo proto kernel scope host src 127.0.0.1 其中local开头的便是本地IP,dev后面是网卡名称。 ...

一月 28, 2021 · 1 分钟 · dushixiang

Linux 修改最大文件描述符

echo "fs.file-max=655350" >>/etc/sysctl.conf echo "* soft nofile 655350" >> /etc/security/limits.conf echo "* hard nofile 655350" >> /etc/security/limits.conf ulimit -n 655350

一月 11, 2021 · 1 分钟 · dushixiang