Vertx-EventBus篇
- 开源代码
- 2025-08-12 18:06:01

简介
EventBus,又称消息总线,类似我们常见的消息中间件。支持点对点、请求与响应、发布订阅模式,支持跨服务跨语言通讯,分布式消息系统。
常见用法 点对点send消息发送到某个地址上,Vertx把消息分发到注册到这个地址上的某个消费者上,若存在多个消费者,使用轮询算法选择一个消费者接收消息。
Producer public class Producer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { vertx.setPeriodic(5000L, h -> { System.out.println("***********************"); vertx.eventBus().send("test", "你吃了吗"); }); super.start(startPromise); } } Consumer public class Consumer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { Random random = new Random(); int num = random.nextInt(10000); vertx.eventBus().<String>consumer("test", msg -> { System.out.println(String.format("消费者%s接收到消息:%s", num, msg.body())); }); super.start(startPromise); } } App public class App { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(Consumer.class.getName(), new DeploymentOptions().setInstances(3)) pose(res -> vertx.deployVerticle(Producer.class.getName())); } } AppRunLog *********************** 消费者7414接收到消息:你吃了吗 *********************** 消费者6421接收到消息:你吃了吗 *********************** 消费者5802接收到消息:你吃了吗 请求响应request请求与响应也是点对点模式的一种,区别在于消费者可以回复结果,两者可以进行会话交流。
Producer public class Producer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { vertx.setPeriodic(5000L, h -> { System.out.println("***********************"); vertx.eventBus().<String>request("test", "你吃了吗") .onComplete(res -> { if (res.succeeded()) { System.out.println("生产者收到回应:"+ res.result().body()); } }); }); super.start(startPromise); } } Consumer public class Consumer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { Random random = new Random(); int num = random.nextInt(10000); vertx.eventBus().<String>consumer("test", msg -> { System.out.println(String.format("消费者%s接收到消息:%s", num, msg.body())); msg.reply(num+"吃过啦"); }); super.start(startPromise); } } App public class App { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(Consumer.class, new DeploymentOptions().setInstances(3)) pose(res -> vertx.deployVerticle(Producer.class.getName())); } } AppRunLog *********************** 消费者8585接收到消息:你吃了吗 生产者收到回应:8585吃过啦 *********************** 消费者9672接收到消息:你吃了吗 生产者收到回应:9672吃过啦 *********************** 消费者3615接收到消息:你吃了吗 生产者收到回应:3615吃过啦 发布订阅publish消息发送到某个地址上,Vertx把消息分发到注册到这个地址上的所有消费者上。
Producer public class Producer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { vertx.setPeriodic(5000L, h -> { System.out.println("***********************"); vertx.eventBus().publish("test", "你吃了吗"); }); super.start(startPromise); } } Consumer public class Consumer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { Random random = new Random(); int num = random.nextInt(10000); vertx.eventBus().<String>consumer("test", msg -> { System.out.println(String.format("消费者%s接收到消息:%s", num, msg.body())); }); super.start(startPromise); } } App public class App { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(Consumer.class.getName(), new DeploymentOptions().setInstances(3)) pose(res -> vertx.deployVerticle(Producer.class.getName())); } } AppRunLog *********************** 消费者5195接收到消息:你吃了吗 消费者9192接收到消息:你吃了吗 消费者2572接收到消息:你吃了吗 *********************** 消费者2572接收到消息:你吃了吗 消费者5195接收到消息:你吃了吗 消费者9192接收到消息:你吃了吗 编码解码器Vertx消息总线默认只对一些基础类型的消息提供编码解码,若想要发送一个实体对象消息,那么需要自定义消息编码解码器,不然会报错 No message codec for type.
自定义单个编解码器 User @Data public class User { private String id; private String name; } UserCodec public class UserCodec implements MessageCodec<User, User> { /** * 编码 * @param buffer * @param user */ @Override public void encodeToWire(Buffer buffer, User user) { Buffer encoded = Json.CODEC.toBuffer(user, false); buffer.appendInt(encoded.length()); buffer.appendBuffer(encoded); } /** * 集群传输解码 * @param pos * @param buffer * @return */ @Override public User decodeFromWire(int pos, Buffer buffer) { int length = buffer.getInt(pos); pos += 4; Buffer slice = buffer.slice(pos, pos + length); String json = slice.toString(); return JacksonUtil.jsonToBean(json, User.class); } /** * 本地传输解码 * @param user * @return */ @Override public User transform(User user) { return user; } @Override public String name() { return "user"; } @Override public byte systemCodecID() { return -1; } } Producer public class Producer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { vertx.setTimer(2000L, h -> { System.out.println("***********************"); User user = new User(); user.setId("1"); user.setName("张三"); vertx.eventBus().<User>request("test", user) .onComplete(res -> { if (res.succeeded()) { System.out.println("生产者收到回应:"+ res.result().body()); } }); }); super.start(startPromise); } } Consumer public class Consumer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { vertx.eventBus().<User>consumer("test", msg -> { System.out.println(String.format("消费者接收到消息:%s",msg.body())); User user = new User(); user.setId("2"); user.setName("李四"); msg.reply(user); }); super.start(startPromise); } } App public class App { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); vertx.eventBus().registerDefaultCodec(User.class, new UserCodec()); vertx.deployVerticle(Consumer.class.getName()) pose(res -> vertx.deployVerticle(Producer.class.getName())); } } AppRunLog *********************** 消费者接收到消息:User(id=1, name=张三) 生产者收到回应:User(id=2, name=李四) 自定义公用编解码器 RestRequest公用请求参数类
@Data public class RestRequest<T> { private String reqTopic; private T reqBody; } ReqMessageCodec公用请求参数消息编解码类
public class ReqMessageCodec<T> implements MessageCodec<RestRequest<T>, T> { @Override public void encodeToWire(Buffer buffer, RestRequest<T> request) { Buffer encoded = Json.CODEC.toBuffer(request, false); buffer.appendInt(encoded.length()); buffer.appendBuffer(encoded); } @Override public T decodeFromWire(int pos, Buffer buffer) { int length = buffer.getInt(pos); pos += 4; Buffer slice = buffer.slice(pos, pos + length); String s = slice.toString(); RestRequest<T> request = JacksonUtil.jsonToBean(s, new TypeReference<>() { }); return request.getReqBody(); } @Override public T transform(RestRequest<T> request) { return request.getReqBody(); } @Override public String name() { return "req"; } @Override public byte systemCodecID() { return -1; } } RestResponse公用响应消息类
@Data public class RestResponse<T> { private int code; private T data; private String msg; } RespMessageCodec公用响应消息编解码类
public class RespMessageCodec<T> implements MessageCodec<RestResponse<T>, RestResponse<T>> { @Override public void encodeToWire(Buffer buffer, RestResponse<T> response) { Buffer encoded = Json.CODEC.toBuffer(response, false); buffer.appendInt(encoded.length()); buffer.appendBuffer(encoded); } @Override public RestResponse<T> decodeFromWire(int pos, Buffer buffer) { int length = buffer.getInt(pos); pos += 4; Buffer slice = buffer.slice(pos, pos + length); return JacksonUtil.jsonToBean(slice.toString(), new TypeReference<>() { }); } @Override public RestResponse<T> transform(RestResponse<T> response) { return response; } @Override public String name() { return "resp"; } @Override public byte systemCodecID() { return -1; } } Producer public class Producer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { vertx.setTimer(2000L, h -> { System.out.println("***********************"); User user = new User(); user.setId("1"); user.setName("张三"); RestRequest<User> req = new RestRequest<>(); req.setReqTopic("addUser"); req.setReqBody(user); vertx.eventBus().<RestResponse>request(req.getReqTopic(), req) .onComplete(res -> { if (res.succeeded()) { System.out.println("生产者收到回应:"+ res.result().body()); }else { System.out.println("生产者发送消息失败:"+res.cause().getMessage()); } }); }); super.start(startPromise); } } Consumer public class Consumer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { vertx.eventBus().<User>consumer("addUser", msg -> { System.out.println(String.format("消费者接收到消息:%s",msg.body())); RestResponse<String> response = new RestResponse(); response.setCode(200); response.setMsg("插入成功"); msg.reply(response); }); super.start(startPromise); } } SingleApp本地传输数据
public class SingleApp { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); vertx.eventBus().registerDefaultCodec(RestRequest.class, new ReqMessageCodec()) .registerDefaultCodec(RestResponse.class, new RespMessageCodec()); vertx.deployVerticle(Consumer.class.getName()) pose(res -> vertx.deployVerticle(Producer.class.getName())); } } SingleAppRunLog *********************** 消费者接收到消息:User(id=1, name=张三) 生产者收到回应:RestResponse(code=200, data=null, msg=插入成功) ClusterApp集群模式传输数据
public class ClusterApp { static Vertx vertx; public static void initCluster(Verticle service) { ClusterManager mgr = new HazelcastClusterManager(); VertxOptions options = new VertxOptions().setClusterManager(mgr); Vertx.clusteredVertx(options) pose(v -> { vertx = v; vertx.eventBus().registerDefaultCodec(RestRequest.class, new ReqMessageCodec()) .registerDefaultCodec(RestResponse.class, new RespMessageCodec()); return vertx.deployVerticle(service); }).onSuccess(h -> { System.out.println("App Start Complete!"); }) .onFailure(err -> { err.printStackTrace(); System.err.println("App Start Failed "+ err.getMessage()); }); } } ConsumerApp消费服务
public class ConsumerApp extends ClusterApp{ public static void main(String[] args) { initCluster(new Consumer()); } } ConsumerAppRunLog Members {size:2, ver:2} [ Member [127.0.0.1]:5703 - 29afc653-a4ef-4933-ab6a-ed0bbbb7041e this Member [127.0.0.1]:5704 - 289a8535-cb6f-4fef-8501-04c40c56f189 ] 消费者接收到消息:{id=1, name=张三} ProducerApp生产服务
public class ProducerApp extends ClusterApp{ public static void main(String[] args) { initCluster(new Producer()); } } ProducerAppRunLog Members {size:2, ver:2} [ Member [127.0.0.1]:5703 - 29afc653-a4ef-4933-ab6a-ed0bbbb7041e Member [127.0.0.1]:5704 - 289a8535-cb6f-4fef-8501-04c40c56f189 this ] *********************** 生产者收到回应:RestResponse(code=200, data=null, msg=插入成功) DeliveryOptions设置请求响应超时时间
public class Producer extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { System.out.println(new Date()+"***********************"); vertx.eventBus().<String>request("test", "你吃了吗", new DeliveryOptions().setSendTimeout(5000)) .onComplete(res -> { if (res.succeeded()) { System.out.println(new Date()+"生产者接收应答消息:"+res.result().body()); } else { System.out.println(new Date()+"生产者发送消息失败"+res.cause().getMessage()); } }); super.start(startPromise); } }Vertx-EventBus篇由讯客互联开源代码栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Vertx-EventBus篇”