MQTT實現消息接收(接收消息需實現MqttSimpleCallback接口並實現它的publishArrived方法)必須註冊接收消息方法:java
mqttClient.registerSimpleHandler(simpleCallbackHandler);// 註冊接收消息方法和訂閱接主題:
mqttClient.subscribe(TOPICS, QOS_VALUES);// 訂閱接主題服務端:
package com.gmcc.kuchuan.business; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttSimpleCallback; /** * MQTT消息發送與接收 * @author Join * */ public class MqttBroker { private final static Log logger = LogFactory.getLog(MqttBroker.class);// 日誌對象 // 鏈接參數 private final static String CONNECTION_STRING = "tcp://localhost:9901"; private final static boolean CLEAN_START = true; private final static short KEEP_ALIVE = 30;// 低耗網絡,可是又須要及時獲取數據,心跳30s private final static String CLIENT_ID = "master";// 客戶端標識 private final static int[] QOS_VALUES = { 0, 0, 2, 0 };// 對應主題的消息級別 private final static String[] TOPICS = { "Test/TestTopics/Topic1", "Test/TestTopics/Topic2", "Test/TestTopics/Topic3", "client/keepalive" }; private static MqttBroker instance = new MqttBroker(); private MqttClient mqttClient; /** * 返回實例對象 * * @return */ public static MqttBroker getInstance() { return instance; } /** * 從新鏈接服務 */ private void connect() throws MqttException { logger.info("connect to mqtt broker."); mqttClient = new MqttClient(CONNECTION_STRING); logger.info("***********register Simple Handler***********"); SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler(); mqttClient.registerSimpleHandler(simpleCallbackHandler);// 註冊接收消息方法 mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE); logger.info("***********subscribe receiver topics***********"); mqttClient.subscribe(TOPICS, QOS_VALUES);// 訂閱接主題 logger.info("***********CLIENT_ID:" + CLIENT_ID); /** * 完成訂閱後,能夠增長心跳,保持網絡通暢,也能夠發佈本身的消息 */ mqttClient.publish("keepalive", "keepalive".getBytes(), QOS_VALUES[0], true);// 增長心跳,保持網絡通暢 } /** * 發送消息 * * @param clientId * @param messageId */ public void sendMessage(String clientId, String message) { try { if (mqttClient == null || !mqttClient.isConnected()) { connect(); } logger.info("send message to " + clientId + ", message is " + message); // 發佈本身的消息 mqttClient.publish("GMCC/client/" + clientId, message.getBytes(), 0, false); } catch (MqttException e) { logger.error(e.getCause()); e.printStackTrace(); } } /** * 簡單回調函數,處理server接收到的主題消息 * * @author Join * */ class SimpleCallbackHandler implements MqttSimpleCallback { /** * 當客戶機和broker意外斷開時觸發 能夠再此處理從新訂閱 */ @Override public void connectionLost() throws Exception { // TODO Auto-generated method stub System.out.println("客戶機和broker已經斷開"); } /** * 客戶端訂閱消息後,該方法負責回調接收處理消息 */ @Override public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception { // TODO Auto-generated method stub System.out.println("訂閱主題: " + topicName); System.out.println("消息數據: " + new String(payload)); System.out.println("消息級別(0,1,2): " + Qos); System.out.println("是不是實時發送的消息(false=實時,true=服務器上保留的最後消息): " + retained); } } public static void main(String[] args) { new MqttBroker().sendMessage("client", "message"); } }Android客戶端: 核心代碼:MQTTConnection內部類
import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Timer; import java.util.TimerTask; import android.app.AlarmManager; import android.app.Notification; import android.app.NotificationManager; import android.app.PendingIntent; import android.app.Service; import android.content.BroadcastReceiver; import android.content.Context; import android.content.Intent; import android.content.IntentFilter; import android.content.SharedPreferences; import android.database.Cursor; import android.net.ConnectivityManager; import android.net.NetworkInfo; import android.os.Binder; import android.os.Bundle; import android.os.IBinder; import android.provider.ContactsContract; import android.util.Log; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttPersistence; import com.ibm.mqtt.MqttPersistenceException; import com.ibm.mqtt.MqttSimpleCallback; /* * PushService that does all of the work. * Most of the logic is borrowed from KeepAliveService. * http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219 */ public class PushService extends Service { private MyBinder mBinder = new MyBinder(); // this is the log tag public static final String TAG = "PushService"; // the IP address, where your MQTT broker is running. private static final String MQTT_HOST = "120.197.230.53"; // "209.124.50.174";// // the port at which the broker is running. private static int MQTT_BROKER_PORT_NUM = 9901; // Let's not use the MQTT persistence. private static MqttPersistence MQTT_PERSISTENCE = null; // We don't need to remember any state between the connections, so we use a // clean start. private static boolean MQTT_CLEAN_START = true; // Let's set the internal keep alive for MQTT to 15 mins. I haven't tested // this value much. It could probably be increased. private static short MQTT_KEEP_ALIVE = 60 * 15; // Set quality of services to 0 (at most once delivery), since we don't want // push notifications // arrive more than once. However, this means that some messages might get // lost (delivery is not guaranteed) private static int[] MQTT_QUALITIES_OF_SERVICE = { 0 }; private static int MQTT_QUALITY_OF_SERVICE = 0; // The broker should not retain any messages. private static boolean MQTT_RETAINED_PUBLISH = false; // MQTT client ID, which is given the broker. In this example, I also use // this for the topic header. // You can use this to run push notifications for multiple apps with one // MQTT broker. public static String MQTT_CLIENT_ID = "client"; // These are the actions for the service (name are descriptive enough) public static final String ACTION_START = MQTT_CLIENT_ID + ".START"; private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP"; private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID + ".KEEP_ALIVE"; private static final String ACTION_RECONNECT = MQTT_CLIENT_ID + ".RECONNECT"; // Connection log for the push service. Good for debugging. private ConnectionLog mLog; // Connectivity manager to determining, when the phone loses connection private ConnectivityManager mConnMan; // Notification manager to displaying arrived push notifications private NotificationManager mNotifMan; // Whether or not the service has been started. private boolean mStarted; // This the application level keep-alive interval, that is used by the // AlarmManager // to keep the connection active, even when the device goes to sleep. private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 28; // Retry intervals, when the connection is lost. private static final long INITIAL_RETRY_INTERVAL = 1000 * 10; private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 30; // Preferences instance private SharedPreferences mPrefs; // We store in the preferences, whether or not the service has been started public static final String PREF_STARTED = "isStarted"; // We also store the deviceID (target) public static final String PREF_DEVICE_ID = "deviceID"; // We store the last retry interval public static final String PREF_RETRY = "retryInterval"; // Notification title public static String NOTIF_TITLE = "client"; // Notification id private static final int NOTIF_CONNECTED = 0; // This is the instance of an MQTT connection. private MQTTConnection mConnection; private long mStartTime; boolean mShowFlag = true;// 是否顯示通知 public static Context ctx; private boolean mRunFlag = true;// 是否向服務器發送心跳 Timer mTimer = new Timer(); // Static method to start the service public static void actionStart(Context ctx) { Intent i = new Intent(ctx, PushService.class); i.setAction(ACTION_START); ctx.startService(i); PushService.ctx = ctx; } // Static method to stop the service public static void actionStop(Context ctx) { Intent i = new Intent(ctx, PushService.class); i.setAction(ACTION_STOP); ctx.startService(i); } // Static method to send a keep alive message public static void actionPing(Context ctx) { Intent i = new Intent(ctx, PushService.class); i.setAction(ACTION_KEEPALIVE); ctx.startService(i); } @Override public void onCreate() { super.onCreate(); log("Creating service"); mStartTime = System.currentTimeMillis(); try { mLog = new ConnectionLog(); Log.i(TAG, "Opened log at " + mLog.getPath()); } catch (IOException e) { Log.e(TAG, "Failed to open log", e); } // Get instances of preferences, connectivity manager and notification // manager mPrefs = getSharedPreferences(TAG, MODE_PRIVATE); mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE); mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE); /* * If our process was reaped by the system for any reason we need to * restore our state with merely a call to onCreate. We record the last * "started" value and restore it here if necessary. */ handleCrashedService(); } // This method does any necessary clean-up need in case the server has been // destroyed by the system // and then restarted private void handleCrashedService() { if (wasStarted() == true) { log("Handling crashed service..."); // stop the keep alives stopKeepAlives(); // Do a clean start start(); } } @Override public void onDestroy() { log("Service destroyed (started=" + mStarted + ")"); // Stop the services, if it has been started if (mStarted == true) { stop(); } try { if (mLog != null) mLog.close(); } catch (IOException e) { } } @Override public void onStart(Intent intent, int startId) { super.onStart(intent, startId); log("Service started with intent=" + intent); if (intent == null) { return; } // Do an appropriate action based on the intent. if (intent.getAction().equals(ACTION_STOP) == true) { stop(); stopSelf(); } else if (intent.getAction().equals(ACTION_START) == true) { start(); } else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) { keepAlive(); } else if (intent.getAction().equals(ACTION_RECONNECT) == true) { if (isNetworkAvailable()) { reconnectIfNecessary(); } } } public class MyBinder extends Binder { public PushService getService() { return PushService.this; } } @Override public IBinder onBind(Intent intent) { return mBinder; } // log helper function private void log(String message) { log(message, null); } private void log(String message, Throwable e) { if (e != null) { Log.e(TAG, message, e); } else { Log.i(TAG, message); } if (mLog != null) { try { mLog.println(message); } catch (IOException ex) { } } } // Reads whether or not the service has been started from the preferences private boolean wasStarted() { return mPrefs.getBoolean(PREF_STARTED, false); } // Sets whether or not the services has been started in the preferences. private void setStarted(boolean started) { mPrefs.edit().putBoolean(PREF_STARTED, started).commit(); mStarted = started; } private synchronized void start() { log("Starting service..."); // Do nothing, if the service is already running. if (mStarted == true) { Log.w(TAG, "Attempt to start connection that is already active"); return; } // Establish an MQTT connection connect(); // 向服務器定時發送心跳,一分鐘一次 mRunFlag = true; mTimer.schedule(new TimerTask() { @Override public void run() { if (!mRunFlag) { // this.cancel(); // PushService.this.stopSelf(); return; } System.out.println("run"); try { if (isNetworkAvailable()) { SharedPreferences pref = getSharedPreferences( "client", 0); String MOBILE_NUM = pref.getString("MOBILE_NUM", ""); HttpUtil.post(Constants.KEEPALIVE + "&mobile=" + MOBILE_NUM + "&online_flag=1"); } } catch (Exception e) { e.printStackTrace(); // TODO: handle exception } } }, 0, 60 * 1000); // Register a connectivity listener registerReceiver(mConnectivityChanged, new IntentFilter( ConnectivityManager.CONNECTIVITY_ACTION)); } private synchronized void stop() { // Do nothing, if the service is not running. if (mStarted == false) { Log.w(TAG, "Attempt to stop connection not active."); return; } // Save stopped state in the preferences setStarted(false); // Remove the connectivity receiver unregisterReceiver(mConnectivityChanged); // Any existing reconnect timers should be removed, since we explicitly // stopping the service. cancelReconnect(); // Destroy the MQTT connection if there is one if (mConnection != null) { mConnection.disconnect(); mConnection = null; } } // private synchronized void connect() { log("Connecting..."); // Thread t = new Thread() { // @Override // public void run() { // fetch the device ID from the preferences. String deviceID = "GMCC/client/" + mPrefs.getString(PREF_DEVICE_ID, null); // Create a new connection only if the device id is not NULL try { mConnection = new MQTTConnection(MQTT_HOST, deviceID); } catch (MqttException e) { // Schedule a reconnect, if we failed to connect log("MqttException: " + (e.getMessage() != null ? e.getMessage() : "NULL")); if (isNetworkAvailable()) { scheduleReconnect(mStartTime); } } setStarted(true); // } // }; // t.start(); // 向服務器定時發送心跳,一分鐘一次 mRunFlag = true; } private synchronized void keepAlive() { try { // Send a keep alive, if there is a connection. if (mStarted == true && mConnection != null) { mConnection.sendKeepAlive(); } } catch (MqttException e) { log("MqttException: " + (e.getMessage() != null ? e.getMessage() : "NULL"), e); mConnection.disconnect(); mConnection = null; cancelReconnect(); } } // Schedule application level keep-alives using the AlarmManager private void startKeepAlives() { Intent i = new Intent(); i.setClass(this, PushService.class); i.setAction(ACTION_KEEPALIVE); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE); alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + KEEP_ALIVE_INTERVAL, KEEP_ALIVE_INTERVAL, pi); } // Remove all scheduled keep alives private void stopKeepAlives() { Intent i = new Intent(); i.setClass(this, PushService.class); i.setAction(ACTION_KEEPALIVE); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE); alarmMgr.cancel(pi); } // We schedule a reconnect based on the starttime of the service public void scheduleReconnect(long startTime) { // the last keep-alive interval long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL); // Calculate the elapsed time since the start long now = System.currentTimeMillis(); long elapsed = now - startTime; // Set an appropriate interval based on the elapsed time since start if (elapsed < interval) { interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL); } else { interval = INITIAL_RETRY_INTERVAL; } log("Rescheduling connection in " + interval + "ms."); // Save the new internval mPrefs.edit().putLong(PREF_RETRY, interval).commit(); // Schedule a reconnect using the alarm manager. Intent i = new Intent(); i.setClass(this, PushService.class); i.setAction(ACTION_RECONNECT); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE); alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi); } // Remove the scheduled reconnect public void cancelReconnect() { Intent i = new Intent(); i.setClass(PushService.this, PushService.class); i.setAction(ACTION_RECONNECT); PendingIntent pi = PendingIntent.getService(PushService.this, 0, i, 0); AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE); alarmMgr.cancel(pi); } private synchronized void reconnectIfNecessary() { log("mStarted" + mStarted); log("mConnection" + mConnection); if (mStarted == true && mConnection == null) { log("Reconnecting..."); connect(); } } // This receiver listeners for network changes and updates the MQTT // connection // accordingly private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() { @Override public void onReceive(Context context, final Intent intent) { // Get network info // Thread mReconnect = new Thread(){ // public void run() { NetworkInfo info = (NetworkInfo) intent .getParcelableExtra(ConnectivityManager.EXTRA_NETWORK_INFO); // Is there connectivity? boolean hasConnectivity = (info != null && info.isConnected()) ? true : false; log("Connectivity changed: connected=" + hasConnectivity); if (hasConnectivity) { reconnectIfNecessary(); } else if (mConnection != null) { // Thread cancelConn = new Thread(){ // public void run() { // // if there no connectivity, make sure MQTT connection is // destroyed log("cancelReconnect"); mConnection.disconnect(); mConnection = null; log("cancelReconnect" + mConnection); cancelReconnect(); // } // }; // cancelConn.start(); } // }; // // }; // mReconnect.start(); } }; // Display the topbar notification private void showNotification(String text, Request request) { Notification n = new Notification(); n.flags |= Notification.FLAG_SHOW_LIGHTS; n.flags |= Notification.FLAG_AUTO_CANCEL; n.defaults = Notification.DEFAULT_ALL; n.icon = R.drawable.ico; n.when = System.currentTimeMillis(); Intent intent = new Intent(); Bundle bundle = new Bundle(); bundle.putSerializable("request", request); bundle.putString("currentTab", "1"); intent.putExtras(bundle); intent.setClass(this, MainActivity.class); intent.setAction(Intent.ACTION_MAIN); intent.addCategory(Intent.CATEGORY_LAUNCHER); intent.setFlags(Intent.FLAG_ACTIVITY_NEW_TASK | Intent.FLAG_ACTIVITY_RESET_TASK_IF_NEEDED); // Simply open the parent activity PendingIntent pi = PendingIntent.getActivity(this, 0, intent, 0); // Change the name of the notification here n.setLatestEventInfo(this, NOTIF_TITLE, text, pi); mNotifMan.notify(NOTIF_CONNECTED, n); } // Check if we are online private boolean isNetworkAvailable() { NetworkInfo info = mConnMan.getActiveNetworkInfo(); if (info == null) { return false; } return info.isConnected(); } // This inner class is a wrapper on top of MQTT client. private class MQTTConnection implements MqttSimpleCallback { IMqttClient mqttClient = null; // Creates a new connection given the broker address and initial topic public MQTTConnection(String brokerHostName, String initTopic) throws MqttException { // Create connection spec String mqttConnSpec = "tcp://" + brokerHostName + "@" + MQTT_BROKER_PORT_NUM; // Create the client and connect mqttClient = MqttClient.createMqttClient(mqttConnSpec, MQTT_PERSISTENCE); String clientID = MQTT_CLIENT_ID + "/" + mPrefs.getString(PREF_DEVICE_ID, ""); Log.d(TAG, "mqttConnSpec:" + mqttConnSpec + " clientID:" + clientID); mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE); // register this client app has being able to receive messages mqttClient.registerSimpleHandler(this); // Subscribe to an initial topic, which is combination of client ID // and device ID. // initTopic = MQTT_CLIENT_ID + "/" + initTopic; subscribeToTopic(initTopic); log("Connection established to " + brokerHostName + " on topic " + initTopic); // Save start time mStartTime = System.currentTimeMillis(); // Star the keep-alives startKeepAlives(); } // Disconnect public void disconnect() { // try { stopKeepAlives(); log("stopKeepAlives"); Thread t = new Thread() { public void run() { try { mqttClient.disconnect(); log("mqttClient.disconnect();"); } catch (MqttPersistenceException e) { log("MqttException" + (e.getMessage() != null ? e.getMessage() : " NULL"), e); } }; }; t.start(); // } catch (MqttPersistenceException e) { // log("MqttException" // + (e.getMessage() != null ? e.getMessage() : " NULL"), // e); // } } /* * Send a request to the message broker to be sent messages published * with the specified topic name. Wildcards are allowed. */ private void subscribeToTopic(String topicName) throws MqttException { if ((mqttClient == null) || (mqttClient.isConnected() == false)) { // quick sanity check - don't try and subscribe if we don't have // a connection log("Connection error" + "No connection"); } else { String[] topics = { topicName }; mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE); } } /* * Sends a message to the message broker, requesting that it be * published to the specified topic. */ private void publishToTopic(String topicName, String message) throws MqttException { if ((mqttClient == null) || (mqttClient.isConnected() == false)) { // quick sanity check - don't try and publish if we don't have // a connection log("No connection to public to"); } else { mqttClient.publish(topicName, message.getBytes(), MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH); } } /* * Called if the application loses it's connection to the message * broker. */ public void connectionLost() throws Exception { log("Loss of connection" + "connection downed"); stopKeepAlives(); // 取消定時發送心跳 mRunFlag = false; // 向服務器發送請求,更改在線狀態 // SharedPreferences pref = getSharedPreferences("client",0); // String MOBILE_NUM=pref.getString("MOBILE_NUM", ""); // HttpUtil.post(Constants.KEEPALIVE + "&mobile=" // + MOBILE_NUM+"&online_flag=0"); // null itself mConnection = null; if (isNetworkAvailable() == true) { reconnectIfNecessary(); } } /* * Called when we receive a message from the message broker. */ public void publishArrived(String topicName, byte[] payload, int qos, boolean retained) throws MqttException { // Show a notification // synchronized (lock) { String s = new String(payload); Request request = null; try {// 解析服務端推送過來的消息 request = XmlPaserTool.getMessage(new ByteArrayInputStream(s .getBytes())); // request=Constants.request; } catch (Exception e) { e.printStackTrace(); } final Request mRequest = request; DownloadInfo down = new DownloadInfo(mRequest); down.setDownLoad(down); downloadInfos.add(down); sendUpdateBroast(); down.start(); showNotification("您有一條新的消息!", mRequest); Log.d(PushService.TAG, s); Log.d(PushService.TAG, mRequest.getMessageId()); // 再向服務端推送消息 new AdvancedCallbackHandler().sendMessage(MQTT_CLIENT_ID + "/keepalive", "***********send message**********"); } public void sendKeepAlive() throws MqttException { log("Sending keep alive"); // publish to a keep-alive topic publishToTopic(MQTT_CLIENT_ID + "/keepalive", mPrefs.getString(PREF_DEVICE_ID, "")); } } class AdvancedCallbackHandler { IMqttClient mqttClient = null; public final int[] QOS_VALUES = { 0, 0, 2, 0 };// 對應主題的消息級別 /** * 從新鏈接服務 */ private void connect() throws MqttException { String mqttConnSpec = "tcp://" + MQTT_HOST + "@" + MQTT_BROKER_PORT_NUM; // Create the client and connect mqttClient = MqttClient.createMqttClient(mqttConnSpec, MQTT_PERSISTENCE); String clientID = MQTT_CLIENT_ID + "/" + mPrefs.getString(PREF_DEVICE_ID, ""); mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE); Log.d(TAG, "鏈接服務器,推送消息"); Log.d(TAG, "**mqttConnSpec:" + mqttConnSpec + " clientID:" + clientID); Log.d(TAG, MQTT_CLIENT_ID + "/keepalive"); // 增長心跳,保持網絡通暢 mqttClient.publish(MQTT_CLIENT_ID + "/keepalive", "keepalive".getBytes(), QOS_VALUES[0], true); } /** * 發送消息 * * @param clientId * @param messageId */ public void sendMessage(String clientId, String message) { try { if (mqttClient == null || !mqttClient.isConnected()) { connect(); } Log.d(TAG, "send message to " + clientId + ", message is " + message); // 發佈本身的消息 // mqttClient.publish(MQTT_CLIENT_ID + "/keepalive", // message.getBytes(), 0, false); mqttClient.publish(MQTT_CLIENT_ID + "/keepalive", message.getBytes(), 0, false); } catch (MqttException e) { Log.d(TAG, e.getCause() + ""); e.printStackTrace(); } } } public String getPeople(String phone_number) { String name = ""; String[] projection = { ContactsContract.PhoneLookup.DISPLAY_NAME, ContactsContract.CommonDataKinds.Phone.NUMBER }; Log.d(TAG, "getPeople ---------"); // 將本身添加到 msPeers 中 Cursor cursor = this.getContentResolver().query( ContactsContract.CommonDataKinds.Phone.CONTENT_URI, projection, // Which columns to return. ContactsContract.CommonDataKinds.Phone.NUMBER + " = '" + phone_number + "'", // WHERE clause. null, // WHERE clause value substitution null); // Sort order. if (cursor == null) { Log.d(TAG, "getPeople null"); return ""; } Log.d(TAG, "getPeople cursor.getCount() = " + cursor.getCount()); if (cursor.getCount() > 0) { cursor.moveToPosition(0); // 取得聯繫人名字 int nameFieldColumnIndex = cursor .getColumnIndex(ContactsContract.PhoneLookup.DISPLAY_NAME); name = cursor.getString(nameFieldColumnIndex); Log.i("Contacts", "" + name + " .... " + nameFieldColumnIndex); // 這裏提示 // force // close System.out.println("聯繫人姓名:" + name); return name; } return phone_number; } public void sendUpdateBroast() { Intent intent = new Intent(); intent.setAction("update"); sendBroadcast(intent); } public void sendUpdateFinishBroast() { Intent intent = new Intent(); intent.setAction("updateFinishList"); sendBroadcast(intent); } public class DownloadInfo extends Thread { boolean runflag = true; Request mRequest; public float progress; public MessageBean bean = null; DownloadInfo download = null; MessageDetailDao dao = new MessageDetailDao( PushService.this.getApplicationContext()); public synchronized void stopthread() { runflag = false; } public synchronized boolean getrunflag() { return runflag; } DownloadInfo(Request mRequest) { this.mRequest = mRequest; } public void setDownLoad(DownloadInfo download) { this.download = download; } @Override public void run() { try { File dir = new File(Constants.DOWNLOAD_PATH); if (!dir.exists()) { dir.mkdirs(); } String filePath = Constants.DOWNLOAD_PATH + mRequest.getMessageId() + "." + mRequest.getExt(); bean = new MessageBean(); bean.setPath(filePath); bean.setStatus(0); bean.setDate(mRequest.getTimestamp()); bean.setLayoutID(R.layout.list_say_he_item); bean.setPhotoID(R.drawable.receive_ico); bean.setMessage_key(mRequest.getMessageId()); bean.setPhone_number(mRequest.getReceiver()); bean.setAction(1); String name = getPeople(mRequest.getSender()); bean.setName(name); bean.setFileType(Integer.parseInt(mRequest.getCommand())); if (mRequest.getCommand().equals(Request.TYPE_MUSIC)) { bean.setMsgIco(R.drawable.music_temp); bean.setText(name + "給你發送了音樂"); mRequest.setBody(Base64.encodeToString(bean.getText() .getBytes(), Base64.DEFAULT)); } else if (mRequest.getCommand().equals(Request.TYPE_CARD)) { bean.setMsgIco(R.drawable.card_temp); bean.setText(new String(Base64.decode(mRequest.getBody(), Base64.DEFAULT))); mRequest.setBody(Base64.encodeToString(bean.getText() .getBytes(), Base64.DEFAULT)); } else if (mRequest.getCommand().equals(Request.TYPE_LBS)) { bean.setMsgIco(R.drawable.address_temp); bean.setText(new String(Base64.decode(mRequest.getBody(), Base64.DEFAULT))); mRequest.setBody(Base64.encodeToString(bean.getText() .getBytes(), Base64.DEFAULT)); } else if (mRequest.getCommand().equals(Request.TYPE_PHOTO)) { bean.setText(name + "向你發送了照片"); bean.setMsgIco(-1); } else if (mRequest.getCommand().equals(Request.TYPE_PIC)) { bean.setText(name + "向你發送了圖片"); bean.setMsgIco(-1); } else if (mRequest.getCommand().equals(Request.TYPE_SMS)) { bean.setFileType(0); } if (!mRequest.getCommand().equals(Request.TYPE_CARD) && !mRequest.getCommand().equals(Request.TYPE_SMS)) { String path = Constants.FILE_DOWNLOAD_URL + mRequest.getMessageId(); URL url = new URL(path); HttpURLConnection hurlconn = (HttpURLConnection) url .openConnection();// 基於HTTP協議的鏈接對象 hurlconn.setConnectTimeout(5000);// 請求超時時間 5s hurlconn.setRequestMethod("GET");// 請求方式 hurlconn.connect(); long fileSize = hurlconn.getContentLength(); InputStream instream = hurlconn.getInputStream(); byte[] buffer = new byte[1024]; int len = 0; int number = 0; RandomAccessFile rasf = new RandomAccessFile(filePath, "rwd"); while ((len = instream.read(buffer)) != -1) {// 開始下載文件 if (getrunflag() && progress < 100) { rasf.seek(number); number += len; rasf.write(buffer, 0, len); progress = (((float) number) / fileSize) * 100; // 發送廣播,修改進度條進度 sendUpdateBroast(); } else { this.interrupt(); if (number != fileSize) {// 取消下載,將已經緩存的未下載完成的文件刪除 File file = new File(filePath); if (file.exists()) file.delete(); } PushService.downloadInfos.remove(download); sendUpdateBroast(); return; } } instream.close(); PushService.downloadInfos.remove(download); sendUpdateBroast(); } else {// 收到消息,將信息保存到數據庫 PushService.downloadInfos.remove(download); sendUpdateBroast(); } // 將文件信息保存到數據庫 dao.create(bean); sendUpdateFinishBroast(); } catch (Exception e) { PushService.downloadInfos.remove(download); sendUpdateBroast(); e.printStackTrace(); } } } public static ArrayList<DownloadInfo> downloadInfos = new ArrayList<DownloadInfo>(); public ArrayList<DownloadInfo> getDownloadInfos() { return PushService.downloadInfos; } public void setDownloadInfos(ArrayList<DownloadInfo> downloadInfos) { PushService.downloadInfos = downloadInfos; } }