티스토리 뷰

728x170

 포스팅은 J2EE Spec 중 JMS Topic에 대해 알아보겠습니다.


JEUS에서 JMS(Java Message Service)를 운영하는 방법을 설명한다.

JEUS JMS의 특징, 구성요소와 구조, JMS 서버를 관리하고 모니터링 하는 방법을 설명하고, JMS 프로그래밍 방법을 예제와 함께 설명한다.

 

소개
Java Message Service
Java Message Service(이하 JMS)는 어플리케이션간의 통신을 메시지 기반으로 수행하기 위한 Java 표준 API를 정의한 것이다. 여기에는 메시징에 필요한 구성 요소 및 메시지 모델에 해당하는 인터페이스를 정의하고 이들 간의 관계를 설명하고 있다.

 

JMS 특징
JMS는 다음과 같은 특징을 갖는다.

 

• 느슨한 결합 구조: 메시지 송신자와 수신자는 서로에 대해 알 필요가 없다.
• 비동기 통신: 메시지 수신자는 요청하지 않아도 서버에 도착하는 대로 메시지를 전달 받을 수 있다.
• 신뢰성 있는 메시지 전달: 메시지가 반드시 한 번(Once-and-Only-Once) 전달되는 것을 보장한다.

[그림 1] JMS messaging

Java 표준 API를 정의한 것이다. 여기에는 메시징에 필요한 구성 요소 및 메시지 모델에 해당


JEUS JMS 개요
 

JEUS JMS 모듈은 JEUS 시스템의 유용한 컴포넌트이다. 엔터프라이즈 메시징 시스템에 접근하기 위해서 필요한 서브 컴포넌트들을 포함하고 있는 엔진이다.
JEUS JMS는 또한 JEUS Web Application Server와 함께 완전히 연동이 된다. 예를 들어 Message Driven Bean (JEUS WAS EJB 엔진에서 deploy 됨)은 JEUS JMS를 이용한다. 

 

JEUS JMS 특징
JEUS JMS는 JMS 버전 1.1 스펙을 지원하며 다음과 같은 주요 특징이 있다.
 

Non-Blocking I/O 지원
Non-Blocking I/O를 사용하여 같은 시스템 리소스로 더 많은 클라이언트를 동시에 처리할 수 있다.
XA 트랜잭션 지원
JMS 스펙에서 선택 사항으로 정의되어 있는 XA 트랜잭션을 지원한다.
고가용성(High Availability)
클라이언트는 일시적인 커넥션 장애로부터 자동으로 복구된다.
또한 다수의 JEUS JMS 서버를 클러스터로 구성하여 클라이언트 커넥션 및 메시지 부하를 분산시킬 수 있고, 일부 서버에서 장애가 발생하더라도 나머지 서버들을 통해 지속적인 서비스 제공이 가능하다.


JEUS JMS 컴포넌트
이 절에서는 JEUS JMS가 자신이 포함하고 있는 컴포넌트, 함께 상호작용을 요구하는 기본적이고 중요한 컴포넌트들을 설명한다.

메인 컴포넌트 

1. JMS engine
Administration 객체의 서버 컴포넌트들을 포함하고 있고 JEUS Server의 엔진 컨테이너 내에 위치한다.
2. JNDI (Java Naming and Directory Interface)
bind 또는 lookup 기능을 제공한다.
3. Client application
메시지를 보내고 받는 역할을 한다.
4. Persistent Storage
Persistent 메시지 데이터와 트랜잭션 데이터를 저장한다.


서브 컴포넌트

1. ConnectionFactory
클라이언트 어플리케이션이 JEUS JMS 엔진과 연결을 위해서 사용된다.

2. Topic

publish-subscripe 메시지 모델을 지원하는 destination이다.
3. Queue
point-to-point 메시지 모델을 지원하는 destination이다.


JEUS JMS 아키텍처


JEUS JMS 관리 툴
JEUS JMS를 제어하기 위한 3 가지 툴이 있다. 일반적인 엔진과 관련된 명령은 jeusadmin, 웹 관리자 그리고 jmsadmin을 통해서 통제 가능하다.
웹 관리자는 JEUS JMS 엔진을 생성하고, 생성된 엔진의 설정을 세팅할 수 있다. 또한 엔진을 제어하고 엔진의 상태를 체크하는 등의 관리 기능도 수행할 수 있다.

 

JMS 디렉토리 구조

[그림 3] JEUS 디렉토리 구조
 

• JEUS_HOME
JEUS 제품이 설치된 디렉토리이다(예, c:\TmaxSoft\jeus5).
• JEUS_HOME\bin
jmsadmin console tool을 포함한다.
• JEUS_HOME\config
JEUS의 환경구성 파일들을 노드별로 관리한다. 이 디렉토리 아래에는 노드 이름의 디렉토리가 있는데 여기에 해당 노드의 모든 엔진들의 환경구성 파일들이 위치한다. 각 엔진들의 환경구성 파일은 엔진들의 이름으로 된 디렉토리 아래에 놓인다.
• JEUS_HOME\config\<node name>\<node name>_jms_<eninge name>
JMS Server의 환경설정 파일인 JMSMain.xml이 위치한다. JMSMain.xml은 JMS Server에 관한 모든 설정들을 담고 있다.
• logs
JMS 엔진에 대한 로그 정보를 저장한다.


JMS Server 구성
Connection factory
클라이언트 어플리케이션 사용을 편리하게 하기 위해서 JEUS JMS Server에서 connection을 만들고 저장하는데 사용된다.

Destination

JMS 클라이언트 어플리케이션들이 message를 보내고 받는 장소이다.

Thread pool
클라이언트가 요구하는 connection을 만들고, 메시지를 보내고 메시지를 받는 모든 처리를 위해서 사용하는 Worker Thread를 포함한다.

Logger
두 종류의 로그 정보를 제공한다. 하나는 JMS Server 처리에 대한 정보를 저장하는 error-log로 JEUSMain.xml의 <engine-command>에 지정한다. 다른 하나는 클라이언트의 connection과 disconnection에 대한 정보를 기록하는 access-log로 JMSMain.xml 파일 내의 <jms-server> element 아래의 태그에 위치한다. 설정 방법은 JEUS의 다른 logger 설정과 같다.

<< JMSMain.xml >> 

<jms-server>

    <connection-factory>
        <type>queue</type> queue 또는 topic
        <name>ConFacSports</name>
        <export-name>ConFacSportsJndi</export-name> JNDI를 통해서 bind 되는 이름
        <client-id>client_sports</client-id> topic 형태의 connection factory에 대해서 default client-id 값을 설정
    </connection-factory>
    <destination>
        <type>queue</type> queue 또는 topic
        <name>myDestName</name>
        <export-name>myDestJndi</export-name> JNDI를 통해서 bind 되는 이름
        <multiple-receiver>false</multiple-receiver>
    </destination>
    <thread-pool>
        <min>10</min>
        <max>20</max>
    </thread-pool>

</jms-server>


Persistent Storage
JMS 서버가 다운 될 경우에, 백업이나 failover 기능을 제공하기 위해서, JMS 환경파일에 persistent storage 관련 부분을 구성해야 한다. 파일이나 database 둘 중에 하나를 JMS 환경 파일에 지정해야 한다.
Database 벤더로 oracle, mssql, informix, cloudscape, db2, sybase 등을 지원한다. 만일 JEUSMain.xml 파일내의 벤더 이름이 database element 내의 unknown 또는 others 이면, JEUS JMS는 자동적으로 벤더 타입을 결정 짓는다. 이런 경우에는, 다음의 옵션은 특정한 database 벤더 타입을 결정하는데 사용된다(jeus 스크립트내의 속성값을 설정).

-Djeus.jms.server.blackboxDB=<database vendor name>

 

<< JMSMain.xml >>  

<jms-server>

<storage>
    <db-storage>
    <data-source-name>datasource1</data-source-name>
    <destination-table-name>jms_dest</destination-tablename>
    <message-table-name>jms_message</message-table-name>
    <relation-table-name>jms_relation</relation-tablename>
    <subscription-table-name>jms_sub</subscription-tablename>
    <transaction-table-name>jms_tx</transaction-tablename>
    <key-table-name>jms_key</key-table-name>
    </db-storage>
</storage>

</jms-server>


Clustering
JMS Servers의 클러스터링은 <destination> element의 relay 설정으로 구성된다. 이를 통해 얻을 수 있는 효과는 부하분산과 failover이다. 즉, 어떤 Queue에 대해 다른 JMS Server의 Queue로 relay를 걸어두고 이 Queue의 메시지를 처리할 client를 반씩 나누어 어느 하나의 Server에 접속해 놓는다면 JMS Server에 가해지는 부하가 반으로 줄어들 것이다. 또한 이런 경우 하나의 Server가 down 된다면 나머지 한 Server가 message 처리를 계속해 줄 수 있을 것이다.
 

JMS 서버 관리 및 모니터링
JEUS JMS Server의 관리 및 모니터링 방법에 대해 설명한다.

JEUS administration

jeusadmin에 접속한다.

 

>jeusadmin shhan
Login name>administrator
Password>
JEUS 5.0 (fix #23) Jeus Manager Controller
shhan>
shhan>exit
>


JMS engine이 실행 중인지 확인한다.

 

shhan>allenglist
========================================
engines in the conshhan_jms_engine1
tainer shhan_container1
shhan_ejb_engine1
shhan_servlet_engine1
========================================

 

JMS engine을 기동 및 정지할 수 있다.

 

shhan>starteng shhan_jms_engine1
shhan_jms_engine1 engine started successful
shhan>downeng shhan_jms_engine1
shhan_jms_engine1 engine down successful

 

JMS administration jmsadmin 툴에 접속한다.

 

>jmsadmin shhan_container1
Login name>administrator
Password>
JMS Engine Administration
shhan_container1>exit

툴에서 제공하는 명령어를 ‘h’ 명령어로 확인할 수 있다.

 

shhan_container1>h
type 'help <command>' for the detailed help of each command.
usage : help(h) [-l] [-a] [-g group_name] [command]
[ JMSAdmin Commands ]______________________________________________________
cluster conf confall createconf createdest dest destall
durable durableall entry entryall mbeanlist removeconf removedest
restat sdb server stat takeover
[ General ]________________________________________________________________
exit help p setProperty unsetProperty


Connection factory 목록을 조회하고 connection factory를 생성하거나 삭제할 수 있다.

 

shhan_container1>confall
QueueConnectionFactory
TopicConnectionFactory
shhan_container1>createconf -t Topic2ConnectionFactory Topic2ConnFactory
shhan_container1>confall
QueueConnectionFactory
Topic2ConnectionFactory
TopicConnectionFactory
shhan_container1>removeconf Topic2ConnectionFactory
shhan_container1>confall
QueueConnectionFactory
TopicConnectionFactory

 

상세한 connection factory 정보를 ‘conf’ 명령어로 확인할 수 있다.

 

shhan_container1>conf TopicConnectionFactory
CONNECTION FACTORY INFORMATIONS
===========================================================================
FACTORY NAME : TopicConnectionFactory
EXPORT NAME : TopicConnectionFactory
FACTORY NAME : topic
CLIENT ID : not-set
MAX THREAD : 300
CLUSTER : false
BROKER SELECTION POLICY : round-robin
===========================================================================


 

Destination 목록을 조회하고 destination을 생성하거나 삭제할 수 있다.

 

shhan_container1>destall
ExamplesQueue
ExamplesTopic
shhan_JMS_DLQ
shhan_container1>createdest -t ExamplesTopic2 ExamTopic
shhan_container1>destall
ExamplesQueue
ExamplesTopic ExamplesTopic2
shhan_JMS_DLQ
shhan_container1>removedest ExamplesTopic2
shhan_container1>destall
ExamplesQueue
ExamplesTopic
shhan_JMS_DLQ


상세한 destination 정보를 ‘dest’ 명령어로 확인할 수 있다.

 

shhan_container1>dest ExamplesTopic
Type: Topic
Destination name:ExamplesTopic
Export name:ExamplesTopic
Consumers count:0
Durable Subscriber count:0


Destination 통계 정보를 확인할 수 있으며 반복 실행도 가능하다(초 단위 설정).

 

shhan_container1>stat ExamplesTopic –i 10
========================================================================
Statistics for ExamplesTopic
------------------------------------------------------------------------
Arrival Completion
------------------------------------------------------------------------
Number of messages: 0 0
Start time: N/A N/A
Last sample time: N/A N/A
Messages per second: N/A N/A
------------------------------------------------------------------------


JMS 프로그래밍

JEUS JMS 클라이언트 어플리케이션을 어떻게 개발하는지에 대해서 설명한다.
어플리케이션 구조
JMS 클라이언트 어플리케이션의 일반적인 구조를 보여준다.

 

• Step 1: ConnectionFactory lookup 한다.
• Step 2: Connection을 만든다.
• Step 3: Session을 만든다.
• Step 4: Destination lookup 한다.


Step 1에서 4까지는 JMS 클라이언트 어플리케이션에 대해 리소스를 설정하는 단계이다.

 

• Step 5: Message Producer 또는 Message Consumer를 만들거나 Message Listener를 등록한다.
• Step 6: Connection을 시작한다.
• Step 7: 어떤 특정한 작업의 어플리케이션을 실행한다.
• Step 8: Connection을 닫는다.


Topic 메시지 보내기
어플리케이션 전체 구조
Topic에 메시지를 보내는 전체적인 어플리케이션 구조이다.

 

public class tPublisher {
public tPublisher()
public void initResource() // 1. 리소스 초기화
public void startConnection() // 2. Connection 시작
public void publishToTopic() // 3. 메시지 보내기
public void releaseResource() // 4. 리소스 해제
public static void main(String[] args) {
tPublisher publisher = new tPublisher();
publisher.initResource();
publisher.startConnection();
publisher.publishToTopic();
publisher.releaseResource();
}
}

 

1. 리소스 초기화
Topic에 메시지를 보내는 클래스의 이름은 tPublisher이다.
<< tPublisher.java >>

 

package sample.manual;
import javax.jms.*;
import javax.naming.*;
public class tPublisher {
Context ctx = null;
TopicConnectionFactory tConFactory = null;
TopicConnection tCon = null;
TopicSession tSes = null;
Topic topic = null;
TopicPublisher tpublisher = null;


 

initResource() method는 리소스를 사용하기 전에 초기화한다.

 

public void initResource() {
try {
ctx = new InitialContext();
} catch (NamingException ex) {
System.err.print("Cannot create Context :" + ex.toString() );
System.exit(1);
}
//Step 1: Lookup a ConnectionFactory
try {
tConFactory = (TopicConnectionFactory)
ctx.lookup("TopicConnectionFactory");
} catch (NamingException ex) {
System.err.print("Cannot find
TopicConnectionFactory in JNDI :" + ex.toString());
System.exit(1);
}
//Step 2: Create a Connection
try {
tCon = tConFactory.createTopicConnection();
} catch (JMSException ex) {
System.err.println("Cannot create
TopicConnection :" + ex.toString());
System.exit(1);
}
//Step 3: Create a Session
try {
tSes = tCon.createTopicSession( false, Session.AUTO_ACKNOWLEDGE );
} catch (JMSException ex) {
System.err.println("Cannot create
TopicSession :" + ex.toString());
}
//Step 4: Lookup a Destination
try { topic = (Topic)ctx.lookup("ExamplesTopic");
} catch (NamingException ex) {
System.err.println("Cannot find ExamplesTopic
in JNDI :" + ex.toString());
}
//Step 5: Create a MessageProducer or a MessageConsumer,
// or register a MessageListener
© 2008 TmaxSoft Co., Ltd. All Rights Reserved.
JEUS Java Message Service
17/22
try { tpublisher = tSes.createPublisher(topic);
} catch (JMSException ex) {
System.err.println("Cannot create TopicPublisher :" + ex.toString());
System.exit(1);
}
}

 

2. Connection 시작
initResource() method에서 초기화된 connection을 시작(start) 한다.

 

public void startConnection() {
try {
//Step 6: Start a Connection
if ( tCon != null tCon.start();
) {
}
} catch (JMSException e) {
System.err.println("Cannot start
TopicConnection :" + e.toString());
System.exit(1);
}
}


 

3. 메시지 보내기
topic에 메시지를 보내기 위한 method이다.

 

public void publishToTopic() {
//Step 7: Do some application-specific operations
TextMessage textMsg = null;
try {
textMsg = tSes.createTextMessage("Hello World!");
} catch (JMSException ex) {
System.err.println("Cannot create
TextMessage :" + ex.toString());
System.exit(1);
}
try { tpublisher.publish(textMsg);
} catch ( JMSException ex) {
System.err.println("Cannot publish
TextMessage :" + ex.toString());
System.exit(1);
}
}


4. 리소스 해제
Connection을 끝내기 위한 method이다.

 

public void releaseResource() {
try {
//Step 8: Close a Connection
if ( tCon != null ) {
tCon.close();
}
} catch (JMSException ex) {
System.err.println("Cannot close TopicConnection :" + ex.toString());
}
}

Topic 메시지 받기
Topic destination에서 async 메시지를 받는 것에 대해 설명한다.
※ JMS 스펙에는 동기적으로 메시지를 받는 것을 queue 그리고 비동기적으로 메시지를 받는 것을 topic으로 엄격히 제한을 두지 않고 있다.

어플리케이션 전체 구조
Topic으로부터 메시지를 받는 전체적인 어플리케이션 구조이다.

 

public class tSubscriber {
public tSubscriber()
public void initResource() // 1. 리소스 초기화
public void startConnection() // 2. Connection 시작
public void receiveFromQueue() // 3. 메시지 받기
public void releaseResource() // 4. 리소스 해제
public void onMessage(javax.jms.Message msg) { //

allDone();
}
public void waitTillDone() { //

monitor.wait();
}
public void allDone() { //

monitor.notify();
}
public static void main(String[] args) {
tSubscriber subscriber = new tSubscriber();
subscriber.initResource();
subscriber.startConnection();
subscriber.waitTillDone();
subscriber.releaseResource();
}
}

 

1. 리소스 초기화
Topic에서 메시지를 받는 클래스의 이름은 tSubscriber이다.
<< tSubscriber.java >>

 

package sample.manual;
import javax.jms.*;
import javax.naming.*;
public class tSubscriber implements MessageListener {
Context ctx = null;
TopicConnectionFactory tConFactory = null;
TopicConnection tCon = null;
TopicSession tSes = null;
Topic topic = null;
TopicSubscriber subscriber = null;
private boolean[] monitor = new boolean[1];
public tSubscriber() {
monitor[0] = false;
}
public void onMessage(javax.jms.Message msg) {

}

Topic에서는 모니터 변수가 있는 것이 Queue와 차이가 있다. 이 예제와 같이 async 하게 메시지를 받는 MessageListener를 등록하는 경우에는, 특정 이벤트를 기다리는 모니터 변수가 main program에서 사용된다.
예를 들어 non-text message가 도착하면 그것을 통지 받고서 main thread에게 상태를 알려준다.
initResource() method는 리소스를 사용하기 전에 초기화한다. Async 메시지를 받도록 하기 위해서 topic subscriber에 MessageListener를 등록한다.

 

public void initResource() {
try {
ctx = new InitialContext();
} catch (NamingException ex) {
System.err.print("Cannot create Context :" + ex.toString() );
System.exit(1);
}
//Step 1: Lookup a ConnectionFactory
try { tConFactory = (TopicConnectionFactory)
ctx.lookup("TopicConnectionFactory");
} catch (NamingException ex) {
System.err.print("Cannot find
TopicConnectionFactory in JNDI :" + ex.toString());
System.exit(1);
}
//Step 2: Create a Connection
try { tCon = tConFactory.createTopicConnection();
} catch (JMSException ex) {
System.err.println("Cannot create
TopicConnection :" + ex.toString());
System.exit(1);
}
//Step 3: Create a Session
try { tSes = tCon.createTopicSession( false, Session.AUTO_ACKNOWLEDGE );
} catch (JMSException ex) {
System.err.println("Cannot create
TopicSession :" + ex.toString());
System.exit(1);
}
//Step 4: Lookup a Destination
try {
topic = (Topic)ctx.lookup("ExamplesTopic");
} catch (NamingException ex) {
System.err.println("Cannot find ExamplesTopic
in JNDI :" + ex.toString());
System.exit(1);
}
//Step 5: Create a MessageProducer or a MessageConsumer,
// or register a MessageListener
try {
subscriber = tSes.createSubscriber(topic);
} catch (JMSException ex) {
System.err.println("Cannot create
Subscriber :" + ex.toString());
System.exit(1);
}
try {
subscriber.setMessageListener(this);
} catch (JMSException ex) {
System.err.println("Cannot register
MessageListener :" + ex.toString());
System.exit(1);
}
}


2. Connection 시작
initResource() method에서 초기화된 coneciont을 시작(start) 한다.

 

public void startConnection() {
try {
//Step 6: Start a Connection
if ( tCon != null ) {
tCon.start();
}
} catch (JMSException e) {
System.err.println("Cannot start
TopicConnection :" + e.toString());
System.exit(1);
}
}

 

3. 메시지 받기
메시지가 도착하면 onMessage() method가 자동으로 실행된다.

 

public void onMessage(javax.jms.Message msg){
//Step 7: Do some application-specific operations
if ( msg instanceof TextMessage ) {
System.out.println("TextMessage arrived :" + msg);
} else {
System.out.println("Non-TextMessage arrived :
" + msg + " allDone();
, so stop listening");
}
}
private void allDone() {
synchronized (monitor) {
monitor[0] = true;
monitor.notify();
}
}

Main thread(main() method가 실행 중인 thread)는 onMessage() method thread가 monitor 변수를 notify 하는 것을 기다린다. 이런 내용은 다음 waitTillDone() method에서 처리된다.

 

private void waitTillDone() {
synchronized (monitor) {
while (! monitor[0]) {
try {
monitor.wait();
} catch (InterruptedException ie)
{}
}
}

}

 

4. 리소스 해제
Connection을 끝내기 위한 method이다.

 

public void releaseResource() {
try {
//Step 8: Close a Connection
if ( tCon != null tCon.close();
) {
}
} catch (JMSException ex) {
System.err.println("Cannot close TopicConnection :" + ex.toString());
}
}


예제 실행
메시지 수신 대기

 

>tSubscriber
java -Djava.naming.factory.initial=jeus.jndi.JEUSContextFactory -Djeus.baseport= sample.manual.tSubscriber
[2008.11.12 21:32:45][2][] [client-10] [ChannelManager/createSocket] connection creating to shhan(0) [192.168.10.100:19741(null)] by Thread[main,5,main] remaining 5000 msec


메시지 보내기

 

>tPublisher
C:\TmaxSoft\JEUS5\samples\manual_examples\JMSGuide\bin>java -Djava.naming.factory.initial=jeus.jndi.JEUSContextFactory -Djeus.baseport= sample.manual.tPublisher
[2008.11.12 22:32:31][2][] [client-10] [ChannelManager/createSocket] connection creating to shhan(0) [192.168.10.100:19741(null)] by Thread[main,5,main] remaining 5000 msec


메시지 받기

 

>tSubscriber

TextMessage arrived :F:{N:{ID:0:5:1:1}T31(0):5:1:1:1:x::x->T(null:ExamplesTopic)},text={(0xcb)Hello World!}


결 론
본 문서에서는 JEUS 환경에서 JMS를 구축하는 방법을 설명하였다. JEUS JMS의 환경 설정과 JEUS JMS 어플리케이션 개발을 위한 간략한 설명을 하였다.

그리드형
댓글
댓글쓰기 폼